当前位置: 首页 > news >正文

ROS-Jazzy_rclpy

一、rclpy 是什么?

rclpy 是 ROS 2(Robot Operating System 2)官方提供的 Python 客户端库,用于在 Python 中编写 ROS 2 节点。它基于 C 底层库 rcl 封装,提供了:

  • 节点(Node)的创建与管理
  • 发布者(Publisher)、订阅者(Subscriber)
  • 服务(Service)、客户端(Client)
  • 定时器(Timer)、参数(Parameter)
  • 生命周期管理

用 Python 写 ROS2 节点时,大部分场景都会用到 rclpy


二、主要功能模块

  1. 节点(Node)

    • ROS 2 中的基本运行单元,所有的 pub/sub/service 都依附于节点。
    • rclpy 中,Node 是一个类,可以继承或直接实例化。
  2. 发布者(Publisher)/订阅者(Subscriber)

    • 用于话题通信,异步消息机制。
  3. 服务(Service)/客户端(Client)

    • 请求/响应模式,常用于一次性操作。
  4. 定时器(Timer)

    • 用于周期性执行某些任务。
  5. 参数(Parameter)

    • 节点运行时的配置数据,可以动态修改。

三、使用示例

1. 最简单的节点

import rclpy
from rclpy.node import Nodedef main(args=None):rclpy.init(args=args)   # 初始化node = Node("simple_node")  # 创建节点node.get_logger().info("Hello ROS2 from rclpy!")  # 打印日志rclpy.spin(node)  # 进入事件循环node.destroy_node()rclpy.shutdown()if __name__ == "__main__":main()

运行后,你会看到终端输出:

[INFO] [<time>] [simple_node]: Hello ROS2 from rclpy!

2. 发布者与订阅者

发布者(Publisher)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Talker(Node):def __init__(self):super().__init__('talker')self.publisher_ = self.create_publisher(String, 'chatter', 10)self.timer = self.create_timer(1.0, self.timer_callback)  # 每秒发布一次self.count = 0def timer_callback(self):msg = String()msg.data = f"Hello ROS2: {self.count}"self.publisher_.publish(msg)self.get_logger().info(f"Publishing: '{msg.data}'")self.count += 1def main(args=None):rclpy.init(args=args)node = Talker()rclpy.spin(node)node.destroy_node()rclpy.shutdown()
订阅者(Subscriber)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Stringclass Listener(Node):def __init__(self):super().__init__('listener')self.subscription = self.create_subscription(String,'chatter',self.listener_callback,10)self.subscription  # 防止被垃圾回收def listener_callback(self, msg):self.get_logger().info(f"I heard: '{msg.data}'")def main(args=None):rclpy.init(args=args)node = Listener()rclpy.spin(node)node.destroy_node()rclpy.shutdown()

运行两个节点后,一个会发布消息,另一个会打印收到的内容。


3. 服务(Service)与客户端(Client)

服务端(Service)
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Nodeclass AddTwoIntsServer(Node):def __init__(self):super().__init__('add_two_ints_server')self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback)def add_callback(self, request, response):response.sum = request.a + request.bself.get_logger().info(f"Incoming request: {request.a} + {request.b}")return responsedef main(args=None):rclpy.init(args=args)node = AddTwoIntsServer()rclpy.spin(node)node.destroy_node()rclpy.shutdown()
客户端(Client)
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Nodeclass AddTwoIntsClient(Node):def __init__(self):super().__init__('add_two_ints_client')self.client = self.create_client(AddTwoInts, 'add_two_ints')while not self.client.wait_for_service(timeout_sec=1.0):self.get_logger().info("Service not available, waiting...")self.request = AddTwoInts.Request()self.request.a = 3self.request.b = 5def send_request(self):return self.client.call_async(self.request)def main(args=None):rclpy.init(args=args)node = AddTwoIntsClient()future = node.send_request()rclpy.spin_until_future_complete(node, future)if future.result() is not None:node.get_logger().info(f"Result: {future.result().sum}")else:node.get_logger().error("Service call failed")node.destroy_node()rclpy.shutdown()

