【ROS2学习笔记】话题通信篇:python话题订阅与发布
前言
本系列博文是本人的学习笔记,自用为主,不是教程,学习请移步其他大佬的相关教程。主要学习途径为@鱼香ROS大佬的教程,欢迎各位大佬交流学习,若有错误,轻喷。
一、核心概念(零基础先了解)
- 话题(Topic):ROS2 中节点间 “发布 - 订阅” 模式的通信方式。一个节点发布消息到话题,其他节点订阅该话题就能收到消息。
- 节点(Node):ROS2 的最小执行单元,负责具体功能(如发布小说、订阅并朗读小说)。
- 消息(Message):话题中传递的数据格式,这里用
example_interfaces
包的String
类型(存字符串)。
二、步骤 1:创建工作空间与功能包
要先搭建 ROS2 的工作空间和功能包,才能编写节点代码。
1. 创建目录结构
- 在主目录下新建
chapt3
文件夹,用 VS Code 打开它。 - 在
chapt3
内创建工作空间文件夹topic_ws
。 - 在
topic_ws
内创建src
文件夹(功能包要放在src
里)。
2. 创建功能包
打开终端,进入topic_ws/src
目录,执行命令:
ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
--build-type ament_python
:指定功能包是 Python 类型(ROS2 推荐用ament
构建)。--dependencies rclpy example_interfaces
:添加依赖(rclpy
是 ROS2 Python 核心库;example_interfaces
提供String
等基础消息类型)。- 执行后,
src
下会生成demo_python_topic
功能包,且package.xml
会自动包含这两个依赖。
三、步骤 2:编写 “小说发布” 节点(发布者)
功能:从网络下载小说,按行发布到/novel
话题。
1. 创建代码文件
在src/demo_python_topic/demo_python_topic/
路径下,新建novel_pub_node.py
。
2. 代码逐行解析
# 1. 导入必要的库
import rclpy # ROS2 Python核心库
from rclpy.node import Node # ROS2 节点基类
import requests # 用于HTTP下载小说(需要联网或本地服务器)
from example_interfaces.msg import String # 消息类型:字符串
from queue import Queue # 队列,用于缓存小说行(避免下载和发布速度不匹配)# 2. 定义“小说发布节点”类,继承自Node
class NovelPubNode(Node):def __init__(self, node_name):super().__init__(node_name) # 初始化父类(Node)self.novels_queue_ = Queue() # 创建队列,存放小说内容# 创建“话题发布者”:参数分别是【消息类型、话题名、历史消息长度】self.novel_publisher_ = self.create_publisher(String, 'novel', 10)# 创建“定时器”:每5秒执行一次timer_callback函数self.timer_ = self.create_timer(5, self.timer_callback)# 3. 下载小说的函数def download_novel(self, url):response = requests.get(url) # 发送HTTP请求下载小说response.encoding = 'utf-8' # 设置编码为UTF-8(避免中文乱码)self.get_logger().info(f'下载完成: {url}') # 打印日志# 按行分割小说内容,存入队列for line in response.text.splitlines():self.novels_queue_.put(line)# 4. 定时器回调函数:定时从队列取内容并发布def timer_callback(self):if self.novels_queue_.qsize() > 0: # 队列有内容时msg = String() # 实例化String类型的消息msg.data = self.novels_queue_.get() # 从队列取一行小说self.novel_publisher_.publish(msg) # 发布消息到“novel”话题self.get_logger().info(f'发布了一行小说: {msg.data}')# 5. 主函数:启动节点的入口
def main():rclpy.init() # 初始化rclpy(必须第一步)node = NovelPubNode('novel_pub') # 实例化“小说发布节点”,命名为novel_pub# 下载小说(示例地址:本地服务器的小说文件,需提前启动服务器)node.download_novel('http://localhost:8000/novel1.txt')rclpy.spin(node) # 让节点保持运行,处理回调(如定时器、发布逻辑)rclpy.shutdown() # 关闭rclpy(程序结束时执行)# 6. 执行主函数
if __name__ == '__main__':main()
3. 运行发布节点
- 先启动本地 Python 服务器(参考教材 2.5.3 节,提供
novel1.txt
小说文件)。 - 进入
topic_ws
工作空间,编译功能包:hcolcon build
- 加载环境变量(每次新开终端或编译后都要执行):
source install/setup.bash
- 运行节点:
ros2 run demo_python_topic novel_pub_node
- 验证发布:打开新终端,用以下命令查看:
- 查看话题列表:
ros2 topic list -v
(能看到/novel
话题)。 - 查看话题内容:
ros2 topic echo /novel
(能看到发布的小说行)。
- 查看话题列表:
四、步骤 3:编写 “订阅小说并朗读” 节点(订阅者 + 语音合成)
功能:订阅/novel
话题,将收到的小说内容合成语音朗读。
1. 安装语音合成依赖
打开终端,依次执行:
sudo apt install python3-pip -y # 安装Python3的pip包管理工具
sudo apt install espeak-ng -y # 安装开源语音合成引擎
pip3 install espeakng # 安装Python版的espeakng库(代码中调用)
2. 创建代码文件
在src/demo_python_topic/demo_python_topic/
路径下,新建novel_sub_node.py
。
3. 代码逐行解析
# 1. 导入必要的库
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # 同发布端的消息类型
import threading # 多线程:避免朗读阻塞节点主循环
from queue import Queue # 队列:缓存收到的小说内容(朗读比接收慢,需缓存)
import time
import espeakng # 语音合成库# 2. 定义“小说订阅+朗读节点”类,继承自Node
class NovelSubNode(Node):def __init__(self, node_name):super().__init__(node_name) # 初始化父类(Node)self.novels_queue_ = Queue() # 创建队列,存放订阅到的小说内容# 创建“话题订阅者”:参数分别是【消息类型、话题名、回调函数、历史消息长度】self.novel_subscriber_ = self.create_subscription(String, 'novel', self.novel_callback, 10)# 创建“朗读线程”:用多线程避免朗读阻塞节点主循环self.speech_thread_ = threading.Thread(target=self.speak_thread)self.speech_thread_.start() # 启动朗读线程# 3. 订阅回调函数:收到消息后存入队列def novel_callback(self, msg):self.novels_queue_.put(msg.data) # 将收到的小说行放入队列# 4. 朗读线程的函数:循环从队列取内容并朗读def speak_thread(self):speaker = espeakng.Speaker() # 创建语音合成对象speaker.voice = 'zh' # 设置为中文语音(如需英文可设为'en')# 只要ROS2运行正常(rclpy.ok()为True),就持续朗读while rclpy.ok():if self.novels_queue_.qsize() > 0: # 队列有内容时text = self.novels_queue_.get() # 从队列取一行小说self.get_logger().info(f'正在朗读 {text}') # 打印日志speaker.say(text) # 调用语音合成朗读speaker.wait() # 等待朗读完成else:time.sleep(1) # 队列空时,休眠1秒(减少CPU占用)# 5. 主函数:启动节点的入口
def main(args=None):rclpy.init(args=args) # 初始化rclpynode = NovelSubNode('novel_read') # 实例化“小说朗读节点”,命名为novel_readrclpy.spin(node) # 启动节点主循环(处理订阅回调等)rclpy.shutdown() # 关闭rclpy# 6. 执行主函数
if __name__ == '__main__':main()
4. 运行订阅节点
- 确保发布节点已运行(或用
ros2 topic pub
手动发布内容)。 - 编译工作空间(若修改了代码):
colcon build
- 加载环境变量:
source install/setup.bash
- 运行节点:
ros2 run demo_python_topic novel_sub_node
- 此时会听到小说内容的朗读,终端也会输出 “正在朗读 xxx”。
五、总结(零基础重点记忆)
- 话题通信流程:发布者(发布小说)→ 话题(
/novel
)→ 订阅者(订阅并朗读)。 - 核心代码逻辑:
- 发布者:
create_publisher(消息类型, 话题名, 历史长度)
+ 定时器 / 循环发布。 - 订阅者:
create_subscription(消息类型, 话题名, 回调函数, 历史长度)
+ 处理收到的消息。
- 发布者:
- ROS2 节点基础:启动节点前要
rclpy.init()
,运行时要rclpy.spin(节点)
,结束时rclpy.shutdown()
。 - 拓展:结合第三方库(
requests
下载、espeakng
语音合成),可实现更丰富的功能。
通过这个例子,能快速掌握 ROS2 Python 话题通信的核心逻辑,后续可拓展到自定义消息、更复杂的发布订阅场景~