ROS2 Action 通信详解:从自定义消息到 Server/Client 实现(附 MoveIt! 联动示例)
以下示例均基于 ROS2(Humble/Iron 等主流版本),核心遵循 ROS2 的命令行规范(ros2 action/ros2 run/colcon build),分两部分讲解:
- Action 控制示例(自定义 Action + 运行 Server/Client,体现 Action 核心通信逻辑);
- MoveIt! 示例(基于其封装的 Action 接口,体现 MoveIt! 运动规划功能,与 Action 联动)。
一、Action 控制:基础命令与完整示例(自定义“任务进度”Action)
Action 的核心是“自定义 Action 消息 + 实现 Server/Client”,以下是从创建到运行的完整流程,场景:模拟“任务执行(如机器人巡逻)”,反馈实时进度,支持取消。
1. 步骤1:创建功能包(依赖 Action 核心库)
首先创建 ROS2 功能包,依赖 Action 必需的消息库、C++/Python 接口:
# 创建工作空间(若已存在可跳过)
mkdir -p ~/ros2_ws/src && cd ~/ros2_ws/src# 创建功能包(命名为 action_demo,依赖 action_msgs、rclcpp、rclpy 等)
ros2 pkg create action_demo --build-type ament_cmake --dependencies action_msgs rclcpp rclpy rosidl_default_generators
cd action_demo
2. 步骤2:自定义 Action 消息(.action 文件)
在功能包下创建 action 文件夹,编写 Action 消息(包含 Goal/Feedback/Result):
# 创建 action 文件夹
mkdir action && cd action# 编写 Action 消息文件(命名为 Task.action)
echo '
# Goal:任务目标(比如巡逻的圈数)
uint32 target_count # 目标执行次数
---
# Result:任务结果(比如实际完成次数)
uint32 completed_count
bool success
---
# Feedback:过程反馈(比如当前进度)
uint32 current_count
float32 progress # 进度百分比(0.0~1.0)
' > Task.action
3. 步骤3:修改配置文件(让 ROS2 编译 Action 消息)
需修改 CMakeLists.txt 和 package.xml,告知编译器生成 Action 相关代码:
(1)修改 CMakeLists.txt(关键部分)
在 action_demo/CMakeLists.txt 中添加以下内容:
# 查找依赖包
find_package(ament_cmake REQUIRED)
find_package(action_msgs REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(rosidl_default_generators REQUIRED)# 声明 Action 消息文件
set(action_files"action/Task.action"
)# 生成 Action 相关代码(C++/Python)
rosidl_generate_interfaces(${PROJECT_NAME}${action_files}DEPENDENCIES action_msgs
)# 安装 Python 脚本(后续会写 Client/Server 脚本)
install(PROGRAMSscripts/action_server.pyscripts/action_client.pyDESTINATION lib/${PROJECT_NAME}
)ament_package()
(2)修改 package.xml(添加依赖)
在 action_demo/package.xml 中添加:
<depend>action_msgs</depend>
<depend>rclcpp</depend>
<depend>rclpy</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
4. 步骤4:编写 Action Server/Client 脚本(Python 示例)
在功能包下创建 scripts 文件夹,编写 Server(执行任务)和 Client(发送任务):
(1)Action Server 脚本(scripts/action_server.py)
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_demo.action import Task # 导入自定义 Action 消息
import timeclass TaskActionServer(Node):def __init__(self):super().__init__('task_action_server')# 创建 Action Server(类型:Task,名称:task_execution)self._action_server = ActionServer(self,Task,'task_execution', # Action 服务名称(Client 需对应)self.execute_callback # 任务执行回调函数)self.get_logger().info("Action Server 启动:等待任务请求...")def execute_callback(self, goal_handle):self.get_logger().info(f"收到任务:执行 {goal_handle.request.target_count} 次")# 模拟任务执行(循环反馈进度)completed = 0for i in range(goal_handle.request.target_count):# 检查是否收到取消请求if goal_handle.is_cancel_requested():goal_handle.canceled()self.get_logger().info("任务被取消!")result = Task.Result()result.completed_count = completedresult.success = Falsereturn result# 模拟执行耗时time.sleep(1)completed += 1# 发送进度反馈feedback = Task.Feedback()feedback.current_count = completedfeedback.progress = completed / goal_handle.request.target_countgoal_handle.publish_feedback(feedback)self.get_logger().info(f"进度:{feedback.progress:.2f}({completed}/{goal_handle.request.target_count})")# 任务完成,返回结果goal_handle.succeed()result = Task.Result()result.completed_count = completedresult.success = Trueself.get_logger().info("任务完成!")return resultdef main(args=None):rclpy.init(args=args)action_server = TaskActionServer()rclpy.spin(action_server)rclpy.shutdown()if __name__ == '__main__':main()
(2)Action Client 脚本(scripts/action_client.py)
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_demo.action import Taskclass TaskActionClient(Node):def __init__(self):super().__init__('task_action_client')# 创建 Action Client(对应 Server 的服务名称:task_execution)self._action_client = ActionClient(self, Task, 'task_execution')self.get_logger().info("Action Client 启动:准备发送任务...")def send_goal(self, target_count):# 等待 Server 上线if not self._action_client.wait_for_server(timeout_sec=5.0):self.get_logger().error("未找到 Action Server!")return# 构造任务目标(Goal)goal_msg = Task.Goal()goal_msg.target_count = target_count # 设定执行次数(比如 3 次)# 发送目标,并注册反馈回调(接收进度)和结果回调(接收最终结果)self._send_goal_future = self._action_client.send_goal_async(goal_msg,feedback_callback=self.feedback_callback)self._send_goal_future.add_done_callback(self.goal_response_callback)def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info("任务被 Server 拒绝!")returnself.get_logger().info("任务被 Server 接受,等待执行...")# 等待任务结果self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)def feedback_callback(self, feedback_msg):# 接收进度反馈feedback = feedback_msg.feedbackself.get_logger().info(f"收到反馈:进度 {feedback.progress:.2f}({feedback.current_count} 次)")def get_result_callback(self, future):result = future.result().resultif result.success:self.get_logger().info(f"任务成功!实际执行 {result.completed_count} 次")else:self.get_logger().info(f"任务失败!实际执行 {result.completed_count} 次")# 任务结束,关闭节点rclpy.shutdown()def main(args=None):rclpy.init(args=args)action_client = TaskActionClient()# 发送任务:执行 3 次(可修改为其他数字)action_client.send_goal(3)rclpy.spin(action_client)if __name__ == '__main__':main()
5. 步骤5:编译与运行 Action Server/Client
# 回到工作空间根目录编译
cd ~/ros2_ws
colcon build --packages-select action_demo
# 加载环境变量(每次新终端都要执行)
source install/setup.bash# 终端1:运行 Action Server
ros2 run action_demo action_server.py# 终端2:运行 Action Client(发送 3 次任务)
ros2 run action_demo action_client.py
6. Action 常用辅助命令(调试用)
# 1. 列出当前活跃的 Action 服务
ros2 action list# 2. 查看某个 Action 的消息结构(比如我们的 task_execution)
ros2 action show action_demo/action/Task# 3. 直接用命令行发送 Action Goal(无需写 Client 脚本,快速测试)
ros2 action send_goal --feedback /task_execution action_demo/action/Task "{target_count: 2}"
# --feedback 参数:实时显示进度反馈
二、MoveIt! 示例(基于 Action 接口,机械臂运动规划)
MoveIt! 已内置 MoveGroupAction 接口(无需自定义 Action),核心命令是“启动 MoveIt! 节点 + 发送运动目标”,以下是常见场景示例(假设你已生成机械臂的 MoveIt! 配置包,命名为 my_robot_moveit_config,机械臂模型为 panda 或自定义模型)。
前提:生成 MoveIt! 配置包(若未生成)
若还没有 MoveIt! 配置包,先通过 moveit_setup_assistant 生成(基于 URDF 模型):
# 安装 MoveIt!(ROS2 Humble 示例)
sudo apt install ros-humble-moveit# 启动 MoveIt! 配置助手
ros2 run moveit_setup_assistant moveit_setup_assistant
→ 按照向导加载 URDF 文件,生成配置包(命名为 my_robot_moveit_config),放入 ros2_ws/src 并编译。
示例1:启动 MoveIt! 核心节点 + RViz 可视化
这是最基础的命令,启动 MoveGroup 节点(Action Server)和 RViz(加载 MoveIt! 配置):
# 编译配置包(若未编译)
cd ~/ros2_ws
colcon build --packages-select my_robot_moveit_config
source install/setup.bash# 启动 MoveIt! + RViz(通过 launch 文件,不同配置包的 launch 文件名可能不同)
# 格式:ros2 launch [配置包名] [launch文件名]
ros2 launch my_robot_moveit_config move_group.launch.py
# 或同时启动 RViz(部分配置包的 launch 会自动加载 RViz)
ros2 launch my_robot_moveit_config demo.launch.py
→ 启动后 RViz 会显示机械臂模型,MoveGroup 节点作为 Action Server 等待运动目标。
示例2:用命令行发送 MoveIt! Action Goal(快速测试运动)
MoveIt! 的核心 Action 服务名称为 move_action,消息类型为 moveit_msgs/action/MoveGroup,可直接用 ros2 action send_goal 发送简单运动目标(比如“机械臂回到初始位置”):
# 前提:已启动 MoveIt! 核心节点(示例1的命令)
# 发送“回到初始位置”的目标(joint_state 为关节角度,根据自己的机械臂调整)
ros2 action send_goal --feedback /move_action moveit_msgs/action/MoveGroup "{goal_request: {group_name: 'panda_arm', # 机械臂组名(在 MoveIt! 配置中定义)goal_constraints: [{joint_constraints: [{joint_name: 'panda_joint1', position: 0.0},{joint_name: 'panda_joint2', position: -0.785},{joint_name: 'panda_joint3', position: 0.0},{joint_name: 'panda_joint4', position: -2.356},{joint_name: 'panda_joint5', position: 0.0},{joint_name: 'panda_joint6', position: 1.571},{joint_name: 'panda_joint7', position: 0.785}]}]}
}"
→ 发送后,RViz 中的机械臂会按目标关节角度运动,命令行会显示“规划中→执行中→成功”的反馈。
示例3:用 Python 脚本发送 MoveIt! Action 目标(实际开发常用)
通过 moveit_commander(MoveIt! 的 Python 接口)简化 Action 调用,无需手动构造复杂的 Action 消息:
# 安装 moveit_commander(ROS2 Humble)
sudo apt install ros-humble-moveit-commander
编写脚本 scripts/moveit_demo.py(放入 my_robot_moveit_config/scripts,或新建功能包):
import rclpy
from moveit_commander import MoveGroupCommanderdef main(args=None):rclpy.init(args=args)node = rclpy.create_node('moveit_demo_node')# 创建 MoveGroup 实例(对应机械臂组名,需与 MoveIt! 配置一致)move_group = MoveGroupCommander("panda_arm", node=node)# 1. 设定目标:回到初始位置(关节空间)node.get_logger().info("移动到初始位置...")joint_goal = move_group.get_current_joint_values()joint_goal[0] = 0.0 # panda_joint1joint_goal[1] = -0.785 # panda_joint2joint_goal[2] = 0.0 # panda_joint3joint_goal[3] = -2.356 # panda_joint4joint_goal[4] = 0.0 # panda_joint5joint_goal[5] = 1.571 # panda_joint6joint_goal[6] = 0.785 # panda_joint7# 发送目标并等待完成(超时 5 秒)move_group.go(joint_goal, wait=True)# 停止运动(防止残留速度)move_group.stop()# 2. 设定目标:笛卡尔空间运动(机械臂末端移动到指定坐标)node.get_logger().info("笛卡尔空间移动...")pose_goal = move_group.get_current_pose().posepose_goal.position.x = 0.5 # X 坐标(根据自己的机械臂工作空间调整)pose_goal.position.y = 0.0 # Y 坐标pose_goal.position.z = 0.5 # Z 坐标move_group.set_pose_target(pose_goal)# 规划并执行(返回是否成功)success = move_group.go(wait=True)move_group.stop()move_group.clear_pose_targets() # 清除目标if success:node.get_logger().info("运动成功!")else:node.get_logger().error("运动失败!")rclpy.shutdown()if __name__ == '__main__':main()
运行脚本:
# 终端1:启动 MoveIt! 核心节点 + RViz
ros2 launch my_robot_moveit_config demo.launch.py# 终端2:运行 Python 脚本(发送运动目标)
ros2 run my_robot_moveit_config moveit_demo.py
三、核心命令总结
| 功能场景 | Action 控制命令 | MoveIt! 命令 |
|---|---|---|
| 列出活跃服务 | ros2 action list | ros2 action list(服务名通常为 /move_action) |
| 查看消息结构 | ros2 action show <包名>/action/<消息名> | ros2 action show moveit_msgs/action/MoveGroup |
| 命令行发送目标 | ros2 action send_goal --feedback <服务名> <消息类型> "<目标数据>" | ros2 action send_goal --feedback /move_action moveit_msgs/action/MoveGroup "<目标数据>" |
| 启动核心节点 | ros2 run <功能包名> <server脚本> | ros2 launch <moveit配置包> move_group.launch.py |
| 代码调用(Python) | 用 rclpy.action.ActionClient 发送目标 | 用 moveit_commander.MoveGroupCommander 简化调用 |
关键注意事项
- ROS2 版本适配:MoveIt! 的命令和包名随 ROS2 版本略有差异(如 Humble 是
ros-humble-moveit,Iron 是ros-iron-moveit),需对应版本安装; - MoveIt! 配置包:必须先基于 URDF 生成
moveit_config包,否则无法启动 MoveGroup 节点; - 机械臂组名:MoveIt! 的
group_name需在配置包中定义(比如panda_arm是 Panda 机械臂的默认组名),需与自己的配置一致; - 环境变量:每次新终端都要执行
source install/setup.bash,否则无法找到包或节点。
