ROS

ROS订阅流程分析2

ROS开发

Posted by Jiayang Hu on September 20, 2025

分层订阅设计逻辑

上一篇文章重要是从订阅数据类型的角度进行了分析,这一篇从分层的角度继续深入。

1. NodeHandle层级的订阅任务

在上篇文章中,我介绍了下面的这个代码的一小部分,这次再详细介绍下这个代码

Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
{
  ops.topic = resolveName(ops.topic);
  if (ops.callback_queue == 0)
  {
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }

  if (TopicManager::instance()->subscribe(ops))
  {
    Subscriber sub(ops.topic, *this, ops.helper);

    {
      boost::mutex::scoped_lock lock(collection_->mutex_);
      collection_->subs_.push_back(sub.impl_);
    }

    return sub;
  }

  return Subscriber();
}

2. TopicManager层的订阅任务

TopicManager::instance()->subscribe(ops)这个是订阅的判断

/*
    1. 加锁(线程安全)
    2. 复用已有订阅回调?(是→直接返回true;否→继续)
    3. 系统正在关闭?(是→返回false)
    4. 参数校验(有空→抛异常)
    5. 创建订阅对象,添加回调
    6. 注册到订阅管理里(失败→警告+关闭+返回false)
    7. 加入管理列表
    8. 返回true
*/
bool TopicManager::subscribe(const SubscribeOptions& ops)
{
  // 保证订阅操作线程安全
  boost::mutex::scoped_lock lock(subs_mutex_);

  // 如果已经有同样的topic和参数的订阅存在,只需要增加一个回调/引用,不需要重复订阅。
  // 如果addSubCallback返回true,直接返回true,不会再往下新建订阅。
  if (addSubCallback(ops))
  {
    return true;
  }

  if (isShuttingDown())
  {
    return false;
  }

  if (ops.md5sum.empty())
  {
    throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
  }

  if (ops.datatype.empty())
  {
    throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
  }

  if (!ops.helper)
  {
    throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
  }

  const std::string& md5sum = ops.md5sum;
  std::string datatype = ops.datatype;

  SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
  s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);

  if (!registerSubscriber(s, ops.datatype))
  {
    ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
    s->shutdown();
    return false;
  }

  subscriptions_.push_back(s);

  return true;
}

上面代码的逻辑就是先去判断是否已经有订阅回调,如果没有就去创建Subscription的对象,然后给这个对象添加回调,并去注册订阅,当然添加回调前需要校验。下面的是一些类型定义。

typedef boost::shared_ptr<Subscription> SubscriptionPtr;
typedef std::list<SubscriptionPtr> L_Subscription;
L_Subscription subscriptions_;

addSubCallback判断逻辑

接下来详细介绍下addSubCallback判断逻辑

最核心的策略是从L_Subscription中遍历,找到的要求是topic和md5sum都要匹配

  {
    if (isShuttingDown())
    {
      return false;
    }

    for (L_Subscription::iterator s = subscriptions_.begin();
         s != subscriptions_.end() && !found; ++s)
    {
      sub = *s;
      if (!sub->isDropped() && sub->getName() == ops.topic)
      {
        found_topic = true;
        if (md5sumsMatch(ops.md5sum, sub->md5sum()))
        {
          found = true;
        }
        break;
      }
    }
  }

注册订阅registerSubscriber

(这个应该放在第三节Subscription层的订阅任务之后介绍,可以先略过本小节去浏览第三节的思路)

本小节,真正把一个本地 Subscription 向 ROS Master 注册,并建立(或准备建立)与现有 publisher 的连接,同时检查是否有“本进程内的本地 publisher 可直接做内存级(intra-process)连接”。 registerSubscriber() 做三件事:

  1. 向 ROS Master 发送 XML-RPC 请求:registerSubscriber,告知“我(这个节点)要订阅某个 topic,类型是什么,我的 XML-RPC URI 是什么”;
  2. 从 Master 返回的 publisher URI 列表中整理出当前已存在的远程 publishers,并调用 s->pubUpdate(pub_uris) 触发后续连接建立流程;
  3. 检测是否“本进程内自己已经 advertise 了同名 topic”,若有且类型兼容,则建立一条本地(零拷贝/共享队列层面的)连接:s->addLocalConnection(pub)。

其他校验的代码我不贴了,只贴下面的核心代码

  XmlRpcValue args, result, payload;
  args[0] = this_node::getName();
  args[1] = s->getName();
  args[2] = datatype;
  args[3] = xmlrpc_manager_->getServerURI();

  if (!master::execute("registerSubscriber", args, result, payload, true))
  {
    return false;
  }

  vector<string> pub_uris;
  for (int i = 0; i < payload.size(); i++)
  {
    if (payload[i] != xmlrpc_manager_->getServerURI())
    {
      pub_uris.push_back(string(payload[i]));
    }
  }
  // ...
  // ...
  s->pubUpdate(pub_uris);
  if (self_subscribed)
  {
    s->addLocalConnection(pub);
  }

