ROS-Jazzy_rclpy
一、rclpy
是什么?
rclpy
是 ROS 2(Robot Operating System 2)官方提供的 Python 客户端库,用于在 Python 中编写 ROS 2 节点。它基于 C 底层库 rcl
封装,提供了:
- 节点(Node)的创建与管理
- 发布者(Publisher)、订阅者(Subscriber)
- 服务(Service)、客户端(Client)
- 定时器(Timer)、参数(Parameter)
- 生命周期管理
用 Python 写 ROS2 节点时,大部分场景都会用到 rclpy
。
二、主要功能模块
-
节点(Node)
- ROS 2 中的基本运行单元,所有的 pub/sub/service 都依附于节点。
- 在
rclpy
中,Node
是一个类,可以继承或直接实例化。
-
发布者(Publisher)/订阅者(Subscriber)
- 用于话题通信,异步消息机制。
-
服务(Service)/客户端(Client)
- 请求/响应模式,常用于一次性操作。
-
定时器(Timer)
- 用于周期性执行某些任务。
-
参数(Parameter)
- 节点运行时的配置数据,可以动态修改。
三、使用示例
1. 最简单的节点
import rclpy
from rclpy.node import Nodedef main(args=None):rclpy.init(args=args) # 初始化node = Node("simple_node") # 创建节点node.get_logger().info("Hello ROS2 from rclpy!") # 打印日志rclpy.spin(node) # 进入事件循环node.destroy_node()rclpy.shutdown()if __name__ == "__main__":main()
运行后,你会看到终端输出:
[INFO] [<time>] [simple_node]: Hello ROS2 from rclpy!
2. 发布者与订阅者
发布者(Publisher)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Talker(Node):def __init__(self):super().__init__('talker')self.publisher_ = self.create_publisher(String, 'chatter', 10)self.timer = self.create_timer(1.0, self.timer_callback) # 每秒发布一次self.count = 0def timer_callback(self):msg = String()msg.data = f"Hello ROS2: {self.count}"self.publisher_.publish(msg)self.get_logger().info(f"Publishing: '{msg.data}'")self.count += 1def main(args=None):rclpy.init(args=args)node = Talker()rclpy.spin(node)node.destroy_node()rclpy.shutdown()
订阅者(Subscriber)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Listener(Node):def __init__(self):super().__init__('listener')self.subscription = self.create_subscription(String,'chatter',self.listener_callback,10)self.subscription # 防止被垃圾回收def listener_callback(self, msg):self.get_logger().info(f"I heard: '{msg.data}'")def main(args=None):rclpy.init(args=args)node = Listener()rclpy.spin(node)node.destroy_node()rclpy.shutdown()
运行两个节点后,一个会发布消息,另一个会打印收到的内容。
3. 服务(Service)与客户端(Client)
服务端(Service)
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Nodeclass AddTwoIntsServer(Node):def __init__(self):super().__init__('add_two_ints_server')self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback)def add_callback(self, request, response):response.sum = request.a + request.bself.get_logger().info(f"Incoming request: {request.a} + {request.b}")return responsedef main(args=None):rclpy.init(args=args)node = AddTwoIntsServer()rclpy.spin(node)node.destroy_node()rclpy.shutdown()
客户端(Client)
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Nodeclass AddTwoIntsClient(Node):def __init__(self):super().__init__('add_two_ints_client')self.client = self.create_client(AddTwoInts, 'add_two_ints')while not self.client.wait_for_service(timeout_sec=1.0):self.get_logger().info("Service not available, waiting...")self.request = AddTwoInts.Request()self.request.a = 3self.request.b = 5def send_request(self):return self.client.call_async(self.request)def main(args=None):rclpy.init(args=args)node = AddTwoIntsClient()future = node.send_request()rclpy.spin_until_future_complete(node, future)if future.result() is not None:node.get_logger().info(f"Result: {future.result().sum}")else:node.get_logger().error("Service call failed")node.destroy_node()rclpy.shutdown()
四、总结
rclpy
是 ROS2 Python 节点开发的核心库。- 它封装了 ROS2 的通信机制:话题(pub/sub)、服务(srv/client)、参数、定时器等。
- 适合快速原型开发与脚本化场景(但高性能、实时场景推荐用 C++ 的
rclcpp
)。
例子 1:一个节点既发布消息又提供服务
这个例子中:
- 节点每秒发布一次计数消息到话题
/counter
。 - 节点还提供一个服务
/reset_counter
,用来把计数器清零。
# 文件名:counter_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Trigger # 标准的触发型服务,无输入参数,只返回成功状态class CounterNode(Node):def __init__(self):super().__init__('counter_node')# 发布者self.publisher_ = self.create_publisher(Int32, 'counter', 10)# 定时器:每秒调用一次self.timer = self.create_timer(1.0, self.timer_callback)self.count = 0# 服务self.srv = self.create_service(Trigger, 'reset_counter', self.reset_callback)def timer_callback(self):msg = Int32()msg.data = self.countself.publisher_.publish(msg)self.get_logger().info(f'Publishing counter: {self.count}')self.count += 1def reset_callback(self, request, response):self.count = 0self.get_logger().info('Counter has been reset via service call!')response.success = Trueresponse.message = "Counter reset successfully"return responsedef main(args=None):rclpy.init(args=args)node = CounterNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
运行步骤:
# 终端 1:运行节点
ros2 run your_package counter_node.py# 终端 2:调用服务清零计数器
ros2 service call /reset_counter example_interfaces/srv/Trigger
例子 2:一个节点发布消息,另一个节点订阅并调用服务
这个例子中:
- 节点 A 每 2 秒发布一个随机数到话题
/random_number
。 - 节点 B 订阅这个话题,并在收到消息时打印出来,如果随机数大于 50,就调用节点 A 提供的服务
/reset_number
让它重置随机数。
节点 A:发布者 + 服务
# 文件名:random_number_server.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Trigger
import randomclass RandomNumberServer(Node):def __init__(self):super().__init__('random_number_server')self.publisher_ = self.create_publisher(Int32, 'random_number', 10)self.timer = self.create_timer(2.0, self.timer_callback)self.number = random.randint(0, 100)self.srv = self.create_service(Trigger, 'reset_number', self.reset_callback)def timer_callback(self):msg = Int32()msg.data = self.numberself.publisher_.publish(msg)self.get_logger().info(f'Publishing random number: {self.number}')self.number = random.randint(0, 100)def reset_callback(self, request, response):self.number = 0self.get_logger().info('Random number reset to 0!')response.success = Trueresponse.message = "Random number reset"return responsedef main(args=None):rclpy.init(args=args)node = RandomNumberServer()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
节点 B:订阅者 + 客户端
# 文件名:random_number_client.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Triggerclass RandomNumberClient(Node):def __init__(self):super().__init__('random_number_client')self.subscription = self.create_subscription(Int32,'random_number',self.listener_callback,10)self.subscription # 防止被垃圾回收self.client = self.create_client(Trigger, 'reset_number')while not self.client.wait_for_service(timeout_sec=1.0):self.get_logger().info('Waiting for reset_number service...')def listener_callback(self, msg):self.get_logger().info(f'Received random number: {msg.data}')if msg.data > 50:self.get_logger().info('Number > 50, calling reset service...')req = Trigger.Request()future = self.client.call_async(req)future.add_done_callback(self.callback_response)def callback_response(self, future):try:response = future.result()if response.success:self.get_logger().info(f'Service call result: {response.message}')else:self.get_logger().warn('Service call failed!')except Exception as e:self.get_logger().error(f'Service call exception: {e}')def main(args=None):rclpy.init(args=args)node = RandomNumberClient()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()
运行步骤:
# 终端 1:运行节点 A
ros2 run your_package random_number_server.py# 终端 2:运行节点 B
ros2 run your_package random_number_client.py
你会看到:
- 节点 A 每两秒发布一个随机数。
- 节点 B 打印收到的随机数。
- 如果收到的数 > 50,节点 B 自动调用服务把节点 A 的随机数重置为 0。
总结
✅ 例子 1:单节点既发布消息又提供服务,适合“一个节点自给自足”的场景(比如传感器驱动 + 配置接口)。
✅ 例子 2:双节点协作,一个专门生产数据,一个专门消费并触发控制逻辑,更贴近真实机器人系统的多节点通信模式。
🟢例子 1:单节点(发布 + 服务)
+--------------------+
| CounterNode |
| |
| [Publisher] ----> /counter (话题)
| |
| [Service] <----- /reset_counter (Trigger)
+--------------------+
- 节点 CounterNode 每秒往话题
/counter
发布一个整数。 - 外部任何节点都可以通过调用
/reset_counter
服务把计数器清零。
例子 2:双节点(发布 + 订阅 + 服务)
+-------------------------+ +---------------------------+
| RandomNumberServer | | RandomNumberClient |
| | | |
| [Publisher] -------> /random_number ----> [Subscriber] |
| | | |
| [Service] <------- /reset_number <----- [Client] (条件触发调用) |
+-------------------------+ +---------------------------+
- RandomNumberServer 每 2 秒发布一个随机数到
/random_number
。 - RandomNumberClient 订阅
/random_number
,打印接收到的数值。 - 如果收到的数值 > 50,就调用
/reset_number
服务让服务器把随机数重置为 0。
- 箭头方向表示消息/请求的流动方向。
- 话题是广播式,一对多通信。
- 服务是请求/应答模式,一对一通信。