四、总结

  • rclpy 是 ROS2 Python 节点开发的核心库。
  • 它封装了 ROS2 的通信机制:话题(pub/sub)、服务(srv/client)、参数、定时器等。
  • 适合快速原型开发与脚本化场景(但高性能、实时场景推荐用 C++ 的 rclcpp)。


例子 1:一个节点既发布消息又提供服务

这个例子中:

  • 节点每秒发布一次计数消息到话题 /counter
  • 节点还提供一个服务 /reset_counter,用来把计数器清零。
# 文件名:counter_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Trigger  # 标准的触发型服务,无输入参数,只返回成功状态class CounterNode(Node):def __init__(self):super().__init__('counter_node')# 发布者self.publisher_ = self.create_publisher(Int32, 'counter', 10)# 定时器:每秒调用一次self.timer = self.create_timer(1.0, self.timer_callback)self.count = 0# 服务self.srv = self.create_service(Trigger, 'reset_counter', self.reset_callback)def timer_callback(self):msg = Int32()msg.data = self.countself.publisher_.publish(msg)self.get_logger().info(f'Publishing counter: {self.count}')self.count += 1def reset_callback(self, request, response):self.count = 0self.get_logger().info('Counter has been reset via service call!')response.success = Trueresponse.message = "Counter reset successfully"return responsedef main(args=None):rclpy.init(args=args)node = CounterNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()

运行步骤:

# 终端 1:运行节点
ros2 run your_package counter_node.py# 终端 2:调用服务清零计数器
ros2 service call /reset_counter example_interfaces/srv/Trigger

例子 2:一个节点发布消息,另一个节点订阅并调用服务

这个例子中:

  • 节点 A 每 2 秒发布一个随机数到话题 /random_number
  • 节点 B 订阅这个话题,并在收到消息时打印出来,如果随机数大于 50,就调用节点 A 提供的服务 /reset_number 让它重置随机数。

节点 A:发布者 + 服务

# 文件名:random_number_server.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Trigger
import randomclass RandomNumberServer(Node):def __init__(self):super().__init__('random_number_server')self.publisher_ = self.create_publisher(Int32, 'random_number', 10)self.timer = self.create_timer(2.0, self.timer_callback)self.number = random.randint(0, 100)self.srv = self.create_service(Trigger, 'reset_number', self.reset_callback)def timer_callback(self):msg = Int32()msg.data = self.numberself.publisher_.publish(msg)self.get_logger().info(f'Publishing random number: {self.number}')self.number = random.randint(0, 100)def reset_callback(self, request, response):self.number = 0self.get_logger().info('Random number reset to 0!')response.success = Trueresponse.message = "Random number reset"return responsedef main(args=None):rclpy.init(args=args)node = RandomNumberServer()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()

节点 B:订阅者 + 客户端

# 文件名:random_number_client.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from example_interfaces.srv import Triggerclass RandomNumberClient(Node):def __init__(self):super().__init__('random_number_client')self.subscription = self.create_subscription(Int32,'random_number',self.listener_callback,10)self.subscription  # 防止被垃圾回收self.client = self.create_client(Trigger, 'reset_number')while not self.client.wait_for_service(timeout_sec=1.0):self.get_logger().info('Waiting for reset_number service...')def listener_callback(self, msg):self.get_logger().info(f'Received random number: {msg.data}')if msg.data > 50:self.get_logger().info('Number > 50, calling reset service...')req = Trigger.Request()future = self.client.call_async(req)future.add_done_callback(self.callback_response)def callback_response(self, future):try:response = future.result()if response.success:self.get_logger().info(f'Service call result: {response.message}')else:self.get_logger().warn('Service call failed!')except Exception as e:self.get_logger().error(f'Service call exception: {e}')def main(args=None):rclpy.init(args=args)node = RandomNumberClient()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()

运行步骤:

