File size: 4,822 Bytes
b0f4559
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Unit tests for the live personal-view progress feedback.

Two surfaces:

1. :mod:`progress` - the in-process registry the eval worker writes to
   (publish / get / terminal classification / TTL pruning).
2. :mod:`submit`'s observer side - the message helpers and the
   :func:`submit._stream_submission_progress` generator that streams
   registry notes into the submitter's status panel until a terminal
   stage or the backstop deadline.

All time + registry access is monkeypatched, so the suite has zero
network traffic and never actually sleeps.
"""
from __future__ import annotations

import progress
import submit


def test_publish_get_roundtrip():
    progress.clear()
    progress.publish("a", progress.RUNNING, "evaluating")
    snap = progress.get("a")
    assert snap is not None
    assert snap.state == progress.RUNNING
    assert snap.message == "evaluating"
    assert progress.get("does-not-exist") is None


def test_publish_overwrites_prior_note():
    progress.clear()
    progress.publish("a", progress.QUEUED, "queued")
    progress.publish("a", progress.RUNNING, "running")
    snap = progress.get("a")
    assert snap is not None
    assert snap.state == progress.RUNNING
    assert snap.message == "running"


def test_is_terminal_classification():
    assert progress.is_terminal(progress.COMPLETED)
    assert progress.is_terminal(progress.FAILED)
    assert not progress.is_terminal(progress.QUEUED)
    assert not progress.is_terminal(progress.RUNNING)


def test_prune_drops_stale_entries(monkeypatch):
    progress.clear()
    base = 1_000.0
    monkeypatch.setattr(progress.time, "time", lambda: base)
    progress.publish("old", progress.RUNNING, "x")
    # A later publish past the TTL window prunes the untouched entry.
    monkeypatch.setattr(
        progress.time, "time", lambda: base + progress.ENTRY_TTL_SECONDS + 1
    )
    progress.publish("new", progress.RUNNING, "y")
    assert progress.get("old") is None
    assert progress.get("new") is not None


def test_running_message_distinguishes_running_from_waiting():
    running = submit._running_message_for_stage("RUNNING")
    waiting = submit._running_message_for_stage("QUEUED")
    assert "Evaluating" in running
    assert "waiting" in waiting.lower()
    assert running != waiting


def test_completed_message_surfaces_score_when_present():
    msg = submit._completed_progress_message({"aggregate_score": 0.8086})
    assert "0.8086" in msg
    # No score -> still a clean terminal note, no crash.
    assert "Done" in submit._completed_progress_message({})


def test_failed_message_appends_reason():
    assert "boom" in submit._failed_progress_message("boom")
    assert submit._failed_progress_message(None) == "Evaluation failed."
    assert submit._failed_progress_message("  ") == "Evaluation failed."


def test_stream_yields_on_change_and_stops_on_terminal(monkeypatch):
    """The panel updates on real transitions and stops at a terminal state."""
    monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
    snaps = [
        progress.Snapshot(progress.QUEUED, "queued msg", 0.0),
        progress.Snapshot(progress.RUNNING, "running msg", 0.0),
        # Duplicate message -> no new yield.
        progress.Snapshot(progress.RUNNING, "running msg", 0.0),
        progress.Snapshot(progress.COMPLETED, "done msg", 0.0),
    ]
    it = iter(snaps)
    monkeypatch.setattr(submit.progress, "get", lambda _sid: next(it))

    out = list(submit._stream_submission_progress("x"))

    assert len(out) == 3
    assert "queued msg" in out[0]
    assert "running msg" in out[1]
    assert "done msg" in out[2]
    # Terminal success gets the celebratory glyph.
    assert out[2].startswith("🎉")


def test_stream_emits_failure_glyph_on_failed_terminal(monkeypatch):
    monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
    it = iter([progress.Snapshot(progress.FAILED, "it broke", 0.0)])
    monkeypatch.setattr(submit.progress, "get", lambda _sid: next(it))

    out = list(submit._stream_submission_progress("x"))

    assert len(out) == 1
    assert out[0].startswith("❌")
    assert "it broke" in out[0]


def test_stream_backstop_deadline_emits_background_note(monkeypatch):
    """If the registry never goes terminal, the stream ends gracefully."""
    monkeypatch.setattr(submit.time, "sleep", lambda *_: None)
    # First monotonic() sets the deadline; the second trips it.
    ticks = iter([0.0, submit.PROGRESS_STREAM_DEADLINE_SECONDS + 1])
    monkeypatch.setattr(
        submit.time,
        "monotonic",
        lambda: next(ticks, submit.PROGRESS_STREAM_DEADLINE_SECONDS + 1),
    )
    monkeypatch.setattr(submit.progress, "get", lambda _sid: None)

    out = list(submit._stream_submission_progress("x"))

    assert len(out) == 1
    assert "background" in out[0].lower()