TeddyBearKinova / bundle /sim_model_based_20260329 /code /rollout_pi_policy_rendered.py
lsnu's picture
Upload folder using huggingface_hub
f81aa8a verified
from __future__ import annotations
import argparse
import json
import time
from dataclasses import replace
from pathlib import Path
import numpy as np
import pandas as pd
from PIL import Image
from render_pose_sequence import solve_ik
from scene_tools import load_scene_calibration, render_scene, robot_tool_position_world
def main() -> None:
t0 = time.time()
parser = argparse.ArgumentParser()
parser.add_argument("--checkpoint-dir", required=True)
parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear")
parser.add_argument("--sync-row-index", type=int, default=52)
parser.add_argument("--config-name", default="pi05_kinova_teddybear")
parser.add_argument("--prompt", default="pick up the teddy bear and place it in the red box")
parser.add_argument("--rollout-steps", type=int, default=32)
parser.add_argument("--scene-model-json", default="/workspace/kinova_scene_sim/outputs/robot_scene_model.json")
parser.add_argument("--output-gif", default=None)
parser.add_argument("--output-json", default="/workspace/kinova_scene_sim/outputs/pi_rollout_rendered.json")
args = parser.parse_args()
session_root = Path(args.session_root)
sync = pd.read_csv(session_root / "sync_index.csv")
row = sync.iloc[args.sync_row_index]
calibration = load_scene_calibration(session_root, sync_row_index=0)
if args.scene_model_json and Path(args.scene_model_json).exists():
scene_model = json.loads(Path(args.scene_model_json).read_text())
calibration = replace(
calibration,
robot_world_rvec=np.asarray(scene_model["robot_world_rvec"], dtype=np.float64),
robot_world_tvec=np.asarray(scene_model["robot_world_tvec"], dtype=np.float64),
)
background_path = Path(scene_model["background_path"])
else:
background_path = session_root / row["azure_rgb_file"]
base_background = np.array(Image.open(background_path).convert("RGB"))
wrist_rgb = np.array(Image.open(session_root / row["rgb_file"]).convert("RGB"))
current_state = np.array(
[
row["tool_x_m"],
row["tool_y_m"],
row["tool_z_m"],
row["tool_theta_x_deg"],
row["tool_theta_y_deg"],
row["tool_theta_z_deg"],
row["gripper_pos"],
],
dtype=np.float32,
)
q_current = row[[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64)
print(f"[rendered] start row={args.sync_row_index}", flush=True)
print("[rendered] importing openpi modules", flush=True)
from openpi.policies import policy_config as _policy_config
from openpi.training import config as _config
print(f"[rendered] imported openpi in {time.time() - t0:.2f}s", flush=True)
train_config = _config.get_config(args.config_name)
policy = _policy_config.create_trained_policy(train_config, Path(args.checkpoint_dir))
print(f"[rendered] policy ready in {time.time() - t0:.2f}s", flush=True)
states = [current_state.copy()]
qs = [q_current.copy()]
chunks = []
rendered_files: list[str] = []
teddy_distances_m = [
float(
np.linalg.norm(
robot_tool_position_world(
current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec
)[:2]
- calibration.teddy_world_center[:2]
)
)
]
output_dir = Path(args.output_json).parent / "rendered_frames"
output_dir.mkdir(parents=True, exist_ok=True)
for step_idx in range(args.rollout_steps):
rendered = render_scene(calibration, q_current, background=base_background.copy(), alpha=1.0)
rendered_path = output_dir / f"frame_{step_idx:03d}.png"
Image.fromarray(rendered).save(rendered_path)
rendered_files.append(str(rendered_path))
results = policy.infer(
{
"observation/state": current_state,
"observation/image": rendered,
"observation/wrist_image": wrist_rgb,
"prompt": args.prompt,
}
)
chunk = np.asarray(results["actions"], dtype=np.float32)
chunks.append(chunk)
current_state = chunk[0].copy()
q_current = solve_ik(current_state, q_current)
states.append(current_state.copy())
qs.append(q_current.copy())
teddy_distances_m.append(
float(
np.linalg.norm(
robot_tool_position_world(
current_state[:3], calibration.robot_world_rvec, calibration.robot_world_tvec
)[:2]
- calibration.teddy_world_center[:2]
)
)
)
print(
f"[rendered] step={step_idx + 1}/{args.rollout_steps} "
f"xyz={current_state[:3].tolist()} grip={float(current_state[6]):.3f} "
f"teddy_xy_m={teddy_distances_m[-1]:.4f}",
flush=True,
)
output = {
"checkpoint_dir": str(Path(args.checkpoint_dir)),
"session_root": str(session_root),
"sync_row_index": int(args.sync_row_index),
"states": np.stack(states, axis=0).tolist(),
"actions": np.stack(states[1:], axis=0).tolist(),
"chunks": np.stack(chunks, axis=0).tolist(),
"joint_sequences_deg": np.stack(qs, axis=0).tolist(),
"rendered_files": rendered_files,
"teddy_xy_distance_m": teddy_distances_m,
}
output_path = Path(args.output_json)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output, indent=2))
output_gif = Path(args.output_gif) if args.output_gif else output_path.with_suffix(".gif")
frames = [Image.open(frame_path) for frame_path in rendered_files]
frames[0].save(
output_gif,
save_all=True,
append_images=frames[1:],
loop=0,
duration=130,
)
print(f"[rendered] saved rollout to {output_path}", flush=True)
print(f"[rendered] saved gif to {output_gif}", flush=True)
if __name__ == "__main__":
main()