当前位置: 首页 > news >正文

ROS2 Action 通信详解:从自定义消息到 Server/Client 实现(附 MoveIt! 联动示例)

以下示例均基于 ROS2(Humble/Iron 等主流版本),核心遵循 ROS2 的命令行规范(ros2 action/ros2 run/colcon build),分两部分讲解:

  1. Action 控制示例(自定义 Action + 运行 Server/Client,体现 Action 核心通信逻辑);
  2. MoveIt! 示例(基于其封装的 Action 接口,体现 MoveIt! 运动规划功能,与 Action 联动)。

一、Action 控制:基础命令与完整示例(自定义“任务进度”Action)

Action 的核心是“自定义 Action 消息 + 实现 Server/Client”,以下是从创建到运行的完整流程,场景:模拟“任务执行(如机器人巡逻)”,反馈实时进度,支持取消。

1. 步骤1:创建功能包(依赖 Action 核心库)

首先创建 ROS2 功能包,依赖 Action 必需的消息库、C++/Python 接口:

# 创建工作空间(若已存在可跳过)
mkdir -p ~/ros2_ws/src && cd ~/ros2_ws/src# 创建功能包(命名为 action_demo,依赖 action_msgs、rclcpp、rclpy 等)
ros2 pkg create action_demo --build-type ament_cmake --dependencies action_msgs rclcpp rclpy rosidl_default_generators
cd action_demo
2. 步骤2:自定义 Action 消息(.action 文件)

在功能包下创建 action 文件夹,编写 Action 消息(包含 Goal/Feedback/Result):

# 创建 action 文件夹
mkdir action && cd action# 编写 Action 消息文件(命名为 Task.action)
echo '
# Goal:任务目标(比如巡逻的圈数)
uint32 target_count  # 目标执行次数
---
# Result:任务结果(比如实际完成次数)
uint32 completed_count
bool success
---
# Feedback:过程反馈(比如当前进度)
uint32 current_count
float32 progress  # 进度百分比(0.0~1.0)
' > Task.action
3. 步骤3:修改配置文件(让 ROS2 编译 Action 消息)

需修改 CMakeLists.txtpackage.xml,告知编译器生成 Action 相关代码:

(1)修改 CMakeLists.txt(关键部分)

action_demo/CMakeLists.txt 中添加以下内容:

