当前位置: 首页 > news >正文

LeRobot 项目部署运行逻辑(四)——control_robot.py

在上一篇中已经记录了标准的 ALOHA 配置流程,但实验室使用的 Mobile ALOHA 硬件有区别,所以要进一步拆解代码

Lerobot 的操作代码基本都在 lerobot/scripts 里面,每个功能命名都很清晰明了:

硬件齐套的情况下,首先就是标定然后遥操作,就是运行此脚本

目前 huggingface 上有很多开源数据集可以直接使用:Lerobot

我们基本都清楚具身智能是 data-driven 的,基本流程就是数据采集-可视化筛选-策略训练-策略评估四个基本流程,而 control_robot.py 是为其中的第一步和第四步

因此,control_robot.py 是最重要的脚本之一,也包含了非常多功能

  • 机器人校准(Calibrate)

  • 遥操作(Teleoperate)

  • 数据集录制(Record)

  • 数据回放(Replay)

  • 远程机器人操作(Remote Robot)

再来说下 Mobile ALOHA,目前来说已经有非常多版本,很多相机和舵机不一样,或者说有一些简化版本 so100 之类的,但是都是可以通用的,流程是一样的,但是要根据硬件做简单更改和适配

目录

1 命令行接口

2 代码详解

2.1 核心组件和函数

2.2 库引用

2.3 主函数

2.4 机器人校准 calibrate(robot, cfg)

2.5 机器人遥操作 teleoperate(robot, cfg)

*2.6 数据集录制 record(robot, cfg)

2.7 数据回放 replay(robot, cfg)

2.8 可视化初始化 _init_rerun(control_config, session_name)

3 运行效果


1 命令行接口

脚本在模块顶部给出了全部的调用示例,常见命令:

# 标定机器人
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=calibrate# 无限制高频率遥操作(~200Hz ),CTRL+C 退出
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=teleoperate# 屏幕显示相机图像
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=teleoperate--control.display_data=True# 限频到 30Hz(模拟数据录制频率)
python control_robot.py \--robot.type=aloha \--control.type=teleoperate \--control.fps=30# 记录单个episode,用于测试回放
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=record \--control.fps=30 \--control.single_task="Grasp a block and put it in the box." \--control.repo_id=yejiangchen\grasp_test \--control.num_episodes=1 \--control.push_to_hub=False- 数据集可视化
python lerobot/scripts/visualize_dataset.py \--repo-id yejiangchen\grasp_test \--episode-index 0# 回放刚才记录的第0号episode
python lerobot/scripts/control_robot.py replay \--robot.type=aloha \--control.type=replay \--control.fps=30 \--control.repo_id=yejiangchen\grasp_test \--control.episode=0

***最重要命令:

最常用的就两条:

1. 数据采集

python lerobot/scripts/control_robot.py  \--robot.type=aloha \--control.type=record \--control.fps=30 \  --control.repo_id=lerobot/eval_act_aloha_new_test \--control.warmup_time_s=3 \--control.episode_time_s=30 \--control.reset_time_s=5 \ --control.num_episodes=10 \--control.push_to_hub=false  --control.policy.path=outputs/train/act_square_into_box_new/checkpoints/last/pretrained_model \--control.num_image_writer_processes=1

2. 真机部署

# 真机部署训练好的策略
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=record \--control.fps=30 \--control.repo_id=lerobot/eval_act_aloha_new_test \--control.warmup_time_s=3 \--control.episode_time_s=30 \--control.reset_time_s=5 \--control.num_episodes=10 \--control.push_to_hub=false  --control.policy.path=outputs/train/act_square_into_box_new/checkpoints/150000/pretrained_model \--control.num_image_writer_processes=1

2 代码详解

2.1 核心组件和函数

核心组件:

  • Robot:表示抽象的机器人设备,提供连接、断开、发送动作等方法
  • LeRobotDataset:处理数据集的创建、加载、保存、上传
  • make_policy:用于加载预训练的策略模型,以便在录制时自动控制机器人
  • rerun (rr): 一个用于实时数据可视化的框架
  • parser: 处理命令行参数输入,动态生成配置对象