3. Subscription层的订阅任务

单topic多callback设计思想

addSubCallback判断逻辑中实际上还有

if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
{
    return false;
}

这里先补充下设计的思想

用户层调用

ros::NodeHandle nh;

void cb1(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO_STREAM("cb1: " << msg->data);
}

void cb2(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO_STREAM("cb2: " << msg->data);
}

int main(int argc, char** argv) {
  ros::init(argc, argv, "multi_cb_demo");
  ros::Subscriber s1 = nh.subscribe("/chatter", 10, cb1);
  ros::Subscriber s2 = nh.subscribe("/chatter", 50, cb2);  // 不同 queue_size
  ros::spin();
}

内部模型设计

TopicManager
  └─ Subscription (对应 /chatter + md5 + transport hints)
        ├─ CallbackEntry #1 (cb1, queue_size=10, callback_queue=A)
        └─ CallbackEntry #2 (cb2, queue_size=50, callback_queue=A)

Publisher 发来一条消息 ->
  Subscription 反序列化一次 ->
    遍历 callbacks_:
       根据各自 queue/policy 封装成调度单元 ->
       放入各自 CallbackQueue -> 最终执行

整体模型介绍

Topic: /chatter
          +----------------------+
Network   |   Subscription S     |  <-- 只建一次 (topic + md5 + transport)
Layer     |  - connection(s)     |
          +----------+-----------+
                     |
           +---------+-----------------------------+
           |         |                             |
      Callback A  Callback B                  Callback C
      (来自第一次) (第二次 subscribe)       (第三次 subscribe)

每个回调对应一个 Subscriber 句柄;全部销毁 -> S 被自动 drop

添加回调的函数设计

为同一个 topic 的既有 Subscription:

校验/固化类型(md5) 构造并登记一个新的回调条目(CallbackInfo) 给它配置独立的消息缓冲(SubscriptionQueue) 处理“刚订阅就要收到 latched 最近一条消息”的语义 准备统计与后续快速分发的缓存结构 它不做:与 master 通信、网络建链(这些在第一次订阅时完成),只做“复用 + 回调挂接”。

latched场景设计——锁存(可忽略)

这块我也没有理解的很到位,暂时做一下记录

