机器人控制器开发(通讯——建图和导航模式切换)
文章总览
1 方案描述
机器人控制器需要拥有建图模式和定位导航模式任意切换,并且能够保存地图和加载不同地图,为了方便调度和上层系统对接,通常可以使用websocket进行通讯,实现对应的模式切换和请求。
2 开发节点管理服务
下面是websockt服务代码:
#!/usr/bin/env python3
import asyncio
import websockets
import json
import subprocess
import signal
import os
import time
from typing import Optionalclass ROS2ModeManager:def __init__(self):self.current_process: Optional[subprocess.Popen] = Noneself.current_pid: Optional[int] = Noneself.current_mode: str = "navigation" # 初始模式为导航def start_navigation(self):"""启动导航模式"""# 终止当前进程(如果存在)self.kill_current_process()# 启动nav2_bringuptry:process = subprocess.Popen(["ros2", "launch", "nav2_bringup", "bringup_launch.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,preexec_fn=os.setsid # 创建新的进程组,便于终止整个进程树)self.current_process = processself.current_pid = process.pidself.current_mode = "navigation"print(f"导航模式已启动,PID: {self.current_pid}")return True, self.current_pidexcept Exception as e:print(f"启动导航模式失败: {e}")return False, Nonedef start_mapping(self):"""启动建图模式"""# 终止当前进程(如果存在)self.kill_current_process()# 启动cartographer_rostry:process = subprocess.Popen(["ros2", "launch", "cartographer_ros", "backpack_2d.launch.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,preexec_fn=os.setsid # 创建新的进程组,便于终止整个进程树)self.current_process = processself.current_pid = process.pidself.current_mode = "mapping"print(f"建图模式已启动,PID: {self.current_pid}")return True, self.current_pidexcept Exception as e:print(f"启动建图模式失败: {e}")return False, Nonedef save_mapping(self, map_name):"""保存地图功能:param map_name: 地图名称:return: (成功与否, 进程PID)"""try:# 确保地图目录存在map_dir = "/home/cat/map"if not os.path.exists(map_dir):os.makedirs(map_dir)print(f"创建地图目录: {map_dir}")# 构建pbstream文件路径pbstream_path = os.path.join(map_dir, "map.pbstream")# 检查pbstream文件是否存在if not os.path.exists(pbstream_path):error_msg = f"pbstream文件不存在: {pbstream_path}"print(error_msg)return False, error_msg# 执行地图保存命令process = subprocess.Popen(["ros2", "run", "cartographer_ros", "cartographer_pbstream_to_ros_map",f"-map_filestem={map_name}",f"-pbstream_filename={pbstream_path}","-resolution=0.02"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,cwd=map_dir, # 在工作目录中运行,文件将保存在这里preexec_fn=os.setsid)# 等待命令完成stdout, stderr = process.communicate()if process.returncode == 0:print(f"地图保存成功: {map_name}")# 检查生成的文件pgm_file = os.path.join(map_dir, f"{map_name}.pgm")yaml_file = os.path.join(map_dir, f"{map_name}.yaml")if os.path.exists(pgm_file) and os.path.exists(yaml_file):return True, f"地图已保存为: {pgm_file} 和 {yaml_file}"else:return False, "地图文件生成失败"else:error_msg = f"地图保存失败: {stderr.decode().strip()}"print(error_msg)return False, error_msgexcept Exception as e:error_msg = f"保存地图时发生异常: {str(e)}"print(error_msg)return False, error_msgdef kill_current_process(self):"""终止当前进程"""if self.current_process is not None:try:# 终止整个进程组os.killpg(os.getpgid(self.current_process.pid), signal.SIGTERM)self.current_process.wait(timeout=5) # 等待进程终止print(f"已终止进程(PID: {self.current_pid})")except subprocess.TimeoutExpired:print("进程正常终止超时,尝试强制终止")try:os.killpg(os.getpgid(self.current_process.pid), signal.SIGKILL)except ProcessLookupError:print("进程已终止")except ProcessLookupError:print("进程已终止")except Exception as e:print(f"终止进程时出错: {e}")finally:self.current_process = Noneself.current_pid = Nonedef get_status(self):"""获取当前状态"""return {"mode": self.current_mode,"pid": self.current_pid,"status": "running" if self.current_process is not None else "stopped"}# 创建模式管理器实例
mode_manager = ROS2ModeManager()async def handle_websocket(websocket):"""处理WebSocket连接和消息"""print(f"客户端连接成功")try:async for message in websocket:try:data = json.loads(message)command = data.get("command")response = {}if command == "start_mapping":# 切换到建图模式success, pid = mode_manager.start_mapping()response = {"command": "start_mapping","success": success,"pid": pid,"mode": "mapping"}elif command == "stop_mapping":# 切换回导航模式success, pid = mode_manager.start_navigation()response = {"command": "stop_mapping","success": success,"pid": pid,"mode": "navigation"}elif command == "status":# 获取当前状态status = mode_manager.get_status()response = {"command": "status","success": True,"status": status}elif command == "save_mapping":# 保存地图map_name = data.get("map_name", "default_map") # 获取地图名称,默认为default_mapsuccess, result = mode_manager.save_mapping(map_name)response = {"command": "save_mapping","success": success,"message": result,"map_name": map_name}else:response = {"command": command,"success": False,"error": "未知命令"}# 发送响应await websocket.send(json.dumps(response))except json.JSONDecodeError:error_response = {"success": False,"error": "无效的JSON格式"}await websocket.send(json.dumps(error_response))except Exception as e:error_response = {"success": False,"error": f"处理请求时出错: {str(e)}"}await websocket.send(json.dumps(error_response))print(f"处理消息时出错: {e}")except websockets.exceptions.ConnectionClosed:print("客户端断开连接")except Exception as e:print(f"WebSocket连接处理错误: {e}")async def main():# 默认启动导航模式print("正在启动默认导航模式...")success, pid = mode_manager.start_navigation()if success:print(f"导航模式已启动,PID: {pid}")else:print("导航模式启动失败")# 即使导航启动失败,也继续启动WebSocket服务器# 启动WebSocket服务器server = await websockets.serve(handle_websocket, "0.0.0.0", 8765)print("WebSocket服务器已启动,监听端口 8765")print("等待连接...")# 保持服务器运行await server.wait_closed()if __name__ == "__main__":# 确保正常清理进程def signal_handler(sig, frame):print("\n正在清理进程...")mode_manager.kill_current_process()exit(0)signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)try:# 运行主程序asyncio.run(main())except KeyboardInterrupt:print("\n程序被用户中断")signal_handler(signal.SIGINT, None)except Exception as e:print(f"程序运行出错: {e}")signal_handler(signal.SIGTERM, None)
3 测试
安装wscat
sudo npm install -g wscat
连接服务器
wscat -c ws://localhost:8765
发送websocket请求命令
{"command": "start_mapping"} // 切换到建图模式
{"command": "stop_mapping"} // 切换回导航模式
{"command": "status"} // 获取当前状态
定位导航模式
建图模式
结束建图后,保存地图
切换地图