当前位置: 首页 > news >正文

ROS2系列 (8) : Python话题通信节点——发布者示例

ROS2系列 (8) : Python话题通信节点——发布者示例

话题通信是ROS2中节点间异步数据传输的核心方式。本文将通过“下载小说并每隔5秒发布一行”的实战案例,详细讲解Python节点中话题发布者、定时器、队列的结合使用,同时完整覆盖功能包创建、依赖配置、本地服务器搭建等流程,确保每个步骤可复现。

一、环境准备与文件结构

1.1 创建小说文本与目录结构

首先准备小说文件,按以下目录结构存放(以用户主目录为例):

~/ros2_ws/                  # 工作空间
└── novel_server/           # 存放小说文件的目录└── novel1.txt          # 小说内容文件

创建小说文件:

# 新建目录
mkdir -p ~/ros2_ws/novel_server
# 创建小说文件并写入内容
cat > ~/ros2_ws/novel_server/novel1.txt << EOF
第一章 初识ROS2
ROS2是一个灵活的机器人操作系统。
本章将介绍话题通信的基本使用。第二章 实战案例
通过Python节点发布小说内容,
是理解话题通信的好方法。
EOF

1.2 启动本地HTTP服务器

进入小说文件所在目录,启动Python内置服务器(端口8000):

cd ~/ros2_ws/novel_server
python3 -m http.server

终端显示Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...即启动成功,保持该终端运行。

二、创建功能包与配置依赖

2.1 创建Python功能包

在工作空间的src目录下创建功能包:

# 进入工作空间src目录
cd ~/ros2_ws/src
# 创建功能包,指定依赖
ros2 pkg create demo_python_topic --build-type ament_python \--dependencies rclpy example_interfaces requests \--license Apache-2.0
  • --dependencies:明确声明依赖(rclpy是ROS2 Python库,example_interfaces提供String消息,requests用于HTTP下载)。

2.2 配置package.xml

手动检查并补充依赖(确保package.xml包含以下内容):

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"><name>demo_python_topic</name><version>0.0.0</version><description>Python话题通信示例</description><maintainer email="your@email.com">your_name</maintainer><license>Apache-2.0</license><!-- 核心依赖声明 --><depend>rclpy</depend><depend>example_interfaces</depend><depend>requests</depend>  <!-- 新增requests依赖 --><buildtool_depend>ament_python</buildtool_depend><test_depend>ament_copyright</test_depend><test_depend>ament_flake8</test_depend><test_depend>ament_pep257</test_depend><test_depend>pytest</test_depend><export><build_type>ament_python</build_type></export>
</package>

2.3 配置setup.py(注册节点入口)

修改demo_python_topic/setup.py,在entry_points中添加节点入口:

