pi05tests-openpi-multiarm / openpi /scripts /run_preprocess_aloha.sh
lsnu's picture
Upload openpi/scripts/run_preprocess_aloha.sh with huggingface_hub
566d923 verified
#!/usr/bin/env bash
set -euo pipefail
IFS=$'\n\t'
export PATH="$HOME/.local/bin:$PATH"
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
OPENPI_ROOT="${OPENPI_ROOT:-$(cd -- "$SCRIPT_DIR/.." && pwd)}"
log() {
printf '%s %s\n' "$(date -u +'%Y-%m-%dT%H:%M:%SZ')" "$*"
}
on_exit() {
local rc=$?
if [[ $rc -eq 0 ]]; then
log "Run exited successfully"
else
log "Run exited with status $rc"
fi
}
trap on_exit EXIT
require_env() {
local name="$1"
if [[ -z "${!name:-}" ]]; then
echo "Required environment variable is not set: $name" >&2
exit 1
fi
}
project_python_path() {
echo "$OPENPI_ROOT/.venv/bin/python"
}
hf_cli() {
local venv_python
venv_python="$(project_python_path)"
if [[ -x "$venv_python" ]]; then
"$venv_python" -m huggingface_hub.commands.huggingface_cli "$@"
return
fi
if command -v huggingface-cli >/dev/null 2>&1; then
huggingface-cli "$@"
return
fi
if command -v hf >/dev/null 2>&1; then
hf "$@"
return
fi
echo "No Hugging Face CLI found in PATH or project venv." >&2
exit 1
}
project_python() {
local venv_python
venv_python="$(project_python_path)"
if [[ -x "$venv_python" ]]; then
"$venv_python" "$@"
return
fi
python3 "$@"
}
ensure_hf_auth() {
if hf_cli whoami >/dev/null 2>&1; then
return 0
fi
if [[ -n "${HF_TOKEN:-}" ]]; then
hf_cli login --token "$HF_TOKEN" >/dev/null
hf_cli whoami >/dev/null
return 0
fi
echo "Hugging Face auth is not available. Set HF_TOKEN or login first." >&2
exit 1
}
export HF_HUB_ENABLE_HF_TRANSFER=0
export ROOT="${ROOT:-$HOME/pi05prep-work}"
export LOGDIR="${LOGDIR:-$ROOT/logs}"
export SOURCE_CACHE="${SOURCE_CACHE:-$ROOT/aloha-source}"
export MODEL_REPO="${MODEL_REPO:-lsnu/pi05tests-openpi-multiarm}"
export HF_HOME="${HF_HOME:-$ROOT/hf-home}"
export HF_HUB_CACHE="${HF_HUB_CACHE:-$HF_HOME/hub}"
export HF_DATASETS_CACHE="${HF_DATASETS_CACHE:-$HF_HOME/datasets}"
export HF_LEROBOT_HOME="${HF_LEROBOT_HOME:-$HF_HOME/lerobot}"
export STATS_BATCH_SIZE="${STATS_BATCH_SIZE:-64}"
export STATS_NUM_WORKERS="${STATS_NUM_WORKERS:-0}"
export STATS_MAX_FRAMES="${STATS_MAX_FRAMES:-}"
export TARGET_PRIVATE="${TARGET_PRIVATE:-true}"
export ALOHA_FILTER="${ALOHA_FILTER:-}"
unset LEROBOT_HOME || true
mkdir -p "$LOGDIR" "$SOURCE_CACHE" "$HF_HOME" "$HF_HUB_CACHE" "$HF_DATASETS_CACHE" "$HF_LEROBOT_HOME"
prepare_openpi_env() {
require_env HF_TOKEN
cd "$OPENPI_ROOT"
if grep -q 'jax\[cuda12\]==0\.5\.3' pyproject.toml; then
echo "pyproject.toml still points at jax[cuda12]; patch it before running." >&2
exit 1
fi
log "Authenticating Hugging Face CLI"
hf_cli login --token "$HF_TOKEN"
hf_cli whoami
log "Syncing uv environment"
GIT_LFS_SKIP_SMUDGE=1 uv sync --python 3.11
GIT_LFS_SKIP_SMUDGE=1 uv pip install -e .
}
dataset_specs() {
cat <<'EOF'
training|pen_uncap_diverse|physical-intelligence/aloha_pen_uncap_diverse|lsnu/pi05tests-openpi-multiarm-aloha-training-pen-uncap-diverse|pi0_aloha_pen_uncap,pi05_aloha_pen_uncap
training|transfer_cube_human|lerobot/aloha_sim_transfer_cube_human|lsnu/pi05tests-openpi-multiarm-aloha-training-transfer-cube-human|pi0_aloha_sim
benchmark|transfer_cube_scripted|lerobot/aloha_sim_transfer_cube_scripted|lsnu/pi05tests-openpi-multiarm-aloha-benchmark-transfer-cube-scripted|
benchmark|insertion_human|lerobot/aloha_sim_insertion_human|lsnu/pi05tests-openpi-multiarm-aloha-benchmark-insertion-human|
benchmark|insertion_scripted|lerobot/aloha_sim_insertion_scripted|lsnu/pi05tests-openpi-multiarm-aloha-benchmark-insertion-scripted|
EOF
}
matches_filter() {
local label="$1"
local source_repo="$2"
local target_repo="$3"
if [[ -z "$ALOHA_FILTER" ]]; then
return 0
fi
[[ "$label" == *"$ALOHA_FILTER"* || "$source_repo" == *"$ALOHA_FILTER"* || "$target_repo" == *"$ALOHA_FILTER"* ]]
}
create_target_repo() {
local repo_id="$1"
local private_flag="$2"
project_python - "$repo_id" "$private_flag" <<'PY'
import sys
from huggingface_hub import HfApi
repo_id, private_flag = sys.argv[1:3]
private = private_flag.lower() in {"1", "true", "yes", "y"}
HfApi().create_repo(repo_id, repo_type="dataset", exist_ok=True, private=private)
print(repo_id)
PY
}
download_source_dataset() {
local source_repo="$1"
local local_dir="$2"
project_python - "$source_repo" "$local_dir" <<'PY'
from pathlib import Path
import sys
from huggingface_hub import snapshot_download
source_repo, local_dir = sys.argv[1:3]
Path(local_dir).mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id=source_repo,
repo_type="dataset",
local_dir=local_dir,
allow_patterns=[".gitattributes", "README.md", "meta/**", "data/**", "videos/**"],
)
print(local_dir)
PY
}
verify_local_dataset() {
local dataset_root="$1"
project_python - "$dataset_root" <<'PY'
import json
import sys
from pathlib import Path
root = Path(sys.argv[1])
if not root.exists():
raise SystemExit(f"Local dataset directory not found: {root}")
paths = [p.relative_to(root).as_posix() for p in root.rglob("*") if p.is_file()]
if "meta/info.json" not in paths:
raise SystemExit(f"Missing meta/info.json in {root}")
has_tasks = "meta/tasks.parquet" in paths or "meta/tasks.jsonl" in paths
if not has_tasks:
raise SystemExit(f"Missing task metadata in {root}")
has_episodes = "meta/episodes.jsonl" in paths or any(
p.startswith("meta/episodes/") and p.endswith(".parquet") for p in paths
)
if not has_episodes:
raise SystemExit(f"Missing episode metadata in {root}")
data_parquet = [p for p in paths if p.startswith("data/") and p.endswith(".parquet")]
if not data_parquet:
raise SystemExit(f"No parquet data files found under {root}")
video_files = [p for p in paths if p.startswith("videos/") and p.endswith(".mp4")]
info = json.loads((root / "meta" / "info.json").read_text())
features = info.get("features", {})
if "action" not in features:
raise SystemExit(f"Dataset is missing 'action' feature in {root / 'meta' / 'info.json'}")
if "observation.state" not in features:
raise SystemExit(f"Dataset is missing 'observation.state' feature in {root / 'meta' / 'info.json'}")
print(
json.dumps(
{
"root": str(root),
"files": len(paths),
"data_parquet": len(data_parquet),
"videos": len(video_files),
"features": sorted(features),
}
)
)
PY
}
verify_remote_dataset() {
local repo_id="$1"
project_python - "$repo_id" <<'PY'
import json
import sys
from huggingface_hub import HfApi
repo_id = sys.argv[1]
api = HfApi()
paths = [item.path for item in api.list_repo_tree(repo_id, repo_type="dataset", recursive=True, expand=True)]
if "meta/info.json" not in paths:
raise SystemExit(f"Remote dataset is missing meta/info.json: {repo_id}")
if "meta/tasks.parquet" not in paths and "meta/tasks.jsonl" not in paths:
raise SystemExit(f"Remote dataset is missing task metadata: {repo_id}")
if "meta/episodes.jsonl" not in paths and not any(
p.startswith("meta/episodes/") and p.endswith(".parquet") for p in paths
):
raise SystemExit(f"Remote dataset is missing episode metadata: {repo_id}")
data_parquet = [p for p in paths if p.startswith("data/") and p.endswith(".parquet")]
if not data_parquet:
raise SystemExit(f"Remote dataset {repo_id} has no parquet data files")
video_files = [p for p in paths if p.startswith("videos/") and p.endswith(".mp4")]
print(
json.dumps(
{
"repo_id": repo_id,
"files": len(paths),
"data_parquet": len(data_parquet),
"video_files": len(video_files),
}
)
)
PY
}
upload_dataset() {
local local_dir="$1"
local repo_id="$2"
log "Uploading dataset repo: $repo_id"
hf_cli upload-large-folder "$repo_id" "$local_dir" --repo-type dataset --num-workers 16
}
stats_one() {
local config_name="$1"
local repo_id="$2"
local log_file="$LOGDIR/${config_name//\//_}__${repo_id//\//_}.stats.log"
local -a cmd=(
.venv/bin/python -u scripts/compute_norm_stats_repo.py
--config-name "$config_name"
--repo-id "$repo_id"
--batch-size "$STATS_BATCH_SIZE"
--num-workers "$STATS_NUM_WORKERS"
--assets-base-dir ./assets
)
if [[ -n "$STATS_MAX_FRAMES" ]]; then
cmd+=(--max-frames "$STATS_MAX_FRAMES")
fi
log "Computing norm stats: $config_name / $repo_id"
(
cd "$OPENPI_ROOT"
PYTHONUNBUFFERED=1 "${cmd[@]}"
) >"$log_file" 2>&1
tail -n 40 "$log_file" || true
}
upload_stats() {
local config_name="$1"
local repo_id="$2"
local src_dir="$OPENPI_ROOT/assets/$config_name/$repo_id"
local dst_dir="openpi/assets/$config_name/$repo_id"
log "Uploading norm stats: $MODEL_REPO::$dst_dir"
hf_cli upload "$MODEL_REPO" "$src_dir" "$dst_dir"
}
verify_remote_stats() {
local config_name="$1"
local repo_id="$2"
project_python - "$MODEL_REPO" "$config_name" "$repo_id" <<'PY'
import sys
from huggingface_hub import HfApi
model_repo, config_name, repo_id = sys.argv[1:4]
target = f"openpi/assets/{config_name}/{repo_id}/norm_stats.json"
paths = [item.path for item in HfApi().list_repo_tree(model_repo, repo_type="model", recursive=True, expand=True)]
if target not in paths:
raise SystemExit(f"Missing remote norm stats file: {target}")
print(target)
PY
}
cleanup_local_source() {
local source_dir="$1"
rm -rf "$source_dir"
}
sync_dataset_if_needed() {
local label="$1"
local source_repo="$2"
local target_repo="$3"
local source_dir="$SOURCE_CACHE/${target_repo//\//__}"
if verify_remote_dataset "$target_repo" >"$LOGDIR/${label}_verify_remote.json" 2>/dev/null; then
log "Remote dataset already verified; skipping sync for $target_repo"
return 0
fi
log "Ensuring target dataset repo exists: $target_repo"
create_target_repo "$target_repo" "$TARGET_PRIVATE" >"$LOGDIR/${label}_create_repo.txt"
log "Downloading source dataset: $source_repo"
download_source_dataset "$source_repo" "$source_dir" >"$LOGDIR/${label}_download.txt"
verify_local_dataset "$source_dir" | tee "$LOGDIR/${label}_verify_local.json"
upload_dataset "$source_dir" "$target_repo"
verify_remote_dataset "$target_repo" | tee "$LOGDIR/${label}_verify_remote.json"
cleanup_local_source "$source_dir"
}
process_training_dataset() {
local label="$1"
local source_repo="$2"
local target_repo="$3"
local config_names="$4"
log "=== Training dataset: $label ==="
sync_dataset_if_needed "$label" "$source_repo" "$target_repo"
local cfg
IFS=',' read -r -a cfgs <<<"$config_names"
for cfg in "${cfgs[@]}"; do
if verify_remote_stats "$cfg" "$target_repo" >"$LOGDIR/${label}_${cfg}_stats_remote.txt" 2>/dev/null; then
log "Norm stats already verified remotely; skipping $cfg / $target_repo"
continue
fi
stats_one "$cfg" "$target_repo"
upload_stats "$cfg" "$target_repo"
verify_remote_stats "$cfg" "$target_repo" | tee "$LOGDIR/${label}_${cfg}_stats_remote.txt"
done
}
process_benchmark_dataset() {
local label="$1"
local source_repo="$2"
local target_repo="$3"
log "=== Benchmark dataset: $label ==="
sync_dataset_if_needed "$label" "$source_repo" "$target_repo"
}
main() {
if [[ "${SKIP_PREPARE:-0}" == "1" ]]; then
ensure_hf_auth
log "Skipping environment bootstrap; using HF_HOME=$HF_HOME"
else
prepare_openpi_env
fi
while IFS='|' read -r category label source_repo target_repo config_names; do
[[ -z "$category" ]] && continue
if ! matches_filter "$label" "$source_repo" "$target_repo"; then
continue
fi
if [[ "$category" == "training" ]]; then
process_training_dataset "$label" "$source_repo" "$target_repo" "$config_names"
else
process_benchmark_dataset "$label" "$source_repo" "$target_repo"
fi
done < <(dataset_specs)
log "All ALOHA tasks completed"
}
if [[ "${BASH_SOURCE[0]}" == "$0" ]]; then
main "$@"
fi