【ROS2】DDS通信协议介绍
引言
ROS2(Robot Operating System 2)是一个广泛用于机器人开发的框架,其核心通信机制依赖于DDS(Data Distribution Service)协议。DDS提供了一种高效、可靠的数据分发方式,支持实时系统需求。本文将逐步介绍DDS在ROS2中的作用、原理和实际应用,帮助您快速上手。
1. DDS协议概述
DDS是一种面向数据的中间件标准,专为分布式实时系统设计。它基于发布-订阅模型,核心概念包括:
- 主题(Topic):数据单元的唯一标识符,例如传感器数据主题。
- 发布者(Publisher):发送数据的节点。
- 订阅者(Subscriber):接收数据的节点。
- 服务质量(QoS):可配置的策略,确保数据传输的可靠性、实时性和容错性。例如,带宽限制可表示为:
最大带宽=数据包大小×频率1000 kbps \text{最大带宽} = \frac{\text{数据包大小} \times \text{频率}}{1000} \text{ kbps} 最大带宽=1000数据包大小×频率 kbps
其中,text数据包大小text{数据包大小}text数据包大小 以字节为单位,频率\text{频率}频率 以Hz为单位。
DDS的优势在于其高吞吐量、低延迟和可扩展性,非常适合机器人系统中的传感器数据流和控制命令传输。
但是,我们要知道一点,DDS并不是专门为ROS2而设计的一种通信协议,而是在此之前就已经被其它领域广泛应用了,比如航空,国防,交通,医疗,能源等。
比如在同样备受关注的自动驾驶领域,通常会存在感知,预测,决策和定位等模块,这些模块都需要非常高速和频繁地交换数据。借助DDS,可以很好地满足它们的通信需求。
2. ROS2与DDS的集成
DDS在ROS2系统中的位置至关重要,所有上层建设都建立在DDS之上。在这个ROS2的架构图中,蓝色和红色部分就是DDS。
ROS2使用DDS作为默认中间件,通过抽象层(如rmw
接口)支持多种DDS实现,例如Fast DDS或Cyclone DDS。集成方式包括:
- 节点通信:ROS2节点通过DDS主题进行交互,无需直接处理网络细节。
- QoS策略:ROS2允许自定义QoS,如设置可靠性(Reliable vs. Best Effort)或持久性(Transient Local)。例如,在实时控制中,可配置为:
延迟预算<10 ms \text{延迟预算} < 10 \text{ ms} 延迟预算<10 ms
确保数据及时到达。 - 发现机制:DDS自动处理节点发现,当新节点加入时,系统动态建立连接。
同时为了应对多个DDS的兼容,ROS设计了一个Middleware中间件,也就是一个统一的标准,不管我们用那个DDS,保证上层编程使用的函数接口都是一样的。此时兼容性的问题就转移给了DDS厂商,如果他们想让自己的DDS系统进入ROS生态,就得按照ROS的接口标准,开发一个驱动,也就是这个部分。这种集成简化了开发,同时保证了性能,适用于从单机到大型集群的场景。
3. DDS在ROS2中的关键特性
- 实时性:DDS支持优先级调度,确保关键数据(如紧急停止命令)优先传输。
- 可靠性:通过QoS策略,可实现零丢失数据传输,适合安全关键应用。
- 可扩展性:DDS处理节点动态变化,支持数百个节点并发通信。系统吞吐量公式为:
总吞吐量=∑i=1n主题i带宽 \text{总吞吐量} = \sum_{i=1}^{n} \text{主题}_i \text{带宽} 总吞吐量=i=1∑n主题i带宽
其中,nnn 是活动主题数。 - 容错性:DDS内置冗余机制,当节点故障时,数据自动重路由。
这些特性使ROS2适用于工业机器人、自动驾驶等复杂环境。
4. 质量服务策略QoS
DDS为ROS的通信系统提供提供了哪些特性呢?我们通过这个通信模型图来看下。
DDS中另外一个重要特性就是质量服务策略,QoS。
QoS是一种网络传输策略,应用程序指定所需要的网络传输质量行为,QoS服务实现这种行为要求,尽可能地满足客户对通信质量的需求,可以理解为数据提供者和接收者之间的合约。
比较常见的策略如下:
- DEADLINE策略,表示通信数据必须要在每次截止时间内完成一次通信;
- HISTORY策略,表示针对历史数据的一个缓存大小;
- RELIABILITY策略,表示数据通信的模式,配置成BEST_EFFORT,就是尽力传输模式,网络情况不好的时候,也要保证数据流畅,此时可能会导致数据丢失,配置成RELIABLE,就是可信赖模式,可以在通信中尽量保证图像的完整性,我们可以根据应用功能场景选择合适的通信模式;
- DURABILITY策略,可以配置针对晚加入的节点,也保证有一定的历史数据发送过去,可以让新节点快速适应系统。
所有这些策略在ROS系统中都可以通过类似这样的结构体配置,如果不配置的话,系统也会使用默认的参数。
5. 实际示例:创建含QoS配置的ROS2发布-订阅系统
在ROS2通信中,QoS(服务质量) 是核心机制之一——它通过定义可靠性、消息历史、存活时间等策略,让发布者与订阅者根据实际场景(如工业实时控制、弱网数据传输、传感器高频采样)适配通信需求。以下Python示例将在基础发布-订阅逻辑上,添加QoS配置,展示如何通过QoS控制DDS底层的通信行为。
步骤1: 安装ROS2并确认环境
确保ROS2 Humble环境已配置完成(若未安装,可通过鱼香ROS脚本一键部署):
# 下载并执行鱼香ROS安装脚本
wget http://fishros.com/install -O fishros && . fishros
# 执行后按终端提示选择“1. 安装ROS”,再选择“ROS2 Humble”版本完成安装
安装后通过ros2 --version
验证环境,确保输出ros2 humble
相关信息。
步骤2: 创建ROS2功能包
沿用原目录结构,创建支持Python的功能包:
# 创建工作空间(若已存在可跳过)
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src# 创建功能包,依赖rclpy(ROS2 Python核心)和std_msgs(标准消息类型)
ros2 pkg create qos_tutorial --build-type ament_python --dependencies rclpy std_msgs# 进入功能包根目录(后续需修改setup.py)
cd qos_tutorial
步骤3: 配置setup.py(关键:定义节点入口)
ROS2通过setup.py
识别Python节点的可执行文件,需在功能包根目录(~/ros2_ws/src/qos_tutorial
)中修改该文件,添加节点与脚本的映射关系(该步骤也可以在编写完发布代码和订阅代码后配置):
from setuptools import setuppackage_name = 'qos_tutorial' # 必须与功能包名称一致setup(name=package_name,version='0.0.0',packages=[package_name], # 对应代码存放的目录data_files=[('share/ament_index/resource_index/packages',['resource/' + package_name]),('share/' + package_name, ['package.xml']),],install_requires=['setuptools'],zip_safe=True,maintainer='Your Name',maintainer_email='your_email@example.com',description='ROS2 QoS 发布-订阅示例',license='Apache-2.0',tests_require=['pytest'],# 核心配置:将Python脚本映射为终端可执行命令entry_points={'console_scripts': [# 格式:"终端命令 = 包名.脚本文件名:主函数名"'qos_publisher = qos_tutorial.qos_publisher:main','qos_subscriber = qos_tutorial.qos_subscriber:main'],},
)
配置说明:
entry_points
中每一行对应一个节点,左侧为终端中使用的命令(如qos_publisher
),右侧指定脚本路径(qos_tutorial
包下的qos_publisher.py
)和主函数(main
)。- 若缺少此配置,
ros2 run
会提示“找不到可执行文件”。
步骤4: 编写含QoS配置的发布者代码
进入代码目录(~/ros2_ws/src/qos_tutorial/qos_tutorial
),创建qos_publisher.py
:
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import Stringclass QoSPublisher(Node):def __init__(self):super().__init__('qos_publisher')# 定义QoS配置:可靠传输+保留最新10条消息self.qos_profile = QoSProfile(reliability=ReliabilityPolicy.RELIABLE, # 确保消息不丢失(丢失则重发)history=HistoryPolicy.KEEP_LAST, # 仅保留最新消息depth=10 # 消息队列深度)# 创建发布者时传入QoS配置self.publisher = self.create_publisher(String, 'qos_topic', self.qos_profile)# 1Hz频率发送消息self.timer = self.create_timer(1.0, self.timer_callback)self.counter = 0def timer_callback(self):msg = String()msg.data = f'QoS Message: Count = {self.counter}'self.publisher.publish(msg)self.get_logger().info(f'Publishing (QoS Reliable): "{msg.data}"')self.counter += 1def main(args=None):rclpy.init(args=args)node = QoSPublisher()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
步骤5: 编写匹配QoS的订阅者代码
在同一代码目录下创建qos_subscriber.py
:
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import Stringclass QoSSubscriber(Node):def __init__(self):super().__init__('qos_subscriber')# 使用与发布者一致的QoS配置(确保兼容性)self.qos_profile = QoSProfile(reliability=ReliabilityPolicy.RELIABLE,history=HistoryPolicy.KEEP_LAST,depth=10)self.subscription = self.create_subscription(String, 'qos_topic', self.listener_callback, self.qos_profile)self.subscription # 防止未使用警告def listener_callback(self, msg):self.get_logger().info(f'Received (QoS Reliable): "{msg.data}"')def main(args=None):rclpy.init(args=args)node = QoSSubscriber()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
步骤6: 编译并运行示例
-
编译功能包:回到工作空间根目录,编译目标包:
cd ~/ros2_ws colcon build --packages-select qos_tutorial
-
加载环境变量:
source install/setup.bash
-
启动节点:
-
终端1(发布者):
ros2 run qos_tutorial qos_publisher
输出示例:
[INFO] [qos_publisher]: Publishing (QoS Reliable): "QoS Message: Count = 0"
-
终端2(订阅者):
ros2 run qos_tutorial qos_subscriber
输出示例:
[INFO] [qos_subscriber]: Received (QoS Reliable): "QoS Message: Count = 0"
-
关键说明:QoS策略的适配场景
- 可靠传输(RELIABLE):适用于控制指令等关键消息(如机械臂运动指令),确保消息100%送达(代价是可能增加延迟)。
- 尽力传输(BEST_EFFORT):适用于高频非关键数据(如摄像头图像),不重发丢失消息,优先保证实时性。
- 历史策略:
KEEP_LAST
(保留最新N条)适合持续流数据,KEEP_ALL
(保留所有)适合需回溯的场景(慎用,易占用内存)。
注意:发布者与订阅者的QoS策略必须兼容(如RELIABLE发布者可匹配RELIABLE/BEST_EFFORT订阅者),否则通信会失败。
6. 总结
DDS协议是ROS2的核心,提供高效、可靠的数据分发。通过本教程,您了解了DDS的基本原理、ROS2集成方式,以及如何实现简单应用。建议进一步探索:
- 高级QoS设置(如Deadline或Lifespan)。
- 使用不同DDS实现(如切换Fast DDS)。
- 性能优化技巧,例如调整数据包大小以减少延迟。
DDS在ROS2中使机器人系统更健壮,适用于各种实时场景。如需深入学习,请参考ROS2官方文档和DDS标准规范。