| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import time |
| from dataclasses import replace |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
|
|
| from render_pose_sequence import solve_ik |
| from scene_tools import load_scene_calibration, render_scene, robot_tool_position_world |
|
|
|
|
| def main() -> None: |
| t0 = time.time() |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--checkpoint-dir", required=True) |
| parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear") |
| parser.add_argument("--sync-row-index", type=int, default=52) |
| parser.add_argument("--config-name", default="pi05_kinova_teddybear") |
| parser.add_argument("--prompt", default="pick up the teddy bear and place it in the red box") |
| parser.add_argument("--rollout-steps", type=int, default=32) |
| parser.add_argument("--scene-model-json", default="/workspace/kinova_scene_sim/outputs/robot_scene_model.json") |
| parser.add_argument("--output-gif", default=None) |
| parser.add_argument("--output-json", default="/workspace/kinova_scene_sim/outputs/pi_rollout_rendered.json") |
| args = parser.parse_args() |
|
|
| session_root = Path(args.session_root) |
| sync = pd.read_csv(session_root / "sync_index.csv") |
| row = sync.iloc[args.sync_row_index] |
| calibration = load_scene_calibration(session_root, sync_row_index=0) |
| if args.scene_model_json and Path(args.scene_model_json).exists(): |
| scene_model = json.loads(Path(args.scene_model_json).read_text()) |
| calibration = replace( |
| calibration, |
| robot_world_rvec=np.asarray(scene_model["robot_world_rvec"], dtype=np.float64), |
| robot_world_tvec=np.asarray(scene_model["robot_world_tvec"], dtype=np.float64), |
| ) |
| background_path = Path(scene_model["background_path"]) |
| else: |
| background_path = session_root / row["azure_rgb_file"] |
| base_background = np.array(Image.open(background_path).convert("RGB")) |
| wrist_rgb = np.array(Image.open(session_root / row["rgb_file"]).convert("RGB")) |
|
|
| current_state = np.array( |
| [ |
| row["tool_x_m"], |
| row["tool_y_m"], |
| row["tool_z_m"], |
| row["tool_theta_x_deg"], |
| row["tool_theta_y_deg"], |
| row["tool_theta_z_deg"], |
| row["gripper_pos"], |
| ], |
| dtype=np.float32, |
| ) |
| q_current = row[[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) |
| print(f"[rendered] start row={args.sync_row_index}", flush=True) |
| print("[rendered] importing openpi modules", flush=True) |
| from openpi.policies import policy_config as _policy_config |
| from openpi.training import config as _config |
|
|
| print(f"[rendered] imported openpi in {time.time() - t0:.2f}s", flush=True) |
| train_config = _config.get_config(args.config_name) |
| policy = _policy_config.create_trained_policy(train_config, Path(args.checkpoint_dir)) |
| print(f"[rendered] policy ready in {time.time() - t0:.2f}s", flush=True) |
|
|
| states = [current_state.copy()] |
| qs = [q_current.copy()] |
| chunks = [] |
| rendered_files: list[str] = [] |
| teddy_distances_m = [ |
| float( |
| np.linalg.norm( |
| robot_tool_position_world( |
| current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec |
| )[:2] |
| - calibration.teddy_world_center[:2] |
| ) |
| ) |
| ] |
|
|
| output_dir = Path(args.output_json).parent / "rendered_frames" |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for step_idx in range(args.rollout_steps): |
| rendered = render_scene(calibration, q_current, background=base_background.copy(), alpha=1.0) |
| rendered_path = output_dir / f"frame_{step_idx:03d}.png" |
| Image.fromarray(rendered).save(rendered_path) |
| rendered_files.append(str(rendered_path)) |
|
|
| results = policy.infer( |
| { |
| "observation/state": current_state, |
| "observation/image": rendered, |
| "observation/wrist_image": wrist_rgb, |
| "prompt": args.prompt, |
| } |
| ) |
| chunk = np.asarray(results["actions"], dtype=np.float32) |
| chunks.append(chunk) |
| current_state = chunk[0].copy() |
| q_current = solve_ik(current_state, q_current) |
| states.append(current_state.copy()) |
| qs.append(q_current.copy()) |
| teddy_distances_m.append( |
| float( |
| np.linalg.norm( |
| robot_tool_position_world( |
| current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec |
| )[:2] |
| - calibration.teddy_world_center[:2] |
| ) |
| ) |
| ) |
| print( |
| f"[rendered] step={step_idx + 1}/{args.rollout_steps} " |
| f"xyz={current_state[:3].tolist()} grip={float(current_state[6]):.3f} " |
| f"teddy_xy_m={teddy_distances_m[-1]:.4f}", |
| flush=True, |
| ) |
|
|
| output = { |
| "checkpoint_dir": str(Path(args.checkpoint_dir)), |
| "session_root": str(session_root), |
| "sync_row_index": int(args.sync_row_index), |
| "states": np.stack(states, axis=0).tolist(), |
| "actions": np.stack(states[1:], axis=0).tolist(), |
| "chunks": np.stack(chunks, axis=0).tolist(), |
| "joint_sequences_deg": np.stack(qs, axis=0).tolist(), |
| "rendered_files": rendered_files, |
| "teddy_xy_distance_m": teddy_distances_m, |
| } |
| output_path = Path(args.output_json) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| output_path.write_text(json.dumps(output, indent=2)) |
| output_gif = Path(args.output_gif) if args.output_gif else output_path.with_suffix(".gif") |
| frames = [Image.open(frame_path) for frame_path in rendered_files] |
| frames[0].save( |
| output_gif, |
| save_all=True, |
| append_images=frames[1:], |
| loop=0, |
| duration=130, |
| ) |
| print(f"[rendered] saved rollout to {output_path}", flush=True) |
| print(f"[rendered] saved gif to {output_gif}", flush=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|