# 终端 1:运行节点 A
ros2 run your_package random_number_server.py# 终端 2:运行节点 B
ros2 run your_package random_number_client.py

你会看到:

  • 节点 A 每两秒发布一个随机数。
  • 节点 B 打印收到的随机数。
  • 如果收到的数 > 50,节点 B 自动调用服务把节点 A 的随机数重置为 0。

总结

例子 1:单节点既发布消息又提供服务,适合“一个节点自给自足”的场景(比如传感器驱动 + 配置接口)。
例子 2:双节点协作,一个专门生产数据,一个专门消费并触发控制逻辑,更贴近真实机器人系统的多节点通信模式。



🟢例子 1:单节点(发布 + 服务)

+--------------------+
|   CounterNode      |
|                    |
|  [Publisher] ----> /counter (话题)
|                    |
|  [Service] <-----  /reset_counter (Trigger)
+--------------------+
  • 节点 CounterNode 每秒往话题 /counter 发布一个整数。
  • 外部任何节点都可以通过调用 /reset_counter 服务把计数器清零。

例子 2:双节点(发布 + 订阅 + 服务)

+-------------------------+                  +---------------------------+
| RandomNumberServer      |                  | RandomNumberClient        |
|                         |                  |                           |
|  [Publisher] -------> /random_number ----> [Subscriber]                |
|                         |                  |                           |
|  [Service] <-------  /reset_number <----- [Client] (条件触发调用)       |
+-------------------------+                  +---------------------------+
  • RandomNumberServer 每 2 秒发布一个随机数到 /random_number
  • RandomNumberClient 订阅 /random_number,打印接收到的数值。
  • 如果收到的数值 > 50,就调用 /reset_number 服务让服务器把随机数重置为 0。

  • 箭头方向表示消息/请求的流动方向。
  • 话题是广播式,一对多通信。
  • 服务是请求/应答模式,一对一通信。

http://www.dtcms.com/a/414348.html

相关文章:

  • Socket 编程 TCP(准备阶段)
  • 【Ultralytics】评估报错:解决 KeyError: ‘info‘ 错误
  • 哪些是实名制网站母了猜猜看游戏做网站
  • 【Linux】TCP原理
  • 论文阅读:arxiv 2024 Fast Adversarial Attacks on Language Models In One GPU Minute
  • OpenJDK 17 方法链接与同步方法入口点生成机制深度解析
  • qt-C++笔记之自定义绘制:QWidget中的paintEvent 与 QGraphicsItem中的paint
  • 项目:智能排队控制系统
  • LeetCode:71.字符串解码
  • LeetCode:66.搜索旋转排序数组
  • 阿帕奇网站搭建六安做网站的
  • wordpress去除评论表单电子商务seo优化
  • deepseek kotlin flow快生产者和慢消费者解决策略
  • 20.NFS iSCSI服务器
  • uniapp 搭建vue项目,快速搭建项目
  • 自动网页浏览助手:基于 Selenium + GLM-4V 的百度自动搜索与内容提取系统
  • 网站地图什么时候提交好网站自响应
  • 深度学习笔记(一)——线性回归、Softmax回归、多层感知机、环境和分布偏移
  • 网站建设教程要去d湖南岚鸿询 问2022年企业年报网上申报流程
  • js构造函数—11
  • Kotlin轻量级互斥锁Mutext与轻量级信号量Semaphore异同比较
  • 【MySQL✨】MySQL 入门之旅 · 第十篇:数据库备份与恢复
  • k8s里三种探针的使用场景
  • 8.基于 Ingress-nginx-controller 实现 k8s 七层代理
  • Kling-Audio-Eval - 高质量视频到音频生成评估数据集
  • LeetCode 812.最大三角形的面积
  • 做网站都需要服务器吗域名类型
  • js逆向实战:爬取淘宝男装商品
  • 前端3.0
  • 机器视觉检测中,最小尺寸多少像素可以检测到?重点解析传统算法和深度学习,对比度很致命