VLAdaptorBench / code /scripts /launch_oven_fuller_logging_batch.py
lsnu's picture
Normalize non-finite metric export to null
bd2d170 verified
raw
history blame
12.6 kB
from pathlib import Path
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from typing import Dict, List, Optional, Sequence
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
def _configure_thread_env() -> None:
defaults = {
"OMP_NUM_THREADS": "1",
"OPENBLAS_NUM_THREADS": "1",
"MKL_NUM_THREADS": "1",
"NUMEXPR_NUM_THREADS": "1",
"VECLIB_MAXIMUM_THREADS": "1",
"BLIS_NUM_THREADS": "1",
"MALLOC_ARENA_MAX": "2",
}
for key, value in defaults.items():
os.environ.setdefault(key, value)
def _configure_coppeliasim_env() -> None:
coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim")
ld_library_path_parts = [
part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part
]
if coppeliasim_root not in ld_library_path_parts:
ld_library_path_parts.insert(0, coppeliasim_root)
os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts)
_configure_thread_env()
_configure_coppeliasim_env()
from rr_label_study.oven_study import _aggregate_summary, _episode_dirs, _json_safe
def _select_episode_indices(
total_episodes: int,
episode_offset: int,
max_episodes: Optional[int],
episode_indices: Optional[Sequence[int]],
) -> List[int]:
if episode_indices is not None:
selected: List[int] = []
seen = set()
for raw_index in episode_indices:
episode_index = int(raw_index)
if not (0 <= episode_index < total_episodes):
raise ValueError(
f"episode index {episode_index} outside available range 0..{total_episodes - 1}"
)
if episode_index in seen:
continue
selected.append(episode_index)
seen.add(episode_index)
return selected
remaining = max(0, total_episodes - episode_offset)
if max_episodes is not None:
remaining = min(remaining, max_episodes)
if remaining <= 0:
return []
return list(range(episode_offset, episode_offset + remaining))
def _is_complete_episode_dir(output_dir: Path, episode_name: str) -> bool:
required = [
output_dir.joinpath(f"{episode_name}.dense.csv"),
output_dir.joinpath(f"{episode_name}.keyframes.csv"),
output_dir.joinpath(f"{episode_name}.debug.jsonl"),
output_dir.joinpath(f"{episode_name}.metrics.json"),
output_dir.joinpath("summary.json"),
output_dir.joinpath("templates.json"),
output_dir.joinpath("templates.pkl"),
]
return all(path.exists() for path in required)
def _load_metrics(result_dir: Path, episode_names: Sequence[str]) -> List[Dict[str, object]]:
metrics: List[Dict[str, object]] = []
for episode_name in episode_names:
metrics_path = result_dir.joinpath(episode_name, f"{episode_name}.metrics.json")
if metrics_path.exists():
with metrics_path.open("r", encoding="utf-8") as handle:
metrics.append(json.load(handle))
return metrics
def _write_json(path: Path, payload: Dict[str, object]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(_json_safe(payload), handle, indent=2, allow_nan=False)
def _run_episode(
dataset_root: Path,
episode_dir: Path,
output_dir: Path,
checkpoint_stride: int,
num_workers: int,
base_display: int,
templates_json: Path,
stagger_seconds: float,
thread_count: int,
log_path: Path,
) -> int:
env = os.environ.copy()
thread_count_str = str(thread_count)
env["OMP_NUM_THREADS"] = thread_count_str
env["OPENBLAS_NUM_THREADS"] = thread_count_str
env["MKL_NUM_THREADS"] = thread_count_str
env["NUMEXPR_NUM_THREADS"] = thread_count_str
env["VECLIB_MAXIMUM_THREADS"] = thread_count_str
env["BLIS_NUM_THREADS"] = thread_count_str
env["MALLOC_ARENA_MAX"] = "2"
env["PYTHONUNBUFFERED"] = "1"
with log_path.open("w", encoding="utf-8") as log_handle:
process = subprocess.Popen(
[
sys.executable,
str(PROJECT_ROOT.joinpath("scripts", "recompute_oven_episode_parallel.py")),
"--dataset-root",
str(dataset_root),
"--episode-dir",
str(episode_dir),
"--output-dir",
str(output_dir),
"--checkpoint-stride",
str(checkpoint_stride),
"--num-workers",
str(num_workers),
"--base-display",
str(base_display),
"--templates-json",
str(templates_json),
"--stagger-seconds",
str(stagger_seconds),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
cwd=str(PROJECT_ROOT),
env=env,
)
return process.wait()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset-root",
default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
)
parser.add_argument("--result-dir", required=True)
parser.add_argument("--templates-json", required=True)
parser.add_argument("--episode-offset", type=int, default=0)
parser.add_argument("--max-episodes", type=int, default=100)
parser.add_argument("--episode-indices")
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--num-workers", type=int, default=24)
parser.add_argument("--base-display", type=int, default=900)
parser.add_argument("--stagger-seconds", type=float, default=0.15)
parser.add_argument("--thread-count", type=int, default=1)
parser.add_argument("--max-retries", type=int, default=2)
args = parser.parse_args()
dataset_root = Path(args.dataset_root)
result_dir = Path(args.result_dir)
result_dir.mkdir(parents=True, exist_ok=True)
templates_json = Path(args.templates_json)
if not templates_json.exists():
raise FileNotFoundError(f"missing templates json: {templates_json}")
all_episode_dirs = _episode_dirs(dataset_root)
explicit_episode_indices = None
if args.episode_indices:
explicit_episode_indices = [
int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip()
]
selected_episode_indices = _select_episode_indices(
total_episodes=len(all_episode_dirs),
episode_offset=args.episode_offset,
max_episodes=args.max_episodes,
episode_indices=explicit_episode_indices,
)
selected_episode_names = [f"episode{index}" for index in selected_episode_indices]
manifest = {
"dataset_root": str(dataset_root.resolve()),
"result_dir": str(result_dir.resolve()),
"templates_json": str(templates_json.resolve()),
"episode_indices": selected_episode_indices,
"checkpoint_stride": args.checkpoint_stride,
"num_workers": args.num_workers,
"base_display": args.base_display,
"stagger_seconds": args.stagger_seconds,
"thread_count": args.thread_count,
"max_retries": args.max_retries,
"started_at_epoch": time.time(),
}
_write_json(result_dir.joinpath("run_manifest.json"), manifest)
progress_path = result_dir.joinpath("progress.json")
logs_dir = result_dir.joinpath("logs")
logs_dir.mkdir(parents=True, exist_ok=True)
completed: List[int] = []
failed: List[Dict[str, object]] = []
for episode_index in selected_episode_indices:
episode_name = f"episode{episode_index}"
episode_dir = all_episode_dirs[episode_index]
final_output_dir = result_dir.joinpath(episode_name)
if _is_complete_episode_dir(final_output_dir, episode_name):
completed.append(episode_index)
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
continue
attempt_success = False
current_failure: Optional[Dict[str, object]] = None
for attempt_index in range(1, args.max_retries + 2):
temp_output_dir = result_dir.joinpath(f".{episode_name}.tmp")
if temp_output_dir.exists():
shutil.rmtree(temp_output_dir)
log_path = logs_dir.joinpath(f"{episode_name}.attempt{attempt_index:02d}.log")
_write_json(
progress_path,
{
"current_episode": episode_name,
"current_attempt": attempt_index,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
return_code = _run_episode(
dataset_root=dataset_root,
episode_dir=episode_dir,
output_dir=temp_output_dir,
checkpoint_stride=args.checkpoint_stride,
num_workers=args.num_workers,
base_display=args.base_display,
templates_json=templates_json,
stagger_seconds=args.stagger_seconds,
thread_count=args.thread_count,
log_path=log_path,
)
if return_code == 0 and _is_complete_episode_dir(temp_output_dir, episode_name):
if final_output_dir.exists():
shutil.rmtree(final_output_dir)
temp_output_dir.rename(final_output_dir)
completed.append(episode_index)
attempt_success = True
current_failure = None
break
current_failure = {
"episode_index": episode_index,
"episode_name": episode_name,
"attempt": attempt_index,
"return_code": return_code,
"log_path": str(log_path),
"updated_at_epoch": time.time(),
}
if temp_output_dir.exists():
failed_dir = result_dir.joinpath("failed_attempts", f"{episode_name}.attempt{attempt_index:02d}")
failed_dir.parent.mkdir(parents=True, exist_ok=True)
if failed_dir.exists():
shutil.rmtree(failed_dir)
temp_output_dir.rename(failed_dir)
if not attempt_success:
failed.append(current_failure or {"episode_index": episode_index, "episode_name": episode_name})
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
raise RuntimeError(f"failed to produce complete result for {episode_name}")
metrics = _load_metrics(result_dir, selected_episode_names)
if metrics:
_write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
metrics = _load_metrics(result_dir, selected_episode_names)
if metrics:
_write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"finished_at_epoch": time.time(),
},
)
return 0
if __name__ == "__main__":
raise SystemExit(main())