| |
| """LIBERO eval client with speed conditioning, partial-suite support, and step-count tracking. |
| |
| Connects to a websocket policy server (see ``scripts/serve_policy.py``), runs |
| rollouts for a chosen task suite (or a subset of task IDs within that suite), |
| records per-episode success and step count, and prints a summary plus an |
| optional JSON file. |
| |
| Designed to be invoked by ``scripts/eval_libero_8gpu.sh``, which fans out the |
| work across 8 GPUs (3 short suites + libero_10 split 5 ways). |
| |
| Example: |
| |
| uv run python scripts/eval_libero_speed.py \\ |
| --task-suite-name libero_spatial \\ |
| --host 0.0.0.0 --port 8000 \\ |
| --speed 1.0 \\ |
| --video-out-path videos/spatial_1x \\ |
| --results-json results/spatial_1x.json |
| """ |
| from __future__ import annotations |
|
|
| import collections |
| import dataclasses |
| import json |
| import logging |
| import math |
| import pathlib |
| import time |
|
|
| import imageio |
| from libero.libero import benchmark |
| from libero.libero import get_libero_path |
| from libero.libero.envs import OffScreenRenderEnv |
| import numpy as np |
| from openpi_client import image_tools |
| from openpi_client import websocket_client_policy as _websocket_client_policy |
| import tqdm |
| import tyro |
|
|
|
|
| LIBERO_DUMMY_ACTION = [0.0] * 6 + [-1.0] |
| LIBERO_ENV_RESOLUTION = 256 |
|
|
|
|
| |
| |
| _MAX_STEPS = { |
| "libero_spatial": 220, |
| "libero_object": 280, |
| "libero_goal": 300, |
| "libero_10": 520, |
| "libero_90": 400, |
| } |
|
|
|
|
| @dataclasses.dataclass |
| class Args: |
| |
| host: str = "0.0.0.0" |
| port: int = 8000 |
|
|
| |
| task_suite_name: str = "libero_spatial" |
| |
| task_ids: str = "all" |
| num_trials_per_task: int = 50 |
|
|
| |
| speed: float = 1.0 |
|
|
| |
| replan_steps: int = 5 |
| resize_size: int = 224 |
| num_steps_wait: int = 10 |
| seed: int = 7 |
|
|
| |
| video_out_path: str = "videos/libero_eval" |
| results_json: str | None = None |
| |
| save_videos: bool = True |
|
|
| |
| |
| rank: int = 0 |
|
|
|
|
| def _parse_task_ids(spec: str, n_total: int) -> list[int]: |
| if spec.strip().lower() == "all": |
| return list(range(n_total)) |
| out: list[int] = [] |
| for part in spec.split(","): |
| part = part.strip() |
| if not part: |
| continue |
| if "-" in part: |
| lo, hi = part.split("-", 1) |
| out.extend(range(int(lo), int(hi) + 1)) |
| else: |
| out.append(int(part)) |
| bad = [i for i in out if i < 0 or i >= n_total] |
| if bad: |
| raise ValueError(f"task_ids out of range [0, {n_total}): {bad}") |
| return sorted(set(out)) |
|
|
|
|
| def _get_libero_env(task, resolution: int, seed: int): |
| task_description = task.language |
| bddl = pathlib.Path(get_libero_path("bddl_files")) / task.problem_folder / task.bddl_file |
| env = OffScreenRenderEnv(bddl_file_name=bddl, camera_heights=resolution, camera_widths=resolution) |
| env.seed(seed) |
| return env, task_description |
|
|
|
|
| def _quat2axisangle(quat): |
| if quat[3] > 1.0: |
| quat[3] = 1.0 |
| elif quat[3] < -1.0: |
| quat[3] = -1.0 |
| den = np.sqrt(1.0 - quat[3] * quat[3]) |
| if math.isclose(den, 0.0): |
| return np.zeros(3) |
| return (quat[:3] * 2.0 * math.acos(quat[3])) / den |
|
|
|
|
| def _speed_label(speed: float) -> str: |
| text = f"{speed:g}".replace(".", "p") |
| return f"{text}x" |
|
|
|
|
| def _summary_string(speed: float, suite: str, rank: int, episodes: list[dict]) -> str: |
| n = len(episodes) |
| if n == 0: |
| return f"[rank={rank}] {suite} speed={speed:g}x no episodes" |
| successes = [e for e in episodes if e["success"]] |
| failures = [e for e in episodes if not e["success"]] |
| succ_steps = [e["steps"] for e in successes] |
| all_steps = [e["steps"] for e in episodes] |
| sr = len(successes) / n |
| succ_mean = float(np.mean(succ_steps)) if succ_steps else float("nan") |
| succ_median = float(np.median(succ_steps)) if succ_steps else float("nan") |
| all_mean = float(np.mean(all_steps)) |
| fail_mean = float(np.mean([e["steps"] for e in failures])) if failures else float("nan") |
| return ( |
| f"[rank={rank}] {suite} speed={speed:g}x " |
| f"success={len(successes)}/{n} ({sr * 100:.1f}%) " |
| f"mean_steps_success={succ_mean:.1f} median={succ_median:.1f} " |
| f"mean_steps_failure={fail_mean:.1f} " |
| f"mean_steps_all={all_mean:.1f}" |
| ) |
|
|
|
|
| def eval_libero(args: Args) -> int: |
| np.random.seed(args.seed) |
|
|
| benchmark_dict = benchmark.get_benchmark_dict() |
| task_suite = benchmark_dict[args.task_suite_name]() |
| n_total_tasks = task_suite.n_tasks |
| task_ids = _parse_task_ids(args.task_ids, n_total_tasks) |
| if not task_ids: |
| raise ValueError(f"No tasks selected for {args.task_suite_name} (task_ids='{args.task_ids}')") |
| if args.task_suite_name not in _MAX_STEPS: |
| raise ValueError(f"Unknown task suite: {args.task_suite_name}") |
| max_steps = _MAX_STEPS[args.task_suite_name] |
|
|
| pathlib.Path(args.video_out_path).mkdir(parents=True, exist_ok=True) |
| logging.info( |
| f"[rank={args.rank}] suite={args.task_suite_name} task_ids={task_ids} " |
| f"n_trials={args.num_trials_per_task} speed={args.speed:g} max_steps={max_steps}" |
| ) |
|
|
| client = _websocket_client_policy.WebsocketClientPolicy(args.host, args.port) |
| speed_label = _speed_label(args.speed) |
|
|
| episodes: list[dict] = [] |
| t_start = time.time() |
|
|
| for task_id in tqdm.tqdm(task_ids, desc=f"rank={args.rank}/{args.task_suite_name}"): |
| task = task_suite.get_task(task_id) |
| initial_states = task_suite.get_task_init_states(task_id) |
| env, task_description = _get_libero_env(task, LIBERO_ENV_RESOLUTION, args.seed) |
|
|
| for episode_idx in range(args.num_trials_per_task): |
| env.reset() |
| action_plan: collections.deque = collections.deque() |
| obs = env.set_init_state(initial_states[episode_idx]) |
|
|
| t = 0 |
| policy_steps_executed = 0 |
| replay_images: list[np.ndarray] = [] |
| done = False |
|
|
| while t < max_steps + args.num_steps_wait: |
| try: |
| if t < args.num_steps_wait: |
| obs, _, _, _ = env.step(LIBERO_DUMMY_ACTION) |
| t += 1 |
| continue |
|
|
| img = np.ascontiguousarray(obs["agentview_image"][::-1, ::-1]) |
| wrist_img = np.ascontiguousarray(obs["robot0_eye_in_hand_image"][::-1, ::-1]) |
| img = image_tools.convert_to_uint8( |
| image_tools.resize_with_pad(img, args.resize_size, args.resize_size) |
| ) |
| wrist_img = image_tools.convert_to_uint8( |
| image_tools.resize_with_pad(wrist_img, args.resize_size, args.resize_size) |
| ) |
| replay_images.append(img) |
|
|
| if not action_plan: |
| element = { |
| "observation/image": img, |
| "observation/wrist_image": wrist_img, |
| "observation/state": np.concatenate( |
| ( |
| obs["robot0_eef_pos"], |
| _quat2axisangle(obs["robot0_eef_quat"]), |
| obs["robot0_gripper_qpos"], |
| ) |
| ), |
| "prompt": str(task_description), |
| |
| |
| |
| |
| |
| "speed": np.array([args.speed], dtype=np.float32), |
| "speed_label": speed_label, |
| } |
| action_chunk = client.infer(element)["actions"] |
| if len(action_chunk) < args.replan_steps: |
| raise RuntimeError( |
| f"replan_steps={args.replan_steps} but policy returned " |
| f"{len(action_chunk)} actions" |
| ) |
| action_plan.extend(action_chunk[: args.replan_steps]) |
|
|
| action = action_plan.popleft() |
| obs, _, env_done, _ = env.step(action.tolist()) |
| policy_steps_executed += 1 |
| if env_done: |
| done = True |
| break |
| t += 1 |
| except Exception as e: |
| logging.error(f"[rank={args.rank}] task={task_id} ep={episode_idx} caught: {e}") |
| break |
|
|
| episodes.append( |
| { |
| "task_id": int(task_id), |
| "task_description": str(task_description), |
| "episode_idx": int(episode_idx), |
| "success": bool(done), |
| "steps": int(policy_steps_executed), |
| "max_steps": int(max_steps), |
| "wait_steps": int(args.num_steps_wait), |
| "speed": float(args.speed), |
| "suite": str(args.task_suite_name), |
| } |
| ) |
|
|
| if args.save_videos: |
| suffix = "success" if done else "failure" |
| seg = task_description.replace(" ", "_") |
| vid_path = ( |
| pathlib.Path(args.video_out_path) |
| / f"rank{args.rank}_{args.task_suite_name}_speed{speed_label}" |
| f"_t{task_id:02d}_e{episode_idx:02d}_{suffix}.mp4" |
| ) |
| imageio.mimwrite(vid_path, [np.asarray(x) for x in replay_images], fps=10) |
| del replay_images |
|
|
| elapsed = time.time() - t_start |
| summary_str = _summary_string(args.speed, args.task_suite_name, args.rank, episodes) |
| print(summary_str) |
| print(f"[rank={args.rank}] elapsed={elapsed:.1f}s") |
|
|
| if args.results_json: |
| out_path = pathlib.Path(args.results_json) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| successes = [e for e in episodes if e["success"]] |
| failures = [e for e in episodes if not e["success"]] |
| summary = { |
| "rank": args.rank, |
| "suite": args.task_suite_name, |
| "task_ids": task_ids, |
| "num_trials_per_task": args.num_trials_per_task, |
| "speed": float(args.speed), |
| "speed_label": speed_label, |
| "n_episodes": len(episodes), |
| "n_success": len(successes), |
| "n_failure": len(failures), |
| "success_rate": len(successes) / max(len(episodes), 1), |
| "mean_steps_success": float(np.mean([e["steps"] for e in successes])) if successes else None, |
| "median_steps_success": float(np.median([e["steps"] for e in successes])) if successes else None, |
| "mean_steps_failure": float(np.mean([e["steps"] for e in failures])) if failures else None, |
| "mean_steps_all": float(np.mean([e["steps"] for e in episodes])) if episodes else None, |
| "elapsed_seconds": elapsed, |
| "summary_line": summary_str, |
| } |
| with out_path.open("w") as f: |
| json.dump({"summary": summary, "episodes": episodes}, f, indent=2) |
| f.write("\n") |
| print(f"[rank={args.rank}] wrote {out_path}") |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S") |
| raise SystemExit(eval_libero(tyro.cli(Args))) |
|
|