| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """In-process live-progress registry for the submitter's status panel. |
| |
| This is the *personal-view* half of the progress story (the shared |
| leaderboard table stays deliberately coarse, driven only by the |
| ``status`` field on the results row). The background eval worker |
| publishes short, human-readable stage notes here as it advances a |
| submission; the Submit-tab generator observes them and streams them |
| into that submitter's status panel until the submission reaches a |
| terminal stage. |
| |
| Deliberately **ephemeral + in-memory**: it is *not* the source of |
| truth for a submission's outcome (that's the row the worker writes to |
| ``results.jsonl``, which the leaderboard table reads). A Space restart, |
| or a submitter whose request is served by a different process, simply |
| loses the fine-grained notes and the personal view falls back to the |
| coarse row state. Keeping this layer out of the shared file is exactly |
| what lets the progress feedback be granular without adding write load |
| to the leaderboard's single source of truth. |
| """ |
| from __future__ import annotations |
|
|
| import threading |
| import time |
| from dataclasses import dataclass |
|
|
| |
| |
| |
| |
| |
| |
| QUEUED = "queued" |
| RUNNING = "running" |
| COMPLETED = "completed" |
| FAILED = "failed" |
|
|
| _TERMINAL = frozenset({COMPLETED, FAILED}) |
|
|
| |
| |
| |
| |
| ENTRY_TTL_SECONDS = 60 * 60 |
|
|
|
|
| @dataclass(frozen=True) |
| class Snapshot: |
| """An immutable point-in-time view of one submission's progress.""" |
|
|
| state: str |
| message: str |
| updated_at: float |
|
|
|
|
| _LOCK = threading.Lock() |
| _ENTRIES: dict[str, Snapshot] = {} |
|
|
|
|
| def is_terminal(state: str) -> bool: |
| """True for states the observer should stop streaming on.""" |
| return state in _TERMINAL |
|
|
|
|
| def publish(submission_id: str, state: str, message: str) -> None: |
| """Record the latest progress note for *submission_id*. |
| |
| Overwrites any prior note (the registry keeps only the most recent |
| snapshot per submission). Prunes stale entries opportunistically so |
| no separate reaper thread is needed. |
| """ |
| now = time.time() |
| with _LOCK: |
| _ENTRIES[submission_id] = Snapshot(state, message, now) |
| _prune_locked(now) |
|
|
|
|
| def get(submission_id: str) -> Snapshot | None: |
| """Return the latest snapshot for *submission_id*, or ``None``.""" |
| with _LOCK: |
| return _ENTRIES.get(submission_id) |
|
|
|
|
| def clear() -> None: |
| """Drop every entry. Test helper; not used by the app at runtime.""" |
| with _LOCK: |
| _ENTRIES.clear() |
|
|
|
|
| def _prune_locked(now: float) -> None: |
| """Remove entries older than the TTL. Caller must hold ``_LOCK``.""" |
| stale = [ |
| sid |
| for sid, snap in _ENTRIES.items() |
| if now - snap.updated_at > ENTRY_TTL_SECONDS |
| ] |
| for sid in stale: |
| del _ENTRIES[sid] |
|
|