pi05-so100-diverse / build_index.py
justinstrong's picture
Upload folder using huggingface_hub
cd604b4 verified
#!/usr/bin/env python3
"""
Build a filtered training index from community_dataset_v3 on disk.
Applies:
- Robot type filter (so100/so101 variants only)
- Schema filter (2 cameras, 6-DOF, 30fps)
- Episode length filter (5s-60s)
- Per-task cap (default 200)
- Per-contributor cap (default 200)
- Excludes datasets with file count mismatches
Outputs filtered_index.json with all info needed to train.
"""
import argparse
import glob
import json
import random
from collections import defaultdict
from pathlib import Path
import pandas as pd
def load_dataset_meta(dataset_root: Path) -> dict | None:
"""Load and validate a single dataset's metadata."""
info_path = dataset_root / "meta" / "info.json"
if not info_path.exists():
return None
info = json.load(open(info_path))
# Robot type filter
robot = info.get("robot_type", "")
if robot not in ("so100", "so101", "so100_follower", "so101_follower"):
return None
# Schema filter: exactly the 2-camera, 6-DOF schema
features = info.get("features", {})
expected_keys = {
"action", "episode_index", "frame_index", "index",
"observation.images.image", "observation.images.image2",
"observation.state", "task_index", "timestamp",
}
if set(features.keys()) != expected_keys:
return None
# Dimension check
if features.get("action", {}).get("shape") != [6]:
return None
if features.get("observation.state", {}).get("shape") != [6]:
return None
# FPS check
if info.get("fps") != 30:
return None
# Resolution check
for cam_key in ("observation.images.image", "observation.images.image2"):
shape = features.get(cam_key, {}).get("shape", [])
if len(shape) < 2 or shape[0] != 480 or shape[1] != 640:
return None
# Load tasks
tasks_path = dataset_root / "meta" / "tasks.jsonl"
tasks = {}
if tasks_path.exists():
for line in open(tasks_path):
line = line.strip()
if line:
t = json.loads(line)
tasks[t["task_index"]] = t["task"]
# Integrity check: video and parquet file counts
total_eps = info.get("total_episodes", 0)
vids = glob.glob(str(dataset_root / "videos" / "**" / "*.mp4"), recursive=True)
parquets = glob.glob(str(dataset_root / "data" / "**" / "*.parquet"), recursive=True)
expected_vids = total_eps * 2 # 2 cameras
if len(vids) != expected_vids or len(parquets) != total_eps:
return None
# Load episode metadata if available
episodes = []
ep_jsonl = dataset_root / "meta" / "episodes.jsonl"
if ep_jsonl.exists():
for line in open(ep_jsonl):
line = line.strip()
if line:
episodes.append(json.loads(line))
return {
"robot_type": robot,
"total_episodes": total_eps,
"total_frames": info.get("total_frames", 0),
"fps": info["fps"],
"tasks": tasks,
"episodes": episodes,
"features": {k: v.get("shape") for k, v in features.items()},
}
def build_index(
data_root: Path,
max_per_task: int = 200,
max_per_contributor: int = 200,
min_episode_frames: int = 150,
max_episode_frames: int = 1800,
seed: int = 42,
) -> dict:
"""Build filtered training index."""
rng = random.Random(seed)
# Discover all contributor/dataset pairs
contributors = sorted([
d for d in data_root.iterdir()
if d.is_dir() and not d.name.startswith(".")
])
# Phase 1: Load all valid datasets
all_episodes = [] # (contributor, dataset_name, episode_idx, task, num_frames)
datasets_passed = 0
datasets_rejected = 0
skipped_missing = 0
for contrib_dir in contributors:
if not contrib_dir.is_dir():
continue
contributor = contrib_dir.name
for ds_dir in sorted(contrib_dir.iterdir()):
if not ds_dir.is_dir():
continue
meta = load_dataset_meta(ds_dir)
if meta is None:
datasets_rejected += 1
continue
datasets_passed += 1
dataset_name = f"{contributor}/{ds_dir.name}"
# Default task if none specified
if not meta["tasks"]:
meta["tasks"] = {0: "(no task)"}
# Build episode list by reading actual parquet files
# Trust the parquet row count, not metadata
for ep_idx in range(meta["total_episodes"]):
parquet_path = ds_dir / f"data/chunk-000/episode_{ep_idx:06d}.parquet"
if not parquet_path.exists():
skipped_missing += 1
continue
# Read actual row count from parquet (fast — just reads footer)
pf = pd.read_parquet(parquet_path, columns=["frame_index"])
actual_length = len(pf)
if actual_length < min_episode_frames or actual_length > max_episode_frames:
continue
# Also verify both video files exist
vid1 = ds_dir / f"videos/chunk-000/observation.images.image/episode_{ep_idx:06d}.mp4"
vid2 = ds_dir / f"videos/chunk-000/observation.images.image2/episode_{ep_idx:06d}.mp4"
if not vid1.exists() or not vid2.exists():
skipped_missing += 1
continue
# Get task from episodes.jsonl if available, else default
task_idx = 0
if meta["episodes"]:
for ep_meta in meta["episodes"]:
if ep_meta.get("episode_index") == ep_idx:
task_idx = ep_meta.get("task_index", 0)
break
task = meta["tasks"].get(task_idx, "(no task)")
all_episodes.append((contributor, dataset_name, ep_idx, task, actual_length))
print(f"Datasets: {datasets_passed} passed, {datasets_rejected} rejected")
print(f"Episodes verified: {len(all_episodes)}, skipped (missing files): {skipped_missing}")
print(f"Episodes before caps: {len(all_episodes)}")
# Phase 2: Apply per-task cap
task_buckets = defaultdict(list)
for ep in all_episodes:
task_buckets[ep[3]].append(ep)
after_task_cap = []
tasks_capped = 0
for task, eps in task_buckets.items():
rng.shuffle(eps)
if len(eps) > max_per_task:
tasks_capped += 1
after_task_cap.extend(eps[:max_per_task])
print(f"Episodes after per-task cap ({max_per_task}): {len(after_task_cap)} ({tasks_capped} tasks capped)")
# Phase 3: Apply per-contributor cap
contrib_buckets = defaultdict(list)
for ep in after_task_cap:
contrib_buckets[ep[0]].append(ep)
final_episodes = []
contribs_capped = 0
for contributor, eps in contrib_buckets.items():
rng.shuffle(eps)
if len(eps) > max_per_contributor:
contribs_capped += 1
final_episodes.extend(eps[:max_per_contributor])
print(f"Episodes after per-contributor cap ({max_per_contributor}): {len(final_episodes)} ({contribs_capped} contributors capped)")
# Phase 4: Build the index
# Sort for determinism
final_episodes.sort(key=lambda x: (x[1], x[2]))
# Collect unique tasks
unique_tasks = sorted(set(ep[3] for ep in final_episodes))
task_to_idx = {t: i for i, t in enumerate(unique_tasks)}
# Collect unique datasets used
datasets_used = sorted(set(ep[1] for ep in final_episodes))
# Build episode entries
entries = []
total_frames = 0
for contributor, dataset_name, ep_idx, task, num_frames in final_episodes:
entries.append({
"dataset": dataset_name,
"episode_index": ep_idx,
"task": task,
"task_index": task_to_idx[task],
"num_frames": num_frames,
})
total_frames += num_frames
index = {
"source_repo": "HuggingFaceVLA/community_dataset_v3",
"filters": {
"max_per_task": max_per_task,
"max_per_contributor": max_per_contributor,
"min_episode_frames": min_episode_frames,
"max_episode_frames": max_episode_frames,
"seed": seed,
},
"summary": {
"datasets": len(datasets_used),
"episodes": len(entries),
"unique_tasks": len(unique_tasks),
"total_frames": total_frames,
"est_hours": total_frames / 30 / 3600,
},
"tasks": unique_tasks,
"datasets_used": datasets_used,
"episodes": entries,
}
return index
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-root", type=Path, default=Path.home() / "lap" / "community_dataset_v3")
parser.add_argument("--output", type=Path, default=Path(__file__).parent / "filtered_index.json")
parser.add_argument("--max-per-task", type=int, default=200)
parser.add_argument("--max-per-contributor", type=int, default=200)
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()
index = build_index(
args.data_root,
max_per_task=args.max_per_task,
max_per_contributor=args.max_per_contributor,
seed=args.seed,
)
args.output.parent.mkdir(parents=True, exist_ok=True)
with open(args.output, "w") as f:
json.dump(index, f, indent=2)
print(f"\nSaved to {args.output}")
print(f" Datasets: {index['summary']['datasets']}")
print(f" Episodes: {index['summary']['episodes']}")
print(f" Tasks: {index['summary']['unique_tasks']}")
print(f" Frames: {index['summary']['total_frames']:,}")
print(f" Est. hours: {index['summary']['est_hours']:.1f}")