(二)deepseek控制机械臂-机械臂提示词设置测试
为了能够实现deepseek生成新的机械臂位置,需要deepseek输出特定格式的位置。
参考开源工程https://github.com/SummerPer/LLM-Based-Control-of-Robotic-Arms
主要需要设置的
(1)api的key
(2)模型类型
(3)提示词
1.python实现代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenAI API接口脚本
用于将自然语言指令转换为机械臂控制坐标,并集成ROS2功能
"""import sys
import json
import os
import time
import requests
import logging
from openai import OpenAI
from typing import Dict, List, Any
from geometry_msgs.msg import PoseStamped# 打印调试信息
def print_debug_info(message):print(f"[调试] {message}", file=sys.stderr)class OpenAIInterface():"""OpenAI API接口类,用于将自然语言转换为机械臂控制指令,并集成ROS2功能"""def __init__(self, model: str = "deepseek-chat", base_url: str = "https://api.deepseek.com"):"""初始化OpenAI客户端和ROS2节点Args:model: 使用的OpenAI模型名称,默认使用deepseek-chatbase_url: API端点URL,默认为https://api.deepseek.com"""# 初始化ROS2节点# super().__init__("openai_api_interface")# 从环境变量获取API密钥self.api_key = os.environ.get('OPENAI_API_KEY')if not self.api_key:raise ValueError("未提供OpenAI API密钥,请通过环境变量OPENAI_API_KEY设置")self.model = modelself.base_url = base_url# 初始化OpenAI客户端,使用指定的base_urlself.client = OpenAI(api_key="DEEPSEEK API KEY", base_url="https://api.deepseek.com")# 订阅末端执行器位置话题self.pose_topic_name = "/end_effector_pose"#self.current_pose = Noneself.current_pose = [0,0,0,0,0,0,1]# self.pose_subscription = self.create_subscription(# PoseStamped,# self.pose_topic_name,# self.pose_callback,# 10# )self.get_logger().info(f"订阅末端执行器位置话题: {self.pose_topic_name}")# 设置系统提示词self.system_prompt = ("你是一个机械臂控制助手,负责将自然语言指令转换为机械臂坐标。\n""你的回答应该只包含一个JSON对象,格式如下:\n""{\n" \" \"position\": [x, y, z, qx, qy, qz, qw],\n" \" \"gripper\": \"open\"或\"close\",\n" \" \"description\": \"对指令的解释\"\n" \"}\n" \"其中x, y, z是位置坐标,qx, qy, qz, qw是四元数表示的姿态,gripper是夹爪状态。\n" \"不要输出任何其他文字!\n" \"如果提供了当前位置信息,请根据当前位置规划合理的运动路径。\n" \"现在请处理用户的指令。")# 等待获取初始位置信息#self.wait_for_initial_pose()def get_logger(self):"""创建一个配置好的日志记录器。"""# 获取一个日志记录器,使用当前模块的名字(__name__)是个好习惯logger = logging.getLogger(__name__)# 设置日志记录器的级别,只有级别大于等于此的日志才会被处理logger.setLevel(logging.INFO)# 如果该记录器还没有处理器,才添加,避免重复记录:cite[7]if not logger.handlers:# 创建一个控制台处理器ch = logging.StreamHandler()ch.setLevel(logging.INFO)# 创建日志格式formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')ch.setFormatter(formatter)# 将处理器添加到日志记录器logger.addHandler(ch)return logger def pose_callback(self, msg):"""位置话题回调函数"""# 保存当前位置信息self.current_pose = [msg.pose.position.x,msg.pose.position.y,msg.pose.position.z,msg.pose.orientation.x,msg.pose.orientation.y,msg.pose.orientation.z,msg.pose.orientation.w]self.get_logger().info(f"接收到末端执行器位置: x={msg.pose.position.x:.4f}, y={msg.pose.position.y:.4f}, z={msg.pose.position.z:.4f}")def convert_natural_language_to_position(self, command: str, current_position: List[float] = None) -> Dict[str, Any]:"""将自然语言指令转换为机械臂坐标Args:command: 自然语言指令current_position: 末端执行器当前位置 [x, y, z, qx, qy, qz, qw],可选Returns:包含位置、夹爪状态和描述的字典"""try:# 优先使用节点中保存的当前位置,如果没有则使用传入的位置或默认位置pose_to_use = self.current_pose if self.current_pose is not None else (current_position if current_position is not None else [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])# 构建用户消息,包含当前位置信息user_message = commanduser_message += f"\n\n当前末端执行器位置: {pose_to_use}"self.get_logger().info(f"正在调用OpenAI API,发送指令: {command[:50]}...")self.get_logger().debug(f"发送的完整消息: {user_message}")# 调用OpenAI APIresponse = self.client.chat.completions.create(model=self.model,messages=[{"role": "system", "content": self.system_prompt},{"role": "user", "content": user_message}],response_format={"type": "json_object"},temperature=0.7 # 控制输出的随机性)# 获取响应内容content = response.choices[0].message.content.strip()self.get_logger().debug(f"OpenAI响应内容: {content}")# 添加INFO级别日志记录原始响应,用于调试JSON解析问题self.get_logger().info(f"原始OpenAI响应(前200字符): {content[:200]}...")# 解析JSON响应result = json.loads(content)# 验证结果格式if not self._validate_response(result):raise ValueError("OpenAI返回的响应格式不正确")return resultexcept json.JSONDecodeError:self.get_logger().error("无法解析OpenAI的响应为JSON格式")raiseexcept Exception as e:self.get_logger().error(f"调用OpenAI API时发生错误: {str(e)}")raisedef _validate_response(self, response: Dict[str, Any]) -> bool:"""验证OpenAI返回的响应格式是否严格符合ControlRvizArm服务要求Args:response: OpenAI返回的响应字典Returns:格式是否正确"""# 检查必要字段是否存在if not all(key in response for key in ['position', 'gripper', 'description']):self.get_logger().error(f"响应缺少必要字段,当前字段: {list(response.keys())}")return False# 检查position是否为长度为7的数组(严格符合ControlRvizArm服务的7元素float64[]要求)if not isinstance(response['position'], list) or len(response['position']) != 7:self.get_logger().error(f"position格式错误,必须是长度为7的数组,当前: {response['position']}")return False# 检查position中的元素是否为数字(确保符合float64[]类型要求)try:# 尝试将所有元素转换为浮点数,确保类型兼容性for i, val in enumerate(response['position']):float(val)except (ValueError, TypeError):self.get_logger().error(f"position数组中包含非数字元素: {response['position']}")return False# 检查gripper是否为有效的值(严格符合ControlRvizArm服务的open_or_close字符串要求)if response['gripper'] not in ['open', 'close']:self.get_logger().error(f"gripper值无效,必须是'open'或'close',当前: {response['gripper']}")return Falsereturn Truedef main():"""主函数,启动对话模式"""try:# 获取命令行参数(如果有)model="deepseek-chat"base_url ="https://api.deepseek.com"# 创建OpenAI接口实例if base_url:openai_interface = OpenAIInterface(model=model, base_url=base_url)else:openai_interface = OpenAIInterface(model=model)print("机械臂对话控制系统已启动!")print("输入自然语言指令控制机械臂,输入'退出'或'quit'结束会话")print("\n示例指令:")print("- 移动到桌子中央")print("- 向上移动10厘米")print("- 抓取前方物体\n")# 启动对话循环while True:try:# 获取用户输入command = input("请输入指令: ")# 检查退出条件if command.lower() in ['退出', 'quit', 'exit']:print("感谢使用机械臂对话控制系统,再见!")break# 处理空输入if not command.strip():continue# 转换自然语言指令为机械臂坐标openai_interface.get_logger().info(f"处理自然语言指令: {command}")result = openai_interface.convert_natural_language_to_position(command)# 输出结果到控制台openai_interface.get_logger().info(f"OpenAI处理结果: {json.dumps(result)}")print(f"\n执行结果: {result['description']}")print(f"目标位置: {result['position'][:3]}")print(f"夹爪状态: {result['gripper']}\n")except KeyboardInterrupt:print("\n程序被用户中断")breakexcept Exception as e:print(f"处理指令时发生错误: {str(e)}")print("请尝试重新输入指令或检查系统状态")sys.exit(0)except Exception as e:# 输出错误信息print(f"错误: {str(e)}", file=sys.stderr)sys.exit(1)finally:print("\n退出")if __name__ == "__main__":main()
2.输入输出
输入输出如图