TeddyBearKinova / bundle /sim /run_policy_preview.py
lsnu's picture
Upload folder using huggingface_hub
d93804e verified
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
def checkpoint_ready(checkpoint_dir: Path) -> bool:
return (checkpoint_dir / "_CHECKPOINT_METADATA").is_file() and (checkpoint_dir / "params" / "_METADATA").is_file()
def wait_for_checkpoint(checkpoint_dir: Path, poll_s: float) -> None:
while not checkpoint_ready(checkpoint_dir):
print(f"waiting for checkpoint: {checkpoint_dir}", flush=True)
time.sleep(poll_s)
def pid_running(pid: int) -> bool:
return Path(f"/proc/{pid}").exists()
def wait_for_pid_exit(pid: int, poll_s: float) -> None:
while pid_running(pid):
print(f"waiting for pid to exit: {pid}", flush=True)
time.sleep(poll_s)
def stage_checkpoint(checkpoint_dir: Path, stage_root: Path) -> Path:
stage_root.mkdir(parents=True, exist_ok=True)
staged_dir = stage_root / checkpoint_dir.name
if staged_dir.exists():
shutil.rmtree(staged_dir)
subprocess.run(["cp", "-al", str(checkpoint_dir), str(stage_root)], check=True)
return staged_dir
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--checkpoint-dir", default=None, help="Checkpoint step directory, e.g. .../4000")
parser.add_argument(
"--checkpoint-root",
default="/workspace/openpi/checkpoints/pi05_kinova_teddybear/teddybear_kinova_ft_v4_b24_w12_fork",
help="Checkpoint experiment root used with --step.",
)
parser.add_argument("--step", type=int, default=None, help="Checkpoint step under --checkpoint-root.")
parser.add_argument("--wait", action="store_true", help="Wait until the checkpoint has finalized before exporting.")
parser.add_argument("--poll-s", type=float, default=15.0)
parser.add_argument("--wait-for-pid-exit", type=int, default=None, help="Wait for this PID to exit before export.")
parser.add_argument(
"--stage-root",
default=None,
help="Optional directory where the finalized checkpoint is hardlink-staged before export.",
)
parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear")
parser.add_argument("--sync-row-index", type=int, default=0)
parser.add_argument("--prompt", default="pick up the teddy bear and place it in the red box")
parser.add_argument(
"--python",
default="/workspace/openpi/.venv/bin/python",
help="Python interpreter used to run the OpenPI export step.",
)
parser.add_argument(
"--jax-platforms",
default="cpu",
help="Value for JAX_PLATFORMS during action export. Use 'cuda' once training has stopped.",
)
parser.add_argument(
"--output-json",
default="/workspace/kinova_scene_sim/outputs/pi_actions_preview.json",
)
parser.add_argument(
"--output-gif",
default="/workspace/kinova_scene_sim/outputs/pi_policy_preview.gif",
)
parser.add_argument("--fps", type=int, default=4)
args = parser.parse_args()
if args.checkpoint_dir is None:
if args.step is None:
raise ValueError("provide either --checkpoint-dir or --step")
checkpoint_dir = Path(args.checkpoint_root) / str(args.step)
else:
checkpoint_dir = Path(args.checkpoint_dir)
if args.wait:
wait_for_checkpoint(checkpoint_dir, args.poll_s)
elif not checkpoint_ready(checkpoint_dir):
raise FileNotFoundError(f"checkpoint not ready: {checkpoint_dir}")
if args.stage_root is not None:
checkpoint_dir = stage_checkpoint(checkpoint_dir, Path(args.stage_root))
print(f"staged checkpoint to {checkpoint_dir}", flush=True)
if args.wait_for_pid_exit is not None:
wait_for_pid_exit(args.wait_for_pid_exit, args.poll_s)
export_cmd = [
args.python,
"/workspace/kinova_scene_sim/export_pi_actions.py",
"--checkpoint-dir",
str(checkpoint_dir),
"--session-root",
args.session_root,
"--sync-row-index",
str(args.sync_row_index),
"--prompt",
args.prompt,
"--output-json",
args.output_json,
]
export_env = os.environ.copy()
export_env["JAX_PLATFORMS"] = args.jax_platforms
subprocess.run(export_cmd, check=True, env=export_env)
render_cmd = [
sys.executable,
"/workspace/kinova_scene_sim/render_pose_sequence.py",
"--session-root",
args.session_root,
"--poses-json",
args.output_json,
"--output",
args.output_gif,
"--fps",
str(args.fps),
]
subprocess.run(render_cmd, check=True)
print(f"saved preview gif to {args.output_gif}")
if __name__ == "__main__":
main()