| """ |
| Prepare test demos for LIBERO-10 task suite. |
| The demonstration that has the shortest frames sequence is regarded as the expert demonstration. |
| When preparing the test demo, we exclude the expert demonstration. |
| For the rest of the demonstrations, we randomly sample up to 40 demonstrations (configurable). |
| For each demonstration, we sample 4 frames with equal interval (configurable), including the initial frame and the success frame. |
| We save the frames as PNGs in a folder. |
| |
| Usage: |
| # Process all LIBERO-10 tasks |
| python prepare_test_demo_single_task.py --process-all-tasks --libero-data-dir <data_dir> --output-root <output_root> |
| |
| # Process a single task |
| python prepare_test_demo_single_task.py --task_name <task_name> --hdf5_path <path_to_hdf5> --output_dir <output_dir> |
| |
| Examples: |
| # Process all LIBERO-10 tasks |
| python prepare_test_demo_single_task.py \ |
| --process-all-tasks \ |
| --libero-data-dir /home/zechen/Data/Robo/LIBERO_Regen/libero_10_regen \ |
| --output-root toy_test_demos_LIBERO_10 \ |
| --num-demos 40 \ |
| --frames-per-demo 4 |
| |
| # Process a single task |
| python prepare_test_demo_single_task.py \ |
| --task-name KITCHEN_SCENE3_turn_on_the_stove_and_put_the_moka_pot_on_it \ |
| --hdf5-path /path/to/KITCHEN_SCENE3_turn_on_the_stove_and_put_the_moka_pot_on_it.hdf5 \ |
| --output-dir /path/to/output \ |
| --num-demos 40 \ |
| --frames-per-demo 4 |
| |
| """ |
|
|
| import argparse |
| import json |
| import random |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| import h5py |
| import numpy as np |
| from PIL import Image |
| from tqdm import tqdm |
|
|
| |
| LIBERO_10_TASKS = [ |
| "KITCHEN_SCENE3_turn_on_the_stove_and_put_the_moka_pot_on_it", |
| "KITCHEN_SCENE4_put_the_black_bowl_in_the_bottom_drawer_of_the_cabinet_and_close_it", |
| "KITCHEN_SCENE6_put_the_yellow_and_white_mug_in_the_microwave_and_close_it", |
| "KITCHEN_SCENE8_put_both_moka_pots_on_the_stove", |
| "LIVING_ROOM_SCENE1_put_both_the_alphabet_soup_and_the_cream_cheese_box_in_the_basket", |
| "LIVING_ROOM_SCENE2_put_both_the_alphabet_soup_and_the_tomato_sauce_in_the_basket", |
| "LIVING_ROOM_SCENE2_put_both_the_cream_cheese_box_and_the_butter_in_the_basket", |
| "LIVING_ROOM_SCENE5_put_the_white_mug_on_the_left_plate_and_put_the_yellow_and_white_mug_on_the_right_plate", |
| "LIVING_ROOM_SCENE6_put_the_white_mug_on_the_plate_and_put_the_chocolate_pudding_to_the_right_of_the_plate", |
| "STUDY_SCENE1_pick_up_the_book_and_place_it_in_the_back_compartment_of_the_caddy", |
| ] |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Prepare test demos for LIBERO-10 task suite") |
| |
| |
| parser.add_argument("--process-all-tasks", action="store_true", help="Process all LIBERO-10 tasks") |
| |
| |
| parser.add_argument("--libero-data-dir", type=Path, help="Directory containing all LIBERO-10 HDF5 files (required with --process-all-tasks)") |
| parser.add_argument("--output-root", type=Path, help="Root output directory (required with --process-all-tasks)") |
| |
| |
| parser.add_argument("--task-name", help="Name of the task (for single task mode)") |
| parser.add_argument("--hdf5-path", type=Path, help="Path to the HDF5 file (for single task mode)") |
| parser.add_argument("--output-dir", type=Path, help="Output directory (for single task mode)") |
| |
| |
| parser.add_argument("--num-demos", type=int, default=40, help="Number of test demos to sample (default: 40)") |
| parser.add_argument("--frames-per-demo", type=int, default=4, help="Number of frames per demo (default: 4)") |
| parser.add_argument("--random-seed", type=int, default=42, help="Random seed for reproducibility (default: 42)") |
| |
| args = parser.parse_args() |
| |
| |
| if args.process_all_tasks: |
| if not args.libero_data_dir: |
| parser.error("--libero-data-dir is required when using --process-all-tasks") |
| if not args.output_root: |
| parser.error("--output-root is required when using --process-all-tasks") |
| else: |
| if not args.task_name: |
| parser.error("--task-name is required for single task mode") |
| if not args.hdf5_path: |
| parser.error("--hdf5-path is required for single task mode") |
| if not args.output_dir: |
| parser.error("--output-dir is required for single task mode") |
| |
| return args |
|
|
| def load_demo_arrays(demo_group: h5py.Group): |
| """Load frames and dones arrays from a demo group.""" |
| frames = demo_group["obs/agentview_rgb"] |
| dones_ds = demo_group.get("dones") |
| dones = np.asarray(dones_ds[:]) if dones_ds is not None else None |
| return frames, dones |
|
|
| def first_success_index(dones: Optional[np.ndarray], total_frames: int) -> int: |
| """Find the index of the first success frame.""" |
| if dones is None: |
| return total_frames - 1 |
| indices = np.where(dones == 1)[0] |
| return int(indices[0]) if indices.size > 0 else total_frames - 1 |
|
|
| def find_expert_demo(data_group: h5py.Group) -> Optional[Tuple[str, int, int]]: |
| """Find the expert demo (shortest successful trajectory). |
| |
| Returns: |
| Tuple of (demo_name, total_frames, success_index) or None if no expert found |
| """ |
| best_name: Optional[str] = None |
| best_total: Optional[int] = None |
| best_success: Optional[int] = None |
| |
| for demo_name in sorted(data_group.keys()): |
| demo_group = data_group[demo_name] |
| frames, dones = load_demo_arrays(demo_group) |
| total_frames = frames.shape[0] |
| success_idx = first_success_index(dones, total_frames) |
| |
| if success_idx <= 0: |
| continue |
| |
| if best_name is None or total_frames < best_total: |
| best_name = demo_name |
| best_total = total_frames |
| best_success = success_idx |
| |
| if best_name is None: |
| return None |
| return best_name, best_total, best_success |
|
|
| def save_frame(array: np.ndarray, file_path: Path) -> None: |
| """Save a frame array as PNG with vertical flip.""" |
| file_path.parent.mkdir(parents=True, exist_ok=True) |
| Image.fromarray(array).transpose(Image.FLIP_TOP_BOTTOM).save(file_path) |
|
|
|
|
| def process_single_task( |
| task_name: str, |
| hdf5_path: Path, |
| output_dir: Path, |
| num_demos: int, |
| frames_per_demo: int, |
| random_seed: int, |
| ) -> bool: |
| """Process a single task and generate test demos. |
| |
| Args: |
| task_name: Name of the task |
| hdf5_path: Path to the HDF5 file |
| output_dir: Output directory for this task |
| num_demos: Number of demos to sample |
| frames_per_demo: Number of frames per demo |
| random_seed: Random seed for reproducibility |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| if not hdf5_path.exists(): |
| print(f"Error: HDF5 file not found: {hdf5_path}") |
| return False |
|
|
| print(f"\n{'='*80}") |
| print(f"Processing task: {task_name}") |
| print(f"HDF5 file: {hdf5_path}") |
| print(f"Output directory: {output_dir}") |
| print(f"{'='*80}") |
|
|
| with h5py.File(hdf5_path, "r") as handle: |
| data_group = handle.get("data") |
| if data_group is None: |
| print("Error: No data group found in HDF5 file") |
| return False |
|
|
| |
| print("\nFinding expert demonstration...") |
| expert_info = find_expert_demo(data_group) |
| if expert_info is None: |
| print("Error: No successful demonstrations found") |
| return False |
| |
| expert_name, expert_frames, expert_success = expert_info |
| print(f"Expert demo: {expert_name} ({expert_frames} frames, success at {expert_success})") |
|
|
| |
| demo_names = sorted(data_group.keys()) |
| candidate_demos: List[Tuple[str, int, int]] = [] |
| |
| for demo_name in demo_names: |
| if demo_name == expert_name: |
| continue |
| |
| demo_group = data_group[demo_name] |
| frames, dones = load_demo_arrays(demo_group) |
| total_frames = frames.shape[0] |
| success_idx = first_success_index(dones, total_frames) |
| |
| if success_idx <= 0: |
| continue |
| |
| candidate_demos.append((demo_name, total_frames, success_idx)) |
| |
| print(f"\nFound {len(candidate_demos)} candidate test demos (excluding expert)") |
| |
| if len(candidate_demos) == 0: |
| print("Error: No test demos available after excluding expert") |
| return False |
|
|
| |
| random.seed(random_seed) |
| |
| sampled_demos = random.sample(candidate_demos, min(num_demos, len(candidate_demos))) |
| print(f"Sampling {len(sampled_demos)} test demos") |
|
|
| |
| manifest: List[Dict] = [] |
| |
| for demo_name, total_frames, success_idx in tqdm(sampled_demos, desc="Exporting demos"): |
| demo_group = data_group[demo_name] |
| frames, _ = load_demo_arrays(demo_group) |
| |
| |
| |
| if frames_per_demo < 2: |
| print(f"Warning: frames_per_demo must be at least 2, setting to 2") |
| frames_per_demo = 2 |
| |
| indices = np.linspace(0, success_idx, num=frames_per_demo, dtype=int) |
| |
| indices = sorted(set(indices)) |
| if 0 not in indices: |
| indices.insert(0, 0) |
| if success_idx not in indices: |
| indices.append(success_idx) |
| |
| |
| demo_folder = output_dir / f"{task_name}_{demo_name}" |
| demo_folder.mkdir(parents=True, exist_ok=True) |
| |
| |
| frame_paths: List[str] = [] |
| for i, frame_idx in enumerate(indices): |
| frame_path = demo_folder / f"frame_{i:04d}.png" |
| save_frame(frames[frame_idx], frame_path) |
| frame_paths.append(str(frame_path.relative_to(output_dir))) |
| |
| |
| demo_metadata = { |
| "task_name": task_name, |
| "demo_name": demo_name, |
| "total_frames": int(total_frames), |
| "success_index": int(success_idx), |
| "sampled_frame_indices": [int(idx) for idx in indices], |
| "frame_paths": frame_paths, |
| } |
| |
| manifest.append(demo_metadata) |
| |
| |
| metadata_path = demo_folder / "metadata.json" |
| with metadata_path.open("w", encoding="utf-8") as f: |
| json.dump(demo_metadata, f, indent=2) |
|
|
| |
| manifest_path = output_dir / f"{task_name}_test_manifest.json" |
| manifest_data = { |
| "task_name": task_name, |
| "expert_demo": expert_name, |
| "num_test_demos": len(sampled_demos), |
| "frames_per_demo": frames_per_demo, |
| "demos": manifest, |
| } |
| |
| with manifest_path.open("w", encoding="utf-8") as f: |
| json.dump(manifest_data, f, indent=2) |
|
|
| print(f"\nSuccessfully exported {len(sampled_demos)} test demos") |
| print(f"Output directory: {output_dir}") |
| print(f"Manifest: {manifest_path}") |
| |
| return True |
|
|
|
|
| def find_hdf5_file(data_dir: Path, task_name: str) -> Optional[Path]: |
| """Find the HDF5 file for a given task name. |
| |
| Tries different naming patterns commonly used in LIBERO datasets. |
| """ |
| |
| patterns = [ |
| f"{task_name}.hdf5", |
| f"{task_name}_demo.hdf5", |
| f"{task_name}_demos.hdf5", |
| ] |
| |
| for pattern in patterns: |
| candidate = data_dir / pattern |
| if candidate.exists(): |
| return candidate |
| |
| return None |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| |
| if args.process_all_tasks: |
| |
| libero_data_dir = args.libero_data_dir.expanduser() |
| output_root = args.output_root.expanduser() |
| |
| if not libero_data_dir.exists(): |
| print(f"Error: LIBERO data directory not found: {libero_data_dir}") |
| return |
| |
| output_root.mkdir(parents=True, exist_ok=True) |
| |
| print("="*80) |
| print("PROCESSING ALL LIBERO-10 TASKS") |
| print("="*80) |
| print(f"Data directory: {libero_data_dir}") |
| print(f"Output root: {output_root}") |
| print(f"Number of demos per task: {args.num_demos}") |
| print(f"Frames per demo: {args.frames_per_demo}") |
| print(f"Random seed: {args.random_seed}") |
| print(f"Total tasks to process: {len(LIBERO_10_TASKS)}") |
| print("="*80) |
| |
| successful_tasks = [] |
| failed_tasks = [] |
| |
| for idx, task_name in enumerate(LIBERO_10_TASKS, 1): |
| print(f"\n[{idx}/{len(LIBERO_10_TASKS)}] Processing: {task_name}") |
| |
| |
| hdf5_path = find_hdf5_file(libero_data_dir, task_name) |
| if hdf5_path is None: |
| print(f" [ERROR] HDF5 file not found for task: {task_name}") |
| failed_tasks.append(task_name) |
| continue |
| |
| |
| task_output_dir = output_root / task_name |
| |
| |
| success = process_single_task( |
| task_name=task_name, |
| hdf5_path=hdf5_path, |
| output_dir=task_output_dir, |
| num_demos=args.num_demos, |
| frames_per_demo=args.frames_per_demo, |
| random_seed=args.random_seed, |
| ) |
| |
| if success: |
| successful_tasks.append(task_name) |
| else: |
| failed_tasks.append(task_name) |
| |
| |
| print("\n" + "="*80) |
| print("PROCESSING COMPLETE") |
| print("="*80) |
| print(f"Successfully processed: {len(successful_tasks)}/{len(LIBERO_10_TASKS)} tasks") |
| print(f"Failed: {len(failed_tasks)}/{len(LIBERO_10_TASKS)} tasks") |
| |
| if failed_tasks: |
| print("\nFailed tasks:") |
| for task in failed_tasks: |
| print(f" - {task}") |
| |
| print(f"\nOutput directory: {output_root}") |
| print("="*80) |
| |
| else: |
| |
| success = process_single_task( |
| task_name=args.task_name, |
| hdf5_path=args.hdf5_path.expanduser(), |
| output_dir=args.output_dir.expanduser(), |
| num_demos=args.num_demos, |
| frames_per_demo=args.frames_per_demo, |
| random_seed=args.random_seed, |
| ) |
| |
| if not success: |
| print("\nProcessing failed!") |
| exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|