ROS2系列 (8) : Python话题通信节点——发布者示例
ROS2系列 (8) : Python话题通信节点——发布者示例
话题通信是ROS2中节点间异步数据传输的核心方式。本文将通过“下载小说并每隔5秒发布一行”的实战案例,详细讲解Python节点中话题发布者、定时器、队列的结合使用,同时完整覆盖功能包创建、依赖配置、本地服务器搭建等流程,确保每个步骤可复现。
一、环境准备与文件结构
1.1 创建小说文本与目录结构
首先准备小说文件,按以下目录结构存放(以用户主目录为例):
~/ros2_ws/ # 工作空间
└── novel_server/ # 存放小说文件的目录└── novel1.txt # 小说内容文件
创建小说文件:
# 新建目录
mkdir -p ~/ros2_ws/novel_server
# 创建小说文件并写入内容
cat > ~/ros2_ws/novel_server/novel1.txt << EOF
第一章 初识ROS2
ROS2是一个灵活的机器人操作系统。
本章将介绍话题通信的基本使用。第二章 实战案例
通过Python节点发布小说内容,
是理解话题通信的好方法。
EOF
1.2 启动本地HTTP服务器
进入小说文件所在目录,启动Python内置服务器(端口8000):
cd ~/ros2_ws/novel_server
python3 -m http.server
终端显示Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...即启动成功,保持该终端运行。
二、创建功能包与配置依赖
2.1 创建Python功能包
在工作空间的src目录下创建功能包:
# 进入工作空间src目录
cd ~/ros2_ws/src
# 创建功能包,指定依赖
ros2 pkg create demo_python_topic --build-type ament_python \--dependencies rclpy example_interfaces requests \--license Apache-2.0
--dependencies:明确声明依赖(rclpy是ROS2 Python库,example_interfaces提供String消息,requests用于HTTP下载)。
2.2 配置package.xml
手动检查并补充依赖(确保package.xml包含以下内容):
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"><name>demo_python_topic</name><version>0.0.0</version><description>Python话题通信示例</description><maintainer email="your@email.com">your_name</maintainer><license>Apache-2.0</license><!-- 核心依赖声明 --><depend>rclpy</depend><depend>example_interfaces</depend><depend>requests</depend> <!-- 新增requests依赖 --><buildtool_depend>ament_python</buildtool_depend><test_depend>ament_copyright</test_depend><test_depend>ament_flake8</test_depend><test_depend>ament_pep257</test_depend><test_depend>pytest</test_depend><export><build_type>ament_python</build_type></export>
</package>
2.3 配置setup.py(注册节点入口)
修改demo_python_topic/setup.py,在entry_points中添加节点入口:
from setuptools import setuppackage_name = 'demo_python_topic'setup(name=package_name,version='0.0.0',packages=[package_name],data_files=[('share/ament_index/resource_index/packages',['resource/' + package_name]),('share/' + package_name, ['package.xml']),],install_requires=['setuptools'],zip_safe=True,maintainer='your_name',maintainer_email='your@email.com',description='Python话题通信示例',license='Apache-2.0',tests_require=['pytest'],entry_points={'console_scripts': [# 注册节点命令:novel_pub -> 对应模块的main函数'novel_pub = demo_python_topic.novel_pub:main',],},
)
三、编写话题发布节点代码
在demo_python_topic/demo_python_topic/目录下创建novel_pub.py:
import rclpy
from rclpy.node import Node
import requests # 用于HTTP下载
from example_interfaces.msg import String # 消息类型
from queue import Queue # 队列用于缓存小说内容class NovelPubNode(Node):def __init__(self, node_name):super().__init__(node_name)# 1. 初始化队列(缓存小说行)self.novels_queue_ = Queue() # 2. 创建话题发布者:话题名`novel`,消息类型String,队列长度10self.novel_publisher_ = self.create_publisher(String, 'novel', 10)self.get_logger().info('话题发布者已创建')# 3. 创建定时器:每5秒执行一次回调函数self.timer_ = self.create_timer(5.0, self.timer_callback) self.get_logger().info('定时器已启动(5秒间隔)')def download_novel(self, url):"""下载小说并按行存入队列"""try:response = requests.get(url, timeout=10)response.encoding = 'utf-8' # 确保中文显示正常# 按行分割文本,过滤空行for line in response.text.splitlines():if line.strip(): # 跳过空行self.novels_queue_.put(line)self.get_logger().info(f'成功下载小说,共{self.novels_queue_.qsize()}行')except Exception as e:self.get_logger().error(f'下载失败:{str(e)}')def timer_callback(self):"""定时器回调:从队列取一行发布"""if not self.novels_queue_.empty(): # 队列非空时发布msg = String()msg.data = self.novels_queue_.get() # 取出一行self.novel_publisher_.publish(msg)self.get_logger().info(f'发布:{msg.data}')else:self.get_logger().info('队列已空,无内容可发布')def main(args=None):rclpy.init(args=args) # 初始化ROS2node = NovelPubNode('novel_pub_node') # 创建节点# 下载小说(本地服务器地址)node.download_novel('http://localhost:8000/novel1.txt')rclpy.spin(node) # 保持节点运行rclpy.shutdown() # 关闭ROS2
四、代码核心模块解析
4.1 Python Queue队列操作
队列用于缓存下载的小说行,实现“下载-发布”解耦:
Queue():创建空队列。queue.put(item):将数据放入队列(线程安全)。queue.get():从队列取出数据(队列为空时阻塞)。queue.empty():判断队列是否为空(替代qsize()更安全)。
4.2 ROS2定时器(Timer)
create_timer(period, callback)实现周期性任务:
period=5.0:每5秒执行一次timer_callback。- 优势:无需手动编写循环,由ROS2事件循环管理,更符合节点运行逻辑。
4.3 话题发布者(Publisher)
create_publisher(String, 'novel', 10):创建发布者,指定消息类型为String,话题名为novel,QoS(消息队列长度)为10。publish(msg):将String类型消息发布到话题,订阅者可接收。
五、构建与运行节点
5.1 构建功能包
回到工作空间根目录,执行构建:
cd ~/ros2_ws
colcon build --packages-select demo_python_topic
构建成功会显示Finished <<< demo_python_topic。
5.2 运行节点
打开新终端,加载环境变量并启动节点:
cd ~/ros2_ws
source install/setup.bash
ros2 run demo_python_topic novel_pub
5.3 验证话题通信
再打开一个终端,订阅novel话题查看内容:
source install/setup.bash
ros2 topic echo /novel
终端会每隔5秒显示一行小说内容,与发布者输出一致。
六、常见问题解决
- 下载失败:检查本地服务器是否启动、小说文件路径是否正确、URL是否为
http://localhost:8000/novel1.txt。 - 节点找不到:确保
setup.py中entry_points配置正确,且已执行colcon build和source。 - 依赖缺失:若提示
No module named 'requests',需在setup.py的install_requires中添加requests,并重新构建。
总结
本文通过完整流程实现了“下载小说-定时发布”的话题通信案例,涵盖:
- 本地服务器搭建与小说文件准备;
- 功能包依赖配置(
package.xml与setup.py); - 队列、定时器、发布者的协同使用。
掌握这些内容后,你可以灵活应用话题通信实现传感器数据发布、日志广播等各类异步传输场景。
