lainwired commited on
Commit
e1f4a4f
·
verified ·
1 Parent(s): c97e1f0

API: HF dataset mirror for leaderboard + ckpts; fix ego_fn obs.get bug

Browse files
Dockerfile CHANGED
@@ -6,7 +6,9 @@ ENV PYTHONUNBUFFERED=1 \
6
  GUNICORN_WORKERS=2 \
7
  GUNICORN_TIMEOUT=180 \
8
  PORT=7860 \
9
- BENCHMARK_UI_STORAGE=/data
 
 
10
 
11
  RUN apt-get update && apt-get install -y --no-install-recommends \
12
  curl ca-certificates build-essential git \
 
6
  GUNICORN_WORKERS=2 \
7
  GUNICORN_TIMEOUT=180 \
8
  PORT=7860 \
9
+ BENCHMARK_UI_STORAGE=/data \
10
+ DATA_ROOT=/data \
11
+ BENCHMARK_HF_REPO=lainwired/jaxaht-benchmark-leaderboard
12
 
13
  RUN apt-get update && apt-get install -y --no-install-recommends \
14
  curl ca-certificates build-essential git \
benchmark_ui/backend/app.py CHANGED
@@ -535,6 +535,7 @@ def _register_routes(app: Flask) -> None: # noqa: C901 (route fan-out)
535
  adapter = get_env(env)
536
  ego_factory = None
537
  upload_holder: dict[str, Any] = {}
 
538
 
539
  if payload.ego_kind == "builtin":
540
  if not payload.builtin_key:
@@ -563,6 +564,15 @@ def _register_routes(app: Flask) -> None: # noqa: C901 (route fan-out)
563
  idx=payload.idx,
564
  )
565
  upload_holder["uploaded"] = uploaded
 
 
 
 
 
 
 
 
 
566
 
567
  def ego_factory():
568
  env_obj = adapter.make_env(adapter.DEFAULT_KWARGS or {})
@@ -571,7 +581,6 @@ def _register_routes(app: Flask) -> None: # noqa: C901 (route fan-out)
571
  raise ValueError(f"unknown ego_kind '{payload.ego_kind}'")
572
 
573
  jobs: JobManager = app.extensions["jobs"]
574
- backends = app.extensions["backends"]
575
 
576
  def _run(progress_cb):
577
  try:
@@ -598,6 +607,11 @@ def _register_routes(app: Flask) -> None: # noqa: C901 (route fan-out)
598
  "per_partner": result["per_partner"],
599
  "wall_clock_seconds": result["wall_clock_seconds"],
600
  "notes": payload.notes,
 
 
 
 
 
601
  }
602
  entry_id = backends.leaderboard.add_entry(env, payload.version, entry)
603
  return {"entry_id": entry_id, **result}
 
535
  adapter = get_env(env)
536
  ego_factory = None
537
  upload_holder: dict[str, Any] = {}
538
+ backends = app.extensions["backends"]
539
 
540
  if payload.ego_kind == "builtin":
541
  if not payload.builtin_key:
 
564
  idx=payload.idx,
565
  )
566
  upload_holder["uploaded"] = uploaded
567
+ # persist the raw upload so submissions survive container rebuilds
568
+ ckpt_id = backends.checkpoint.save_upload(uploaded_bytes, {
569
+ "agent_name": payload.agent_name,
570
+ "env": env,
571
+ "actor_type": payload.actor_type,
572
+ "ckpt_key": payload.ckpt_key,
573
+ "idx": payload.idx,
574
+ })
575
+ upload_holder["ckpt_id"] = ckpt_id
576
 
577
  def ego_factory():
578
  env_obj = adapter.make_env(adapter.DEFAULT_KWARGS or {})
 
581
  raise ValueError(f"unknown ego_kind '{payload.ego_kind}'")
582
 
583
  jobs: JobManager = app.extensions["jobs"]
 
584
 
585
  def _run(progress_cb):
586
  try:
 
607
  "per_partner": result["per_partner"],
608
  "wall_clock_seconds": result["wall_clock_seconds"],
609
  "notes": payload.notes,
610
+ "checkpoint_id": upload_holder.get("ckpt_id"),
611
+ "checkpoint_sha256": (
612
+ backends.checkpoint.get_sha256(upload_holder["ckpt_id"])
613
+ if "ckpt_id" in upload_holder else None
614
+ ),
615
  }
616
  entry_id = backends.leaderboard.add_entry(env, payload.version, entry)
617
  return {"entry_id": entry_id, **result}
