LeRobot 项目部署运行逻辑(四)——control_robot.py
在上一篇中已经记录了标准的 ALOHA 配置流程,但实验室使用的 Mobile ALOHA 硬件有区别,所以要进一步拆解代码
Lerobot 的操作代码基本都在 lerobot/scripts 里面,每个功能命名都很清晰明了:
硬件齐套的情况下,首先就是标定然后遥操作,就是运行此脚本
目前 huggingface 上有很多开源数据集可以直接使用:Lerobot
我们基本都清楚具身智能是 data-driven 的,基本流程就是数据采集-可视化筛选-策略训练-策略评估四个基本流程,而 control_robot.py 是为其中的第一步和第四步
因此,control_robot.py 是最重要的脚本之一,也包含了非常多功能
-
机器人校准(Calibrate)
-
遥操作(Teleoperate)
-
数据集录制(Record)
-
数据回放(Replay)
-
远程机器人操作(Remote Robot)
再来说下 Mobile ALOHA,目前来说已经有非常多版本,很多相机和舵机不一样,或者说有一些简化版本 so100 之类的,但是都是可以通用的,流程是一样的,但是要根据硬件做简单更改和适配
目录
1 命令行接口
2 代码详解
2.1 核心组件和函数
2.2 库引用
2.3 主函数
2.4 机器人校准 calibrate(robot, cfg)
2.5 机器人遥操作 teleoperate(robot, cfg)
*2.6 数据集录制 record(robot, cfg)
2.7 数据回放 replay(robot, cfg)
2.8 可视化初始化 _init_rerun(control_config, session_name)
3 运行效果
1 命令行接口
脚本在模块顶部给出了全部的调用示例,常见命令:
# 标定机器人
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=calibrate# 无限制高频率遥操作(~200Hz ),CTRL+C 退出
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=teleoperate# 屏幕显示相机图像
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=teleoperate--control.display_data=True# 限频到 30Hz(模拟数据录制频率)
python control_robot.py \--robot.type=aloha \--control.type=teleoperate \--control.fps=30# 记录单个episode,用于测试回放
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=record \--control.fps=30 \--control.single_task="Grasp a block and put it in the box." \--control.repo_id=yejiangchen\grasp_test \--control.num_episodes=1 \--control.push_to_hub=False- 数据集可视化
python lerobot/scripts/visualize_dataset.py \--repo-id yejiangchen\grasp_test \--episode-index 0# 回放刚才记录的第0号episode
python lerobot/scripts/control_robot.py replay \--robot.type=aloha \--control.type=replay \--control.fps=30 \--control.repo_id=yejiangchen\grasp_test \--control.episode=0
***最重要命令:
最常用的就两条:
1. 数据采集
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=record \--control.fps=30 \ --control.repo_id=lerobot/eval_act_aloha_new_test \--control.warmup_time_s=3 \--control.episode_time_s=30 \--control.reset_time_s=5 \ --control.num_episodes=10 \--control.push_to_hub=false --control.policy.path=outputs/train/act_square_into_box_new/checkpoints/last/pretrained_model \--control.num_image_writer_processes=1
2. 真机部署
# 真机部署训练好的策略
python lerobot/scripts/control_robot.py \--robot.type=aloha \--control.type=record \--control.fps=30 \--control.repo_id=lerobot/eval_act_aloha_new_test \--control.warmup_time_s=3 \--control.episode_time_s=30 \--control.reset_time_s=5 \--control.num_episodes=10 \--control.push_to_hub=false --control.policy.path=outputs/train/act_square_into_box_new/checkpoints/150000/pretrained_model \--control.num_image_writer_processes=1
2 代码详解
2.1 核心组件和函数
核心组件:
- Robot:表示抽象的机器人设备,提供连接、断开、发送动作等方法
- LeRobotDataset:处理数据集的创建、加载、保存、上传
- make_policy:用于加载预训练的策略模型,以便在录制时自动控制机器人
- rerun (rr): 一个用于实时数据可视化的框架
- parser: 处理命令行参数输入,动态生成配置对象
核心函数:
- calibrate() :机器人校准,删除已有校准文件,强制重新校准
- teleoperate() :手动遥操作机器人,实时显示数据
- record():录制机器人操作数据,支持自动策略与手动控制
- replay():重放已录制数据,用于验证
2.2 库引用
import logging # 标准日志库,用于输出调试和运行信息
import os # 操作系统功能,用于环境变量和路径操作
import time # 时间库,用于计时和延时
from dataclasses import asdict # 将 dataclass 对象转换为字典,便于打印配置
from pprint import pformat # 格式化打印复杂数据结构import rerun as rr # Rerun SDK,用于实时可视化和数据调试# 可选安全张量 I/O (视需求启用)
# from safetensors.torch import load_file, save_filefrom lerobot.common.datasets.lerobot_dataset import LeRobotDataset # 数据集类,支持创建和加载
from lerobot.common.policies.factory import make_policy # 策略工厂,创建预训练策略实例
from lerobot.common.robot_devices.control_configs import (CalibrateControlConfig, # 校准模式配置类ControlConfig, # 基础控制配置类ControlPipelineConfig, # 控制管道顶层配置RecordControlConfig, # 数据录制模式配置类RemoteRobotConfig, # 远程控制模式配置类ReplayControlConfig, # 数据重放模式配置类TeleoperateControlConfig # 遥操作模式配置类
)
from lerobot.common.robot_devices.control_utils import (control_loop, # 通用控制循环函数,用于遥操作和数据回放init_keyboard_listener, # 初始化键盘监听器,捕获用户按键事件is_headless, # 检查是否在无头环境(无显示)运行log_control_info, # 记录控制循环的延时和 FPS 信息record_episode, # 录制单个 episode 数据reset_environment, # 重置机器人环境sanity_check_dataset_name, # 校验数据集命名是否合法sanity_check_dataset_robot_compatibility, # 校验数据集与机器人配置兼容性stop_recording, # 停止录制并清理资源warmup_record # 录制前的预热过程
)
from lerobot.common.robot_devices.robots.utils import Robot, make_robot_from_config # 机器人工厂,根据配置创建机器人实例
from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect # 忙等工具和安全断开装饰器
from lerobot.common.utils.utils import has_method, init_logging, log_say # 通用工具函数
from lerobot.configs import parser # 命令行参数解析器
2.3 主函数
@parser.wrap()
def control_robot(cfg: ControlPipelineConfig):"""脚本入口:根据解析后的配置选择对应控制模式并执行。"""init_logging() # 初始化日志系统logging.info(pformat(asdict(cfg))) # 打印完整配置# 创建机器人实例robot = make_robot_from_config(cfg.robot)# 根据 control 类型分发到不同函数if isinstance(cfg.control, CalibrateControlConfig):calibrate(robot, cfg.control)elif isinstance(cfg.control, TeleoperateControlConfig):_init_rerun(cfg.control, session_name="lerobot_control_loop_teleop")teleoperate(robot, cfg.control)elif isinstance(cfg.control, RecordControlConfig):_init_rerun(cfg.control, session_name="lerobot_control_loop_record")record(robot, cfg.control)elif isinstance(cfg.control, ReplayControlConfig):replay(robot, cfg.control)elif isinstance(cfg.control, RemoteRobotConfig):# 远程 LeKiwifrom lerobot.common.robot_devices.robots.lekiwi_remote import run_lekiwi_init_rerun(cfg.control, session_name="lerobot_control_loop_remote")run_lekiwi(cfg.robot)# 结束时安全断开连接,避免摄像头线程残留导致 core dumpif robot.is_connected:robot.disconnect()if __name__ == "__main__":# 执行主函数control_robot()
1. 初始化日志系统
2. 根据命令行参数动态创建 robot 实例
3. 根据 cfg.control 类型确定执行模式:
- 校准模式 -> calibrate()
- 遥操作模式 -> teleoperate()
- 录制模式 -> record()
- 回放模式 -> replay()
- 远程控制模式 -> 调用run_lekiwi()
完成操作后确保机器人安全断开连接
2.4 机器人校准 calibrate(robot, cfg)
1. 检查机器人类型:
- 若是stretch,则检查是否已连接、归零(home)
- 若是lekiwi,分别调用calibrate_follower()或calibrate_leader()
2. 其他类型则删除原校准文件,重新调用连接方法,自动执行校准流程。
########################################################################################
# 控制模式函数
########################################################################################@safe_disconnect
def calibrate(robot: Robot, cfg: CalibrateControlConfig):"""校准模式:删除旧校准文件并执行校准。对不同机器人类型存在特殊逻辑。"""# Stretch 机器人类型:连接后直接执行 homingif robot.robot_type.startswith("stretch"):if not robot.is_connected:robot.connect() # 先连接机器人if not robot.is_homed():robot.home() # 执行归零return# 确定需校准的臂列表arms = robot.available_arms if cfg.arms is None else cfg.armsunknown_arms = [arm_id for arm_id in arms if arm_id not in robot.available_arms]available_arms_str = " ".join(robot.available_arms)unknown_arms_str = " ".join(unknown_arms)# 若未提供臂或提供无效臂,则抛出错误提示可用臂if arms is None or len(arms) == 0:raise ValueError("未指定臂,请使用 `--arms` 参数。可用臂:``{}``".format(available_arms_str))if unknown_arms:raise ValueError("未知臂 `{}`,可用臂:``{}``".format(unknown_arms_str, available_arms_str))# 删除对应臂的旧校准文件for arm_id in arms:arm_calib_path = robot.calibration_dir / f"{arm_id}.json"if arm_calib_path.exists():print(f"移除旧校准文件: {arm_calib_path}")arm_calib_path.unlink()else:print(f"未找到校准文件: {arm_calib_path}")# 若已连接,则先断开以确保后续重新连接时执行校准if robot.is_connected:robot.disconnect()# LeKiwi 机器人:分别处理主从臂校准if robot.robot_type.startswith("lekiwi") and "main_follower" in arms:print("校准 LeKiwi 从臂 main_follower...")robot.calibrate_follower()returnif robot.robot_type.startswith("lekiwi") and "main_leader" in arms:print("校准 LeKiwi 主臂 main_leader...")robot.calibrate_leader()return# 默认流程:连接机器人时若缺少校准文件会自动执行校准robot.connect()robot.disconnect()print("校准完成,可开始遥操作和数据录制。")
PS:源码中标定好的文件还蛮好用的,ALOHA不建议更换哈
2.5 机器人遥操作 teleoperate(robot, cfg)
调用通用的 control_loop() 方法,支持实时遥操作
显示实时机器人状态与传感器数据
@safe_disconnect
def teleoperate(robot: Robot, cfg: TeleoperateControlConfig):"""遥操作模式:启动控制循环,实时手动控制机器人。"""control_loop(robot,control_time_s=cfg.teleop_time_s, # 遥操作持续时间fps=cfg.fps, # 控制频率teleoperate=True, # 启用手动模式display_data=cfg.display_data, # 是否显示传感器/相机数据)
*2.6 数据集录制 record(robot, cfg)
1. 支持断点续录(resume参数)
2. 根据设置创建或加载 LeRobotDataset 对象
3. 初始化键盘监听,支持录制过程中实时交互:
- 右箭头→:提前结束当前环节(录制或重置环境)
- 左箭头←:重录当前 episode
- ESC键:终止数据录制
4. 支持加载策略自动控制机器人(若有)
5. 通过 init_keyboard_listener() 捕获键盘输入,动态控制录制流程
6. 流程步骤:
- 预热阶段(Warmup):手动或策略控制,提供准备时间
- 录制阶段:按照设定时长、帧率记录 episode
- 重置环境阶段:提供时间重置机器人环境
- 上传数据(可选):录制完成后推送数据至 HuggingFace Hub
@safe_disconnect
def record(robot: Robot, cfg: RecordControlConfig) -> LeRobotDataset:"""数据录制模式:- 支持断点续录- 创建或加载 LeRobotDataset- 根据配置可选地加载预训练策略- 支持键盘交互控制录制流程- 录制完成可选上传到 HuggingFace Hub"""if cfg.resume:# 续录:加载已有数据集dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)# 若有摄像头则启动并行图像写入if robot.cameras:dataset.start_image_writer(num_processes=cfg.num_image_writer_processes,num_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),)# 校验数据集与机器人配置兼容sanity_check_dataset_robot_compatibility(dataset, robot, cfg.fps, cfg.video)else:# 新录制:校验名称,创建数据集sanity_check_dataset_name(cfg.repo_id, cfg.policy)dataset = LeRobotDataset.create(cfg.repo_id,cfg.fps,root=cfg.root,robot=robot,use_videos=cfg.video,image_writer_processes=cfg.num_image_writer_processes,image_writer_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),)# 加载策略(如指定)policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)# 确保机器人已连接if not robot.is_connected:robot.connect()# 初始化键盘监听,支持以下快捷键:# 右箭头 -> 结束当前录制/重置流程# 左箭头 <- 重新录制当前 episode# Esc 键 退出录制listener, events = init_keyboard_listener()# 预热阶段:手动或策略驱动,给机器人和摄像头启动准备时间enable_teleoperation = (policy is None)log_say("预热录制", cfg.play_sounds)warmup_record(robot, events, enable_teleoperation, cfg.warmup_time_s, cfg.display_data, cfg.fps)# 如果机器人支持安全停止,触发安全停止函数if has_method(robot, "teleop_safety_stop"):robot.teleop_safety_stop()recorded_episodes = 0# 进入录制循环while recorded_episodes < cfg.num_episodes:log_say(f"录制 Episode {dataset.num_episodes}", cfg.play_sounds)record_episode(robot=robot,dataset=dataset,events=events,episode_time_s=cfg.episode_time_s,display_data=cfg.display_data,policy=policy,fps=cfg.fps,single_task=cfg.single_task,)# 若未按停止且需重置环境,则重置if not events["stop_recording"] and (recorded_episodes < cfg.num_episodes - 1 or events["rerecord_episode"]):log_say("重置环境", cfg.play_sounds)reset_environment(robot, events, cfg.reset_time_s, cfg.fps)# 处理重录事件if events["rerecord_episode"]:log_say("重新录制当前 Episode", cfg.play_sounds)events["rerecord_episode"] = Falseevents["exit_early"] = Falsedataset.clear_episode_buffer() # 清除缓存continue# 保存已录制数据dataset.save_episode()recorded_episodes += 1# 若按停止键,则退出if events["stop_recording"]:break# 结束录制并清理资源log_say("停止录制", cfg.play_sounds, blocking=True)stop_recording(robot, listener, cfg.display_data)# 若标记推送,则上传到 HuggingFace Hubif cfg.push_to_hub:dataset.push_to_hub(tags=cfg.tags, private=cfg.private)log_say("录制流程结束,退出。", cfg.play_sounds)return dataset
2.7 数据回放 replay(robot, cfg)
加载指定的 episode 数据
按照录制动作序列逐帧向机器人发送动作,以验证数据质量
@safe_disconnect
def replay(robot: Robot, cfg: ReplayControlConfig):"""数据重放模式:按帧读取动作并发送给机器人,以回放录制的 Episode。"""# 加载指定 Episode 的数据集dataset = LeRobotDataset(cfg.repo_id, root=cfg.root, episodes=[cfg.episode])# 从 HF 数据集中提取动作列actions = dataset.hf_dataset.select_columns("action")# 确保连接if not robot.is_connected:robot.connect()log_say("开始重放 Episode", cfg.play_sounds, blocking=True)# 按帧循环重放for idx in range(dataset.num_frames):start_t = time.perf_counter() # 记录起始时间action = actions[idx]["action"] # 获取动作字典robot.send_action(action) # 发送给机器人# 控制频率:1/fpsdt = time.perf_counter() - start_tbusy_wait(max(0, 1 / cfg.fps - dt))# 记录时延和 FPStotal_dt = time.perf_counter() - start_tlog_control_info(robot, total_dt, fps=cfg.fps)
2.8 可视化初始化 _init_rerun(control_config, session_name)
根据配置确定是否本地启动或连接远程rerun
支持远程机器人配置,传输数据至远程显示器
# Rerun 可视化初始化函数
def _init_rerun(control_config: ControlConfig, session_name: str = "lerobot_control_loop") -> None:"""根据配置启动或连接到 Rerun 可视化会话。Args:control_config: 控制模式配置,用于判断显示参数session_name: 会话名称,默认为 'lerobot_control_loop'Raises:ValueError: 当远程模式且未配置查看器地址时抛出"""# 需要显示数据且非无头模式,或远程模式下也允许if (control_config.display_data and not is_headless()) or (control_config.display_data and isinstance(control_config, RemoteRobotConfig)):# 设置环境变量控制批量发送大小batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_sizerr.init(session_name) # 初始化本地 Rerunif isinstance(control_config, RemoteRobotConfig):viewer_ip = control_config.viewer_ipviewer_port = control_config.viewer_portif not viewer_ip or not viewer_port:raise ValueError("远程模式需要设置 viewer_ip 和 viewer_port,或关闭 display_data。")logging.info(f"连接远程 Rerun 查看器:{viewer_ip}:{viewer_port}")rr.connect_tcp(f"{viewer_ip}:{viewer_port}")else:# 本地模式下根据环境变量或默认启动查看器memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")rr.spawn(memory_limit=memory_limit)
3 运行效果
PS:遥操作过程可以打开摄像头查看图像数据