当前位置: 首页 > news >正文

机器人控制器开发(通讯——建图和导航模式切换)

文章总览

1 方案描述

机器人控制器需要拥有建图模式和定位导航模式任意切换,并且能够保存地图和加载不同地图,为了方便调度和上层系统对接,通常可以使用websocket进行通讯,实现对应的模式切换和请求。

2 开发节点管理服务

下面是websockt服务代码:

#!/usr/bin/env python3
import asyncio
import websockets
import json
import subprocess
import signal
import os
import time
from typing import Optionalclass ROS2ModeManager:def __init__(self):self.current_process: Optional[subprocess.Popen] = Noneself.current_pid: Optional[int] = Noneself.current_mode: str = "navigation"  # 初始模式为导航def start_navigation(self):"""启动导航模式"""# 终止当前进程(如果存在)self.kill_current_process()# 启动nav2_bringuptry:process = subprocess.Popen(["ros2", "launch", "nav2_bringup", "bringup_launch.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,preexec_fn=os.setsid  # 创建新的进程组,便于终止整个进程树)self.current_process = processself.current_pid = process.pidself.current_mode = "navigation"print(f"导航模式已启动,PID: {self.current_pid}")return True, self.current_pidexcept Exception as e:print(f"启动导航模式失败: {e}")return False, Nonedef start_mapping(self):"""启动建图模式"""# 终止当前进程(如果存在)self.kill_current_process()# 启动cartographer_rostry:process = subprocess.Popen(["ros2", "launch", "cartographer_ros", "backpack_2d.launch.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,preexec_fn=os.setsid  # 创建新的进程组,便于终止整个进程树)self.current_process = processself.current_pid = process.pidself.current_mode = "mapping"print(f"建图模式已启动,PID: {self.current_pid}")return True, self.current_pidexcept Exception as e:print(f"启动建图模式失败: {e}")return False, Nonedef save_mapping(self, map_name):"""保存地图功能:param map_name: 地图名称:return: (成功与否, 进程PID)"""try:# 确保地图目录存在map_dir = "/home/cat/map"if not os.path.exists(map_dir):os.makedirs(map_dir)print(f"创建地图目录: {map_dir}")# 构建pbstream文件路径pbstream_path = os.path.join(map_dir, "map.pbstream")# 检查pbstream文件是否存在if not os.path.exists(pbstream_path):error_msg = f"pbstream文件不存在: {pbstream_path}"print(error_msg)return False, error_msg# 执行地图保存命令process = subprocess.Popen(["ros2", "run", "cartographer_ros", "cartographer_pbstream_to_ros_map",f"-map_filestem={map_name}",f"-pbstream_filename={pbstream_path}","-resolution=0.02"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,cwd=map_dir,  # 在工作目录中运行,文件将保存在这里preexec_fn=os.setsid)# 等待命令完成stdout, stderr = process.communicate()if process.returncode == 0:print(f"地图保存成功: {map_name}")# 检查生成的文件pgm_file = os.path.join(map_dir, f"{map_name}.pgm")yaml_file = os.path.join(map_dir, f"{map_name}.yaml")if os.path.exists(pgm_file) and os.path.exists(yaml_file):return True, f"地图已保存为: {pgm_file}{yaml_file}"else:return False, "地图文件生成失败"else:error_msg = f"地图保存失败: {stderr.decode().strip()}"print(error_msg)return False, error_msgexcept Exception as e:error_msg = f"保存地图时发生异常: {str(e)}"print(error_msg)return False, error_msgdef kill_current_process(self):"""终止当前进程"""if self.current_process is not None:try:# 终止整个进程组os.killpg(os.getpgid(self.current_process.pid), signal.SIGTERM)self.current_process.wait(timeout=5)  # 等待进程终止print(f"已终止进程(PID: {self.current_pid})")except subprocess.TimeoutExpired:print("进程正常终止超时,尝试强制终止")try:os.killpg(os.getpgid(self.current_process.pid), signal.SIGKILL)except ProcessLookupError:print("进程已终止")except ProcessLookupError:print("进程已终止")except Exception as e:print(f"终止进程时出错: {e}")finally:self.current_process = Noneself.current_pid = Nonedef get_status(self):"""获取当前状态"""return {"mode": self.current_mode,"pid": self.current_pid,"status": "running" if self.current_process is not None else "stopped"}# 创建模式管理器实例
mode_manager = ROS2ModeManager()async def handle_websocket(websocket):"""处理WebSocket连接和消息"""print(f"客户端连接成功")try:async for message in websocket:try:data = json.loads(message)command = data.get("command")response = {}if command == "start_mapping":# 切换到建图模式success, pid = mode_manager.start_mapping()response = {"command": "start_mapping","success": success,"pid": pid,"mode": "mapping"}elif command == "stop_mapping":# 切换回导航模式success, pid = mode_manager.start_navigation()response = {"command": "stop_mapping","success": success,"pid": pid,"mode": "navigation"}elif command == "status":# 获取当前状态status = mode_manager.get_status()response = {"command": "status","success": True,"status": status}elif command == "save_mapping":# 保存地图map_name = data.get("map_name", "default_map")  # 获取地图名称,默认为default_mapsuccess, result = mode_manager.save_mapping(map_name)response = {"command": "save_mapping","success": success,"message": result,"map_name": map_name}else:response = {"command": command,"success": False,"error": "未知命令"}# 发送响应await websocket.send(json.dumps(response))except json.JSONDecodeError:error_response = {"success": False,"error": "无效的JSON格式"}await websocket.send(json.dumps(error_response))except Exception as e:error_response = {"success": False,"error": f"处理请求时出错: {str(e)}"}await websocket.send(json.dumps(error_response))print(f"处理消息时出错: {e}")except websockets.exceptions.ConnectionClosed:print("客户端断开连接")except Exception as e:print(f"WebSocket连接处理错误: {e}")async def main():# 默认启动导航模式print("正在启动默认导航模式...")success, pid = mode_manager.start_navigation()if success:print(f"导航模式已启动,PID: {pid}")else:print("导航模式启动失败")# 即使导航启动失败,也继续启动WebSocket服务器# 启动WebSocket服务器server = await websockets.serve(handle_websocket, "0.0.0.0", 8765)print("WebSocket服务器已启动,监听端口 8765")print("等待连接...")# 保持服务器运行await server.wait_closed()if __name__ == "__main__":# 确保正常清理进程def signal_handler(sig, frame):print("\n正在清理进程...")mode_manager.kill_current_process()exit(0)signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)try:# 运行主程序asyncio.run(main())except KeyboardInterrupt:print("\n程序被用户中断")signal_handler(signal.SIGINT, None)except Exception as e:print(f"程序运行出错: {e}")signal_handler(signal.SIGTERM, None)

