Buckets:
| # Environment Processors | |
| Environment processors are a critical layer in LeRobot's data processing architecture that handle **environment-specific** transformations, separate from policy-specific processing. This separation of concerns enables cleaner code, better modularity, and easier experimentation with different environments and policies. | |
| ## Why Environment Processors? | |
| When working with different robot environments (LIBERO, MetaWorld, Aloha, etc.), each environment often has unique data formats, coordinate systems, and conventions that need standardization **before** policy processing. Without environment processors, these transformations would be: | |
| 1. **Hardcoded in environment code** - Making it difficult to experiment with different state representations | |
| 2. **Duplicated across policies** - Each policy would need to handle environment-specific quirks | |
| 3. **Mixed with policy logic** - Violating separation of concerns and making debugging harder | |
| Environment processors solve this by providing a **dedicated processing layer** between raw environment observations and policy inputs. | |
| ## The Processing Pipeline | |
| Here's how data flows through the complete processing pipeline during evaluation: | |
| ```python | |
| # In lerobot_eval.py rollout() function: | |
| # 1. Raw environment observation (numpy arrays, various formats) | |
| raw_observation = env.step(action) | |
| # 2. Convert numpy to torch, normalize images [0,1] | |
| observation = preprocess_observation(raw_observation) | |
| # 3. Add task metadata (for multi-task environments) | |
| observation = add_envs_task(env, observation) | |
| # 4. ENVIRONMENT-SPECIFIC preprocessing (NEW!) | |
| # - Flatten robot states | |
| # - Rotate images to match dataset conventions | |
| # - Handle environment-specific coordinate systems | |
| observation = env_preprocessor(observation) | |
| # 5. POLICY-SPECIFIC preprocessing | |
| # - Normalize with dataset statistics | |
| # - Add batch dimensions | |
| # - Move to GPU | |
| # - Tokenize language instructions | |
| observation = preprocessor(observation) | |
| # 6. Policy inference | |
| action = policy.select_action(observation) | |
| # 7. POLICY-SPECIFIC postprocessing | |
| # - Unnormalize actions | |
| # - Remove batch dimensions | |
| action = postprocessor(action) | |
| # 8. ENVIRONMENT-SPECIFIC postprocessing (NEW!) | |
| # - Convert action formats if needed | |
| # - Apply environment-specific constraints | |
| action_transition = {"action": action} | |
| action_transition = env_postprocessor(action_transition) | |
| action = action_transition["action"] | |
| # 9. Execute in environment | |
| env.step(action) | |
| ``` | |
| ## The Benefits | |
| ### 1. **Separation of Concerns** | |
| Environment processors handle transformations specific to the **environment's data format**, while policy processors handle transformations specific to the **model's requirements**. | |
| ```python | |
| # ❌ Before: Mixed concerns | |
| class LiberoVLAPolicy: | |
| def preprocess(self, obs): | |
| # Environment-specific: Flatten robot state (shouldn't be in policy!) | |
| state = self._flatten_robot_state(obs["robot_state"]) | |
| # Policy-specific: Normalize with dataset stats | |
| state = self.normalizer(state) | |
| return state | |
| # ✅ After: Clear separation | |
| # Environment processor: Handles LIBERO's nested robot state | |
| env_preprocessor = LiberoProcessorStep() # Flattens robot_state | |
| # Policy processor: Handles model requirements | |
| policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats) | |
| ``` | |
| ### 2. **Flexibility and Reusability** | |
| The same policy can work with different environment processors, and the same environment processor can work with different policies: | |
| ````python | |
| # Use SmolVLA policy with LIBERO environment | |
| # Use SmolVLA policy with LIBERO environment | |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( | |
| env_cfg=libero_cfg, | |
| policy_cfg=smolvla_cfg, | |
| ) | |
| smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg) | |
| # Or use ACT policy with the same LIBERO environment | |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( | |
| env_cfg=libero_cfg, | |
| policy_cfg=act_cfg, | |
| ) | |
| act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) | |
| ```python | |
| # Use SmolVLA policy with LIBERO environment | |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( | |
| env_cfg=libero_cfg, | |
| policy_cfg=smolvla_cfg, | |
| ) | |
| smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg) | |
| # Or use ACT policy with the same LIBERO environment | |
| libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( | |
| env_cfg=libero_cfg, | |
| policy_cfg=act_cfg, | |
| ) | |
| act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) | |
| ### 3. **Easier Experimentation** | |
| Want to try different state representations for LIBERO? Just create a new processor: | |
| ```python | |
| # Original: 8D state (pos + quat→axisangle + gripper) | |
| @ProcessorStepRegistry.register("libero_processor") | |
| class LiberoProcessorStep(ObservationProcessorStep): | |
| def _process_observation(self, obs): | |
| eef_pos = robot_state["eef"]["pos"] # 3D | |
| eef_axisangle = quat2axisangle(quat) # 3D | |
| gripper = robot_state["gripper"]["qpos"] # 2D | |
| state = torch.cat([eef_pos, eef_axisangle, gripper], dim=-1) # 8D | |
| return state | |
| # Experiment: Add velocity for better control | |
| @ProcessorStepRegistry.register("libero_velocity_processor") | |
| class LiberoVelocityProcessorStep(ObservationProcessorStep): | |
| def _process_observation(self, obs): | |
| # Include velocities for 14D state | |
| eef_pos = robot_state["eef"]["pos"] # 3D | |
| eef_axisangle = quat2axisangle(quat) # 3D | |
| eef_vel = robot_state["eef"]["vel"] # 3D (NEW) | |
| gripper_pos = robot_state["gripper"]["qpos"] # 2D | |
| gripper_vel = robot_state["gripper"]["qvel"] # 3D (NEW) | |
| state = torch.cat([eef_pos, eef_axisangle, eef_vel, | |
| gripper_pos, gripper_vel], dim=-1) # 14D | |
| return state | |
| ```` | |
| ### 4. **Cleaner Environment Code** | |
| Environments expose **all available data** without needing to know what downstream models will use: | |
| ```python | |
| # LIBERO environment exposes full robot state | |
| observation = { | |
| "pixels": {"image": img, "image2": img2}, | |
| "robot_state": { | |
| "eef": {"pos": ..., "quat": ..., "vel": ..., "mat": ..., "axisangle": ...}, | |
| "gripper": {"qpos": ..., "qvel": ...}, | |
| "joints": {"pos": ..., "vel": ...} | |
| } | |
| } | |
| # Environment processor decides what to use | |
| # Policy processor handles model-specific transformations | |
| ``` | |
| ## Using Environment Processors | |
| ### Factory Function | |
| The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies: | |
| ```python | |
| from lerobot.envs import make_env_pre_post_processors, PushtEnv | |
| from lerobot.envs.configs import LiberoEnv | |
| # For LIBERO: Returns LiberoProcessorStep in preprocessor | |
| libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"]) | |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg) | |
| # For other environments: Returns identity processors (no-op) | |
| pusht_cfg = PushtEnv() | |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg) | |
| ``` | |
| ### Implementation in `envs/factory.py` | |
| ```python | |
| def make_env_pre_post_processors( | |
| env_cfg: EnvConfig, | |
| ) -> tuple[ | |
| PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], | |
| PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], | |
| ]: | |
| """ | |
| Create preprocessor and postprocessor pipelines for environment observations. | |
| Args: | |
| env_cfg: The configuration of the environment. | |
| Returns: | |
| A tuple containing: | |
| - preprocessor: Pipeline that processes environment observations | |
| - postprocessor: Pipeline that processes environment outputs | |
| """ | |
| # For LIBERO environments, add the LiberoProcessorStep to preprocessor | |
| if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: | |
| preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) | |
| else: | |
| # For all other environments, return an identity preprocessor | |
| preprocessor = PolicyProcessorPipeline(steps=[]) | |
| # Postprocessor is currently identity for all environments | |
| # Future: Could add environment-specific action transformations | |
| postprocessor = PolicyProcessorPipeline(steps=[]) | |
| return preprocessor, postprocessor | |
| ``` | |
| ### Integration in Evaluation | |
| In `lerobot_eval.py`, the environment processors are created once and used throughout: | |
| ```python | |
| def eval_main(cfg: EvalPipelineConfig): | |
| # Create environment | |
| envs = make_env(cfg.env, n_envs=cfg.eval.batch_size) | |
| # Create policy | |
| policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env) | |
| # Create policy processors | |
| preprocessor, postprocessor = make_pre_post_processors( | |
| policy_cfg=cfg.policy, | |
| pretrained_path=cfg.policy.pretrained_path, | |
| ) | |
| # Create environment processors (NEW!) | |
| env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env) | |
| # Run evaluation with both processor types | |
| eval_policy_all( | |
| envs=envs, | |
| policy=policy, | |
| env_preprocessor=env_preprocessor, # Environment-specific | |
| env_postprocessor=env_postprocessor, # Environment-specific | |
| preprocessor=preprocessor, # Policy-specific | |
| postprocessor=postprocessor, # Policy-specific | |
| n_episodes=cfg.eval.n_episodes, | |
| ) | |
| ``` | |
| ## Example: LIBERO Environment Processor | |
| The `LiberoProcessorStep` demonstrates a real-world environment processor: | |
| ```python | |
| from lerobot.processor import ObservationProcessorStep | |
| @dataclass | |
| @ProcessorStepRegistry.register(name="libero_processor") | |
| class LiberoProcessorStep(ObservationProcessorStep): | |
| """ | |
| Processes LIBERO observations into the LeRobot format. | |
| **State Processing:** | |
| - Extracts end-effector position (3D) | |
| - Converts quaternion to axis-angle representation (3D) | |
| - Extracts gripper joint positions (2D) | |
| - Concatenates into 8D state vector | |
| **Image Processing:** | |
| - Rotates images 180° to match HuggingFaceVLA/libero convention | |
| """ | |
| def _process_observation(self, observation): | |
| processed_obs = observation.copy() | |
| # Process images: Flip 180° for camera convention | |
| for key in list(processed_obs.keys()): | |
| if key.startswith("observation.images."): | |
| img = processed_obs[key] | |
| img = torch.flip(img, dims=[2, 3]) # Flip H and W | |
| processed_obs[key] = img | |
| # Process robot_state: Flatten to 8D vector | |
| if "observation.robot_state" in processed_obs: | |
| robot_state = processed_obs.pop("observation.robot_state") | |
| eef_pos = robot_state["eef"]["pos"] # (B, 3) | |
| eef_quat = robot_state["eef"]["quat"] # (B, 4) | |
| gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2) | |
| # Convert quaternion to axis-angle | |
| eef_axisangle = self._quat2axisangle(eef_quat) # (B, 3) | |
| # Concatenate into single state vector | |
| state = torch.cat((eef_pos, eef_axisangle, gripper_qpos), dim=-1) | |
| state = state.float() | |
| processed_obs["observation.state"] = state | |
| return processed_obs | |
| ``` | |
| ### Why These Transformations? | |
| 1. **Image Rotation**: The HuggingFaceVLA/libero dataset has images rotated 180° from the raw LIBERO simulator. The processor handles this convention mismatch so policies trained on the dataset work seamlessly. | |
| 2. **State Flattening**: The raw LIBERO environment exposes nested dictionaries with all available state information (position, quaternion, velocity, matrix representation, etc.). The processor: | |
| - Selects the relevant components (pos, quat, gripper) | |
| - Converts quaternion to axis-angle (more suitable for learning) | |
| - Flattens to a single 8D vector that policies expect | |
| 3. **Flexibility**: The environment still exposes **all** raw data. If you want to try different state representations (e.g., including velocities, using matrix representation instead of axis-angle), you can create a new processor without modifying the environment code. | |
| ## Adding Environment Processors for New Environments | |
| To add environment processors for a new environment: | |
| ### 1. Create the Processor Step | |
| ```python | |
| # In src/lerobot/processor/env_processor.py | |
| @dataclass | |
| @ProcessorStepRegistry.register(name="myenv_processor") | |
| class MyEnvProcessorStep(ObservationProcessorStep): | |
| """Process observations from MyEnv.""" | |
| def _process_observation(self, observation): | |
| processed = observation.copy() | |
| # Your environment-specific transformations | |
| if "myenv.specific.state" in processed: | |
| state = processed.pop("myenv.specific.state") | |
| # Transform to standard format | |
| processed["observation.state"] = self._transform_state(state) | |
| return processed | |
| ``` | |
| ### 2. Update Your `EnvConfig` Subclass | |
| ```python | |
| # In src/lerobot/envs/factory.py | |
| def make_env_pre_post_processors(env_cfg: EnvConfig): | |
| if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: | |
| preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) | |
| elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type: | |
| preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]) | |
| else: | |
| preprocessor = PolicyProcessorPipeline(steps=[]) | |
| postprocessor = PolicyProcessorPipeline(steps=[]) | |
| return preprocessor, postprocessor | |
| ``` | |
| ### 3. Use in Evaluation | |
| No changes needed! The evaluation script automatically uses the appropriate processor: | |
| ```bash | |
| lerobot-eval \ | |
| --policy.path=lerobot/my_policy \ | |
| --env.type=myenv \ # Automatically uses MyEnvProcessorStep | |
| --eval.n_episodes=10 | |
| ``` | |
| ## Future: Environment Postprocessors | |
| Currently, postprocessors are identity (no-op) for all environments. Future use cases include: | |
| ### Action Space Transformations | |
| ```python | |
| @dataclass | |
| class MyEnvActionPostprocessor(ProcessorStep): | |
| """Convert policy actions to environment-specific format.""" | |
| def __call__(self, transition: EnvTransition) -> EnvTransition: | |
| action = transition["action"] | |
| # Example: Convert from Cartesian to joint space | |
| if self.action_space == "joint": | |
| action = self.ik_solver(action) | |
| # Example: Apply environment-specific safety limits | |
| action = torch.clamp(action, self.min_action, self.max_action) | |
| transition["action"] = action | |
| return transition | |
| ``` | |
| ### Coordinate System Conversions | |
| ```python | |
| @dataclass | |
| class CoordinateTransformPostprocessor(ProcessorStep): | |
| """Transform actions between coordinate systems.""" | |
| def __call__(self, transition: EnvTransition) -> EnvTransition: | |
| action = transition["action"] | |
| # Example: Policy outputs in world frame, env expects base frame | |
| action = self.world_to_base_transform(action) | |
| transition["action"] = action | |
| return transition | |
| ``` | |
| ## Best Practices | |
| 1. **Keep environment processors simple**: They should only handle environment-specific data format issues, not complex learning-related transformations. | |
| 2. **Use policy processors for model requirements**: Normalization, batching, device placement, and tokenization belong in policy processors. | |
| 3. **Expose all data from environments**: Let processors decide what to use rather than hardcoding choices in the environment. | |
| 4. **Document conventions**: Clearly document any coordinate system conventions, camera orientations, or data formats that your processor handles. | |
| 5. **Test independently**: Environment processors should be testable without loading full policies or environments. | |
| ## Summary | |
| Environment processors provide a **clean separation** between environment-specific data transformations and policy-specific model requirements. This architecture: | |
| - ✅ Enables easy experimentation with different state representations | |
| - ✅ Allows policies to work seamlessly across different environments | |
| - ✅ Keeps environment code focused on simulation/hardware interface | |
| - ✅ Makes processor pipelines more maintainable and debuggable | |
| - ✅ Follows the single responsibility principle | |
| The key insight: **Environments define data formats, processors standardize them, policies consume standardized data.** Each layer has a clear, focused responsibility. | |
Xet Storage Details
- Size:
- 16.4 kB
- Xet hash:
- dfb1e888fb1977418867627bf7a4d05ab80d8368622449712c7f02ed549fca69
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.