benchmark_ui/backend/checkpoint_loader.py CHANGED
@@ -188,7 +188,7 @@ def _generic_load_ego(env, uploaded: UploadedCheckpoint) -> Callable:
188
  hstate_holder[0] = None
189
 
190
  if isinstance(obs, dict):
191
- agent_obs = obs.get("agent_0") or next(iter(obs.values()))
192
  else:
193
  agent_obs = obs
194
 
 
188
  hstate_holder[0] = None
189
 
190
  if isinstance(obs, dict):
191
+ agent_obs = obs["agent_0"] if "agent_0" in obs else next(iter(obs.values()))
192
  else:
193
  agent_obs = obs
194
 
benchmark_ui/backend/storage/__init__.py CHANGED
@@ -1,9 +1,10 @@
1
  """Storage layer. Interface in ``base``; file impl in ``file``.
2
 
3
- Routes import ``build_backends`` and never touch a subclass directly.
4
- Swap with a Postgres impl later via STORAGE=postgres env var.
5
  """
6
  import os
 
7
 
8
  from .base import (
9
  Backends,
@@ -17,19 +18,46 @@ from .file import build_backends as _build_file
17
 
18
 
19
  def build_backends() -> Backends:
20
- """Construct the storage bundle according to STORAGE env var.
21
-
22
- Default: file-backed under ./benchmark_ui/data.
23
- """
24
  backend = os.environ.get("STORAGE", "file").lower()
25
- if backend == "file":
26
- root = os.environ.get("DATA_ROOT", "./benchmark_ui/data")
27
- return _build_file(root)
28
- if backend == "postgres":
29
- raise NotImplementedError(
30
- "Postgres backend not implemented yet. Set STORAGE=file."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  )
32
- raise ValueError(f"Unknown STORAGE: {backend}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
 
35
  __all__ = [
 
1
  """Storage layer. Interface in ``base``; file impl in ``file``.
2
 
3
+ Set BENCHMARK_HF_REPO=<dataset_id> to mirror leaderboard + ckpts to HF
4
+ so they survive container rebuilds.
5
  """
6
  import os
7
+ from pathlib import Path
8
 
9
  from .base import (
10
  Backends,
 
18
 
19
 
20
  def build_backends() -> Backends:
 
 
 
 
21
  backend = os.environ.get("STORAGE", "file").lower()
22
+ if backend != "file":
23
+ if backend == "postgres":
24
+ raise NotImplementedError("Postgres backend not implemented yet.")
25
+ raise ValueError(f"Unknown STORAGE: {backend}")
26
+
27
+ root = os.environ.get("DATA_ROOT", "./benchmark_ui/data")
28
+ hf_repo = os.environ.get("BENCHMARK_HF_REPO")
29
+
30
+ if hf_repo:
31
+ from .hf_mirror import bootstrap_from_hf, HFMirrorCheckpointStore, HFMirrorLeaderboardStore
32
+ bootstrap_root = Path(root)
33
+ bootstrap_from_hf(hf_repo, bootstrap_root)
34
+ _migrate_bootstrap_paths(bootstrap_root)
35
+
36
+ backends = _build_file(root)
37
+
38
+ if hf_repo:
39
+ backends = Backends(
40
+ leaderboard=HFMirrorLeaderboardStore(backends.leaderboard, hf_repo),
41
+ trajectory=backends.trajectory,
42
+ checkpoint=HFMirrorCheckpointStore(backends.checkpoint, hf_repo),
43
+ session=backends.session,
44
+ job=backends.job,
45
  )
46
+ return backends
47
+
48
+
49
+ def _migrate_bootstrap_paths(root: Path) -> None:
50
+ # the HF dataset uses "leaderboard/" while FileLeaderboardStore reads from "leaderboards/"
51
+ src = root / "leaderboard"
52
+ dst = root / "leaderboards"
53
+ if not src.exists():
54
+ return
55
+ dst.mkdir(exist_ok=True)
56
+ for f in src.iterdir():
57
+ if f.is_file():
58
+ target = dst / f.name
59
+ if not target.exists():
60
+ f.replace(target)
61
 
62
 
63
  __all__ = [
benchmark_ui/backend/storage/hf_mirror.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # HF Hub mirror for leaderboard + checkpoint stores.
2
+ # Local file backs writes; after each successful write we push to HF dataset.
3
+ # Bootstrap: snapshot_download pulls the dataset into the local root before serving.
4
+ from __future__ import annotations
5
+
6
+ import logging
7
+ import threading
8
+ from concurrent.futures import ThreadPoolExecutor
9
+ from pathlib import Path
10
+
11
+ from .base import CheckpointStore, LeaderboardStore
12
+ from .file import FileCheckpointStore, FileLeaderboardStore
13
+
14
+ log = logging.getLogger("hf_mirror")
15
+
16
+
17
+ def bootstrap_from_hf(repo_id: str, local_root: Path) -> None:
18
+ # pull existing leaderboard + ckpts from the dataset on container start.
19
+ # idempotent: snapshot_download skips files that already exist locally.
20
+ from huggingface_hub import snapshot_download
21
+ local_root = Path(local_root)
22
+ local_root.mkdir(parents=True, exist_ok=True)
23
+ try:
24
+ snapshot_download(
25
+ repo_id=repo_id,
26
+ repo_type="dataset",
27
+ local_dir=str(local_root),
28
+ allow_patterns=["leaderboard/*", "checkpoints/**"],
29
+ )
30
+ log.info("bootstrapped storage from %s into %s", repo_id, local_root)
31
+ except Exception as exc:
32
+ log.warning("failed to bootstrap from %s (%s); starting empty", repo_id, exc)
33
+
34
+
35
+ class HFMirrorLeaderboardStore(LeaderboardStore):
36
+ def __init__(self, inner: FileLeaderboardStore, repo_id: str):
37
+ self.inner = inner
38
+ self.repo_id = repo_id
39
+ self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="hf-push")
40
+ self._lock = threading.Lock()
41
+
42
+ def _push(self, env: str, version: str) -> None:
43
+ from huggingface_hub import HfApi, CommitOperationAdd
44
+ local = self.inner._path(env, version)
45
+ if not local.exists():
46
+ return
47
+ rel = f"leaderboard/{env}_{version}.json"
48
+ try:
49
+ HfApi().create_commit(
50
+ repo_id=self.repo_id, repo_type="dataset",
51
+ operations=[CommitOperationAdd(path_in_repo=rel, path_or_fileobj=str(local))],
52
+ commit_message=f"update {rel}",
53
+ )
54
+ log.info("pushed %s to HF", rel)
55
+ except Exception as exc:
56
+ log.exception("HF push failed for %s: %s", rel, exc)
57
+
58
+ def add_entry(self, env: str, version: str, entry: dict) -> str:
59
+ entry_id = self.inner.add_entry(env, version, entry)
60
+ # fire-and-forget push so the API response isn't blocked
61
+ self._executor.submit(self._push, env, version)
62
+ return entry_id
63
+
64
+ def list_entries(self, env: str, version: str) -> list[dict]:
65
+ return self.inner.list_entries(env, version)
66
+
67
+ def get_entry(self, env: str, version: str, entry_id: str):
68
+ return self.inner.get_entry(env, version, entry_id)
69
+
70
+ def clear(self, env: str, version: str) -> None:
71
+ self.inner.clear(env, version)
72
+ self._executor.submit(self._push, env, version)
73
+
74
+
75
+ class HFMirrorCheckpointStore(CheckpointStore):
76
+ # mirrors user-uploaded ckpts to HF so they survive container restarts.
77
+ # the local FileCheckpointStore stays the source of truth for in-process reads.
78
+ def __init__(self, inner: FileCheckpointStore, repo_id: str):
79
+ self.inner = inner
80
+ self.repo_id = repo_id
81
+ self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="hf-ckpt-push")
82
+
83
+ def _push(self, ckpt_id: str) -> None:
84
+ # Push the whole ckpt dir (zip + extracted + meta) so a future bootstrap can rehydrate it fully.
85
+ from huggingface_hub import HfApi
86
+ ckpt_dir = self.inner.root / ckpt_id
87
+ if not ckpt_dir.exists():
88
+ return
89
+ try:
90
+ HfApi().upload_folder(
91
+ folder_path=str(ckpt_dir),
92
+ path_in_repo=f"checkpoints/{ckpt_id}",
93
+ repo_id=self.repo_id,
94
+ repo_type="dataset",
95
+ commit_message=f"upload ckpt {ckpt_id}",
96
+ )
97
+ log.info("pushed ckpt %s to HF", ckpt_id)
98
+ except Exception as exc:
99
+ log.exception("HF ckpt push failed for %s: %s", ckpt_id, exc)
100
+
101
+ def save_upload(self, zip_bytes: bytes, metadata: dict | None = None) -> str:
102
+ ckpt_id = self.inner.save_upload(zip_bytes, metadata)
103
+ self._executor.submit(self._push, ckpt_id)
104
+ return ckpt_id
105
+
106
+ def get_path(self, ckpt_id: str):
107
+ return self.inner.get_path(ckpt_id)
108
+
109
+ def get_sha256(self, ckpt_id: str):
110
+ return self.inner.get_sha256(ckpt_id)