ros2 消息订阅与发布示例 python
以下是一个完整的ROS2 Python消息订阅与发布示例,包含节点创建、发布者、订阅者及定时器回调的详细实现:
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass MinimalNode(Node):def __init__(self):super().__init__('minimal_node')self.count_ = 0# 创建发布者(话题名:message,队列大小10)self.publisher_ = self.create_publisher(String, 'message', 10)# 创建订阅者(话题名:chatter,队列大小10)self.subscription_ = self.create_subscription(String,'chatter',self.topic_callback,10)# 创建定时器(周期2秒)self.timer_ = self.create_timer(2.0, self.timer_callback)self.get_logger().info("节点已启动,等待消息...")def timer_callback(self):# 创建并发布消息msg = String()msg.data = f"Hello ROS2! Count: {self.count_}"self.count_ += 1self.publisher_.publish(msg)self.get_logger().info(f'发布消息: "{msg.data}"')def topic_callback(self, msg):# 处理接收到的订阅消息self.get_logger().info(f'收到消息: "{msg.data}"')def main():rclpy.init()node = MinimalNode()try:rclpy.spin(node) # 保持节点运行except KeyboardInterrupt:node.get_logger().info("节点已终止")finally:node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
代码解析
-
节点初始化:
- 继承
rclpy.node.Node
创建MinimalNode
类 - 构造函数中初始化节点名称为
minimal_node
- 计数器
count_
用于生成递增消息
- 继承
-
发布者设置:
- 使用
create_publisher
创建发布者 - 话题名称:
message
- 消息类型:
std_msgs/String
- 队列大小:10(QoS配置)
- 使用
-
订阅者设置:
- 使用
create_subscription
创建订阅者 - 话题名称:
chatter
- 绑定回调函数
topic_callback
- 消息类型:
std_msgs/String
- 使用
-
定时器机制:
- 每2秒触发一次
timer_callback
- 发布包含计数器的字符串消息
- 每2秒触发一次
-
回调函数:
timer_callback
:构造并发布消息topic_callback
:处理接收到的订阅消息
配置与运行
-
创建功能包(若尚未创建):
ros2 pkg create --build-type ament_python py_example
-
代码文件:
- 将代码保存为
py_example/py_example/minimal_node.py
- 将代码保存为
-
setup.py配置:
setup(name="py_example",version="0.1.0",packages=["py_example"],install_requires=["rclpy", "std_msgs"],entry_points={'console_scripts': ['minimal_node = py_example.minimal_node:main'],}, )
-
编译与运行:
colcon build --packages-select py_example source install/setup.bash ros2 run py_example minimal_node
测试验证
-
终端1:运行节点
ros2 run py_example minimal_node
-
终端2:发布测试消息
ros2 topic pub /chatter std_msgs/String "data: 'Test Message'"
-
终端3:查看发布的话题
ros2 topic echo /message
关键特性说明
-
QoS配置:
- 默认使用
QoSProfile(depth=10)
- 可通过
create_publisher(qos_profile=...)
自定义配置
- 默认使用
-
日志系统:
- 使用
get_logger().info()
输出日志 - 支持不同级别:DEBUG, INFO, WARN, ERROR
- 使用
-
生命周期管理:
- 自动处理节点初始化、自旋和关闭
- 支持优雅退出(Ctrl+C)
-
消息类型:
- 使用标准消息类型
std_msgs/String
- 可扩展自定义消息类型(需额外配置)
- 使用标准消息类型
此示例展示了ROS2 Python开发的核心模式,包括节点创建、话题通信、定时器使用及日志处理,可直接用于基础应用开发。