【ROS2学习笔记】DDS(数据分发服务)
前言
本系列博文是本人的学习笔记,自用为主,不是教程,学习请移步其他大佬的相关教程。前几篇学习资源来自鱼香ROS大佬的详细教程,适合深入学习,但对本人这样的初学者不算友好,后续笔记将以@古月居的ROS2入门21讲为主,侵权即删。
一、学习目标
- 理解 4 种通信模型的区别,明白为什么 ROS2 选择 DDS 作为底层通信
- 搞懂 DDS 的核心概念(数据为中心、标准性),知道它在 ROS2 中的作用
- 掌握 QoS(质量服务策略)的常用配置,能结合机器人场景选择合适策略
- 学会用 “命令行” 和 “代码” 配置 QoS,验证不同策略的通信效果
- 记住 ROS2 中 DDS 的架构逻辑,不用深究底层也能灵活使用
二、先搞懂:为什么需要 DDS?(从通信模型对比入手)
ROS2 选择 DDS,是因为它解决了其他通信模型的痛点。先通过 “生活类比” 理解 4 种常用通信模型的区别:
通信模型 | 工作方式(小白理解) | 优点 | 缺点 | 类比场景 |
---|---|---|---|---|
点对点模型 | 每个客户端直接连服务器(比如你直接给快递员打电话寄件) | 通信直接,延迟低 | 1. 客户端要知道服务器地址(地址变了就用不了);2. 节点多了连接混乱 | 打电话、一对一视频通话 |
Broker 模型 | 所有节点通过 “中间中转站” 通信(比如所有快递先到驿站,再取件) | 客户端不用记服务器地址 | 1. 中转站(Broker)是瓶颈(忙不过来会卡顿);2. 中转站坏了整个系统瘫痪 | ROS1 的 Master、快递驿站 |
广播模型 | 一个节点发消息,所有节点都能收到(比如小区广播通知) | 不用建连接,地址灵活 | 所有节点都收到无关消息(浪费资源),比如你收到邻居的快递通知 | 小区喇叭广播、微信群 @所有人 |
DDS 模型 | 基于 “数据总线”,节点只收自己关心的消息(比如旋转火锅,只拿自己想吃的) | 1. 只收有用消息(省资源);2. 无中心瓶颈;3. 支持高实时性 | 配置比前三种复杂(但 ROS2 简化了) | 旋转火锅、定制化新闻推送 |
核心结论:DDS 是 4 种模型中最适合机器人的 —— 机器人有大量节点(相机、电机、视觉),需要 “实时、不浪费资源、无中心故障点” 的通信,DDS 刚好满足。
三、什么是 DDS?
3.1 一句话定义:成熟的 “数据分发标准”
DDS 的全称是 Data Distribution Service(数据分发服务),不是 ROS2 发明的,而是:
- 2004 年由OMG 组织(对象管理组织,搞标准的权威机构)发布的通信标准;
- 最早用在美国海军,解决舰船复杂网络的软件兼容问题;
- 现在广泛用于航空、国防、自动驾驶、医疗(比如无人机、手术机器人)。
可以理解为:DDS 是通信的 “通用语言标准”,就像 4G/5G 是手机通信的标准,不同厂商(比如华为、高通)都能按这个标准做产品,互相兼容。
3.2 DDS 的核心特点(为什么适合机器人)
- 以数据为中心:节点只关心 “数据内容”(比如 “相机图像”“电机速度”),不关心 “谁发的”“发给谁”,收到自己需要的数据就处理;
- 高实时性:支持微秒级延迟,适合机器人的电机控制、传感器数据(比如无人机不能等 1 秒才收到指令);
- 高可靠性:可以配置 “数据不丢失”(比如重要的控制指令),也可以配置 “优先流畅性”(比如图像掉帧没关系);
- 无中心架构:没有像 ROS1 Master 那样的 “总开关”,某个节点坏了不影响其他节点(比如相机坏了,电机还能正常运行)。
3.3 补充:OMG 组织是什么?(不用记,了解就行)
OMG(Object Management Group)是 1989 年成立的国际标准组织,专门制定软件标准,除了 DDS,还有你可能听过的:
- UML(统一建模语言,画流程图的标准);
- SysML(系统建模语言,用于复杂设备设计);
- 简单说:OMG 是 “通信规则的制定者”,DDS 是它制定的 “机器人通信规则”。
四、DDS 在 ROS2 中的作用(架构简化版)
ROS2 把 DDS 作为 “底层通信骨架”,所有上层功能(话题、服务、动作)都基于 DDS 实现。不用看懂复杂架构图,记住 3 个核心点就行:
4.1 ROS2 的 DDS 架构:兼容多个 DDS 实现
DDS 是 “标准”,不是 “具体产品”,有很多厂商按 DDS 标准做了具体的通信工具(比如 FastDDS、CycloneDDS、RTI DDS),ROS2 的做法是:
- 不绑定某一个 DDS:让用户自己选(比如个人开发用免费的 FastDDS,工业场景用付费的 RTI DDS);
- 通过 Middleware(中间件)兼容:ROS2 做了一个 “统一接口层”,不管你用哪个 DDS,上层代码(比如
create_publisher
)都一样,不用改代码。
类比:Middleware 就像 “电源适配器”,不管你用华为还是苹果充电器(不同 DDS),只要适配器兼容,都能给电脑(ROS2 节点)供电。
4.2 ROS2 中 DDS 的关键概念:Domain(域)
之前学分布式时配置的ROS_DOMAIN_ID
,就来自 DDS 的 “Domain(域)” 概念:
- Domain 是数据分组:只有
ROS_DOMAIN_ID
相同的节点才能通信; - 作用:避免不同机器人的消息互相干扰(比如你的机器人和邻居的机器人都用 DDS,域不同就不会收到对方的消息)。
比如:你的树莓派和笔记本都设ROS_DOMAIN_ID=10
,就能通信;邻居设=20
,就和你没关系。
五、DDS 的核心:QoS(质量服务策略)
QoS 是 DDS 的 “灵魂”,全称 Quality of Service(质量服务策略),可以理解为:数据发送方和接收方的 “通信合约”—— 双方约定好 “怎么传数据”(比如要不要保证不丢、要不要保留历史数据),DDS 按约定执行。
ROS2 简化了 QoS 的配置,常用的策略只有 4 种,结合机器人场景讲清楚:
5.1 常用 QoS 策略(作用 + 场景 + 例子)
QoS 策略 | 作用(小白理解) | 常用配置选项 | 机器人场景例子 |
---|---|---|---|
RELIABILITY(可靠性) | 决定数据是否允许丢失 | 1. RELIABLE (可靠):保证数据不丢失(重发直到收到);2. BEST_EFFORT (尽力):优先流畅,允许丢失 | - 电机指令、机器人状态:用RELIABLE (不能丢,丢了会失控);- 相机图像、激光点云:用BEST_EFFORT (掉帧没关系,流畅更重要) |
HISTORY(历史记录) | 决定保留多少历史数据 | 1. KEEP_LAST (保留最新 N 条);2. KEEP_ALL (保留所有,慎用) | - 保留最新 10 条机器人位置:KEEP_LAST+depth=10 ;- 不需要历史数据:KEEP_LAST+depth=1 |
DURABILITY(持久性) | 决定晚启动的节点是否能收到历史数据 | 1. VOLATILE (不持久):晚启动的节点收不到之前的消息;2. TRANSIENT_LOCAL (本地持久):晚启动的节点能收到历史数据 | - 机器人参数节点:用TRANSIENT_LOCAL (新启动的监控节点能看到之前的参数);- 实时图像:用VOLATILE (晚启动的节点不用看之前的图像) |
DEADLINE(截止时间) | 决定数据必须在多久内送达(超时会报警) | 设置时间(比如 0.1 秒) | 无人机避障指令:设DEADLINE=0.1秒 (超时没收到指令,无人机自动悬停) |
关键规则:发布者和订阅者的 QoS 策略必须 “兼容” 才能通信,比如:
- 发布者用
BEST_EFFORT
,订阅者必须也用BEST_EFFORT
(用RELIABLE
收不到); - 发布者用
KEEP_LAST+depth=5
,订阅者用KEEP_LAST+depth=3
(兼容,取较小的 3)。
六、实战案例:QoS 的配置与验证
从 “命令行” 到 “代码”,逐步掌握 QoS 配置,所有步骤都有详细说明。
案例 1:命令行配置 QoS(快速验证)
用ros2 topic
命令直接配置 QoS,验证 “可靠性策略” 的兼容性:
步骤 1:启动发布者(用 BEST_EFFORT 策略)
打开终端 1,发布/chatter
话题,数据类型是Int32
(整数),QoS 设为best_effort
:
# 命令格式:ros2 topic pub <话题名> <数据类型> <数据> --qos-reliability <策略>
ros2 topic pub /chatter std_msgs/msg/Int32 "data: 42" --qos-reliability best_effort
# 预期输出:循环发布 "data: 42",每0.5秒一次
步骤 2:订阅者用 RELIABLE 策略(收不到数据)
打开终端 2,订阅/chatter
,QoS 设为reliable
:
# 命令格式:ros2 topic echo <话题名> --qos-reliability <策略>
ros2 topic echo /chatter --qos-reliability reliable
# 预期结果:终端2没有任何输出(策略不兼容,收不到数据)
步骤 3:订阅者改用 BEST_EFFORT 策略(能收到数据)
终端 2 按Ctrl+C
停止,重新执行命令,策略改为best_effort
:
ros2 topic echo /chatter --qos-reliability best_effort
# 预期结果:终端2不断输出 "data: 42"(策略兼容,通信成功)
步骤 4:查看话题的 QoS 信息(验证配置)
打开终端 3,用--verbose
查看/chatter
的 QoS 配置:
ros2 topic info /chatter --verbose
# 预期输出:显示发布者和订阅者的QoS策略,比如 "Reliability: BEST_EFFORT"
案例 2:代码配置 QoS(Hello World 话题)
在之前的 “Hello World” 话题代码中加入 QoS 配置,理解代码层面如何控制策略。
步骤 1:创建功能包
先创建存放 QoS 代码的功能包learning_qos
:
cd dev_ws/src
ros2 pkg create learning_qos --build-type ament_python --dependencies rclpy std_msgs
步骤 2:发布者代码(带 QoS 配置,逐行注释)
文件名:learning_qos/qos_helloworld_pub.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ROS2 QoS发布者示例:
- 发布"Hello World"话题
- 配置QoS策略:RELIABLE(可靠传输)、KEEP_LAST(保留最新1条)
"""# 1. 导入需要的库
import rclpy # ROS2 Python核心库
from rclpy.node import Node # ROS2节点类
from std_msgs.msg import String # 字符串消息类型(发布"Hello World"用)
# 导入QoS相关类:QoSProfile(QoS配置容器)、策略枚举
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy# 2. 定义发布者节点类
class PublisherNode(Node):def __init__(self, name):super().__init__(name) # 初始化节点,节点名=qos_helloworld_pub# 3. 配置QoS策略(核心部分)self.qos_profile = QoSProfile(# 可靠性策略:RELIABLE(保证数据不丢失),适合重要消息reliability=QoSReliabilityPolicy.RELIABLE,# 历史记录策略:KEEP_LAST(保留最新N条),避免占内存history=QoSHistoryPolicy.KEEP_LAST,# 保留最新1条数据(depth=1),如果需要保留更多,改数字(比如10)depth=1)# 4. 创建发布者:指定QoS策略self.pub = self.create_publisher(String, # 消息类型(字符串)"chatter", # 话题名(和订阅者一致)self.qos_profile # 传入QoS配置(关键!之前没QoS是用默认配置))# 5. 创建定时器:每0.5秒发布一次消息self.timer = self.create_timer(0.5, self.timer_callback)def timer_callback(self):"""定时器回调函数:生成并发布消息"""msg = String() # 创建字符串消息对象msg.data = "Hello World" # 消息内容self.pub.publish(msg) # 发布消息# 打印日志:提示发布成功,包含QoS策略(方便验证)self.get_logger().info(f"Publishing: {msg.data} | QoS: RELIABLE, KEEP_LAST(1)")# 3. 主入口函数
def main(args=None):rclpy.init(args=args) # 初始化ROS2node = PublisherNode("qos_helloworld_pub") # 创建发布者节点rclpy.spin(node) # 启动节点循环node.destroy_node() # 销毁节点rclpy.shutdown() # 关闭ROS2
步骤 3:订阅者代码(匹配 QoS 配置)
文件名:learning_qos/qos_helloworld_sub.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ROS2 QoS订阅者示例:
- 订阅"Hello World"话题
- 配置和发布者一致的QoS策略(RELIABLE、KEEP_LAST(1))
"""# 1. 导入库(和发布者一致)
import rclpy # ROS2核心库
from rclpy.node import Node # 节点类
from std_msgs.msg import String # 字符串消息类型
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy# 2. 定义订阅者节点类
class SubscriberNode(Node):def __init__(self, name):super().__init__(name) # 初始化节点,节点名=qos_helloworld_sub# 3. 配置QoS策略:必须和发布者兼容(这里完全一致)self.qos_profile = QoSProfile(reliability=QoSReliabilityPolicy.RELIABLE, # 和发布者一样:可靠传输history=QoSHistoryPolicy.KEEP_LAST, # 和发布者一样:保留最新depth=1 # 和发布者一样:保留1条)# 4. 创建订阅者:指定QoS策略self.sub = self.create_subscription(String, # 消息类型(和发布者一致)"chatter", # 话题名(和发布者一致)self.listener_callback, # 收到消息的回调函数self.qos_profile # 传入QoS配置(关键!))def listener_callback(self, msg):"""回调函数:处理收到的消息"""# 打印日志:包含收到的消息和QoS策略self.get_logger().info(f"Heard: {msg.data} | QoS: RELIABLE, KEEP_LAST(1)")# 3. 主入口函数
def main(args=None):rclpy.init(args=args) # 初始化ROS2node = SubscriberNode("qos_helloworld_sub") # 创建订阅者节点rclpy.spin(node) # 启动节点循环node.destroy_node() # 销毁节点rclpy.shutdown() # 关闭ROS2
步骤 4:配置节点入口(让 ROS 找到程序)
打开learning_qos/setup.py
,在entry_points
的console_scripts
中添加入口:
entry_points={'console_scripts': [# 发布者命令:ros2 run learning_qos qos_helloworld_pub'qos_helloworld_pub = learning_qos.qos_helloworld_pub:main',# 订阅者命令:ros2 run learning_qos qos_helloworld_sub'qos_helloworld_sub = learning_qos.qos_helloworld_sub:main',],
},
步骤 5:编译与运行(验证效果)
编译功能包:
cd dev_ws colcon build --packages-select learning_qos source install/setup.bash # 加载环境变量
启动发布者(终端 1):
ros2 run learning_qos qos_helloworld_pub # 预期输出:每0.5秒打印 "Publishing: Hello World | QoS: RELIABLE, KEEP_LAST(1)"
启动订阅者(终端 2):
ros2 run learning_qos qos_helloworld_sub # 预期输出:每0.5秒打印 "Heard: Hello World | QoS: RELIABLE, KEEP_LAST(1)"(通信成功)
修改策略验证不兼容(可选):
- 把发布者的
reliability
改成QoSReliabilityPolicy.BEST_EFFORT
,重新编译; - 订阅者还是
RELIABLE
,再运行会发现订阅者收不到消息(策略不兼容)。
- 把发布者的
七、复习要点总结(小白必背)
- DDS 的核心优势:以数据为中心、无中心瓶颈、支持高实时性,解决了 ROS1 Master 的痛点;
- ROS2 中 DDS 的关键概念:
- Domain(域):
ROS_DOMAIN_ID
相同才能通信,避免干扰; - QoS(质量服务):通信的 “合约”,决定数据怎么传;
- Domain(域):
- 常用 QoS 策略场景:
- 重要指令(电机、参数):
RELIABLE + KEEP_LAST
; - 实时流数据(图像、激光):
BEST_EFFORT + KEEP_LAST
;
- 重要指令(电机、参数):
- QoS 通信规则:发布者和订阅者的 QoS 必须兼容(比如可靠性策略一致),否则收不到数据;
- 配置方式:命令行用
--qos-xxx
参数,代码用QoSProfile
类。
DDS 底层很复杂,但 ROS2 已经帮我们简化了 90%,初学者不用深究底层实现,重点掌握 “QoS 策略怎么选”“怎么配置”,就能满足 90% 的机器人开发需求