Instructions to use StrongRoboticsLab/pi05-so100-diverse with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- LeRobot
How to use StrongRoboticsLab/pi05-so100-diverse with LeRobot:
- Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """ | |
| Evaluate a Pi0.5 checkpoint in the SO-100 MuJoCo sim. | |
| Renders a video of the model controlling the arm. | |
| Usage: | |
| python eval_sim.py --checkpoint outputs/scale_up_1k/checkpoints/000500/pretrained_model | |
| python eval_sim.py --checkpoint lerobot/pi05_base # test base model | |
| """ | |
| import argparse | |
| import sys | |
| from pathlib import Path | |
| import imageio | |
| import numpy as np | |
| import torch | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| sys.path.insert(0, str(Path.home() / "lerobot" / "src")) | |
| from gym_so100.env import SO100Env | |
| from gym_so100.constants import normalize_lerobot_to_gym_so100 | |
| def load_policy(checkpoint_path, device="cuda"): | |
| """Load Pi0.5 policy from checkpoint.""" | |
| from lerobot.policies.pi05.modeling_pi05 import PI05Policy | |
| print(f"Loading policy from {checkpoint_path}...") | |
| policy = PI05Policy.from_pretrained(str(checkpoint_path)) | |
| policy = policy.to(device) | |
| policy.eval() | |
| return policy | |
| def build_batch(obs, task, stats, device="cuda"): | |
| """Convert sim observation to Pi0.5 batch format.""" | |
| # Image: sim gives (H, W, 3) uint8 -> (1, 3, H, W) float32 [0,1] | |
| image = torch.from_numpy(obs["pixels"]).permute(2, 0, 1).float() / 255.0 | |
| image = image.unsqueeze(0) # add batch dim | |
| # Resize to 224x224 | |
| import torchvision.transforms.functional as TF | |
| image_224 = TF.resize(image, [224, 224], antialias=True) | |
| # ImageNet normalization | |
| mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1) | |
| std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1) | |
| image_224 = (image_224 - mean) / std | |
| # State: sim gives radians, convert to degrees (LeRobot scale) | |
| agent_pos = obs["agent_pos"].copy() | |
| agent_pos_degrees = np.degrees(agent_pos) | |
| state = torch.tensor(agent_pos_degrees, dtype=torch.float32).unsqueeze(0) | |
| # Normalize state with our stats | |
| state_mean = torch.tensor(stats["observation.state"]["mean"], dtype=torch.float32) | |
| state_std = torch.tensor(stats["observation.state"]["std"], dtype=torch.float32) | |
| state = (state - state_mean) / (state_std + 1e-8) | |
| # Pad state to 32 dims | |
| state_padded = torch.zeros(1, 32) | |
| state_padded[:, :6] = state | |
| # Tokenize task | |
| from transformers import AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained("google/paligemma-3b-pt-224") | |
| # Discretize state for prompt (Pi0.5 format) | |
| state_discrete = ((state[0].clamp(-1, 1) + 1) / 2 * 255).int() | |
| state_str = " ".join(str(v.item()) for v in state_discrete) | |
| prompt = f"Task: {task}, State: {state_str};\nAction: " | |
| tokens = tokenizer( | |
| prompt, | |
| padding="max_length", | |
| max_length=200, | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| batch = { | |
| "observation.images.base_0_rgb": image_224.to(device), | |
| "observation.images.left_wrist_0_rgb": image_224.to(device), | |
| "observation.state": state_padded.to(device), | |
| "observation.language.tokens": tokens["input_ids"].to(device), | |
| "observation.language.attention_mask": tokens["attention_mask"].bool().to(device), | |
| } | |
| return batch | |
| def decode_actions(raw_actions, stats): | |
| """Convert model output actions back to LeRobot scale, then to sim radians.""" | |
| actions = raw_actions[0, :, :6].cpu().numpy() # (chunk_size, 6) | |
| # Unnormalize from MEAN_STD | |
| action_mean = np.array(stats["action"]["mean"]) | |
| action_std = np.array(stats["action"]["std"]) | |
| actions = actions * action_std + action_mean | |
| # Now in LeRobot degree-scale. Convert to radians for sim. | |
| actions_rad = np.radians(actions) | |
| return actions_rad | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--checkpoint", type=str, required=True) | |
| parser.add_argument("--task", type=str, default="pick up the cube and place it in the bin") | |
| parser.add_argument("--steps", type=int, default=200) | |
| parser.add_argument("--output", type=str, default="sim_eval.mp4") | |
| parser.add_argument("--device", type=str, default="cuda") | |
| args = parser.parse_args() | |
| import json | |
| with open(Path(__file__).parent / "norm_stats.json") as f: | |
| stats = json.load(f) | |
| # Load policy | |
| policy = load_policy(args.checkpoint, args.device) | |
| # Create sim | |
| env = SO100Env(task="so100_cube_to_bin", obs_type="so100_pixels_agent_pos") | |
| obs, info = env.reset() | |
| frames = [] | |
| print(f"Running {args.steps} sim steps with task: '{args.task}'") | |
| chunk_actions = None | |
| chunk_idx = 0 | |
| for step in range(args.steps): | |
| # Get new action chunk from policy every N steps | |
| if chunk_actions is None or chunk_idx >= len(chunk_actions): | |
| with torch.no_grad(): | |
| batch = build_batch(obs, args.task, stats, args.device) | |
| action = policy.select_action(batch) | |
| chunk_actions = decode_actions(action.unsqueeze(0), stats) | |
| chunk_idx = 0 | |
| # Apply one action from the chunk | |
| action = chunk_actions[chunk_idx] | |
| chunk_idx += 1 | |
| # Normalize radians to sim's [-1, 1] action space | |
| joint_mins = np.array([-1.92, -3.32, -0.174, -1.66, -2.79, -0.174]) | |
| joint_maxs = np.array([1.92, 0.174, 3.14, 1.66, 2.79, 1.75]) | |
| sim_action = 2.0 * (action - joint_mins) / (joint_maxs - joint_mins) - 1.0 | |
| sim_action = np.clip(sim_action, -1.0, 1.0) | |
| obs, reward, terminated, truncated, info = env.step(sim_action.astype(np.float32)) | |
| frame = env.render() | |
| frames.append(frame) | |
| if step % 20 == 0: | |
| pos = obs["agent_pos"] | |
| print(f" step {step:>3}: pos=[{pos[0]:.2f} {pos[1]:.2f} {pos[2]:.2f} {pos[3]:.2f} {pos[4]:.2f} {pos[5]:.3f}] reward={reward:.3f}") | |
| if terminated or truncated: | |
| print(f"Episode ended at step {step}") | |
| break | |
| # Save video | |
| imageio.mimsave(args.output, frames, fps=25) | |
| print(f"Saved {len(frames)} frames to {args.output}") | |
| if __name__ == "__main__": | |
| main() | |