from setuptools import setuppackage_name = 'demo_python_topic'setup(name=package_name,version='0.0.0',packages=[package_name],data_files=[('share/ament_index/resource_index/packages',['resource/' + package_name]),('share/' + package_name, ['package.xml']),],install_requires=['setuptools'],zip_safe=True,maintainer='your_name',maintainer_email='your@email.com',description='Python话题通信示例',license='Apache-2.0',tests_require=['pytest'],entry_points={'console_scripts': [# 注册节点命令:novel_pub -> 对应模块的main函数'novel_pub = demo_python_topic.novel_pub:main',],},
)

三、编写话题发布节点代码

demo_python_topic/demo_python_topic/目录下创建novel_pub.py

import rclpy
from rclpy.node import Node
import requests  # 用于HTTP下载
from example_interfaces.msg import String  # 消息类型
from queue import Queue  # 队列用于缓存小说内容class NovelPubNode(Node):def __init__(self, node_name):super().__init__(node_name)# 1. 初始化队列(缓存小说行)self.novels_queue_ = Queue()  # 2. 创建话题发布者:话题名`novel`,消息类型String,队列长度10self.novel_publisher_ = self.create_publisher(String, 'novel', 10)self.get_logger().info('话题发布者已创建')# 3. 创建定时器:每5秒执行一次回调函数self.timer_ = self.create_timer(5.0, self.timer_callback)  self.get_logger().info('定时器已启动(5秒间隔)')def download_novel(self, url):"""下载小说并按行存入队列"""try:response = requests.get(url, timeout=10)response.encoding = 'utf-8'  # 确保中文显示正常# 按行分割文本,过滤空行for line in response.text.splitlines():if line.strip():  # 跳过空行self.novels_queue_.put(line)self.get_logger().info(f'成功下载小说,共{self.novels_queue_.qsize()}行')except Exception as e:self.get_logger().error(f'下载失败:{str(e)}')def timer_callback(self):"""定时器回调:从队列取一行发布"""if not self.novels_queue_.empty():  # 队列非空时发布msg = String()msg.data = self.novels_queue_.get()  # 取出一行self.novel_publisher_.publish(msg)self.get_logger().info(f'发布:{msg.data}')else:self.get_logger().info('队列已空,无内容可发布')def main(args=None):rclpy.init(args=args)  # 初始化ROS2node = NovelPubNode('novel_pub_node')  # 创建节点# 下载小说(本地服务器地址)node.download_novel('http://localhost:8000/novel1.txt')rclpy.spin(node)  # 保持节点运行rclpy.shutdown()  # 关闭ROS2

四、代码核心模块解析

4.1 Python Queue队列操作

队列用于缓存下载的小说行,实现“下载-发布”解耦:

  • Queue():创建空队列。
  • queue.put(item):将数据放入队列(线程安全)。
  • queue.get():从队列取出数据(队列为空时阻塞)。
  • queue.empty():判断队列是否为空(替代qsize()更安全)。

4.2 ROS2定时器(Timer)

create_timer(period, callback)实现周期性任务:

  • period=5.0:每5秒执行一次timer_callback
  • 优势:无需手动编写循环,由ROS2事件循环管理,更符合节点运行逻辑。

4.3 话题发布者(Publisher)

  • create_publisher(String, 'novel', 10):创建发布者,指定消息类型为String,话题名为novel,QoS(消息队列长度)为10。
  • publish(msg):将String类型消息发布到话题,订阅者可接收。

五、构建与运行节点

5.1 构建功能包

回到工作空间根目录,执行构建:

cd ~/ros2_ws
colcon build --packages-select demo_python_topic

构建成功会显示Finished <<< demo_python_topic

5.2 运行节点

打开新终端,加载环境变量并启动节点:

cd ~/ros2_ws
source install/setup.bash
ros2 run demo_python_topic novel_pub

5.3 验证话题通信

再打开一个终端,订阅novel话题查看内容:

source install/setup.bash
ros2 topic echo /novel

终端会每隔5秒显示一行小说内容,与发布者输出一致。

六、常见问题解决

  1. 下载失败:检查本地服务器是否启动、小说文件路径是否正确、URL是否为http://localhost:8000/novel1.txt
  2. 节点找不到:确保setup.pyentry_points配置正确,且已执行colcon buildsource
  3. 依赖缺失:若提示No module named 'requests',需在setup.pyinstall_requires中添加requests,并重新构建。

总结

本文通过完整流程实现了“下载小说-定时发布”的话题通信案例,涵盖:

  • 本地服务器搭建与小说文件准备;
  • 功能包依赖配置(package.xmlsetup.py);
  • 队列、定时器、发布者的协同使用。

掌握这些内容后,你可以灵活应用话题通信实现传感器数据发布、日志广播等各类异步传输场景。

http://www.dtcms.com/a/546847.html

相关文章:

  • 公司做网站找谁wordpress显示文章时分秒代码
  • dp与px转换原理
  • 网站开发推广方案策划书网站seo规范
  • 那个网站做二手车好沙洋网站定制
  • 权限管理混乱会造成哪些安全隐患
  • 官方网站建设必要性网站建设维护与管理实训总结
  • 《Zookeeper 常用命令手册:客户端操作、集群管理与监控指令》
  • 浙江电商网站建设销售百度投流运营
  • 构建新能源智能调度大脑:7分支并行算法架构的工程实践
  • 教育网站建设的策划1122t
  • 仓颉语言布局系统深度解析:从算法到自定义组件实践
  • 社区网站怎么建做搜索的网站有哪些
  • 广西建设厅官网站张家口建设厅网站
  • 网站开发协议模版开源课程 视频网站模板
  • 苏州要服务网站建设平台网站开发公司组织架构
  • dedecms网站搬家进行网站建设
  • 深入 Rust 之心:Serde 如何实现真正的“零成本抽象”
  • 智能建站网站网站开发的外文翻译
  • 做信息采集的网站邯郸 网站建设
  • 肝脏肿瘤MRI图像分类数据集
  • NX603NX604美光SSD固态NX605NX606
  • 网站建设为主题调研材料网站开发与维护是干什么的
  • 盐城做企业网站的价格wordpress 投票插件
  • No酒类网站建设2019建设银行招聘网站
  • 从零快速学习RNN:循环神经网络完全指南
  • 购买建立网站费怎么做会计凭证wordpress 快速回复
  • 用cms做网站的缺点wordpress阅读设置
  • 求职网站开发开题报告flash网站开发工具
  • wap手机网站开发小规模企业所得税怎么算
  • 海淀重庆网站建设企业网站推广 知乎