Michael Rabinovich Cursor commited on
Commit
b0f4559
·
1 Parent(s): 07430f9

submit: stream live eval progress to the submitter's status panel

Browse files

Adds a small in-process progress registry (progress.py) that the eval
worker publishes stage notes to (queued -> waiting for GPU -> evaluating
-> collecting results -> done/failed, plus an "X of N chunks done" count
for sharded runs). handle_submit keeps its generator alive after queuing
and streams those notes into the personal status panel until a terminal
stage or a backstop deadline. The shared leaderboard table stays coarse;
this is the personal-view feedback only, so it adds no write load to the
results file. Also fixes the _with_hub_retries tests for the newer
huggingface_hub HfHubHTTPError(response=...) signature.

Co-authored-by: Cursor <cursoragent@cursor.com>

Files changed (4) hide show
  1. progress.py +111 -0
  2. submit.py +170 -4
  3. tests/test_progress.py +133 -0
  4. tests/test_submit.py +11 -5
progress.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright 2026 Hugging Face
2
+ #
3
+ # Licensed under the Apache License, Version 2.0 (the "License");
4
+ # you may not use this file except in compliance with the License.
5
+ # You may obtain a copy of the License at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # Unless required by applicable law or agreed to in writing, software
10
+ # distributed under the License is distributed on an "AS IS" BASIS,
11
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ # See the License for the specific language governing permissions and
13
+ # limitations under the License.
14
+
15
+ """In-process live-progress registry for the submitter's status panel.
16
+
17
+ This is the *personal-view* half of the progress story (the shared
18
+ leaderboard table stays deliberately coarse, driven only by the
19
+ ``status`` field on the results row). The background eval worker
20
+ publishes short, human-readable stage notes here as it advances a
21
+ submission; the Submit-tab generator observes them and streams them
22
+ into that submitter's status panel until the submission reaches a
23
+ terminal stage.
24
+
25
+ Deliberately **ephemeral + in-memory**: it is *not* the source of
26
+ truth for a submission's outcome (that's the row the worker writes to
27
+ ``results.jsonl``, which the leaderboard table reads). A Space restart,
28
+ or a submitter whose request is served by a different process, simply
29
+ loses the fine-grained notes and the personal view falls back to the
30
+ coarse row state. Keeping this layer out of the shared file is exactly
31
+ what lets the progress feedback be granular without adding write load
32
+ to the leaderboard's single source of truth.
33
+ """
34
+ from __future__ import annotations
35
+
36
+ import threading
37
+ import time
38
+ from dataclasses import dataclass
39
+
40
+ # Coarse lifecycle states a submission moves through *after* it has been
41
+ # accepted and queued. ``QUEUED`` / ``RUNNING`` are transient; the
42
+ # ``message`` carried alongside is what actually varies (e.g. "waiting
43
+ # for a GPU" vs "evaluating" vs "3 of 8 chunks done"). ``COMPLETED`` /
44
+ # ``FAILED`` are terminal: once a submission reaches one, the observing
45
+ # generator stops streaming.
46
+ QUEUED = "queued"
47
+ RUNNING = "running"
48
+ COMPLETED = "completed"
49
+ FAILED = "failed"
50
+
51
+ _TERMINAL = frozenset({COMPLETED, FAILED})
52
+
53
+ # Entries untouched for this long are pruned on the next publish so the
54
+ # registry can't grow unbounded across a long-lived Space process. Well
55
+ # above any realistic eval wall-time, so a still-streaming submission is
56
+ # never pruned out from under its observer.
57
+ ENTRY_TTL_SECONDS = 60 * 60
58
+
59
+
60
+ @dataclass(frozen=True)
61
+ class Snapshot:
62
+ """An immutable point-in-time view of one submission's progress."""
63
+
64
+ state: str
65
+ message: str
66
+ updated_at: float
67
+
68
+
69
+ _LOCK = threading.Lock()
70
+ _ENTRIES: dict[str, Snapshot] = {}
71
+
72
+
73
+ def is_terminal(state: str) -> bool:
74
+ """True for states the observer should stop streaming on."""
75
+ return state in _TERMINAL
76
+
77
+
78
+ def publish(submission_id: str, state: str, message: str) -> None:
79
+ """Record the latest progress note for *submission_id*.
80
+
81
+ Overwrites any prior note (the registry keeps only the most recent
82
+ snapshot per submission). Prunes stale entries opportunistically so
83
+ no separate reaper thread is needed.
84
+ """
85
+ now = time.time()
86
+ with _LOCK:
87
+ _ENTRIES[submission_id] = Snapshot(state, message, now)
88
+ _prune_locked(now)
89
+
90
+
91
+ def get(submission_id: str) -> Snapshot | None:
92
+ """Return the latest snapshot for *submission_id*, or ``None``."""
93
+ with _LOCK:
94
+ return _ENTRIES.get(submission_id)
95
+
96
+
97
+ def clear() -> None:
98
+ """Drop every entry. Test helper; not used by the app at runtime."""
99
+ with _LOCK:
100
+ _ENTRIES.clear()
101
+
102
+
103
+ def _prune_locked(now: float) -> None:
104
+ """Remove entries older than the TTL. Caller must hold ``_LOCK``."""
105
+ stale = [
106
+ sid
107
+ for sid, snap in _ENTRIES.items()
108
+ if now - snap.updated_at > ENTRY_TTL_SECONDS
109
+ ]
110
+ for sid in stale:
111
+ del _ENTRIES[sid]
submit.py CHANGED
@@ -115,6 +115,7 @@ from huggingface_hub import (
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
 
118
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
119
 
120
  logger = logging.getLogger(__name__)
@@ -172,6 +173,18 @@ EVAL_JOB_NAMESPACE = "michaelr27"
172
  EVAL_JOB_TIMEOUT = "30m"
173
  EVAL_JOB_WORKER_COUNT = "8"
174
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  # Poll cadence + outer deadline guarding inspect_job. 5 s is fast
176
  # enough that a 60 s eval lands in <10 s of completion, slow enough
177
  # that we don't hammer the API. Deadline matches the Job's own
@@ -288,13 +301,60 @@ def _submit_status(state: str, message: str) -> str:
288
  The panel is the durable counterpart to the transient ``gr.Info`` /
289
  ``gr.Error`` toasts: a submitter always sees the current stage and
290
  any rejection reason without having to catch an ephemeral toast.
291
- *state* picks the leading glyph (``working`` / ``queued`` /
292
- ``error``).
293
  """
294
- glyph = {"working": "⏳", "queued": "✅", "error": "❌"}.get(state, "•")
 
 
295
  return f"{glyph} {message}"
296
 
297
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
298
  def handle_submit(
299
  zip_file,
300
  profile: gr.OAuthProfile | None,
@@ -389,18 +449,64 @@ def handle_submit(
389
  yield _submit_status("error", msg)
390
  raise gr.Error(msg)
391
 
 
 
 
 
 
 
 
 
392
  _spawn_worker(submission_id, blob_url, sorted(fixture_names))
393
  yield _submit_status(
394
  "queued",
395
  f"Submission `{submission_id}` queued ({len(fixture_names)} "
396
  f"fixtures). The eval runs on an HF Jobs GPU; your row appears on "
397
  f"the **Unvalidated** leaderboard and flips to completed when the "
398
- f"job finishes (typically 1–3 minutes).",
399
  )
 
 
 
 
 
400
  finally:
401
  shutil.rmtree(tmp, ignore_errors=True)
402
 
403
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
404
  def _validate_form(zip_file) -> str | None:
405
  """Form-level check before any zip parsing.
406
 
@@ -871,16 +977,34 @@ def _run_worker(
871
  submission_id, submission_blob_url, fixture_names,
872
  )
873
  return
 
 
 
 
 
874
  job_id = _dispatch_eval_job(submission_id, submission_blob_url)
875
  logger.info("Dispatched eval job %s for %s", job_id, submission_id)
876
  stage, status_message = _poll_until_done(job_id, submission_id)
877
  if stage == "COMPLETED":
 
 
 
 
 
878
  summary = _fetch_run_summary_from_report(submission_id)
879
  _flip_row_to_completed(submission_id, summary)
 
 
 
 
 
880
  logger.info("Worker completed for %s", submission_id)
881
  return
882
  reason = _job_failure_reason(job_id, stage, status_message)
883
  _flip_row_to_failed(submission_id, reason)
 
 
 
884
  logger.warning(
885
  "Eval job %s for %s ended %s: %s",
886
  job_id, submission_id, stage, reason,
@@ -888,6 +1012,9 @@ def _run_worker(
888
  except Exception as e: # noqa: BLE001 - broad on purpose; we map to row state
889
  logger.exception("Worker failed for %s", submission_id)
890
  reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS]
 
 
 
891
  try:
892
  _flip_row_to_failed(submission_id, reason)
893
  except Exception:
@@ -931,6 +1058,11 @@ def _run_worker_sharded(
931
  "Sharded eval for %s: %d fixtures -> %d shard(s)",
932
  submission_id, len(fixture_names), len(shards),
933
  )
 
 
 
 
 
934
  for shard_id, st in shards.items():
935
  _dispatch_shard(submission_id, submission_blob_url, shard_id, st)
936
 
@@ -942,13 +1074,24 @@ def _run_worker_sharded(
942
  :FAILURE_REASON_MAX_CHARS
943
  ]
944
  _flip_row_to_failed(submission_id, reason)
 
 
 
945
  logger.warning("Sharded eval for %s failed: %s", submission_id, reason)
946
  return
947
 
 
 
 
 
 
948
  summary = _merge_shards_and_publish(
949
  submission_id, list(shards.keys()), fixture_names,
950
  )
951
  _flip_row_to_completed(submission_id, summary)
 
 
 
952
  logger.info("Sharded worker completed for %s", submission_id)
953
  _cleanup_shard_artifacts(submission_id)
954
 
@@ -1058,11 +1201,23 @@ def _poll_shards_until_done(
1058
  """
1059
  deadline = time.monotonic() + SHARD_POLL_DEADLINE_SECONDS
1060
  consecutive_errors = 0
 
 
1061
  while True:
1062
  running = [
1063
  sid for sid, st in shards.items()
1064
  if st["stage"] not in ("COMPLETED", "FAILED")
1065
  ]
 
 
 
 
 
 
 
 
 
 
1066
  if not running:
1067
  break
1068
  for shard_id in running:
@@ -1137,6 +1292,7 @@ def _poll_until_done(
1137
  """
1138
  deadline = time.monotonic() + JOB_POLL_DEADLINE_SECONDS
1139
  consecutive_errors = 0
 
1140
  while True:
1141
  try:
1142
  info = inspect_job(job_id=job_id)
@@ -1157,6 +1313,16 @@ def _poll_until_done(
1157
  message = info.status.message
1158
  if stage in ("COMPLETED", "ERROR"):
1159
  return stage, message
 
 
 
 
 
 
 
 
 
 
1160
  if time.monotonic() >= deadline:
1161
  return "ERROR", (
1162
  f"Space-side poll deadline exceeded "
 
115
  )
116
  from huggingface_hub.errors import EntryNotFoundError, HfHubHTTPError
117
 
118
+ import progress
119
  from leaderboard import HF_DATA_REPO, HF_ORG, HF_SUBMISSIONS_REPO
120
 
121
  logger = logging.getLogger(__name__)
 
173
  EVAL_JOB_TIMEOUT = "30m"
174
  EVAL_JOB_WORKER_COUNT = "8"
175
 
176
+ # Live personal-view progress streaming. After a submission is queued,
177
+ # handle_submit keeps its generator alive and re-reads the in-process
178
+ # progress registry (which the background worker writes to) every few
179
+ # seconds, pushing each new note into the submitter's status panel. The
180
+ # deadline is a generous backstop: the stream normally ends the moment
181
+ # the worker publishes a terminal note, well before this trips. If it
182
+ # does trip (worker died, very long sharded run), the panel tells the
183
+ # submitter the eval continues in the background and to watch the
184
+ # leaderboard, rather than hanging forever.
185
+ PROGRESS_STREAM_POLL_SECONDS = 3
186
+ PROGRESS_STREAM_DEADLINE_SECONDS = 45 * 60
187
+
188
  # Poll cadence + outer deadline guarding inspect_job. 5 s is fast
189
  # enough that a 60 s eval lands in <10 s of completion, slow enough
190
  # that we don't hammer the API. Deadline matches the Job's own
 
301
  The panel is the durable counterpart to the transient ``gr.Info`` /
302
  ``gr.Error`` toasts: a submitter always sees the current stage and
303
  any rejection reason without having to catch an ephemeral toast.
304
+ *state* picks the leading glyph (``working`` / ``queued`` / ``done``
305
+ / ``error``).
306
  """
307
+ glyph = {"working": "⏳", "queued": "✅", "done": "🎉", "error": "❌"}.get(
308
+ state, "•"
309
+ )
310
  return f"{glyph} {message}"
311
 
312
 
313
+ # Maps the progress registry's coarse state to the `_submit_status`
314
+ # glyph state. The registry's transient states (queued waiting for a
315
+ # slot, running on the GPU) both read as "in progress" in the panel;
316
+ # the terminal ones get the celebratory / error glyph.
317
+ _PROGRESS_PANEL_STATE = {
318
+ progress.QUEUED: "queued",
319
+ progress.RUNNING: "working",
320
+ progress.COMPLETED: "done",
321
+ progress.FAILED: "error",
322
+ }
323
+
324
+
325
+ def _running_message_for_stage(stage: str) -> str:
326
+ """Friendly note for a non-terminal HF Jobs stage.
327
+
328
+ The Jobs API exposes a stage string per poll. We only care about
329
+ one distinction the submitter actually feels: actively *running* vs
330
+ still *waiting for a machine*. Treating any non-RUNNING, non-terminal
331
+ stage as "queued on the GPU" keeps the message robust to the exact
332
+ set of intermediate stage names the API may use.
333
+ """
334
+ if stage == "RUNNING":
335
+ return "Evaluating your submission on a GPU…"
336
+ return "Evaluation queued on a GPU — waiting for a free machine…"
337
+
338
+
339
+ def _completed_progress_message(summary: dict[str, Any]) -> str:
340
+ """Terminal success note, surfacing the headline score when present."""
341
+ score = summary.get("aggregate_score")
342
+ if isinstance(score, (int, float)):
343
+ return (
344
+ f"Done — scored {float(score):.4f}. Your row is on the "
345
+ f"Unvalidated leaderboard."
346
+ )
347
+ return "Done — your row is on the Unvalidated leaderboard."
348
+
349
+
350
+ def _failed_progress_message(reason: str | None) -> str:
351
+ """Terminal failure note, appending the short reason when there is one."""
352
+ reason = (reason or "").strip()
353
+ if reason:
354
+ return f"Evaluation failed: {reason}"
355
+ return "Evaluation failed."
356
+
357
+
358
  def handle_submit(
359
  zip_file,
360
  profile: gr.OAuthProfile | None,
 
449
  yield _submit_status("error", msg)
450
  raise gr.Error(msg)
451
 
452
+ # Seed the registry so the stream below has something to show
453
+ # in the gap before the worker publishes its first stage note.
454
+ progress.publish(
455
+ submission_id,
456
+ progress.QUEUED,
457
+ f"Queued ({len(fixture_names)} fixtures) — waiting for the "
458
+ f"evaluation to start…",
459
+ )
460
  _spawn_worker(submission_id, blob_url, sorted(fixture_names))
461
  yield _submit_status(
462
  "queued",
463
  f"Submission `{submission_id}` queued ({len(fixture_names)} "
464
  f"fixtures). The eval runs on an HF Jobs GPU; your row appears on "
465
  f"the **Unvalidated** leaderboard and flips to completed when the "
466
+ f"job finishes (typically 1–3 minutes). Live progress below.",
467
  )
468
+ # Keep the generator alive, observing the in-process progress
469
+ # registry the worker writes to, until the submission reaches a
470
+ # terminal stage (or the backstop deadline). This is the
471
+ # personal-view live feedback; the shared table stays coarse.
472
+ yield from _stream_submission_progress(submission_id)
473
  finally:
474
  shutil.rmtree(tmp, ignore_errors=True)
475
 
476
 
477
+ def _stream_submission_progress(submission_id: str):
478
+ """Yield panel markdown as the worker advances *submission_id*.
479
+
480
+ Polls the in-process :mod:`progress` registry every
481
+ :data:`PROGRESS_STREAM_POLL_SECONDS` and yields a fresh status panel
482
+ only when the human-readable note changes (so the panel updates on
483
+ real transitions, not every tick). Returns when the submission
484
+ reaches a terminal state, or yields a "still running in the
485
+ background" note and returns if the backstop deadline trips first
486
+ (worker death, an unusually long sharded run, etc.).
487
+ """
488
+ deadline = time.monotonic() + PROGRESS_STREAM_DEADLINE_SECONDS
489
+ last_message: str | None = None
490
+ while True:
491
+ snap = progress.get(submission_id)
492
+ if snap is not None and snap.message != last_message:
493
+ last_message = snap.message
494
+ yield _submit_status(
495
+ _PROGRESS_PANEL_STATE.get(snap.state, "working"), snap.message,
496
+ )
497
+ if snap is not None and progress.is_terminal(snap.state):
498
+ return
499
+ if time.monotonic() >= deadline:
500
+ yield _submit_status(
501
+ "queued",
502
+ "Evaluation is taking longer than expected; it continues in "
503
+ "the background. Check the **Unvalidated** leaderboard for "
504
+ "the final result.",
505
+ )
506
+ return
507
+ time.sleep(PROGRESS_STREAM_POLL_SECONDS)
508
+
509
+
510
  def _validate_form(zip_file) -> str | None:
511
  """Form-level check before any zip parsing.
512
 
 
977
  submission_id, submission_blob_url, fixture_names,
978
  )
979
  return
980
+ progress.publish(
981
+ submission_id,
982
+ progress.RUNNING,
983
+ "Evaluation dispatched — waiting for a GPU…",
984
+ )
985
  job_id = _dispatch_eval_job(submission_id, submission_blob_url)
986
  logger.info("Dispatched eval job %s for %s", job_id, submission_id)
987
  stage, status_message = _poll_until_done(job_id, submission_id)
988
  if stage == "COMPLETED":
989
+ progress.publish(
990
+ submission_id,
991
+ progress.RUNNING,
992
+ "Evaluation finished — collecting results…",
993
+ )
994
  summary = _fetch_run_summary_from_report(submission_id)
995
  _flip_row_to_completed(submission_id, summary)
996
+ progress.publish(
997
+ submission_id,
998
+ progress.COMPLETED,
999
+ _completed_progress_message(summary),
1000
+ )
1001
  logger.info("Worker completed for %s", submission_id)
1002
  return
1003
  reason = _job_failure_reason(job_id, stage, status_message)
1004
  _flip_row_to_failed(submission_id, reason)
1005
+ progress.publish(
1006
+ submission_id, progress.FAILED, _failed_progress_message(reason),
1007
+ )
1008
  logger.warning(
1009
  "Eval job %s for %s ended %s: %s",
1010
  job_id, submission_id, stage, reason,
 
1012
  except Exception as e: # noqa: BLE001 - broad on purpose; we map to row state
1013
  logger.exception("Worker failed for %s", submission_id)
1014
  reason = f"{type(e).__name__}: {str(e)}"[:FAILURE_REASON_MAX_CHARS]
1015
+ progress.publish(
1016
+ submission_id, progress.FAILED, _failed_progress_message(reason),
1017
+ )
1018
  try:
1019
  _flip_row_to_failed(submission_id, reason)
1020
  except Exception:
 
1058
  "Sharded eval for %s: %d fixtures -> %d shard(s)",
1059
  submission_id, len(fixture_names), len(shards),
1060
  )
1061
+ progress.publish(
1062
+ submission_id,
1063
+ progress.RUNNING,
1064
+ f"Evaluation split into {len(shards)} chunks — dispatching to GPUs…",
1065
+ )
1066
  for shard_id, st in shards.items():
1067
  _dispatch_shard(submission_id, submission_blob_url, shard_id, st)
1068
 
 
1074
  :FAILURE_REASON_MAX_CHARS
1075
  ]
1076
  _flip_row_to_failed(submission_id, reason)
1077
+ progress.publish(
1078
+ submission_id, progress.FAILED, _failed_progress_message(reason),
1079
+ )
1080
  logger.warning("Sharded eval for %s failed: %s", submission_id, reason)
1081
  return
1082
 
1083
+ progress.publish(
1084
+ submission_id,
1085
+ progress.RUNNING,
1086
+ "All chunks evaluated — merging results…",
1087
+ )
1088
  summary = _merge_shards_and_publish(
1089
  submission_id, list(shards.keys()), fixture_names,
1090
  )
1091
  _flip_row_to_completed(submission_id, summary)
1092
+ progress.publish(
1093
+ submission_id, progress.COMPLETED, _completed_progress_message(summary),
1094
+ )
1095
  logger.info("Sharded worker completed for %s", submission_id)
1096
  _cleanup_shard_artifacts(submission_id)
1097
 
 
1201
  """
1202
  deadline = time.monotonic() + SHARD_POLL_DEADLINE_SECONDS
1203
  consecutive_errors = 0
1204
+ last_done = -1
1205
+ total = len(shards)
1206
  while True:
1207
  running = [
1208
  sid for sid, st in shards.items()
1209
  if st["stage"] not in ("COMPLETED", "FAILED")
1210
  ]
1211
+ # Push an "N of M chunks done" note to the submitter's panel
1212
+ # whenever the completed count advances.
1213
+ done = sum(1 for st in shards.values() if st["stage"] == "COMPLETED")
1214
+ if done != last_done:
1215
+ last_done = done
1216
+ progress.publish(
1217
+ submission_id,
1218
+ progress.RUNNING,
1219
+ f"Evaluating… {done} of {total} chunks done.",
1220
+ )
1221
  if not running:
1222
  break
1223
  for shard_id in running:
 
1292
  """
1293
  deadline = time.monotonic() + JOB_POLL_DEADLINE_SECONDS
1294
  consecutive_errors = 0
1295
+ last_stage: str | None = None
1296
  while True:
1297
  try:
1298
  info = inspect_job(job_id=job_id)
 
1313
  message = info.status.message
1314
  if stage in ("COMPLETED", "ERROR"):
1315
  return stage, message
1316
+ # Surface the running-vs-waiting distinction to the submitter's
1317
+ # panel, but only when the stage actually changes (not every
1318
+ # tick), so the personal view reflects real transitions.
1319
+ if stage != last_stage:
1320
+ last_stage = stage
1321
+ progress.publish(
1322
+ submission_id,
1323
+ progress.RUNNING,
1324
+ _running_message_for_stage(stage),
1325
+ )
1326
  if time.monotonic() >= deadline:
1327
  return "ERROR", (
1328
  f"Space-side poll deadline exceeded "
tests/test_progress.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Unit tests for the live personal-view progress feedback.
2
+
3
+ Two surfaces:
4
+
5
+ 1. :mod:`progress` - the in-process registry the eval worker writes to
6
+ (publish / get / terminal classification / TTL pruning).
7
+ 2. :mod:`submit`'s observer side - the message helpers and the
8
+ :func:`submit._stream_submission_progress` generator that streams
9
+ registry notes into the submitter's status panel until a terminal
10
+ stage or the backstop deadline.
11
+
12
+ All time + registry access is monkeypatched, so the suite has zero
13
+ network traffic and never actually sleeps.
14
+ """
15
+ from __future__ import annotations
16
+
17
+ import progress
18
+ import submit
19
+
20
+
21
+ def test_publish_get_roundtrip():
22
+ progress.clear()
23
+ progress.publish("a", progress.RUNNING, "evaluating")
24
+ snap = progress.get("a")
25
+ assert snap is not None
26
+ assert snap.state == progress.RUNNING
27
+ assert snap.message == "evaluating"
28
+ assert progress.get("does-not-exist") is None
29
+
30
+
31
+ def test_publish_overwrites_prior_note():
32
+ progress.clear()
33
+ progress.publish("a", progress.QUEUED, "queued")
34
+ progress.publish("a", progress.RUNNING, "running")
35
+ snap = progress.get("a")
36
+ assert snap is not None
37
+ assert snap.state == progress.RUNNING
38
+ assert snap.message == "running"
39
+
40
+
41
+ def test_is_terminal_classification():
42
+ assert progress.is_terminal(progress.COMPLETED)
43
+ assert progress.is_terminal(progress.FAILED)
44
+ assert not progress.is_terminal(progress.QUEUED)
45
+ assert not progress.is_terminal(progress.RUNNING)
46
+
47
+
48
+ def test_prune_drops_stale_entries(monkeypatch):
49
+ progress.clear()
50
+ base = 1_000.0
51
+ monkeypatch.setattr(progress.time, "time", lambda: base)
52
+ progress.publish("old", progress.RUNNING, "x")
53
+ # A later publish past the TTL window prunes the untouched entry.
54
+ monkeypatch.setattr(
55
+ progress.time, "time", lambda: base + progress.ENTRY_TTL_SECONDS + 1
56
+ )
57
+ progress.publish("new", progress.RUNNING, "y")
58
+ assert progress.get("old") is None
59
+ assert progress.get("new") is not None
60
+
61
+
62
+ def test_running_message_distinguishes_running_from_waiting():
63
+ running = submit._running_message_for_stage("RUNNING")
64
+ waiting = submit._running_message_for_stage("QUEUED")
65
+ assert "Evaluating" in running
66
+ assert "waiting" in waiting.lower()
67
+ assert running != waiting
68
+
69
+
70
+ def test_completed_message_surfaces_score_when_present():
71
+ msg = submit._completed_progress_message({"aggregate_score": 0.8086})
72
+ assert "0.8086" in msg
73
+ # No score -> still a clean terminal note, no crash.
74
+ assert "Done" in submit._completed_progress_message({})
75
+
76
+
77
+ def test_failed_message_appends_reason():
78
+ assert "boom" in submit._failed_progress_message("boom")
79
+ assert submit._failed_progress_message(None) == "Evaluation failed."
80
+ assert submit._failed_progress_message(" ") == "Evaluation failed."
81
+
82
+
83
+ def test_stream_yields_on_change_and_stops_on_terminal(monkeypatch):
84
+ """The panel updates on real transitions and stops at a terminal state."""
85
+ monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
86
+ snaps = [
87
+ progress.Snapshot(progress.QUEUED, "queued msg", 0.0),
88
+ progress.Snapshot(progress.RUNNING, "running msg", 0.0),
89
+ # Duplicate message -> no new yield.
90
+ progress.Snapshot(progress.RUNNING, "running msg", 0.0),
91
+ progress.Snapshot(progress.COMPLETED, "done msg", 0.0),
92
+ ]
93
+ it = iter(snaps)
94
+ monkeypatch.setattr(submit.progress, "get", lambda _sid: next(it))
95
+
96
+ out = list(submit._stream_submission_progress("x"))
97
+
98
+ assert len(out) == 3
99
+ assert "queued msg" in out[0]
100
+ assert "running msg" in out[1]
101
+ assert "done msg" in out[2]
102
+ # Terminal success gets the celebratory glyph.
103
+ assert out[2].startswith("🎉")
104
+
105
+
106
+ def test_stream_emits_failure_glyph_on_failed_terminal(monkeypatch):
107
+ monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
108
+ it = iter([progress.Snapshot(progress.FAILED, "it broke", 0.0)])
109
+ monkeypatch.setattr(submit.progress, "get", lambda _sid: next(it))
110
+
111
+ out = list(submit._stream_submission_progress("x"))
112
+
113
+ assert len(out) == 1
114
+ assert out[0].startswith("❌")
115
+ assert "it broke" in out[0]
116
+
117
+
118
+ def test_stream_backstop_deadline_emits_background_note(monkeypatch):
119
+ """If the registry never goes terminal, the stream ends gracefully."""
120
+ monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
121
+ # First monotonic() sets the deadline; the second trips it.
122
+ ticks = iter([0.0, submit.PROGRESS_STREAM_DEADLINE_SECONDS + 1])
123
+ monkeypatch.setattr(
124
+ submit.time,
125
+ "monotonic",
126
+ lambda: next(ticks, submit.PROGRESS_STREAM_DEADLINE_SECONDS + 1),
127
+ )
128
+ monkeypatch.setattr(submit.progress, "get", lambda _sid: None)
129
+
130
+ out = list(submit._stream_submission_progress("x"))
131
+
132
+ assert len(out) == 1
133
+ assert "background" in out[0].lower()
tests/test_submit.py CHANGED
@@ -22,12 +22,18 @@ import submit
22
  def _hub_http_error(status: int, headers: dict | None = None) -> submit.HfHubHTTPError:
23
  """An ``HfHubHTTPError`` with a minimal response carrying *status*.
24
 
25
- Built without going through the real Hub: construct the exception
26
- with no response, then attach a stand-in so ``_with_hub_retries``
27
- can read ``response.status_code`` / ``response.headers``.
 
 
 
28
  """
29
- err = submit.HfHubHTTPError(f"HTTP {status}")
30
- err.response = SimpleNamespace(status_code=status, headers=headers or {})
 
 
 
31
  return err
32
 
33
 
 
22
  def _hub_http_error(status: int, headers: dict | None = None) -> submit.HfHubHTTPError:
23
  """An ``HfHubHTTPError`` with a minimal response carrying *status*.
24
 
25
+ Built without going through the real Hub: a ``SimpleNamespace``
26
+ stands in for the httpx response so ``_with_hub_retries`` can read
27
+ ``response.status_code`` / ``response.headers``. Newer
28
+ ``huggingface_hub`` makes ``response`` a required keyword-only
29
+ constructor argument, so it's passed in directly (and re-assigned
30
+ afterwards for the older positional-optional signature too).
31
  """
32
+ response = SimpleNamespace(
33
+ status_code=status, headers=headers or {}, request=None,
34
+ )
35
+ err = submit.HfHubHTTPError(f"HTTP {status}", response=response)
36
+ err.response = response
37
  return err
38
 
39