在 ROS1 中,如果一个 Publisher 以 latched=true 方式创建(例如:

ros::Publisher pub = nh.advertise<std_msgs::String>("/status", 1, true);

那么它发布的“最后一条消息”会被缓存在连接层。任何后来订阅该 topic 的订阅者,一旦订阅成功,应该立即收到这条“最近一次”消息,而不必等下一次发布。这就实现了一种“状态型”语义(类似参数广播 / 初始状态同步)。

本函数(Subscription::addCallback)中这段代码的目的: 当你“追加一个新的回调”到已有的 Subscription 上时,如果已经有 latched publisher 的历史消息,就立即把它投递到这个新回调对应的队列里,让它也马上触发一次。

    if (!latched_messages_.empty())
    {
      boost::mutex::scoped_lock lock(publisher_links_mutex_);

      V_PublisherLink::iterator it = publisher_links_.begin();
      V_PublisherLink::iterator end = publisher_links_.end();
      for (; it != end;++it)
      {
        const PublisherLinkPtr& link = *it;
        if (link->isLatched())
        {
          M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
          if (des_it != latched_messages_.end())
          {
            const LatchInfo& latch_info = des_it->second;

            MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
            bool was_full = false;
            info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
            if (!was_full)
            {
              info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
            }
          }
        }
      }
    }

MessageDeserializer 的作用(延迟 + 复用)

它内部保存:

用户回调 helper(知道消息类型) 序列化的字节 header 执行阶段(在 spinner 线程上)再真正把序列化数据 decode 成具体 message 对象,此时可以:

结合“是否所有回调都是 const”来复用一个反序列化结果 避免在锁内做 CPU 重操作 允许某些“还没真正需要就不用解”的早期丢弃场景(如果队列满被 drop)

触发连接建立pubUpdate

这个函数是bool Subscription::pubUpdate(const V_string& new_pubs),是注册订阅registerSubscriber的实现逻辑,代码非常多。主要的功能是:

根据 Master(或其它来源)提供的”当前该 topic 所有 publisher 的 XML-RPC URI 列表”,计算增量变化(新增 / 删减),然后:

  • 对已消失的 publisher → drop 连接
  • 对新出现的 publisher → 发起连接协商(negotiateConnection) 保持 publisher_links_(已建立的连接)与”真实可用 publisher 集合”同步。

(1) 前置检查:防止关闭时修改

  boost::mutex::scoped_lock lock(shutdown_mutex_);

  if (shutting_down_ || dropped_)
  {
    return false;
  }

如果该 Subscription 正在或已经 shutdown / drop(例如所有回调都注销了),则拒绝更新,返回 false。 避免”析构中还在尝试建新连接”的资源泄漏或野指针。

(2) 调试日志

收集三类信息打印到 DEBUG 级日志:

  • new_pubs:这次通知的完整 URI 列表
  • publisher_links_:当前已建立的连接
  • pending_connections_:正在建立但尚未完成的连接(避免重复发起) 用于诊断:为什么某个 publisher 没连上、是否有重复、是否有挂起等。

(3) 差异计算

集合比较:找出”要删除的旧连接”和”要新增的连接”

  1. 找出需要删除的旧连接(subtractions)
V_string additions;             // 新增 URI
V_PublisherLink subtractions;   // 要删除的已有连接
V_PublisherLink to_add;         // (未使用? 可能历史遗留或其它分支)

  boost::mutex::scoped_lock lock(publisher_links_mutex_);
  for (V_PublisherLink::iterator spc = publisher_links_.begin();
       spc != publisher_links_.end(); ++spc)
  {
    bool found = false;
    for (V_string::const_iterator up_i = new_pubs.begin();
         !found && up_i != new_pubs.end(); ++up_i)
    {
      if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
      {
        found = true;
        break;
      }
    }

    if (!found)
      subtractions.push_back(*spc);
  }

遍历当前已有的 publisher_links_,看它在不在 new_pubs 里。 如果不在 → 说明该 publisher 已经 unadvertise / 下线,加入 subtractions 等待关闭。 urisEqual(a, b):ROS 定义的 URI 比较(可能会处理协议、主机名大小写等规范化)。

  1. 找出需要新增的连接(additions)
  for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
  {
    bool found = false;

    // 先在已建立的连接中找
    for (V_PublisherLink::iterator spc = publisher_links_.begin();
         !found && spc != publisher_links_.end(); ++spc)
    {
      if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
      {
        found = true;
        break;
      }
    }

    // 如果还没找到,再检查"正在建立连接"队列(pending_connections_)
    if (!found)
    {
      boost::mutex::scoped_lock lock(pending_connections_mutex_);
      for (S_PendingConnection::iterator it = pending_connections_.begin();
           it != pending_connections_.end(); ++it)
      {
        if (urisEqual(*up_i, (*it)->getRemoteURI()))
        {
          found = true;
          break;
        }
      }
    }

    if (!found)
      additions.push_back(*up_i);
  }
  • 遍历 new_pubs,看是否已经在”已建立连接”或”待建立连接”中。
  • 三种情况都没有 → 真正的新 URI,加入 additions。
  • 为什么还检查 pending_connections_?
    • 防止”Master 连续两次回调”或网络延迟导致”同一个 URI 被重复发起 negotiateConnection”。
    • 如果已经在 pending 队列里,说明第一次还没完成,不应该再发起第二次。

(4) 删除旧连接(drop)

for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
{
  const PublisherLinkPtr& link = *i;
  if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
  {
    ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
                     link->getCallerID().c_str(), name_.c_str(),
                     link->getPublisherXMLRPCURI().c_str());
    link->drop();
  }
  else
  {
    ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
  }
}
  • 遍历要删除的连接。
  • 特殊判断:如果该 link 的 URI 是”自己”(XMLRPCManager 的 serverURI),跳过(本地连接通过 addLocalConnection 建立,不由 pubUpdate 管理)。
  • 否则:调用 link->drop(),关闭底层 socket、清理资源、从 publisher_links_ 移除(由 drop 内部完成)。

为什么要跳过自己?

  • 本地(intra-process)连接在 registerSubscriber 中单独建立(addLocalConnection),也由单独路径移除(当 publication drop 时)。
  • 避免误 drop 本地优化路径。

(5) 新建连接(negotiateConnection)

for (V_string::iterator i = additions.begin(); i != additions.end(); ++i)
{
  if (XMLRPCManager::instance()->getServerURI() != *i)
  {
    retval &= negotiateConnection(*i);
  }
  else
  {
    ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(),
                     XMLRPCManager::instance()->getServerURI().c_str());
  }
}

negotiateConnection的逻辑下篇文章再说明,本篇文章内容过多了

4. 总结

NodeHandle::subscribe()
  -> TopicManager::subscribe(ops)
      -> addSubCallback(ops)  // 如果已有同名+类型 Subscription,则只加回调
      -> (若未复用) 创建 Subscription 对象
         -> registerSubscriber(subscription, datatype)
             -> master::execute("registerSubscriber", ...)
             -> s->pubUpdate(existing_publishers_from_master)
             -> 若本进程内存在同名 publisher: s->addLocalConnection(pub)