3 测试

安装wscat

sudo npm install -g wscat

连接服务器

wscat -c ws://localhost:8765

发送websocket请求命令

{"command": "start_mapping"}    // 切换到建图模式
{"command": "stop_mapping"}     // 切换回导航模式
{"command": "status"}           // 获取当前状态

在这里插入图片描述
定位导航模式
在这里插入图片描述
建图模式
在这里插入图片描述
结束建图后,保存地图

切换地图


文章转载自:

http://I7BOL9al.gynLc.cn
http://8gn910Vk.gynLc.cn
http://Pc9Xg6KG.gynLc.cn
http://wXYGGZJy.gynLc.cn
http://OaPUDU2k.gynLc.cn
http://f2tJEmVN.gynLc.cn
http://1apz3fvZ.gynLc.cn
http://xBRyF1Bm.gynLc.cn
http://FSBDOkD9.gynLc.cn
http://Uv2FfEhz.gynLc.cn
http://3ejd4FdH.gynLc.cn
http://wGTaIa6H.gynLc.cn
http://DBrxvC9J.gynLc.cn
http://FjRcDXZE.gynLc.cn
http://vaXC8I6U.gynLc.cn
http://GIvFZt0q.gynLc.cn
http://Wqg64k86.gynLc.cn
http://5nSOUWeC.gynLc.cn
http://BV7aKDUf.gynLc.cn
http://vhOWIhQ2.gynLc.cn
http://T5r8WmYG.gynLc.cn
http://ahL9UhnE.gynLc.cn
http://3EsVjFmD.gynLc.cn
http://xEQO1zmU.gynLc.cn
http://w2UYvBny.gynLc.cn
http://BdiJwRiw.gynLc.cn
http://H01QP1TO.gynLc.cn
http://l4G1s2tm.gynLc.cn
http://Wx7SXI4O.gynLc.cn
http://MOIv8ghm.gynLc.cn
http://www.dtcms.com/a/386448.html

相关文章:

  • 容器化部署项目05
  • AI如何深度驱动数据资产入表业务开展-一线经验
  • Chromium 138 编译指南 macOS 篇:Xcode 与开发工具安装配置(二)
  • 网络.1 UDP
  • 在 Mac 环境安装全局默认版本 Python
  • 小迪安全v2023学习笔记(八十三讲)—— 组件安全JacksonFastJsonXStreamCVE复现
  • 鲁能大师显卡跑分计算规则,与算力对应关系?
  • 边缘智能的“隐形引擎”——TinyML 模型在 ARM Cortex-M 系列上的极致量化与加速实战
  • kernel32.dll如何修复?科普kernel32.dll缺失的故障问题的多种解决方法
  • git推送远程仓库
  • 「日拱一码」091 机器学习——集成学习
  • MTK Linux Charger驱动分析(五)- adapter_class.c
  • HarmonyOS 5.0应用开发——V2装饰器@once的使用
  • Day25_【深度学习(3)—PyTorch使用(5)—张量形状操作】
  • 微服务分布式事务解决方案梳理
  • 告别“长明灯”——塔能科技地下车库照明改造的极致节能
  • vue 3 阿里云视频播放器 如何让用户自己给视频添加节点
  • LinkedList 底层实现与 ArrayList 对比分析
  • 滚珠花键在半导体制造设备中承担怎样的核心功能?
  • 服装制造企业痛点解决方案:EFISH-SBC-RK3588 柔性化吊挂调度方案
  • 10cm钢板矫平机:工业制造中的“整形医生”
  • html表单登录模式代码
  • QUIC 协议域名封堵:核心原理、关键技术与实现路径(C/C++代码实现)
  • 8 基于机器学习进行遥感影像的地物分类-以随机森林为例
  • Qt读写SQLite示例
  • Jmeter性能测试之阶梯式场景、波浪式场景、服务器监控
  • 黄昏时刻复古胶片风格人像风光摄影后期Lr调色教程,手机滤镜PS+Lightroom预设下载!
  • Django ORM多对多关系实战指南
  • 【从零开始java学习|第十七篇】面向对象进阶
  • Three.js 开发实战教程(一):环境搭建与第一个 3D 场景