【ros2】ROS2话题(Topic)通信完全指南:订阅与发布详解
文章目录
- ROS2话题(Topic)通信完全指南:订阅与发布详解
-
- 一、话题通信核心概念
-
- 1. 基本原理
- 2. 核心特点
- 二、话题发布与订阅的通用步骤
- 三、Python实现:发布者与订阅者
-
- 1. 创建功能包
- 2. 实现发布者节点(发布字符串消息)
- 3. 实现订阅者节点(接收字符串消息)
- 4. 配置节点入口(setup.py)
- 四、C++实现:发布者与订阅者
-
- 1. 创建功能包
- 2. 实现发布者节点
- 3. 实现订阅者节点
- 4. 配置编译规则(CMakeLists.txt)
- 五、编译与运行验证
-
- 1. 编译功能包
- 2. 运行节点测试
- 六、话题相关高级知识
-
- 1. 消息类型与自定义消息
- 2. 话题工具与调试
- 3. 话题QoS(服务质量)设置
- 4. 话题重映射与命名空间
- 5. 话题通信的性能优化
- 七、常见问题与解决方案
- 八、总结
ROS2话题(Topic)通信完全指南:订阅与发布详解
话题(Topic)是ROS2中最常用的通信机制,基于发布-订阅(Publish-Subscribe)模式,适用于单向、异步、高频的数据传输(如传感器数据、控制指令等)。本文将详细介绍话题的核心概念、发布与订阅的实现步骤(含Python和C++示例)及相关高级知识。
一、话题通信核心概念
1. 基本原理
- 发布者(Publisher):向指定话题发送数据的节点。
- 订阅者(Subscriber):接收指定话题数据的节点。
- 消息(Message):话题传输的数据格式,由
.msg文件定义(如std_msgs/String、sensor_msgs/Image)。 - 话题名称:全局唯一的标识符(如
/camera/image、/cmd_vel),发布者和订阅者通过名称匹配通信。
2. 核心特点
- 多对多通信:一个话题可被多个发布者发布,也可被多个订阅者订阅。
- 异步通信:发布者和订阅者无需同步运行,数据通过ROS2底层缓存传递。
- 数据流向:单向传输(发布者→订阅者),适合传感器数据、状态监控等场景。
- 消息类型严格匹配:发布者和订阅者必须使用相同的消息类型,否则无法通信。
二、话题发布与订阅的通用步骤
- 确定消息类型:选择现有标准消息(如
std_msgs/String)或自定义消息(需先定义.msg文件)。 - 创建功能包:包含发布者和订阅者节点,依赖对应消息包。
- 实现发布者节点:创建发布者对象,周期性或按需发送消息。
- 实现订阅者节点:创建订阅者对象,定义消息接收后的处理回调函数。
- 编译与运行:编译功能包,启动节点验证通信。
三、Python实现:发布者与订阅者
1. 创建功能包
# 创建Python功能包,依赖rclpy(ROS2 Python库)和std_msgs(标准消息)
ros2 pkg create py_topic_demo --build-type ament_python --dependencies rclpy std_msgs
2. 实现发布者节点(发布字符串消息)
在py_topic_demo/py_topic_demo目录下创建talker.py:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String # 导入标准字符串消息类型class TalkerNode(Node):def __init__(self):super().__init__('python_talker') # 节点名称:python_talker# 创建发布者:参数(消息类型,话题名称,队列长度)self.publisher_ = self.create_publisher(String, # 消息类型'chatter', # 话题名称10 # 队列长度(缓存10条消息))# 创建定时器:每1秒触发一次回调(周期性发布消息)self.timer_ = self.create_timer(1.0, # 时间间隔(秒)self.timer_callback # 回调函数)self.count_ = 0 # 消息计数器self.get_logger().info("Python发布者已启动,话题:chatter")def timer_callback(self):# 构造消息内容msg = String()msg.data = f"Hello ROS2! 第 {self.count_} 条消息"self.count_ += 1# 发布消息self.publisher_.publish(msg)self.get_logger().info(f"发布: {msg.data}") # 打印日志def main(args=None):rclpy.init(args=args) # 初始化ROS2node = TalkerNode() # 创建节点实例rclpy.spin(node) # 运行节点(进入事件循环)node.destroy_node() # 退出时销毁节点rclpy.shutdown() # 关闭ROS2if __name__ == '__main__':main(