当前位置: 首页 > news >正文

【ROS2学习笔记】话题通信篇:python话题订阅与发布

前言

本系列博文是本人的学习笔记,自用为主,不是教程,学习请移步其他大佬的相关教程。主要学习途径为@鱼香ROS大佬的教程,欢迎各位大佬交流学习,若有错误,轻喷。

一、核心概念(零基础先了解)

  • 话题(Topic):ROS2 中节点间 “发布 - 订阅” 模式的通信方式。一个节点发布消息到话题,其他节点订阅该话题就能收到消息。
  • 节点(Node):ROS2 的最小执行单元,负责具体功能(如发布小说、订阅并朗读小说)。
  • 消息(Message):话题中传递的数据格式,这里用example_interfaces包的String类型(存字符串)。

二、步骤 1:创建工作空间与功能包

要先搭建 ROS2 的工作空间和功能包,才能编写节点代码。

1. 创建目录结构
  • 在主目录下新建chapt3文件夹,用 VS Code 打开它。
  • chapt3内创建工作空间文件夹topic_ws
  • topic_ws内创建src文件夹(功能包要放在src里)。
2. 创建功能包

打开终端,进入topic_ws/src目录,执行命令:

ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
  • --build-type ament_python:指定功能包是 Python 类型(ROS2 推荐用ament构建)。
  • --dependencies rclpy example_interfaces:添加依赖(rclpy是 ROS2 Python 核心库;example_interfaces提供String等基础消息类型)。
  • 执行后,src下会生成demo_python_topic功能包,且package.xml会自动包含这两个依赖。

三、步骤 2:编写 “小说发布” 节点(发布者)

功能:从网络下载小说,按行发布到/novel话题。

1. 创建代码文件

src/demo_python_topic/demo_python_topic/路径下,新建novel_pub_node.py

2. 代码逐行解析
# 1. 导入必要的库
import rclpy  # ROS2 Python核心库
from rclpy.node import Node  # ROS2 节点基类
import requests  # 用于HTTP下载小说(需要联网或本地服务器)
from example_interfaces.msg import String  # 消息类型:字符串
from queue import Queue  # 队列,用于缓存小说行(避免下载和发布速度不匹配)# 2. 定义“小说发布节点”类,继承自Node
class NovelPubNode(Node):def __init__(self, node_name):super().__init__(node_name)  # 初始化父类(Node)self.novels_queue_ = Queue()  # 创建队列,存放小说内容# 创建“话题发布者”:参数分别是【消息类型、话题名、历史消息长度】self.novel_publisher_ = self.create_publisher(String, 'novel', 10)# 创建“定时器”:每5秒执行一次timer_callback函数self.timer_ = self.create_timer(5, self.timer_callback)# 3. 下载小说的函数def download_novel(self, url):response = requests.get(url)  # 发送HTTP请求下载小说response.encoding = 'utf-8'  # 设置编码为UTF-8(避免中文乱码)self.get_logger().info(f'下载完成: {url}')  # 打印日志# 按行分割小说内容,存入队列for line in response.text.splitlines():self.novels_queue_.put(line)# 4. 定时器回调函数:定时从队列取内容并发布def timer_callback(self):if self.novels_queue_.qsize() > 0:  # 队列有内容时msg = String()  # 实例化String类型的消息msg.data = self.novels_queue_.get()  # 从队列取一行小说self.novel_publisher_.publish(msg)  # 发布消息到“novel”话题self.get_logger().info(f'发布了一行小说: {msg.data}')# 5. 主函数:启动节点的入口
def main():rclpy.init()  # 初始化rclpy(必须第一步)node = NovelPubNode('novel_pub')  # 实例化“小说发布节点”,命名为novel_pub# 下载小说(示例地址:本地服务器的小说文件,需提前启动服务器)node.download_novel('http://localhost:8000/novel1.txt')rclpy.spin(node)  # 让节点保持运行,处理回调(如定时器、发布逻辑)rclpy.shutdown()  # 关闭rclpy(程序结束时执行)# 6. 执行主函数
if __name__ == '__main__':main()
3. 运行发布节点
  • 先启动本地 Python 服务器(参考教材 2.5.3 节,提供novel1.txt小说文件)。
  • 进入topic_ws工作空间,编译功能包:h
    colcon build
    
  • 加载环境变量(每次新开终端或编译后都要执行):
    source install/setup.bash
    
  • 运行节点:
    ros2 run demo_python_topic novel_pub_node
    
  • 验证发布:打开新终端,用以下命令查看:
    • 查看话题列表:ros2 topic list -v(能看到/novel话题)。
    • 查看话题内容:ros2 topic echo /novel(能看到发布的小说行)。

四、步骤 3:编写 “订阅小说并朗读” 节点(订阅者 + 语音合成)

功能:订阅/novel话题,将收到的小说内容合成语音朗读。

1. 安装语音合成依赖

打开终端,依次执行:

sudo apt install python3-pip -y  # 安装Python3的pip包管理工具
sudo apt install espeak-ng -y   # 安装开源语音合成引擎
pip3 install espeakng          # 安装Python版的espeakng库(代码中调用)
2. 创建代码文件

src/demo_python_topic/demo_python_topic/路径下,新建novel_sub_node.py

