当前位置: 首页 > news >正文

【代码讲解】Rerun 实时数据采集并显示 + 手柄控制 6 轴机械臂运动

【代码讲解】Rerun 实时记录并显示 + 手柄控制 6 轴机械臂运动数据

Mujoco部分:https://github.com/LitchiCheng/mujoco-learning

SO-ARM100部分:https://github.com/LitchiCheng/SO-ARM100

本期介绍下使用 Rerun 在线实时记录并显示,手柄控制6个关节,以及实际6个关节的数据

将lerobot中的rerun部分提取出来,保证数据格式类似,方便后面的数据集统一,如下为lerobot的rerun部分

def _init_rerun(session_name: str = "lerobot_control_loop") -> None:"""Initializes the Rerun SDK for visualizing the control loop."""batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_sizerr.init(session_name)memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")rr.spawn(memory_limit=memory_limit)def log_rerun_data(observation: dict[str | Any], action: dict[str | Any]):for obs, val in observation.items():if isinstance(val, float):rr.log(f"observation.{obs}", rr.Scalars(val))elif isinstance(val, np.ndarray):if val.ndim == 1:for i, v in enumerate(val):rr.log(f"observation.{obs}_{i}", rr.Scalars(float(v)))else:rr.log(f"observation.{obs}", rr.Image(val), static=True)for act, val in action.items():if isinstance(val, float):rr.log(f"action.{act}", rr.Scalars(val))elif isinstance(val, np.ndarray):for i, v in enumerate(val):rr.log(f"action.{act}_{i}", rr.Scalars(float(v)))

将机械臂被控时的控制量记为action,实际当前位置记为observation,如下为完整代码。


import zmq
import json
import sys
import os
import time
import math
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from hardware import FeetechMotor as fmfrom typing import Any
import numpy as np
import rerun as rrmotors_key=["shoulder_pan","shoulder_lift","elbow_flex","wrist_flex","wrist_roll","gripper"
]def _init_rerun(session_name: str = "lerobot_control_loop") -> None:"""Initializes the Rerun SDK for visualizing the control loop."""batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_sizerr.init(session_name)memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")rr.spawn(memory_limit=memory_limit)def log_rerun_data(observation: dict[str | Any], action: dict[str | Any]):for obs, val in observation.items():if isinstance(val, float):rr.log(f"observation.{obs}", rr.Scalars(val))elif isinstance(val, np.ndarray):if val.ndim == 1:for i, v in enumerate(val):rr.log(f"observation.{obs}_{i}", rr.Scalars(float(v)))else:rr.log(f"observation.{obs}", rr.Image(val), static=True)for act, val in action.items():if isinstance(val, float):rr.log(f"action.{act}", rr.Scalars(val))elif isinstance(val, np.ndarray):for i, v in enumerate(val):rr.log(f"action.{act}_{i}", rr.Scalars(float(v)))# 初始化机械臂
motor = fm.FeetechMotor(1, "/dev/ttyACM0")
motor.connect()# # ZMQ配置
# ZMQ_IP = "127.0.0.1"
# ZMQ_PORT = "5555"
# context = zmq.Context()
# socket = context.socket(zmq.SUB)
# socket.connect(f"tcp://{ZMQ_IP}:{ZMQ_PORT}")
# # 订阅所有消息(空字符串表示订阅所有主题)
# socket.setsockopt_string(zmq.SUBSCRIBE, "")# print(f"等待仿真端发布数据... 连接到:tcp://{ZMQ_IP}:{ZMQ_PORT}")# ZMQ IPC配置 - 使用进程间通信协议
ZMQ_IPC_PATH = "ipc:///tmp/robot_arm_comm.ipc"  # 与发布者相同的IPC路径
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(ZMQ_IPC_PATH)  # 连接到IPC路径
socket.setsockopt_string(zmq.SUBSCRIBE, "")  # 订阅所有消息print(f"控制端订阅者启动,连接到:{ZMQ_IPC_PATH}")
_init_rerun()try:while True:data = socket.recv_string().strip()if not data:continueaction = json.loads(data)joint_pos_deg = [val for key, val in action.items() if key.endswith(".pos")]joint_now_pos = []# 发送控制指令给实际机械臂for i in range(1, 7):motor.setMotorId(i)motor.setSpeed(1000)  # 设置目标速度为1000步/秒motor.printFlag(False)  # 打开打印if i == 1 or i == 2:motor.setPID(16, 16, 0)elif i == 5:motor.setPID(16, 16, 0)else:   motor.setPID(32, 32, 0)  # 设置PID参数motor.setPosition(math.degrees(joint_pos_deg[i - 1]))joint_now_pos.append(math.radians(motor.getPosition()))observation = {f"{motor}.pos": val for motor, val in zip(motors_key, joint_now_pos)}log_rerun_data(observation, action)except KeyboardInterrupt:print("程序被用户中断")
except Exception as e:print(f"接收/控制错误:{e}")
finally:# 关闭连接与机械臂socket.close()context.term()motor.disconnect()

http://www.dtcms.com/a/416599.html

相关文章:

  • 设计企业门户网站新手做网站流程
  • h5网站页面网页制作工具可分为
  • 微网站建设哪家便宜设计之家官网首页
  • 油猴,悬浮评论区窗口模版
  • 东莞网站建设求职棋牌软件开发定制软件
  • 做网站应该注意什么外贸推广服务公司
  • 网站建设机构培训wordpress高亮
  • 宣传工作网站建设作用网站备案名称的影响
  • 网站制作前需要进行规划设计wordpress主题修改教程
  • 微网站建设资讯wordpress 禁止升级
  • 内容网站模板电竞网站建设方案
  • dedecms 网站名称标签好的php网站
  • 湖南建设厅网站设计开发流程
  • 外贸网站虚拟主机昆明经济技术开发区官方门户网站
  • 风云格式工厂 6.8.0.apk:
  • 春节网站怎么做typecho还是WordPress
  • 公司建一个网站要多少钱wordpress源码解析
  • 没内容的网站怎么优化免费表白网站制作
  • 有用的网站地址无锡专业网站制作
  • 【ROS学习笔记】ROS1和ROS2对比
  • 郑州网站提升排名wordpress文章页面添加打赏
  • 江阴公司做网站wordpress 插件 手机
  • 海口网站建设方案报价百度产品优化排名软件
  • 怎么样建设一个电影网站视频下载免费行情的软件大全下载
  • 微信上优惠券的网站怎么做的苏州360推广 网站建设
  • 附近有木有做网站扁平化企业网站模板
  • 上海电商网站开发公司网站微信建设运维经验
  • 网站系统建设方案站嗨建站
  • 个人做电影网站违法吗杭州市住建局官网
  • 用宝塔面板快速构建LNMP环境