# 查找依赖包
find_package(ament_cmake REQUIRED)
find_package(action_msgs REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(rosidl_default_generators REQUIRED)# 声明 Action 消息文件
set(action_files"action/Task.action"
)# 生成 Action 相关代码(C++/Python)
rosidl_generate_interfaces(${PROJECT_NAME}${action_files}DEPENDENCIES action_msgs
)# 安装 Python 脚本(后续会写 Client/Server 脚本)
install(PROGRAMSscripts/action_server.pyscripts/action_client.pyDESTINATION lib/${PROJECT_NAME}
)ament_package()
(2)修改 package.xml(添加依赖)

action_demo/package.xml 中添加:

<depend>action_msgs</depend>
<depend>rclcpp</depend>
<depend>rclpy</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
4. 步骤4:编写 Action Server/Client 脚本(Python 示例)

在功能包下创建 scripts 文件夹,编写 Server(执行任务)和 Client(发送任务):

(1)Action Server 脚本(scripts/action_server.py)
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_demo.action import Task  # 导入自定义 Action 消息
import timeclass TaskActionServer(Node):def __init__(self):super().__init__('task_action_server')# 创建 Action Server(类型:Task,名称:task_execution)self._action_server = ActionServer(self,Task,'task_execution',  # Action 服务名称(Client 需对应)self.execute_callback  # 任务执行回调函数)self.get_logger().info("Action Server 启动:等待任务请求...")def execute_callback(self, goal_handle):self.get_logger().info(f"收到任务:执行 {goal_handle.request.target_count} 次")# 模拟任务执行(循环反馈进度)completed = 0for i in range(goal_handle.request.target_count):# 检查是否收到取消请求if goal_handle.is_cancel_requested():goal_handle.canceled()self.get_logger().info("任务被取消!")result = Task.Result()result.completed_count = completedresult.success = Falsereturn result# 模拟执行耗时time.sleep(1)completed += 1# 发送进度反馈feedback = Task.Feedback()feedback.current_count = completedfeedback.progress = completed / goal_handle.request.target_countgoal_handle.publish_feedback(feedback)self.get_logger().info(f"进度:{feedback.progress:.2f}{completed}/{goal_handle.request.target_count})")# 任务完成,返回结果goal_handle.succeed()result = Task.Result()result.completed_count = completedresult.success = Trueself.get_logger().info("任务完成!")return resultdef main(args=None):rclpy.init(args=args)action_server = TaskActionServer()rclpy.spin(action_server)rclpy.shutdown()if __name__ == '__main__':main()
(2)Action Client 脚本(scripts/action_client.py)
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_demo.action import Taskclass TaskActionClient(Node):def __init__(self):super().__init__('task_action_client')# 创建 Action Client(对应 Server 的服务名称:task_execution)self._action_client = ActionClient(self, Task, 'task_execution')self.get_logger().info("Action Client 启动:准备发送任务...")def send_goal(self, target_count):# 等待 Server 上线if not self._action_client.wait_for_server(timeout_sec=5.0):self.get_logger().error("未找到 Action Server!")return# 构造任务目标(Goal)goal_msg = Task.Goal()goal_msg.target_count = target_count  # 设定执行次数(比如 3 次)# 发送目标,并注册反馈回调(接收进度)和结果回调(接收最终结果)self._send_goal_future = self._action_client.send_goal_async(goal_msg,feedback_callback=self.feedback_callback)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info("任务被 Server 拒绝!")returnself.get_logger().info("任务被 Server 接受,等待执行...")# 等待任务结果self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def feedback_callback(self, feedback_msg):# 接收进度反馈feedback = feedback_msg.feedbackself.get_logger().info(f"收到反馈:进度 {feedback.progress:.2f}{feedback.current_count} 次)")def get_result_callback(self, future):result = future.result().resultif result.success:self.get_logger().info(f"任务成功!实际执行 {result.completed_count} 次")else:self.get_logger().info(f"任务失败!实际执行 {result.completed_count} 次")# 任务结束,关闭节点rclpy.shutdown()def main(args=None):rclpy.init(args=args)action_client = TaskActionClient()# 发送任务:执行 3 次(可修改为其他数字)action_client.send_goal(3)rclpy.spin(action_client)if __name__ == '__main__':main()
5. 步骤5:编译与运行 Action Server/Client
# 回到工作空间根目录编译
cd ~/ros2_ws
colcon build --packages-select action_demo
# 加载环境变量(每次新终端都要执行)
source install/setup.bash# 终端1:运行 Action Server
ros2 run action_demo action_server.py# 终端2:运行 Action Client(发送 3 次任务)
ros2 run action_demo action_client.py
6. Action 常用辅助命令(调试用)
# 1. 列出当前活跃的 Action 服务
ros2 action list# 2. 查看某个 Action 的消息结构(比如我们的 task_execution)
ros2 action show action_demo/action/Task# 3. 直接用命令行发送 Action Goal(无需写 Client 脚本,快速测试)
ros2 action send_goal --feedback /task_execution action_demo/action/Task "{target_count: 2}"
# --feedback 参数:实时显示进度反馈

二、MoveIt! 示例(基于 Action 接口,机械臂运动规划)

MoveIt! 已内置 MoveGroupAction 接口(无需自定义 Action),核心命令是“启动 MoveIt! 节点 + 发送运动目标”,以下是常见场景示例(假设你已生成机械臂的 MoveIt! 配置包,命名为 my_robot_moveit_config,机械臂模型为 panda 或自定义模型)。

前提:生成 MoveIt! 配置包(若未生成)

若还没有 MoveIt! 配置包,先通过 moveit_setup_assistant 生成(基于 URDF 模型):

# 安装 MoveIt!(ROS2 Humble 示例)
sudo apt install ros-humble-moveit# 启动 MoveIt! 配置助手
ros2 run moveit_setup_assistant moveit_setup_assistant

→ 按照向导加载 URDF 文件,生成配置包(命名为 my_robot_moveit_config),放入 ros2_ws/src 并编译。

示例1:启动 MoveIt! 核心节点 + RViz 可视化

这是最基础的命令,启动 MoveGroup 节点(Action Server)和 RViz(加载 MoveIt! 配置):

# 编译配置包(若未编译)
cd ~/ros2_ws
colcon build --packages-select my_robot_moveit_config
source install/setup.bash# 启动 MoveIt! + RViz(通过 launch 文件,不同配置包的 launch 文件名可能不同)
# 格式:ros2 launch [配置包名] [launch文件名]
ros2 launch my_robot_moveit_config move_group.launch.py
# 或同时启动 RViz(部分配置包的 launch 会自动加载 RViz)
ros2 launch my_robot_moveit_config demo.launch.py

→ 启动后 RViz 会显示机械臂模型,MoveGroup 节点作为 Action Server 等待运动目标。

示例2:用命令行发送 MoveIt! Action Goal(快速测试运动)

MoveIt! 的核心 Action 服务名称为 move_action,消息类型为 moveit_msgs/action/MoveGroup,可直接用 ros2 action send_goal 发送简单运动目标(比如“机械臂回到初始位置”):

# 前提:已启动 MoveIt! 核心节点(示例1的命令)
# 发送“回到初始位置”的目标(joint_state 为关节角度,根据自己的机械臂调整)
ros2 action send_goal --feedback /move_action moveit_msgs/action/MoveGroup "{goal_request: {group_name: 'panda_arm',  # 机械臂组名(在 MoveIt! 配置中定义)goal_constraints: [{joint_constraints: [{joint_name: 'panda_joint1', position: 0.0},{joint_name: 'panda_joint2', position: -0.785},{joint_name: 'panda_joint3', position: 0.0},{joint_name: 'panda_joint4', position: -2.356},{joint_name: 'panda_joint5', position: 0.0},{joint_name: 'panda_joint6', position: 1.571},{joint_name: 'panda_joint7', position: 0.785}]}]}
}"

