ros2 多线程节点调度MultiThreadedExecutor
为什么需要多线程?
- 在ROS 2中,节点可能需要同时处理多个任务(如接收传感器数据、响应服务请求、执行定时任务等)。如果使用单线程执行器,所有回调函数会按顺序执行,可能导致延迟。
例如:一个节点同时订阅激光雷达数据和摄像头数据,若激光雷达回调函数耗时较长,摄像头回调会被阻塞,导致数据处理延迟。 - MultiThreadedExecutor的优势:并行处理多个回调函数,减少阻塞。提高实时性,适用于高频传感器数据或计算密集型任务。
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from std_msgs.msg import String
import timeclass DemoNode(Node):def __init__(self):super().__init__('demo_node')# 订阅两个话题self.sub1 = self.create_subscription(String, 'topic1', self.callback1, 10)self.sub2 = self.create_subscription(String, 'topic2', self.callback2, 10)self.get_logger().info("节点已启动")def callback1(self, msg):self.get_logger().info(f'回调1开始处理: {msg.data}')time.sleep(2) # 模拟耗时操作self.get_logger().info('回调1处理完成')def callback2(self, msg):self.get_logger().info(f'回调2开始处理: {msg.data}')time.sleep(1) # 模拟耗时操作self.get_logger().info('回调2处理完成')def main(args=None):rclpy.init(args=args)node = DemoNode()# 使用 MultiThreadedExecutor(多线程)executor = MultiThreadedExecutor(num_threads=4) # 默认线程数为CPU核心数# 对比:使用 SingleThreadedExecutor(单线程)# executor = SingleThreadedExecutor()executor.add_node(node)try:executor.spin() # 开始执行回调except KeyboardInterrupt:passfinally:executor.shutdown()node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
通过两个终端分别发布话题消息:
# 终端1:发布 topic1
ros2 topic pub /topic1 std_msgs/String "data: 'Hello1'" -1# 终端2:发布 topic2
ros2 topic pub /topic2 std_msgs/String "data: 'Hello2'" -1
单线程:回调函数按顺序执行,callback2 必须等待 callback1 完成。
[INFO] [demo_node]: 回调1开始处理: Hello1
[INFO] [demo_node]: 回调1处理完成 # 2秒后
[INFO] [demo_node]: 回调2开始处理: Hello2
[INFO] [demo_node]: 回调2处理完成 # 1秒后(总耗时3秒)
多线程:两个回调函数并行执行,总耗时仅取决于最长的回调(2秒)。
[INFO] [demo_node]: 回调1开始处理: Hello1
[INFO] [demo_node]: 回调2开始处理: Hello2
[INFO] [demo_node]: 回调2处理完成 # 1秒后
[INFO] [demo_node]: 回调1处理完成 # 2秒后(总耗时2秒)