3. 代码逐行解析
# 1. 导入必要的库
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String  # 同发布端的消息类型
import threading  # 多线程:避免朗读阻塞节点主循环
from queue import Queue  # 队列:缓存收到的小说内容(朗读比接收慢,需缓存)
import time
import espeakng  # 语音合成库# 2. 定义“小说订阅+朗读节点”类,继承自Node
class NovelSubNode(Node):def __init__(self, node_name):super().__init__(node_name)  # 初始化父类(Node)self.novels_queue_ = Queue()  # 创建队列,存放订阅到的小说内容# 创建“话题订阅者”:参数分别是【消息类型、话题名、回调函数、历史消息长度】self.novel_subscriber_ = self.create_subscription(String, 'novel', self.novel_callback, 10)# 创建“朗读线程”:用多线程避免朗读阻塞节点主循环self.speech_thread_ = threading.Thread(target=self.speak_thread)self.speech_thread_.start()  # 启动朗读线程# 3. 订阅回调函数:收到消息后存入队列def novel_callback(self, msg):self.novels_queue_.put(msg.data)  # 将收到的小说行放入队列# 4. 朗读线程的函数:循环从队列取内容并朗读def speak_thread(self):speaker = espeakng.Speaker()  # 创建语音合成对象speaker.voice = 'zh'  # 设置为中文语音(如需英文可设为'en')# 只要ROS2运行正常(rclpy.ok()为True),就持续朗读while rclpy.ok():if self.novels_queue_.qsize() > 0:  # 队列有内容时text = self.novels_queue_.get()  # 从队列取一行小说self.get_logger().info(f'正在朗读 {text}')  # 打印日志speaker.say(text)  # 调用语音合成朗读speaker.wait()  # 等待朗读完成else:time.sleep(1)  # 队列空时,休眠1秒(减少CPU占用)# 5. 主函数:启动节点的入口
def main(args=None):rclpy.init(args=args)  # 初始化rclpynode = NovelSubNode('novel_read')  # 实例化“小说朗读节点”,命名为novel_readrclpy.spin(node)  # 启动节点主循环(处理订阅回调等)rclpy.shutdown()  # 关闭rclpy# 6. 执行主函数
if __name__ == '__main__':main()
4. 运行订阅节点
  • 确保发布节点已运行(或用ros2 topic pub手动发布内容)。
  • 编译工作空间(若修改了代码):
    colcon build
    
  • 加载环境变量:
    source install/setup.bash
    
  • 运行节点:
    ros2 run demo_python_topic novel_sub_node
    
  • 此时会听到小说内容的朗读,终端也会输出 “正在朗读 xxx”。

五、总结(零基础重点记忆)

  1. 话题通信流程:发布者(发布小说)→ 话题(/novel)→ 订阅者(订阅并朗读)。
  2. 核心代码逻辑
    • 发布者:create_publisher(消息类型, 话题名, 历史长度) + 定时器 / 循环发布。
    • 订阅者:create_subscription(消息类型, 话题名, 回调函数, 历史长度) + 处理收到的消息。
  3. ROS2 节点基础:启动节点前要rclpy.init(),运行时要rclpy.spin(节点),结束时rclpy.shutdown()
  4. 拓展:结合第三方库(requests下载、espeakng语音合成),可实现更丰富的功能。

通过这个例子,能快速掌握 ROS2 Python 话题通信的核心逻辑,后续可拓展到自定义消息、更复杂的发布订阅场景~

http://www.dtcms.com/a/418195.html

相关文章:

  • 【碎片化学习】SpringBoot服务的启动过程
  • 儿童网站模板 html做百度快照要先有网站吗
  • Games101 第六章 Shading(着色)
  • 电子电气架构 --- 智能座舱域环境感知和人机交互系统
  • 数字营销网站主页优化制作网页动画的软件
  • CSS详篇
  • Memblock-3
  • 大数据毕业设计选题推荐-基于大数据的全国饮品门店数据可视化分析系统-Hadoop-Spark-数据可视化-BigData
  • 【后端开发】golang部分中间件介绍(任务调度/服务治理/数据库/缓存/服务通信/流量治理)
  • 建设一个自己的网站需要多少钱站长统计官方网站
  • 烟台装修公司网站建设注册公司流程和费用时间
  • java设计模式:工厂方法 + 建造者模式
  • 3、Lombok进阶功能实战:Builder模式、异常处理与资源管理高级用法
  • Linux 内核开发 的核心知识点
  • 【Linux系列】并发世界的基石:透彻理解 Linux 进程 — 理解操作系统
  • 小米17对比iPhone 17:2025年深度对比
  • 借助Aspose.Email,使用 Python 将 EML 转换为 MHTML
  • 免费做名片的网站专业网站开发设计
  • 微服务项目->在线oj系统(Java-Spring)-后台管理(1)
  • 怎么防止网站攻击做网站价格表
  • 1、Product-Service 服务设计规范文档
  • 网深科技NetInside网络流量异常分析报告
  • 基于TCP的简易端口扫描器
  • 大腾智能助力深圳市联合创造科技,实现精密触控屏行业数字化升级
  • MR+无人机-增强态势感知、更高效的现场检查和增强
  • 无人机遥控器使用与信号连接问题处理
  • 德克西尔抗振倾角仪,守护无人机云台稳定清晰
  • Mosquitto:MQTT Broker入门与分布式部署最佳实践
  • 无人机姿态控制技术详解
  • 做网站所需要的公司细责及条款微信小程序推广