→ 发送后,RViz 中的机械臂会按目标关节角度运动,命令行会显示“规划中→执行中→成功”的反馈。

示例3:用 Python 脚本发送 MoveIt! Action 目标(实际开发常用)

通过 moveit_commander(MoveIt! 的 Python 接口)简化 Action 调用,无需手动构造复杂的 Action 消息:

# 安装 moveit_commander(ROS2 Humble)
sudo apt install ros-humble-moveit-commander

编写脚本 scripts/moveit_demo.py(放入 my_robot_moveit_config/scripts,或新建功能包):

import rclpy
from moveit_commander import MoveGroupCommanderdef main(args=None):rclpy.init(args=args)node = rclpy.create_node('moveit_demo_node')# 创建 MoveGroup 实例(对应机械臂组名,需与 MoveIt! 配置一致)move_group = MoveGroupCommander("panda_arm", node=node)# 1. 设定目标:回到初始位置(关节空间)node.get_logger().info("移动到初始位置...")joint_goal = move_group.get_current_joint_values()joint_goal[0] = 0.0    # panda_joint1joint_goal[1] = -0.785 # panda_joint2joint_goal[2] = 0.0    # panda_joint3joint_goal[3] = -2.356 # panda_joint4joint_goal[4] = 0.0    # panda_joint5joint_goal[5] = 1.571  # panda_joint6joint_goal[6] = 0.785  # panda_joint7# 发送目标并等待完成(超时 5 秒)move_group.go(joint_goal, wait=True)# 停止运动(防止残留速度)move_group.stop()# 2. 设定目标:笛卡尔空间运动(机械臂末端移动到指定坐标)node.get_logger().info("笛卡尔空间移动...")pose_goal = move_group.get_current_pose().posepose_goal.position.x = 0.5  # X 坐标(根据自己的机械臂工作空间调整)pose_goal.position.y = 0.0  # Y 坐标pose_goal.position.z = 0.5  # Z 坐标move_group.set_pose_target(pose_goal)# 规划并执行(返回是否成功)success = move_group.go(wait=True)move_group.stop()move_group.clear_pose_targets()  # 清除目标if success:node.get_logger().info("运动成功!")else:node.get_logger().error("运动失败!")rclpy.shutdown()if __name__ == '__main__':main()