核心函数:

  • calibrate() :机器人校准,删除已有校准文件,强制重新校准
  • teleoperate() :手动遥操作机器人,实时显示数据
  • record():录制机器人操作数据,支持自动策略与手动控制
  • replay():重放已录制数据,用于验证

2.2 库引用

import logging                 # 标准日志库,用于输出调试和运行信息
import os                      # 操作系统功能,用于环境变量和路径操作
import time                    # 时间库,用于计时和延时
from dataclasses import asdict # 将 dataclass 对象转换为字典,便于打印配置
from pprint import pformat     # 格式化打印复杂数据结构import rerun as rr             # Rerun SDK,用于实时可视化和数据调试# 可选安全张量 I/O (视需求启用)
# from safetensors.torch import load_file, save_filefrom lerobot.common.datasets.lerobot_dataset import LeRobotDataset  # 数据集类,支持创建和加载
from lerobot.common.policies.factory import make_policy              # 策略工厂,创建预训练策略实例
from lerobot.common.robot_devices.control_configs import (CalibrateControlConfig,   # 校准模式配置类ControlConfig,            # 基础控制配置类ControlPipelineConfig,    # 控制管道顶层配置RecordControlConfig,      # 数据录制模式配置类RemoteRobotConfig,        # 远程控制模式配置类ReplayControlConfig,      # 数据重放模式配置类TeleoperateControlConfig  # 遥操作模式配置类
)
from lerobot.common.robot_devices.control_utils import (control_loop,                       # 通用控制循环函数,用于遥操作和数据回放init_keyboard_listener,             # 初始化键盘监听器,捕获用户按键事件is_headless,                        # 检查是否在无头环境(无显示)运行log_control_info,                   # 记录控制循环的延时和 FPS 信息record_episode,                     # 录制单个 episode 数据reset_environment,                  # 重置机器人环境sanity_check_dataset_name,          # 校验数据集命名是否合法sanity_check_dataset_robot_compatibility, # 校验数据集与机器人配置兼容性stop_recording,                     # 停止录制并清理资源warmup_record                       # 录制前的预热过程
)
from lerobot.common.robot_devices.robots.utils import Robot, make_robot_from_config  # 机器人工厂,根据配置创建机器人实例
from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect       # 忙等工具和安全断开装饰器
from lerobot.common.utils.utils import has_method, init_logging, log_say        # 通用工具函数
from lerobot.configs import parser                  # 命令行参数解析器

2.3 主函数

@parser.wrap()
def control_robot(cfg: ControlPipelineConfig):"""脚本入口:根据解析后的配置选择对应控制模式并执行。"""init_logging()  # 初始化日志系统logging.info(pformat(asdict(cfg)))  # 打印完整配置# 创建机器人实例robot = make_robot_from_config(cfg.robot)# 根据 control 类型分发到不同函数if isinstance(cfg.control, CalibrateControlConfig):calibrate(robot, cfg.control)elif isinstance(cfg.control, TeleoperateControlConfig):_init_rerun(cfg.control, session_name="lerobot_control_loop_teleop")teleoperate(robot, cfg.control)elif isinstance(cfg.control, RecordControlConfig):_init_rerun(cfg.control, session_name="lerobot_control_loop_record")record(robot, cfg.control)elif isinstance(cfg.control, ReplayControlConfig):replay(robot, cfg.control)elif isinstance(cfg.control, RemoteRobotConfig):# 远程 LeKiwifrom lerobot.common.robot_devices.robots.lekiwi_remote import run_lekiwi_init_rerun(cfg.control, session_name="lerobot_control_loop_remote")run_lekiwi(cfg.robot)# 结束时安全断开连接,避免摄像头线程残留导致 core dumpif robot.is_connected:robot.disconnect()if __name__ == "__main__":# 执行主函数control_robot()

1. 初始化日志系统

