Michael Rabinovich
submit: stream live eval progress to the submitter's status panel
b0f4559
# Copyright 2026 Hugging Face
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""In-process live-progress registry for the submitter's status panel.
This is the *personal-view* half of the progress story (the shared
leaderboard table stays deliberately coarse, driven only by the
``status`` field on the results row). The background eval worker
publishes short, human-readable stage notes here as it advances a
submission; the Submit-tab generator observes them and streams them
into that submitter's status panel until the submission reaches a
terminal stage.
Deliberately **ephemeral + in-memory**: it is *not* the source of
truth for a submission's outcome (that's the row the worker writes to
``results.jsonl``, which the leaderboard table reads). A Space restart,
or a submitter whose request is served by a different process, simply
loses the fine-grained notes and the personal view falls back to the
coarse row state. Keeping this layer out of the shared file is exactly
what lets the progress feedback be granular without adding write load
to the leaderboard's single source of truth.
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
# Coarse lifecycle states a submission moves through *after* it has been
# accepted and queued. ``QUEUED`` / ``RUNNING`` are transient; the
# ``message`` carried alongside is what actually varies (e.g. "waiting
# for a GPU" vs "evaluating" vs "3 of 8 chunks done"). ``COMPLETED`` /
# ``FAILED`` are terminal: once a submission reaches one, the observing
# generator stops streaming.
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
_TERMINAL = frozenset({COMPLETED, FAILED})
# Entries untouched for this long are pruned on the next publish so the
# registry can't grow unbounded across a long-lived Space process. Well
# above any realistic eval wall-time, so a still-streaming submission is
# never pruned out from under its observer.
ENTRY_TTL_SECONDS = 60 * 60
@dataclass(frozen=True)
class Snapshot:
"""An immutable point-in-time view of one submission's progress."""
state: str
message: str
updated_at: float
_LOCK = threading.Lock()
_ENTRIES: dict[str, Snapshot] = {}
def is_terminal(state: str) -> bool:
"""True for states the observer should stop streaming on."""
return state in _TERMINAL
def publish(submission_id: str, state: str, message: str) -> None:
"""Record the latest progress note for *submission_id*.
Overwrites any prior note (the registry keeps only the most recent
snapshot per submission). Prunes stale entries opportunistically so
no separate reaper thread is needed.
"""
now = time.time()
with _LOCK:
_ENTRIES[submission_id] = Snapshot(state, message, now)
_prune_locked(now)
def get(submission_id: str) -> Snapshot | None:
"""Return the latest snapshot for *submission_id*, or ``None``."""
with _LOCK:
return _ENTRIES.get(submission_id)
def clear() -> None:
"""Drop every entry. Test helper; not used by the app at runtime."""
with _LOCK:
_ENTRIES.clear()
def _prune_locked(now: float) -> None:
"""Remove entries older than the TTL. Caller must hold ``_LOCK``."""
stale = [
sid
for sid, snap in _ENTRIES.items()
if now - snap.updated_at > ENTRY_TTL_SECONDS
]
for sid in stale:
del _ENTRIES[sid]