分层订阅设计逻辑
上一篇文章重要是从订阅数据类型的角度进行了分析,这一篇从分层的角度继续深入。
1. NodeHandle层级的订阅任务
在上篇文章中,我介绍了下面的这个代码的一小部分,这次再详细介绍下这个代码
Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
{
ops.topic = resolveName(ops.topic);
if (ops.callback_queue == 0)
{
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else
{
ops.callback_queue = getGlobalCallbackQueue();
}
}
if (TopicManager::instance()->subscribe(ops))
{
Subscriber sub(ops.topic, *this, ops.helper);
{
boost::mutex::scoped_lock lock(collection_->mutex_);
collection_->subs_.push_back(sub.impl_);
}
return sub;
}
return Subscriber();
}
2. TopicManager层的订阅任务
TopicManager::instance()->subscribe(ops)这个是订阅的判断
/*
1. 加锁(线程安全)
2. 复用已有订阅回调?(是→直接返回true;否→继续)
3. 系统正在关闭?(是→返回false)
4. 参数校验(有空→抛异常)
5. 创建订阅对象,添加回调
6. 注册到订阅管理里(失败→警告+关闭+返回false)
7. 加入管理列表
8. 返回true
*/
bool TopicManager::subscribe(const SubscribeOptions& ops)
{
// 保证订阅操作线程安全
boost::mutex::scoped_lock lock(subs_mutex_);
// 如果已经有同样的topic和参数的订阅存在,只需要增加一个回调/引用,不需要重复订阅。
// 如果addSubCallback返回true,直接返回true,不会再往下新建订阅。
if (addSubCallback(ops))
{
return true;
}
if (isShuttingDown())
{
return false;
}
if (ops.md5sum.empty())
{
throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
}
if (ops.datatype.empty())
{
throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
}
if (!ops.helper)
{
throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
}
const std::string& md5sum = ops.md5sum;
std::string datatype = ops.datatype;
SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
if (!registerSubscriber(s, ops.datatype))
{
ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
s->shutdown();
return false;
}
subscriptions_.push_back(s);
return true;
}
上面代码的逻辑就是先去判断是否已经有订阅回调,如果没有就去创建Subscription的对象,然后给这个对象添加回调,并去注册订阅,当然添加回调前需要校验。下面的是一些类型定义。
typedef boost::shared_ptr<Subscription> SubscriptionPtr;
typedef std::list<SubscriptionPtr> L_Subscription;
L_Subscription subscriptions_;
addSubCallback判断逻辑
接下来详细介绍下addSubCallback判断逻辑
最核心的策略是从L_Subscription中遍历,找到的要求是topic和md5sum都要匹配
{
if (isShuttingDown())
{
return false;
}
for (L_Subscription::iterator s = subscriptions_.begin();
s != subscriptions_.end() && !found; ++s)
{
sub = *s;
if (!sub->isDropped() && sub->getName() == ops.topic)
{
found_topic = true;
if (md5sumsMatch(ops.md5sum, sub->md5sum()))
{
found = true;
}
break;
}
}
}
注册订阅registerSubscriber
(这个应该放在第三节Subscription层的订阅任务之后介绍,可以先略过本小节去浏览第三节的思路)
本小节,真正把一个本地 Subscription 向 ROS Master 注册,并建立(或准备建立)与现有 publisher 的连接,同时检查是否有“本进程内的本地 publisher 可直接做内存级(intra-process)连接”。 registerSubscriber() 做三件事:
- 向 ROS Master 发送 XML-RPC 请求:registerSubscriber,告知“我(这个节点)要订阅某个 topic,类型是什么,我的 XML-RPC URI 是什么”;
- 从 Master 返回的 publisher URI 列表中整理出当前已存在的远程 publishers,并调用 s->pubUpdate(pub_uris) 触发后续连接建立流程;
- 检测是否“本进程内自己已经 advertise 了同名 topic”,若有且类型兼容,则建立一条本地(零拷贝/共享队列层面的)连接:s->addLocalConnection(pub)。
其他校验的代码我不贴了,只贴下面的核心代码
XmlRpcValue args, result, payload;
args[0] = this_node::getName();
args[1] = s->getName();
args[2] = datatype;
args[3] = xmlrpc_manager_->getServerURI();
if (!master::execute("registerSubscriber", args, result, payload, true))
{
return false;
}
vector<string> pub_uris;
for (int i = 0; i < payload.size(); i++)
{
if (payload[i] != xmlrpc_manager_->getServerURI())
{
pub_uris.push_back(string(payload[i]));
}
}
// ...
// ...
s->pubUpdate(pub_uris);
if (self_subscribed)
{
s->addLocalConnection(pub);
}
3. Subscription层的订阅任务
单topic多callback设计思想
addSubCallback判断逻辑中实际上还有
if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
{
return false;
}
这里先补充下设计的思想
用户层调用
ros::NodeHandle nh;
void cb1(const std_msgs::String::ConstPtr& msg) {
ROS_INFO_STREAM("cb1: " << msg->data);
}
void cb2(const std_msgs::String::ConstPtr& msg) {
ROS_INFO_STREAM("cb2: " << msg->data);
}
int main(int argc, char** argv) {
ros::init(argc, argv, "multi_cb_demo");
ros::Subscriber s1 = nh.subscribe("/chatter", 10, cb1);
ros::Subscriber s2 = nh.subscribe("/chatter", 50, cb2); // 不同 queue_size
ros::spin();
}
内部模型设计
TopicManager
└─ Subscription (对应 /chatter + md5 + transport hints)
├─ CallbackEntry #1 (cb1, queue_size=10, callback_queue=A)
└─ CallbackEntry #2 (cb2, queue_size=50, callback_queue=A)
Publisher 发来一条消息 ->
Subscription 反序列化一次 ->
遍历 callbacks_:
根据各自 queue/policy 封装成调度单元 ->
放入各自 CallbackQueue -> 最终执行
整体模型介绍
Topic: /chatter
+----------------------+
Network | Subscription S | <-- 只建一次 (topic + md5 + transport)
Layer | - connection(s) |
+----------+-----------+
|
+---------+-----------------------------+
| | |
Callback A Callback B Callback C
(来自第一次) (第二次 subscribe) (第三次 subscribe)
每个回调对应一个 Subscriber 句柄;全部销毁 -> S 被自动 drop
添加回调的函数设计
为同一个 topic 的既有 Subscription:
校验/固化类型(md5) 构造并登记一个新的回调条目(CallbackInfo) 给它配置独立的消息缓冲(SubscriptionQueue) 处理“刚订阅就要收到 latched 最近一条消息”的语义 准备统计与后续快速分发的缓存结构 它不做:与 master 通信、网络建链(这些在第一次订阅时完成),只做“复用 + 回调挂接”。
latched场景设计——锁存(可忽略)
这块我也没有理解的很到位,暂时做一下记录
在 ROS1 中,如果一个 Publisher 以 latched=true 方式创建(例如:
ros::Publisher pub = nh.advertise<std_msgs::String>("/status", 1, true);
那么它发布的“最后一条消息”会被缓存在连接层。任何后来订阅该 topic 的订阅者,一旦订阅成功,应该立即收到这条“最近一次”消息,而不必等下一次发布。这就实现了一种“状态型”语义(类似参数广播 / 初始状态同步)。
本函数(Subscription::addCallback)中这段代码的目的: 当你“追加一个新的回调”到已有的 Subscription 上时,如果已经有 latched publisher 的历史消息,就立即把它投递到这个新回调对应的队列里,让它也马上触发一次。
if (!latched_messages_.empty())
{
boost::mutex::scoped_lock lock(publisher_links_mutex_);
V_PublisherLink::iterator it = publisher_links_.begin();
V_PublisherLink::iterator end = publisher_links_.end();
for (; it != end;++it)
{
const PublisherLinkPtr& link = *it;
if (link->isLatched())
{
M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
if (des_it != latched_messages_.end())
{
const LatchInfo& latch_info = des_it->second;
MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
bool was_full = false;
info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
if (!was_full)
{
info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
}
}
}
}
}
MessageDeserializer 的作用(延迟 + 复用)
它内部保存:
用户回调 helper(知道消息类型) 序列化的字节 header 执行阶段(在 spinner 线程上)再真正把序列化数据 decode 成具体 message 对象,此时可以:
结合“是否所有回调都是 const”来复用一个反序列化结果 避免在锁内做 CPU 重操作 允许某些“还没真正需要就不用解”的早期丢弃场景(如果队列满被 drop)
触发连接建立pubUpdate
这个函数是bool Subscription::pubUpdate(const V_string& new_pubs),是注册订阅registerSubscriber的实现逻辑,代码非常多。主要的功能是:
根据 Master(或其它来源)提供的”当前该 topic 所有 publisher 的 XML-RPC URI 列表”,计算增量变化(新增 / 删减),然后:
- 对已消失的 publisher → drop 连接
- 对新出现的 publisher → 发起连接协商(negotiateConnection) 保持 publisher_links_(已建立的连接)与”真实可用 publisher 集合”同步。

(1) 前置检查:防止关闭时修改
boost::mutex::scoped_lock lock(shutdown_mutex_);
if (shutting_down_ || dropped_)
{
return false;
}
如果该 Subscription 正在或已经 shutdown / drop(例如所有回调都注销了),则拒绝更新,返回 false。 避免”析构中还在尝试建新连接”的资源泄漏或野指针。
(2) 调试日志
收集三类信息打印到 DEBUG 级日志:
- new_pubs:这次通知的完整 URI 列表
- publisher_links_:当前已建立的连接
- pending_connections_:正在建立但尚未完成的连接(避免重复发起) 用于诊断:为什么某个 publisher 没连上、是否有重复、是否有挂起等。
(3) 差异计算
集合比较:找出”要删除的旧连接”和”要新增的连接”
- 找出需要删除的旧连接(subtractions)
V_string additions; // 新增 URI
V_PublisherLink subtractions; // 要删除的已有连接
V_PublisherLink to_add; // (未使用? 可能历史遗留或其它分支)
boost::mutex::scoped_lock lock(publisher_links_mutex_);
for (V_PublisherLink::iterator spc = publisher_links_.begin();
spc != publisher_links_.end(); ++spc)
{
bool found = false;
for (V_string::const_iterator up_i = new_pubs.begin();
!found && up_i != new_pubs.end(); ++up_i)
{
if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
{
found = true;
break;
}
}
if (!found)
subtractions.push_back(*spc);
}
遍历当前已有的 publisher_links_,看它在不在 new_pubs 里。 如果不在 → 说明该 publisher 已经 unadvertise / 下线,加入 subtractions 等待关闭。 urisEqual(a, b):ROS 定义的 URI 比较(可能会处理协议、主机名大小写等规范化)。
- 找出需要新增的连接(additions)
for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
{
bool found = false;
// 先在已建立的连接中找
for (V_PublisherLink::iterator spc = publisher_links_.begin();
!found && spc != publisher_links_.end(); ++spc)
{
if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
{
found = true;
break;
}
}
// 如果还没找到,再检查"正在建立连接"队列(pending_connections_)
if (!found)
{
boost::mutex::scoped_lock lock(pending_connections_mutex_);
for (S_PendingConnection::iterator it = pending_connections_.begin();
it != pending_connections_.end(); ++it)
{
if (urisEqual(*up_i, (*it)->getRemoteURI()))
{
found = true;
break;
}
}
}
if (!found)
additions.push_back(*up_i);
}
- 遍历 new_pubs,看是否已经在”已建立连接”或”待建立连接”中。
- 三种情况都没有 → 真正的新 URI,加入 additions。
- 为什么还检查 pending_connections_?
- 防止”Master 连续两次回调”或网络延迟导致”同一个 URI 被重复发起 negotiateConnection”。
- 如果已经在 pending 队列里,说明第一次还没完成,不应该再发起第二次。
(4) 删除旧连接(drop)
for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
{
const PublisherLinkPtr& link = *i;
if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
{
ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
link->getCallerID().c_str(), name_.c_str(),
link->getPublisherXMLRPCURI().c_str());
link->drop();
}
else
{
ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
}
}
- 遍历要删除的连接。
- 特殊判断:如果该 link 的 URI 是”自己”(XMLRPCManager 的 serverURI),跳过(本地连接通过 addLocalConnection 建立,不由 pubUpdate 管理)。
- 否则:调用 link->drop(),关闭底层 socket、清理资源、从 publisher_links_ 移除(由 drop 内部完成)。
为什么要跳过自己?
- 本地(intra-process)连接在 registerSubscriber 中单独建立(addLocalConnection),也由单独路径移除(当 publication drop 时)。
- 避免误 drop 本地优化路径。
(5) 新建连接(negotiateConnection)
for (V_string::iterator i = additions.begin(); i != additions.end(); ++i)
{
if (XMLRPCManager::instance()->getServerURI() != *i)
{
retval &= negotiateConnection(*i);
}
else
{
ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(),
XMLRPCManager::instance()->getServerURI().c_str());
}
}
negotiateConnection的逻辑下篇文章再说明,本篇文章内容过多了
4. 总结
NodeHandle::subscribe()
-> TopicManager::subscribe(ops)
-> addSubCallback(ops) // 如果已有同名+类型 Subscription,则只加回调
-> (若未复用) 创建 Subscription 对象
-> registerSubscriber(subscription, datatype)
-> master::execute("registerSubscriber", ...)
-> s->pubUpdate(existing_publishers_from_master)
-> 若本进程内存在同名 publisher: s->addLocalConnection(pub)