File size: 2,889 Bytes
6d9770a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging

from hackathon_advisor.profiling import (
    TurnProfiler,
    configure_logging,
    messages_processed,
    next_message_index,
    resource_snapshot,
)


def _turn_events() -> list[dict]:
    return [
        {"type": "start"},
        {"type": "stage", "stage": "planning"},
        {"type": "model_progress", "tokens": 5, "max_tokens": 180},
        {"type": "model_progress", "tokens": 12, "max_tokens": 180},
        {"type": "stage", "stage": "running_tool"},
        {"type": "tool_event", "name": "save_idea"},
        {"type": "tool_event", "name": "score_idea"},
        {"type": "stage", "stage": "writing"},
        {"type": "token", "text": "hello "},
        {"type": "done"},
    ]


def test_profiler_observes_tokens_tools_and_stage_durations() -> None:
    profiler = TurnProfiler(message_index=1, compute="cpu", backend="minicpm-transformers")
    for event in _turn_events():
        profiler.observe(event)

    durations = profiler.durations()

    assert profiler.tokens == 12
    assert profiler.tool_count == 2
    assert profiler.fell_back is False
    assert set(durations) >= {"total_ms", "decode_ms", "tools_ms", "write_ms"}
    assert all(value >= 0 for value in durations.values())


def test_profiler_logs_start_and_summary() -> None:
    configure_logging()  # the advisor logger does not propagate, so capture it directly
    logger = logging.getLogger("hackathon_advisor")
    messages: list[str] = []
    handler = logging.Handler()
    handler.emit = lambda record: messages.append(record.getMessage())
    logger.addHandler(handler)
    try:
        profiler = TurnProfiler(message_index=7, compute="gpu", backend="rules", message_chars=42)
        profiler.log_start()
        for event in _turn_events():
            profiler.observe(event)
        profiler.log_summary()
        profiler.log_summary()  # idempotent: a second call must not log again
    finally:
        logger.removeHandler(handler)

    summaries = [message for message in messages if "turn #7" in message]
    assert any("start" in message for message in summaries)
    assert sum("done" in message for message in summaries) == 1  # log_summary is idempotent


def test_profiler_marks_fallback() -> None:
    profiler = TurnProfiler(message_index=2, compute="gpu", backend="minicpm-transformers")
    profiler.observe({"type": "fallback", "to": "cpu"})

    assert profiler.fell_back is True


def test_resource_snapshot_is_best_effort_dict() -> None:
    snapshot = resource_snapshot()

    assert isinstance(snapshot, dict)
    # rss is available on the platforms we run on; never raises regardless.
    assert "rss_mb" in snapshot


def test_message_counter_increments() -> None:
    start = messages_processed()
    first = next_message_index()
    second = next_message_index()

    assert second == first + 1
    assert messages_processed() >= start + 2