| |
|
|
| Environment processors are a critical layer in LeRobot's data processing architecture that handle **environment-specific** transformations, separate from policy-specific processing. This separation of concerns enables cleaner code, better modularity, and easier experimentation with different environments and policies. |
|
|
| |
|
|
| When working with different robot environments (LIBERO, MetaWorld, Aloha, etc.), each environment often has unique data formats, coordinate systems, and conventions that need standardization **before** policy processing. Without environment processors, these transformations would be: |
|
|
| 1. **Hardcoded in environment code** - Making it difficult to experiment with different state representations |
| 2. **Duplicated across policies** - Each policy would need to handle environment-specific quirks |
| 3. **Mixed with policy logic** - Violating separation of concerns and making debugging harder |
|
|
| Environment processors solve this by providing a **dedicated processing layer** between raw environment observations and policy inputs. |
|
|
| |
|
|
| Here's how data flows through the complete processing pipeline during evaluation: |
|
|
| ```python |
| |
|
|
| |
| raw_observation = env.step(action) |
|
|
| |
| observation = preprocess_observation(raw_observation) |
|
|
| |
| observation = add_envs_task(env, observation) |
|
|
| |
| |
| |
| |
| observation = env_preprocessor(observation) |
|
|
| |
| |
| |
| |
| |
| observation = preprocessor(observation) |
|
|
| |
| action = policy.select_action(observation) |
|
|
| |
| |
| |
| action = postprocessor(action) |
|
|
| |
| |
| |
| action_transition = {"action": action} |
| action_transition = env_postprocessor(action_transition) |
| action = action_transition["action"] |
|
|
| |
| env.step(action) |
| ``` |
|
|
| |
|
|
| |
|
|
| Environment processors handle transformations specific to the **environment's data format**, while policy processors handle transformations specific to the **model's requirements**. |
|
|
| ```python |
| |
| class LiberoVLAPolicy: |
| def preprocess(self, obs): |
| |
| state = self._flatten_robot_state(obs["robot_state"]) |
| |
| state = self.normalizer(state) |
| return state |
|
|
| |
| |
| env_preprocessor = LiberoProcessorStep() |
|
|
| |
| policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats) |
| ``` |
|
|
| |
|
|
| The same policy can work with different environment processors, and the same environment processor can work with different policies: |
|
|
| ```python |
| |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg) |
| smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg) |
|
|
| |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg) |
| act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) |
| ``` |
|
|
| |
|
|
| Want to try different state representations for LIBERO? Just create a new processor: |
|
|
| ```python |
| |
| @ProcessorStepRegistry.register("libero_processor") |
| class LiberoProcessorStep(ObservationProcessorStep): |
| def _process_observation(self, obs): |
| eef_pos = robot_state["eef"]["pos"] |
| eef_axisangle = quat2axisangle(quat) |
| gripper = robot_state["gripper"]["qpos"] |
| state = torch.cat([eef_pos, eef_axisangle, gripper], dim=-1) |
| return state |
|
|
| |
| @ProcessorStepRegistry.register("libero_velocity_processor") |
| class LiberoVelocityProcessorStep(ObservationProcessorStep): |
| def _process_observation(self, obs): |
| |
| eef_pos = robot_state["eef"]["pos"] |
| eef_axisangle = quat2axisangle(quat) |
| eef_vel = robot_state["eef"]["vel"] |
| gripper_pos = robot_state["gripper"]["qpos"] |
| gripper_vel = robot_state["gripper"]["qvel"] |
| state = torch.cat([eef_pos, eef_axisangle, eef_vel, |
| gripper_pos, gripper_vel], dim=-1) |
| return state |
| ``` |
|
|
| |
|
|
| Environments expose **all available data** without needing to know what downstream models will use: |
|
|
| ```python |
| |
| observation = { |
| "pixels": {"image": img, "image2": img2}, |
| "robot_state": { |
| "eef": {"pos": ..., "quat": ..., "vel": ..., "mat": ..., "axisangle": ...}, |
| "gripper": {"qpos": ..., "qvel": ...}, |
| "joints": {"pos": ..., "vel": ...} |
| } |
| } |
|
|
| |
| |
| ``` |
|
|
| |
|
|
| |
|
|
| The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies: |
|
|
| ```python |
| from lerobot.envs.factory import make_env_pre_post_processors |
| from lerobot.envs.configs import LiberoEnv, PushtEnv |
|
|
| |
| libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"]) |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg) |
|
|
| |
| pusht_cfg = PushtEnv() |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg) |
| ``` |
|
|
| |
|
|
| ```python |
| def make_env_pre_post_processors( |
| env_cfg: EnvConfig, |
| ) -> tuple[ |
| PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], |
| PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], |
| ]: |
| """ |
| Create preprocessor and postprocessor pipelines for environment observations. |
| |
| Args: |
| env_cfg: The configuration of the environment. |
| |
| Returns: |
| A tuple containing: |
| - preprocessor: Pipeline that processes environment observations |
| - postprocessor: Pipeline that processes environment outputs |
| """ |
| |
| if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: |
| preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) |
| else: |
| |
| preprocessor = PolicyProcessorPipeline(steps=[]) |
|
|
| |
| |
| postprocessor = PolicyProcessorPipeline(steps=[]) |
| |
| return preprocessor, postprocessor |
| ``` |
|
|
| |
|
|
| In `lerobot_eval.py`, the environment processors are created once and used throughout: |
|
|
| ```python |
| def eval_main(cfg: EvalPipelineConfig): |
| |
| envs = make_env(cfg.env, n_envs=cfg.eval.batch_size) |
|
|
| |
| policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env) |
|
|
| |
| preprocessor, postprocessor = make_pre_post_processors( |
| policy_cfg=cfg.policy, |
| pretrained_path=cfg.policy.pretrained_path, |
| ) |
|
|
| |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env) |
|
|
| |
| eval_policy_all( |
| envs=envs, |
| policy=policy, |
| env_preprocessor=env_preprocessor, |
| env_postprocessor=env_postprocessor, |
| preprocessor=preprocessor, |
| postprocessor=postprocessor, |
| n_episodes=cfg.eval.n_episodes, |
| ) |
| ``` |
|
|
| |
|
|
| The `LiberoProcessorStep` demonstrates a real-world environment processor: |
|
|
| ```python |
| from lerobot.processor.pipeline import ObservationProcessorStep |
|
|
| @dataclass |
| @ProcessorStepRegistry.register(name="libero_processor") |
| class LiberoProcessorStep(ObservationProcessorStep): |
| """ |
| Processes LIBERO observations into the LeRobot format. |
| |
| **State Processing:** |
| - Extracts end-effector position (3D) |
| - Converts quaternion to axis-angle representation (3D) |
| - Extracts gripper joint positions (2D) |
| - Concatenates into 8D state vector |
| |
| **Image Processing:** |
| - Rotates images 180° to match HuggingFaceVLA/libero convention |
| """ |
|
|
| def _process_observation(self, observation): |
| processed_obs = observation.copy() |
|
|
| |
| for key in list(processed_obs.keys()): |
| if key.startswith("observation.images."): |
| img = processed_obs[key] |
| img = torch.flip(img, dims=[2, 3]) |
| processed_obs[key] = img |
|
|
| |
| if "observation.robot_state" in processed_obs: |
| robot_state = processed_obs.pop("observation.robot_state") |
|
|
| eef_pos = robot_state["eef"]["pos"] |
| eef_quat = robot_state["eef"]["quat"] |
| gripper_qpos = robot_state["gripper"]["qpos"] |
|
|
| |
| eef_axisangle = self._quat2axisangle(eef_quat) |
|
|
| |
| state = torch.cat((eef_pos, eef_axisangle, gripper_qpos), dim=-1) |
| state = state.float() |
|
|
| processed_obs["observation.state"] = state |
| |
| return processed_obs |
| ``` |
|
|
| |
|
|
| 1. **Image Rotation**: The HuggingFaceVLA/libero dataset has images rotated 180° from the raw LIBERO simulator. The processor handles this convention mismatch so policies trained on the dataset work seamlessly. |
|
|
| 2. **State Flattening**: The raw LIBERO environment exposes nested dictionaries with all available state information (position, quaternion, velocity, matrix representation, etc.). The processor: |
| - Selects the relevant components (pos, quat, gripper) |
| - Converts quaternion to axis-angle (more suitable for learning) |
| - Flattens to a single 8D vector that policies expect |
|
|
| 3. **Flexibility**: The environment still exposes **all** raw data. If you want to try different state representations (e.g., including velocities, using matrix representation instead of axis-angle), you can create a new processor without modifying the environment code. |
|
|
| |
|
|
| To add environment processors for a new environment: |
|
|
| |
|
|
| ```python |
| |
|
|
| @dataclass |
| @ProcessorStepRegistry.register(name="myenv_processor") |
| class MyEnvProcessorStep(ObservationProcessorStep): |
| """Process observations from MyEnv.""" |
|
|
| def _process_observation(self, observation): |
| processed = observation.copy() |
|
|
| |
| if "myenv.specific.state" in processed: |
| state = processed.pop("myenv.specific.state") |
| |
| processed["observation.state"] = self._transform_state(state) |
| |
| return processed |
| ``` |
|
|
| |
|
|
| ```python |
| |
|
|
| def make_env_pre_post_processors(env_cfg: EnvConfig): |
| if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: |
| preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) |
| elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type: |
| preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]) |
| else: |
| preprocessor = PolicyProcessorPipeline(steps=[]) |
|
|
| postprocessor = PolicyProcessorPipeline(steps=[]) |
| return preprocessor, postprocessor |
| ``` |
|
|
| |
|
|
| No changes needed! The evaluation script automatically uses the appropriate processor: |
|
|
| ```bash |
| lerobot-eval \ |
| |
| |
| |
| ``` |
|
|
| |
|
|
| Currently, postprocessors are identity (no-op) for all environments. Future use cases include: |
|
|
| |
|
|
| ```python |
| @dataclass |
| class MyEnvActionPostprocessor(ProcessorStep): |
| """Convert policy actions to environment-specific format.""" |
|
|
| def __call__(self, transition: EnvTransition) -> EnvTransition: |
| action = transition["action"] |
|
|
| |
| if self.action_space == "joint": |
| action = self.ik_solver(action) |
|
|
| |
| action = torch.clamp(action, self.min_action, self.max_action) |
|
|
| transition["action"] = action |
| return transition |
| ``` |
|
|
| |
|
|
| ```python |
| @dataclass |
| class CoordinateTransformPostprocessor(ProcessorStep): |
| """Transform actions between coordinate systems.""" |
|
|
| def __call__(self, transition: EnvTransition) -> EnvTransition: |
| action = transition["action"] |
|
|
| |
| action = self.world_to_base_transform(action) |
|
|
| transition["action"] = action |
| return transition |
| ``` |
|
|
| |
|
|
| 1. **Keep environment processors simple**: They should only handle environment-specific data format issues, not complex learning-related transformations. |
|
|
| 2. **Use policy processors for model requirements**: Normalization, batching, device placement, and tokenization belong in policy processors. |
|
|
| 3. **Expose all data from environments**: Let processors decide what to use rather than hardcoding choices in the environment. |
|
|
| 4. **Document conventions**: Clearly document any coordinate system conventions, camera orientations, or data formats that your processor handles. |
|
|
| 5. **Test independently**: Environment processors should be testable without loading full policies or environments. |
|
|
| |
|
|
| Environment processors provide a **clean separation** between environment-specific data transformations and policy-specific model requirements. This architecture: |
|
|
| - ✅ Enables easy experimentation with different state representations |
| - ✅ Allows policies to work seamlessly across different environments |
| - ✅ Keeps environment code focused on simulation/hardware interface |
| - ✅ Makes processor pipelines more maintainable and debuggable |
| - ✅ Follows the single responsibility principle |
|
|
| The key insight: **Environments define data formats, processors standardize them, policies consume standardized data.** Each layer has a clear, focused responsibility. |
|
|