2. 根据命令行参数动态创建 robot 实例

3. 根据 cfg.control 类型确定执行模式:

  • 校准模式 -> calibrate()
  • 遥操作模式 -> teleoperate()
  • 录制模式 -> record()
  • 回放模式 -> replay()
  • 远程控制模式 -> 调用run_lekiwi()

完成操作后确保机器人安全断开连接

2.4 机器人校准 calibrate(robot, cfg)

1. 检查机器人类型:

  • 若是stretch,则检查是否已连接、归零(home)
  • 若是lekiwi,分别调用calibrate_follower()或calibrate_leader()

2. 其他类型则删除原校准文件,重新调用连接方法,自动执行校准流程。

########################################################################################
# 控制模式函数
########################################################################################@safe_disconnect
def calibrate(robot: Robot, cfg: CalibrateControlConfig):"""校准模式:删除旧校准文件并执行校准。对不同机器人类型存在特殊逻辑。"""# Stretch 机器人类型:连接后直接执行 homingif robot.robot_type.startswith("stretch"):if not robot.is_connected:robot.connect()  # 先连接机器人if not robot.is_homed():robot.home()     # 执行归零return# 确定需校准的臂列表arms = robot.available_arms if cfg.arms is None else cfg.armsunknown_arms = [arm_id for arm_id in arms if arm_id not in robot.available_arms]available_arms_str = " ".join(robot.available_arms)unknown_arms_str = " ".join(unknown_arms)# 若未提供臂或提供无效臂,则抛出错误提示可用臂if arms is None or len(arms) == 0:raise ValueError("未指定臂,请使用 `--arms` 参数。可用臂:``{}``".format(available_arms_str))if unknown_arms:raise ValueError("未知臂 `{}`,可用臂:``{}``".format(unknown_arms_str, available_arms_str))# 删除对应臂的旧校准文件for arm_id in arms:arm_calib_path = robot.calibration_dir / f"{arm_id}.json"if arm_calib_path.exists():print(f"移除旧校准文件: {arm_calib_path}")arm_calib_path.unlink()else:print(f"未找到校准文件: {arm_calib_path}")# 若已连接,则先断开以确保后续重新连接时执行校准if robot.is_connected:robot.disconnect()# LeKiwi 机器人:分别处理主从臂校准if robot.robot_type.startswith("lekiwi") and "main_follower" in arms:print("校准 LeKiwi 从臂 main_follower...")robot.calibrate_follower()returnif robot.robot_type.startswith("lekiwi") and "main_leader" in arms:print("校准 LeKiwi 主臂 main_leader...")robot.calibrate_leader()return# 默认流程:连接机器人时若缺少校准文件会自动执行校准robot.connect()robot.disconnect()print("校准完成,可开始遥操作和数据录制。")

PS:源码中标定好的文件还蛮好用的,ALOHA不建议更换哈

2.5 机器人遥操作 teleoperate(robot, cfg)

调用通用的 control_loop() 方法,支持实时遥操作

显示实时机器人状态与传感器数据

