from __future__ import annotations import argparse import json import time from dataclasses import replace from pathlib import Path import numpy as np import pandas as pd from PIL import Image from render_pose_sequence import solve_ik from scene_tools import load_scene_calibration, render_scene, robot_tool_position_world def main() -> None: t0 = time.time() parser = argparse.ArgumentParser() parser.add_argument("--checkpoint-dir", required=True) parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear") parser.add_argument("--sync-row-index", type=int, default=52) parser.add_argument("--config-name", default="pi05_kinova_teddybear") parser.add_argument("--prompt", default="pick up the teddy bear and place it in the red box") parser.add_argument("--rollout-steps", type=int, default=32) parser.add_argument("--scene-model-json", default="/workspace/kinova_scene_sim/outputs/robot_scene_model.json") parser.add_argument("--output-gif", default=None) parser.add_argument("--output-json", default="/workspace/kinova_scene_sim/outputs/pi_rollout_rendered.json") args = parser.parse_args() session_root = Path(args.session_root) sync = pd.read_csv(session_root / "sync_index.csv") row = sync.iloc[args.sync_row_index] calibration = load_scene_calibration(session_root, sync_row_index=0) if args.scene_model_json and Path(args.scene_model_json).exists(): scene_model = json.loads(Path(args.scene_model_json).read_text()) calibration = replace( calibration, robot_world_rvec=np.asarray(scene_model["robot_world_rvec"], dtype=np.float64), robot_world_tvec=np.asarray(scene_model["robot_world_tvec"], dtype=np.float64), ) background_path = Path(scene_model["background_path"]) else: background_path = session_root / row["azure_rgb_file"] base_background = np.array(Image.open(background_path).convert("RGB")) wrist_rgb = np.array(Image.open(session_root / row["rgb_file"]).convert("RGB")) current_state = np.array( [ row["tool_x_m"], row["tool_y_m"], row["tool_z_m"], row["tool_theta_x_deg"], row["tool_theta_y_deg"], row["tool_theta_z_deg"], row["gripper_pos"], ], dtype=np.float32, ) q_current = row[[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) print(f"[rendered] start row={args.sync_row_index}", flush=True) print("[rendered] importing openpi modules", flush=True) from openpi.policies import policy_config as _policy_config from openpi.training import config as _config print(f"[rendered] imported openpi in {time.time() - t0:.2f}s", flush=True) train_config = _config.get_config(args.config_name) policy = _policy_config.create_trained_policy(train_config, Path(args.checkpoint_dir)) print(f"[rendered] policy ready in {time.time() - t0:.2f}s", flush=True) states = [current_state.copy()] qs = [q_current.copy()] chunks = [] rendered_files: list[str] = [] teddy_distances_m = [ float( np.linalg.norm( robot_tool_position_world( current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec )[:2] - calibration.teddy_world_center[:2] ) ) ] output_dir = Path(args.output_json).parent / "rendered_frames" output_dir.mkdir(parents=True, exist_ok=True) for step_idx in range(args.rollout_steps): rendered = render_scene(calibration, q_current, background=base_background.copy(), alpha=1.0) rendered_path = output_dir / f"frame_{step_idx:03d}.png" Image.fromarray(rendered).save(rendered_path) rendered_files.append(str(rendered_path)) results = policy.infer( { "observation/state": current_state, "observation/image": rendered, "observation/wrist_image": wrist_rgb, "prompt": args.prompt, } ) chunk = np.asarray(results["actions"], dtype=np.float32) chunks.append(chunk) current_state = chunk[0].copy() q_current = solve_ik(current_state, q_current) states.append(current_state.copy()) qs.append(q_current.copy()) teddy_distances_m.append( float( np.linalg.norm( robot_tool_position_world( current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec )[:2] - calibration.teddy_world_center[:2] ) ) ) print( f"[rendered] step={step_idx + 1}/{args.rollout_steps} " f"xyz={current_state[:3].tolist()} grip={float(current_state[6]):.3f} " f"teddy_xy_m={teddy_distances_m[-1]:.4f}", flush=True, ) output = { "checkpoint_dir": str(Path(args.checkpoint_dir)), "session_root": str(session_root), "sync_row_index": int(args.sync_row_index), "states": np.stack(states, axis=0).tolist(), "actions": np.stack(states[1:], axis=0).tolist(), "chunks": np.stack(chunks, axis=0).tolist(), "joint_sequences_deg": np.stack(qs, axis=0).tolist(), "rendered_files": rendered_files, "teddy_xy_distance_m": teddy_distances_m, } output_path = Path(args.output_json) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(output, indent=2)) output_gif = Path(args.output_gif) if args.output_gif else output_path.with_suffix(".gif") frames = [Image.open(frame_path) for frame_path in rendered_files] frames[0].save( output_gif, save_all=True, append_images=frames[1:], loop=0, duration=130, ) print(f"[rendered] saved rollout to {output_path}", flush=True) print(f"[rendered] saved gif to {output_gif}", flush=True) if __name__ == "__main__": main()