ROS

ROS订阅流程分析1

ROS开发

Posted by Jiayang Hu on September 20, 2025

ROS订阅调用的语句逻辑

1. 从显式调用层面理解订阅逻辑

经典的cpp-ros调用是如下所示的

subscriber = nh.subscribe("chatter", 10, &MyNode::cb, this);

而不是显示的调用下面的方式

subscriber = nh.subscribe<std_msgs::String, MyNode>("chatter", 10, &MyNode::cb, this);

编译器就会自动挑选合适的重载并推导出模板参数,不需要手写 <M, T>,具体原理在于(T::*fp)(M)

template<class M, class T>
Subscriber subscribe(const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj, const TransportHints& transport_hints = TransportHints())
{
  SubscribeOptions ops;
  ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, boost::placeholders::_1));
  ops.transport_hints = transport_hints;
  return subscribe(ops);
}

这个东西会根据&MyNode::cb推导出T就是MyNode

假如说回调函数长这样

void MyNode::cb(const std_msgs::String::ConstPtr& msg) {
    ROS_INFO("recv: %s", msg->data.c_str());
}

那么M就是 M = std_msgs::String::ConstPtr

(更精确地说,ROS 实际里常用 const boost::shared_ptr<const std_msgs::String>& 之类的签名,源码里有一层模板包装,可以查看第三节内容)

2. 订阅底层通用模板介绍

通用模板如下

Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
{
  ops.topic = resolveName(ops.topic);
  if (ops.callback_queue == 0)
  {
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }

  if (TopicManager::instance()->subscribe(ops))
  {
    Subscriber sub(ops.topic, *this, ops.helper);

    {
      boost::mutex::scoped_lock lock(collection_->mutex_);
      collection_->subs_.push_back(sub.impl_);
    }

    return sub;
  }

  return Subscriber();
}

  • resolveName函数的作用:

将用户写的相对名称(可能包含 ~、相对命名空间)解析成全限定 ROS 名称。 这样内部统一使用 fully qualified name,避免命名冲突。resolveName 会参考:

  • 当前 NodeHandle 的 namespace
  • remapping 规则
  • private (~) / relative / global (/) 名称规范

3. SubscribeOptions介绍

这个是一个类,但是是用struct的 具体函数是

  • initByFullCallbackType用于确定初始化的回调函数类型
  template<class P>
  void initByFullCallbackType(const std::string& _topic, uint32_t _queue_size,
       const boost::function<void (P)>& _callback,
       const boost::function<boost::shared_ptr<typename ParameterAdapter<P>::Message>(void)>& factory_fn = DefaultMessageCreator<typename ParameterAdapter<P>::Message>())
  {
    typedef typename ParameterAdapter<P>::Message MessageType;
    topic = _topic;
    queue_size = _queue_size;
    md5sum = message_traits::md5sum<MessageType>();
    datatype = message_traits::datatype<MessageType>();
    helper = boost::make_shared<SubscriptionCallbackHelperT<P> >(_callback, factory_fn);
  }

理解下面的代码逻辑

factory_fn = DefaultMessageCreator<typename ParameterAdapter<P>::Message>
  1. ParameterAdapter的作用是去引用、去const
template<typename M>
struct ParameterAdapter
{
  typedef typename boost::remove_reference<typename boost::remove_const<M>::type>::type Message;
  typedef ros::MessageEvent<Message const> Event;
  typedef M Parameter;
  static const bool is_const = true;

  static Parameter getParameter(const Event& event)
  {
    return *event.getMessage();
  }
};
  1. DefaultMessageCreator返回一个仿函数,类型是boost::function<…>
template<typename M>
struct DefaultMessageCreator
{
  boost::shared_ptr<M> operator()()
  {
    return boost::make_shared<M>();
  }
};

4. helper介绍

helper 是一个多态包装指针(SubscriptionCallbackHelperPtr),指向具体的 SubscriptionCallbackHelperT<P> 实例。它的职责是“把网络层收到的序列化数据变成用户回调可直接使用的参数并调用用户函数”

SubscriptionCallbackHelperT(const Callback& callback, 
		      const CreateFunction& create = DefaultMessageCreator<NonConstType>())
  : callback_(callback)
  , create_(create)
{ }

在helper中最值得说明的是反序列化,使用了模板方法的递归调用

  virtual VoidConstPtr deserialize(const SubscriptionCallbackHelperDeserializeParams& params)
  {
    namespace ser = serialization;

    NonConstTypePtr msg = create_();

    if (!msg)
    {
      ROS_DEBUG("Allocation failed for message of type [%s]", getTypeInfo().name());
      return VoidConstPtr();
    }

    ser::PreDeserializeParams<NonConstType> predes_params;
    predes_params.message = msg;
    predes_params.connection_header = params.connection_header;
    ser::PreDeserialize<NonConstType>::notify(predes_params);

    ser::IStream stream(params.buffer, params.length);
    ser::deserialize(stream, *msg);

    return VoidConstPtr(msg);
  }

而这里的msg就是最开始的回调函数中的参数类型M,本例中也就是std_msgs::String::ConstPtr

错了,不准确。确切的讲是,

反序列化阶段需要“可写”对象,所以用的是 boost::shared_ptr<std_msgs::String>(非 const)。 返回给上层时会包成 VoidConstPtr(即 boost::shared_ptr)。 在 helper 调用用户回调前,通过 ParameterAdapter 把它“再解释”为 boost::shared_ptr<const std_msgs::String>(或引用等形式)——从此对用户只读。

ser::deserialize(stream, *msg); 这个最终会走到serialization.h的SFINAE版本各种特化版本中(这边就是会走到字符串的版本)