@safe_disconnect
def teleoperate(robot: Robot, cfg: TeleoperateControlConfig):"""遥操作模式:启动控制循环,实时手动控制机器人。"""control_loop(robot,control_time_s=cfg.teleop_time_s,  # 遥操作持续时间fps=cfg.fps,                       # 控制频率teleoperate=True,                  # 启用手动模式display_data=cfg.display_data,     # 是否显示传感器/相机数据)

*2.6 数据集录制 record(robot, cfg)

1. 支持断点续录(resume参数)

2. 根据设置创建或加载 LeRobotDataset 对象

3. 初始化键盘监听,支持录制过程中实时交互

  • 右箭头→:提前结束当前环节(录制或重置环境)
  • 左箭头←:重录当前 episode
  • ESC键:终止数据录制

4. 支持加载策略自动控制机器人(若有)

5. 通过 init_keyboard_listener() 捕获键盘输入,动态控制录制流程

6. 流程步骤:

  • 预热阶段(Warmup):手动或策略控制,提供准备时间
  • 录制阶段:按照设定时长、帧率记录 episode
  • 重置环境阶段:提供时间重置机器人环境
  • 上传数据(可选):录制完成后推送数据至 HuggingFace Hub
@safe_disconnect
def record(robot: Robot, cfg: RecordControlConfig) -> LeRobotDataset:"""数据录制模式:- 支持断点续录- 创建或加载 LeRobotDataset- 根据配置可选地加载预训练策略- 支持键盘交互控制录制流程- 录制完成可选上传到 HuggingFace Hub"""if cfg.resume:# 续录:加载已有数据集dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)# 若有摄像头则启动并行图像写入if robot.cameras:dataset.start_image_writer(num_processes=cfg.num_image_writer_processes,num_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),)# 校验数据集与机器人配置兼容sanity_check_dataset_robot_compatibility(dataset, robot, cfg.fps, cfg.video)else:# 新录制:校验名称,创建数据集sanity_check_dataset_name(cfg.repo_id, cfg.policy)dataset = LeRobotDataset.create(cfg.repo_id,cfg.fps,root=cfg.root,robot=robot,use_videos=cfg.video,image_writer_processes=cfg.num_image_writer_processes,image_writer_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),)# 加载策略(如指定)policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)# 确保机器人已连接if not robot.is_connected:robot.connect()# 初始化键盘监听,支持以下快捷键:# 右箭头 -> 结束当前录制/重置流程# 左箭头 <- 重新录制当前 episode# Esc 键 退出录制listener, events = init_keyboard_listener()# 预热阶段:手动或策略驱动,给机器人和摄像头启动准备时间enable_teleoperation = (policy is None)log_say("预热录制", cfg.play_sounds)warmup_record(robot, events, enable_teleoperation, cfg.warmup_time_s, cfg.display_data, cfg.fps)# 如果机器人支持安全停止,触发安全停止函数if has_method(robot, "teleop_safety_stop"):robot.teleop_safety_stop()recorded_episodes = 0# 进入录制循环while recorded_episodes < cfg.num_episodes:log_say(f"录制 Episode {dataset.num_episodes}", cfg.play_sounds)record_episode(robot=robot,dataset=dataset,events=events,episode_time_s=cfg.episode_time_s,display_data=cfg.display_data,policy=policy,fps=cfg.fps,single_task=cfg.single_task,)# 若未按停止且需重置环境,则重置if not events["stop_recording"] and (recorded_episodes < cfg.num_episodes - 1 or events["rerecord_episode"]):log_say("重置环境", cfg.play_sounds)reset_environment(robot, events, cfg.reset_time_s, cfg.fps)# 处理重录事件if events["rerecord_episode"]:log_say("重新录制当前 Episode", cfg.play_sounds)events["rerecord_episode"] = Falseevents["exit_early"] = Falsedataset.clear_episode_buffer()  # 清除缓存continue# 保存已录制数据dataset.save_episode()recorded_episodes += 1# 若按停止键,则退出if events["stop_recording"]:break# 结束录制并清理资源log_say("停止录制", cfg.play_sounds, blocking=True)stop_recording(robot, listener, cfg.display_data)# 若标记推送,则上传到 HuggingFace Hubif cfg.push_to_hub:dataset.push_to_hub(tags=cfg.tags, private=cfg.private)log_say("录制流程结束,退出。", cfg.play_sounds)return dataset

2.7 数据回放 replay(robot, cfg)

加载指定的 episode 数据

按照录制动作序列逐帧向机器人发送动作,以验证数据质量

@safe_disconnect
def replay(robot: Robot, cfg: ReplayControlConfig):"""数据重放模式:按帧读取动作并发送给机器人,以回放录制的 Episode。"""# 加载指定 Episode 的数据集dataset = LeRobotDataset(cfg.repo_id, root=cfg.root, episodes=[cfg.episode])# 从 HF 数据集中提取动作列actions = dataset.hf_dataset.select_columns("action")# 确保连接if not robot.is_connected:robot.connect()log_say("开始重放 Episode", cfg.play_sounds, blocking=True)# 按帧循环重放for idx in range(dataset.num_frames):start_t = time.perf_counter()  # 记录起始时间action = actions[idx]["action"]  # 获取动作字典robot.send_action(action)         # 发送给机器人# 控制频率:1/fpsdt = time.perf_counter() - start_tbusy_wait(max(0, 1 / cfg.fps - dt))# 记录时延和 FPStotal_dt = time.perf_counter() - start_tlog_control_info(robot, total_dt, fps=cfg.fps)

2.8 可视化初始化 _init_rerun(control_config, session_name)

根据配置确定是否本地启动或连接远程rerun

支持远程机器人配置,传输数据至远程显示器

# Rerun 可视化初始化函数
def _init_rerun(control_config: ControlConfig, session_name: str = "lerobot_control_loop") -> None:"""根据配置启动或连接到 Rerun 可视化会话。Args:control_config: 控制模式配置,用于判断显示参数session_name: 会话名称,默认为 'lerobot_control_loop'Raises:ValueError: 当远程模式且未配置查看器地址时抛出"""# 需要显示数据且非无头模式,或远程模式下也允许if (control_config.display_data and not is_headless()) or (control_config.display_data and isinstance(control_config, RemoteRobotConfig)):# 设置环境变量控制批量发送大小batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_sizerr.init(session_name)  # 初始化本地 Rerunif isinstance(control_config, RemoteRobotConfig):viewer_ip = control_config.viewer_ipviewer_port = control_config.viewer_portif not viewer_ip or not viewer_port:raise ValueError("远程模式需要设置 viewer_ip 和 viewer_port,或关闭 display_data。")logging.info(f"连接远程 Rerun 查看器:{viewer_ip}:{viewer_port}")rr.connect_tcp(f"{viewer_ip}:{viewer_port}")else:# 本地模式下根据环境变量或默认启动查看器memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")rr.spawn(memory_limit=memory_limit)

3 运行效果

PS:遥操作过程可以打开摄像头查看图像数据

相关文章:

  • 使用 Spring Data Redis 实现 Redis 数据存储详解
  • L35.【LeetCode题解】转置矩阵(C语言)
  • 11.Spring Boot 3.1.5 中使用 SpringDoc OpenAPI(替代 Swagger)生成 API 文档
  • 2025.4.28 Vue.js 学习笔记
  • Rancher 2.6.3企业级容器管理平台部署实践
  • 百家号等新媒体私信入口是否可以聚合到企业微信的客服,如何实现
  • E. Unpleasant Strings【Educational Codeforces Round 178 (Rated for Div. 2)】
  • SpringAI实现AI应用-搭建知识库
  • 核心技能:ArcGIS洪水灾害普查、风险评估及淹没制图
  • 【数学建模国奖速成系列】优秀论文绘图复现代码(三)
  • X²+1素数素数
  • 《Python实战进阶》 No46:CPython的GIL与多线程优化
  • 直播美颜SDK是什么?跨平台美颜SDK开发与接入全解析
  • errorno 和WSAGetlasterror的区别
  • Java写数据结构:队列
  • [CPCTF 2025] Crypto
  • 西门子PLC S7-1200电动机软启动、软停止的控制实例
  • nvm for windows 安装低版本 node 丢失 npm 安装
  • Kubernetes Ingress 深度解析
  • Java @Transactional事物隔离级别和默认值详解
  • 八成盈利,2024年沪市主板公司实现净利润4.35万亿元
  • 夜读丨春天要去动物园
  • 史学巨擘的思想地图与学术路径——王汎森解析梁启超、陈寅恪、傅斯年
  • 荆州市委书记汪元程:全市各级干部要做到慎微、慎初、慎独、慎友
  • 浙江官宣:五一假期,没电、没气、没油车辆全部免费拖离高速
  • 发出“美利坚名存实亡”呼号的卡尼,将带领加拿大走向何方?