1. HIL-SERL简介与核心思想
HIL-SERL(《Precise and Dexterous Robotic Manipulation via Human-in-the-Loop Reinforcement Learning》)是一种面向真实机器人场景的高效强化学习算法框架。它通过融合人类演示、奖励分类器和在线人工干预,极大提升了机器人在复杂任务中的学习效率与安全性。HIL-SERL的核心流程包括:
- 离线人类演示与奖励分类器训练:首先采集少量人类遥操作演示数据,并基于视觉观测训练奖励分类器,实现自动化的成功检测与奖励分配。
- 分布式Actor-Learner在线训练:采用分布式SAC(Soft Actor Critic)架构,Actor在物理机器人上探索,Learner在服务器端异步更新策略权重。
- 人机交互与安全干预:训练过程中,操作者可随时通过游戏手柄或键盘介入,实时修正策略的危险或无效行为,保障硬件安全并加速策略收敛。
HIL-SERL适用于多种机器人操作任务,尤其在样本采集成本高、硬件安全要求高的场景下表现突出。通过合理配置工作空间边界、裁剪视觉ROI、自动化奖励判别等机制,HIL-SERL实现了高成功率与高训练效率,显著优于传统纯模仿学习或无干预的RL方法。
2. HIL-SERL完整使用流程与工程要点
HIL-SERL的工程实践流程涵盖了从环境配置、硬件准备到数据采集、奖励建模、分布式训练与人机协作的全链路。整体流程如下图所示:
环境与硬件配置:根据任务需求,配置机器人、遥操作设备(如游戏手柄/引导臂)、相机、GPU等硬件,并在配置文件中指定各项参数。推荐优先使用官方
HILSerlRobotEnvConfig
类,确保兼容性与可扩展性。设定工作空间边界:通过
find_joint_limits.py
脚本,采集机器人末端执行器的安全操作范围,避免训练过程中的越界与硬件损伤。将边界参数写入遥操作配置(end_effector_bounds
),为后续动作空间裁剪和安全控制打下基础。采集人类演示数据:在
record
模式下,操作者通过遥操作设备完成若干任务演示,系统自动记录观测、动作、奖励等数据。建议每个任务采集10-20条高质量演示,便于后续奖励建模和策略预热。裁剪参数与数据预处理:利用
crop_dataset_roi.py
工具,交互式选取相机图像的ROI区域,过滤无关背景,提升视觉特征的有效性。将裁剪参数写入配置文件,统一图像分辨率(如128×128),为视觉策略训练做准备。
训练奖励分类器:基于演示数据,配置并训练视觉奖励分类器,实现自动化的成功判别。奖励分类器可选用ResNet等主流视觉模型,训练完成后在环境配置中指定其路径,实现奖励信号的自动生成。
分布式Actor-Learner训练:采用分布式SAC架构,分别启动
learner
和actor
进程,前者负责策略更新,后者负责与物理机器人交互。两者通过gRPC通信,支持高效异步训练与权重同步。人工干预与策略改进:训练过程中,操作者可随时通过手柄/键盘介入,修正策略异常行为。干预频率可通过WandB等工具实时监控,理想状态下干预率随策略提升逐步下降。
监控与评估:全流程支持日志、视频、奖励等多维度监控,便于问题定位与性能评估。训练完成后可导出模型权重,部署于实际机器人系统。
本流程强调安全、效率与可复现性,建议严格按照上述步骤执行,结合官方文档与工程代码灵活调整细节。
3. gym_manipulator.py与Gym Wrapper机制深度解析
3.1 文件定位与功能概述
在HIL-SERL的工程实现中,gym_manipulator.py
承担了环境构建、数据流转和干预机制的核心角色。其设计充分利用了Gym的Wrapper体系,实现了观测、动作、奖励等多维度的灵活扩展与安全控制。下图展示了主要Wrapper的链式结构:
3.2 环境工厂函数:make_robot_env
环境的创建入口是 make_robot_env(cfg)
。该函数根据配置自动组合各类包装器,最终返回一个功能齐全、可直接用于训练、演示或回放的 Gym 环境对象。
核心代码片段:
def make_robot_env(cfg: EnvConfig) -> gym.Env:
# ...(省略HIL模式分支)
robot = make_robot_from_config(cfg.robot)
teleop_device = make_teleoperator_from_config(cfg.teleop)
teleop_device.connect()
env = RobotEnv(
robot=robot,
use_gripper=cfg.wrapper.use_gripper,
display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False,
)
# 观测与图像处理包装
if cfg.wrapper:
if cfg.wrapper.add_joint_velocity_to_observation:
env = AddJointVelocityToObservation(env=env, fps=cfg.fps)
if cfg.wrapper.add_current_to_observation:
env = AddCurrentToObservation(env=env)
if cfg.wrapper.add_ee_pose_to_observation:
env = EEObservationWrapper(env=env, ee_pose_limits=robot.end_effector_bounds)
env = ConvertToLeRobotObservation(env=env, device=cfg.device)
if cfg.wrapper and cfg.wrapper.crop_params_dict is not None:
env = ImageCropResizeWrapper(
env=env,
crop_params_dict=cfg.wrapper.crop_params_dict,
resize_size=cfg.wrapper.resize_size,
)
# 奖励与控制包装
reward_classifier = init_reward_classifier(cfg)
if reward_classifier is not None:
env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper and cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
penalty=cfg.wrapper.gripper_penalty,
)
# 控制模式包装
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "keyboard_ee":
env = KeyboardControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
else:
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
)
env = BatchCompatibleWrapper(env=env)
env = TorchActionWrapper(env=env, device=cfg.device)
return env
工程解读:
- 该工厂函数通过链式组合包装器,极大提升了环境的灵活性和可维护性。
- 每个包装器只关注单一功能,便于调试和扩展。
- 支持多种控制模式(手柄、键盘、引导臂),可灵活适配不同实验场景。
- 观测、动作、奖励、重置、批量推理等功能均可按需插拔。
3.3 基础环境实现:RobotEnv
RobotEnv
是所有包装器的底层核心,负责与机器人硬件通信、观测采集、动作下发和状态管理。
核心代码片段:
class RobotEnv(gym.Env):
def __init__(self, robot, use_gripper=False, display_cameras=False):
super().__init__()
self.robot = robot
self.display_cameras = display_cameras
if not self.robot.is_connected:
self.robot.connect()
self.current_step = 0
self.episode_data = None
self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors]
self._image_keys = self.robot.cameras.keys()
self.current_observation: dict[str, Any] | None = None
self.use_gripper = use_gripper
self._setup_spaces()
def _get_observation(self) -> dict[str, Any]:
obs_dict = self.robot.get_observation()
joint_positions = np.array([obs_dict[name] for name in self._joint_names])
images = {key: obs_dict[key] for key in self._image_keys}
self.current_observation = {"agent_pos": joint_positions, "pixels": images}
return self.current_observation
def step(self, action) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2]}
action_dict["gripper"] = action[3] if self.use_gripper else 1.0
self.robot.send_action(action_dict)
self._get_observation()
if self.display_cameras:
self.render()
self.current_step += 1
reward = 0.0
terminated = False
truncated = False
return (
self.current_observation,
reward,
terminated,
truncated,
{"is_intervention": False},
)
工程解读:
- 该类负责与机器人底层硬件通信,采集观测、下发动作。
- 观测空间和动作空间根据机器人配置动态生成,支持多种传感器和控制模式。
- 支持夹爪控制、摄像头画面显示等功能。
- 是所有包装器的“地基”,包装器通过继承和重载其方法实现功能扩展。
3.4 观测增强与图像处理包装器
HIL-SERL的观测流极为丰富,支持关节速度、电流、末端执行器位姿、图像裁剪与归一化等多种增强。以下是典型包装器实现:
关节速度观测包装器:
class AddJointVelocityToObservation(gym.ObservationWrapper):
def __init__(self, env, joint_velocity_limits=100.0, fps=30, num_dof=6):
super().__init__(env)
# ...(省略空间扩展代码)
self.dt = 1.0 / fps
def observation(self, observation):
joint_velocities = (observation["agent_pos"] - self.last_joint_positions) / self.dt
self.last_joint_positions = observation["agent_pos"]
observation["agent_pos"] = np.concatenate([observation["agent_pos"], joint_velocities], axis=-1)
return observation
图像裁剪与缩放包装器:
class ImageCropResizeWrapper(gym.Wrapper):
def __init__(self, env, crop_params_dict, resize_size=None):
super().__init__(env)
self.crop_params_dict = crop_params_dict
self.resize_size = resize_size or (128, 128)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
for k in self.crop_params_dict:
device = obs[k].device
obs[k] = F.crop(obs[k], *self.crop_params_dict[k])
obs[k] = F.resize(obs[k], self.resize_size)
obs[k] = obs[k].clamp(0.0, 1.0)
obs[k] = obs[k].to(device)
return obs, reward, terminated, truncated, info
工程解读:
- 观测增强包装器通过重载
observation()
方法,将物理量、图像等信息动态拼接进观测空间。 - 图像处理包装器支持ROI裁剪、分辨率归一化,极大提升视觉特征的有效性。
- 所有观测增强均可链式组合,便于灵活扩展和调试。
3.5 奖励自动化与安全控制包装器
HIL-SERL支持基于视觉的奖励分类器、时间限制、夹爪惩罚等多种奖励与安全机制。
奖励分类器包装器:
class RewardWrapper(gym.Wrapper):
def __init__(self, env, reward_classifier, device="cuda"):
self.env = env
self.device = device
self.reward_classifier = torch.compile(reward_classifier)
self.reward_classifier.to(self.device)
def step(self, action):
observation, _, terminated, truncated, info = self.env.step(action)
images = {k: observation[k].to(self.device) for k in observation if "image" in k}
with torch.inference_mode():
success = self.reward_classifier.predict_reward(images, threshold=0.7) if self.reward_classifier is not None else 0.0
reward = 1.0 if success == 1.0 else 0.0
if success == 1.0:
terminated = True
return observation, reward, terminated, truncated, info
时间限制包装器:
class TimeLimitWrapper(gym.Wrapper):
def __init__(self, env, control_time_s, fps):
self.env = env
self.control_time_s = control_time_s
self.fps = fps
self.max_episode_steps = int(self.control_time_s * self.fps)
self.current_step = 0
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.current_step += 1
if self.current_step >= self.max_episode_steps:
terminated = True
return obs, reward, terminated, truncated, info
def reset(self, seed=None, options=None):
self.current_step = 0
return self.env.reset(seed=seed, options=options)
工程解读:
- 奖励包装器可集成视觉奖励分类器,实现自动化奖励分配,极大提升训练效率和一致性。
- 时间限制包装器保障每个回合不会无限延长,提升训练安全性和可控性。
- 夹爪惩罚、重置包装等包装器进一步增强了环境的安全性和工程健壮性。
3.6 人机干预与多种控制模式
HIL-SERL支持多种人机交互方式,包括游戏手柄、键盘、引导臂等,极大提升了实验灵活性和安全性。
游戏手柄控制包装器:
class GamepadControlWrapper(gym.Wrapper):
def __init__(self, env, teleop_device, use_gripper=False, auto_reset=False):
super().__init__(env)
self.teleop_device = teleop_device
if hasattr(self.teleop_device, "connect") and not self.teleop_device.is_connected:
self.teleop_device.connect()
self.auto_reset = auto_reset
self.use_gripper = use_gripper
def get_teleop_commands(self):
# ...(省略手柄输入解析代码)
return (
intervention_is_active,
gamepad_action_np,
terminate_episode,
success,
rerecord_episode,
)
def step(self, action):
(
is_intervention,
gamepad_action,
terminate_episode,
success,
rerecord_episode,
) = self.get_teleop_commands()
action = gamepad_action if is_intervention else action
obs, reward, terminated, truncated, info = self.env.step(action)
terminated = terminated or truncated or terminate_episode
if success:
reward = 1.0
info["is_intervention"] = is_intervention
info["action_intervention"] = action
info["rerecord_episode"] = rerecord_episode
if terminated or truncated:
info["next.success"] = success
if self.auto_reset:
obs, reset_info = self.reset()
info.update(reset_info)
return obs, reward, terminated, truncated, info
工程解读:
- 控制包装器通过解析手柄/键盘/引导臂输入,实现人工干预与策略切换。
- 支持回合终止、成功标记、重录等多种实验管理功能。
- 通过
is_intervention
等标志,训练过程中可灵活切换自动/人工控制,保障安全。
3.7 数据采集与回放
HIL-SERL支持高效的数据采集与回放,便于离线训练、评估和调试。
数据采集函数:
def record_dataset(env, policy, cfg):
from lerobot.datasets.lerobot_dataset import LeRobotDataset
action = env.action_space.sample() * 0.0
# ...(省略特征配置)
dataset = LeRobotDataset.create(
cfg.repo_id,
cfg.fps,
root=cfg.dataset_root,
use_videos=True,
image_writer_threads=4,
image_writer_processes=0,
features=features,
)
episode_index = 0
while episode_index < cfg.num_episodes:
obs, _ = env.reset()
# ...(省略回合循环与采集逻辑)
dataset.save_episode()
episode_index += 1
if cfg.push_to_hub:
dataset.push_to_hub()
数据回放函数:
def replay_episode(env, cfg):
from lerobot.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset(cfg.repo_id, root=cfg.dataset_root, episodes=[cfg.episode])
env.reset()
actions = dataset.hf_dataset.select_columns("action")
for idx in range(dataset.num_frames):
action = actions[idx]["action"]
env.step(action)
工程解读:
- 数据采集函数支持策略采样和人工遥操作两种模式,自动记录观测、动作、奖励等信息。
- 支持回合重录、成功标记、采集频率控制等功能,保证数据质量。
- 数据回放函数可用于可视化、评估和调试,极大提升实验效率。
3.8 主入口与使用流程
gym_manipulator.py
提供了统一的主入口,支持训练、数据采集、回放等多种模式。
主入口代码:
@parser.wrap()
def main(cfg: EnvConfig):
env = make_robot_env(cfg)
if cfg.mode == "record":
policy = None
if cfg.pretrained_policy_name_or_path is not None:
from lerobot.policies.sac.modeling_sac import SACPolicy
policy = SACPolicy.from_pretrained(cfg.pretrained_policy_name_or_path)
policy.to(cfg.device)
policy.eval()
record_dataset(env, policy=policy, cfg=cfg)
exit()
if cfg.mode == "replay":
replay_episode(env, cfg=cfg)
exit()
env.reset()
smoothed_action = env.action_space.sample() * 0.0
alpha = 1.0
num_episode = 0
successes = []
while num_episode < 10:
new_random_action = env.action_space.sample()
smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action
obs, reward, terminated, truncated, info = env.step(smoothed_action)
if terminated or truncated:
successes.append(reward)
env.reset()
num_episode += 1
工程解读:
- 通过配置文件选择不同模式(record、replay、train等),自动加载策略、采集或回放数据。
- 支持批量采集、自动评估、实验复现等多种工程需求。
- 入口函数高度解耦,便于集成到更大规模的实验系统中。
4. 深入理解Gym:基础原理与工程实践
4.1 Gym环境的基本用法与交互流程
Gym是强化学习领域最主流的环境接口库。它将“智能体-环境”交互抽象为标准API,极大简化了算法开发和实验复现。初始化环境非常简单:
import gym
env = gym.make('CartPole-v0')
每个环境都实现了“智能体-环境循环”:智能体根据观测选择动作,环境根据动作返回新的观测、奖励和终止信号。如下图所示,智能体与环境不断交互,形成时间步的循环:
例如,以下代码演示了在LunarLander-v2
环境中随机采样动作并与环境交互的完整流程:
import gym
env = gym.make("LunarLander-v2", render_mode="human")
env.action_space.seed(42)
observation, info = env.reset(seed=42)
for _ in range(1000):
observation, reward, terminated, truncated, info = env.step(env.action_space.sample())
if terminated or truncated:
observation, info = env.reset()
env.close()
每个环境都通过action_space
和observation_space
属性明确动作和观测的格式,便于算法自动适配。
4.2 空间(Spaces):动作与观测的结构化表达
Gym通过Space
体系对动作和观测的取值范围进行结构化描述。常见类型包括:
- Box:连续空间,适用于机械臂等连续控制任务。
- Discrete:离散空间,适用于分类动作。
- Dict、Tuple:支持多模态观测或多分量动作。
- MultiBinary、MultiDiscrete:适合多维二进制或多离散变量。
示例代码如下:
from gym.spaces import Box, Discrete, Dict, Tuple, MultiBinary, MultiDiscrete
import numpy as np
# 连续空间
observation_space = Box(low=-1.0, high=2.0, shape=(3,), dtype=np.float32)
print(observation_space.sample())
# 离散空间
observation_space = Discrete(4)
print(observation_space.sample())
# 组合空间
observation_space = Dict({"position": Discrete(2), "velocity": Discrete(3)})
print(observation_space.sample())
通过空间的定义,Gym环境可以灵活适配各种复杂任务,极大提升了通用性和可扩展性。
4.3 环境基类Env与核心方法
Gym的所有环境都继承自Env
基类,需实现如下核心方法:
reset()
:重置环境,返回初始观测。step(action)
:执行动作,返回新观测、奖励、终止标志和信息字典。render()
:可视化环境状态。close()
:资源清理。
例如:
class MyEnv(gym.Env):
def step(self, action):
# 计算新状态、奖励、终止条件
return observation, reward, terminated, truncated, info
def reset(self, seed=None, options=None):
# 初始化状态
return observation, info
这些方法标准化了环境接口,使算法与环境解耦,便于算法复用和大规模实验。
4.4 包装器(Wrappers):环境功能的模块化扩展
Gym的最大亮点之一是包装器(Wrapper)体系。通过链式包装,可以在不修改底层环境代码的前提下,灵活扩展观测、动作、奖励等功能。例如:
import gym
from gym.wrappers import RescaleAction
base_env = gym.make("BipedalWalker-v3")
wrapped_env = RescaleAction(base_env, min_action=0, max_action=1)
包装器分为三大类:
- ObservationWrapper:处理观测流,如图像裁剪、特征拼接。
- ActionWrapper:处理动作流,如归一化、离散化。
- RewardWrapper:处理奖励信号,如裁剪、平滑。
自定义包装器示例:
class MyObsWrapper(gym.ObservationWrapper):
def observation(self, obs):
return obs * 2
env = MyObsWrapper(gym.make('CartPole-v1'))
包装器可以多层嵌套,形成功能链。通过.unwrapped
属性可访问最底层环境,便于调试和底层操作。
4.5 图解Gym环境与包装器结构
如上图所示,Gym的包装器体系高度模块化,便于功能叠加和工程维护。HIL-SERL正是通过链式组合多种包装器,实现了观测增强、奖励自动化和人机干预等复杂功能。
4.6 ObservationWrapper、ActionWrapper、RewardWrapper的继承关系与作用详解
在Gym的包装器体系中,ObservationWrapper
、ActionWrapper
和RewardWrapper
均继承自Wrapper
基类,并分别针对观测、动作和奖励流进行模块化处理。它们的设计极大提升了环境的灵活性和可维护性,是工程实践中不可或缺的工具。