运行脚本:

# 终端1:启动 MoveIt! 核心节点 + RViz
ros2 launch my_robot_moveit_config demo.launch.py# 终端2:运行 Python 脚本(发送运动目标)
ros2 run my_robot_moveit_config moveit_demo.py

三、核心命令总结

功能场景Action 控制命令MoveIt! 命令
列出活跃服务ros2 action listros2 action list(服务名通常为 /move_action
查看消息结构ros2 action show <包名>/action/<消息名>ros2 action show moveit_msgs/action/MoveGroup
命令行发送目标ros2 action send_goal --feedback <服务名> <消息类型> "<目标数据>"ros2 action send_goal --feedback /move_action moveit_msgs/action/MoveGroup "<目标数据>"
启动核心节点ros2 run <功能包名> <server脚本>ros2 launch <moveit配置包> move_group.launch.py
代码调用(Python)rclpy.action.ActionClient 发送目标moveit_commander.MoveGroupCommander 简化调用

关键注意事项

  1. ROS2 版本适配:MoveIt! 的命令和包名随 ROS2 版本略有差异(如 Humble 是 ros-humble-moveit,Iron 是 ros-iron-moveit),需对应版本安装;
  2. MoveIt! 配置包:必须先基于 URDF 生成 moveit_config 包,否则无法启动 MoveGroup 节点;
  3. 机械臂组名:MoveIt! 的 group_name 需在配置包中定义(比如 panda_arm 是 Panda 机械臂的默认组名),需与自己的配置一致;
  4. 环境变量:每次新终端都要执行 source install/setup.bash,否则无法找到包或节点。
http://www.dtcms.com/a/597888.html

相关文章:

  • 个人建什么网站最赚钱智慧团建网站登录平台pc端
  • 使用新版本cpu编译vLLM cpu端(编译完成,但是SCNet公网转发8000端口后,连不上)
  • 双浩建设网站aspnet网站开发pdf
  • 云南网站制作公司在线生成短链接
  • 上海企业网站制作报价燕郊网站建设
  • 东航数科开源软件治理体系的建设实践”荣获OSCAR开源+安全及风险治理案例
  • 查企业免费的网站郑州同济医院妇科怎么样
  • 基于SAM2的眼动数据跟踪2.1——修正目标消失的记录方式
  • 网站开发包含网站维护吗建设一个网站可以采用那几方案
  • 【C++】--模板进阶
  • 如何选择企业网站建设wordpress 自动跳转
  • 设计深圳网站制作新北方app下载
  • 【Janet】函数
  • 【微服务 - easy视频 | day04】Seata解决分布式事务
  • 网站关键词没有排名怎么用ip做网站
  • Jmeter超详细使用教程
  • 北京网站优化技术学科分类目录
  • 网站源码下载安全吗找一个免费域名的网站
  • 【Git、GitHub、Gitee】GitLab的概念、注册流程、远程仓库操作以及高级功能详解(超详细)
  • 2025三掌柜赠书活动第四十一期 AI Agent 开发实战:MCP+A2A+LangGraph 驱动的智能体全流程开发
  • 1 NLP导论及环境准备
  • 龙岩做网站开发大概价格网页软件有哪些
  • 设计软件网站wordpress付费看
  • C#中,FirstOrDefault
  • 【INVSR 代码解析】encode_first_stage函数,以及一个知识点普通编码器与VAE编码器的区别
  • 面试题:说说Redis的三大问题和解决方案
  • 大型企业网站wordpress评论框制作
  • EtherCAT通信PDO和SDO的区别和使用
  • dedecms本地可以更换网站模板出现网站模板不存在3800给做网站
  • 漯河哪里做网站柳州市住房和城乡建设局网站首页