File size: 5,981 Bytes
b641d3d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d75167
b641d3d
3d75167
 
b641d3d
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import os

from server.constants import TaskName
from inference import (
    _attempt_history_block,
    _episode_score,
    _format_end_line,
    _parse_tasks,
    _single_line,
    _task_symptom_block,
    build_prompt,
    extract_command,
    extract_reasoning,
)
from server.models import Observation, SystemMetrics


def test_extract_command_rejects_non_json_code_fence() -> None:
    raw = "```bash\nredis-cli LLEN job_queue\n```"
    assert extract_command(raw) is None


def test_extract_command_returns_none_when_empty() -> None:
    assert extract_command("   ") is None


def test_extract_command_reads_json_payload() -> None:
    raw = '{"command":"redis-cli LLEN job_queue"}'
    assert extract_command(raw) == "redis-cli LLEN job_queue"


def test_extract_command_reads_fenced_json_payload() -> None:
    raw = '```json\n{"command":"ps -ef"}\n```'
    assert extract_command(raw) == "ps -ef"


def test_extract_command_reads_json_embedded_in_text() -> None:
    raw = 'Use this command: {"command":"redis-cli LLEN job_queue"} thanks.'
    assert extract_command(raw) == "redis-cli LLEN job_queue"


def test_extract_command_reads_json_after_reasoning_preamble() -> None:
    raw = (
        "I'll start by checking process state.\n"
        '{"command":"ps aux","reasoning":"list processes"}'
    )
    assert extract_command(raw) == "ps aux"
    assert extract_reasoning(raw) == "list processes"


def test_extract_command_prefers_first_json_object_with_command() -> None:
    raw = '{"meta":"skip"} then {"command":"ls -la","reasoning":"explore"}'
    assert extract_command(raw) == "ls -la"


def test_extract_reasoning_when_present() -> None:
    raw = '{"command":"redis-cli LLEN job_queue","reasoning":"check queue depth first"}'
    assert extract_command(raw) == "redis-cli LLEN job_queue"
    assert extract_reasoning(raw) == "check queue depth first"


def test_extract_command_requires_command_even_with_reasoning() -> None:
    raw = '{"reasoning":"i should inspect logs"}'
    assert extract_command(raw) is None
    assert extract_reasoning(raw) is None


def test_single_line_removes_newlines() -> None:
    assert _single_line("echo a\necho b") == "echo a echo b"


def test_task_symptom_block_is_non_empty() -> None:
    block = _task_symptom_block(TaskName.ROUTE_PARTITION)
    assert "connectivity path issue" in block
    assert "route-partition" not in block


def test_task_symptom_block_includes_new_tasks() -> None:
    registry_block = _task_symptom_block(TaskName.REGISTRY_CORRUPTION)
    runaway_block = _task_symptom_block(TaskName.JOB_GENERATOR_RUNAWAY)

    assert "registry" in registry_block.lower()
    assert "queue" in runaway_block.lower()
    assert "job-generator-runaway" not in runaway_block


def test_attempt_history_block_renders_all_attempts() -> None:
    attempts = [
        {
            "step": 1,
            "command": "redis-cli LLEN job_queue",
            "reasoning": "check backlog",
            "reward": 0.12,
            "error": None,
        },
        {
            "step": 2,
            "command": "curl -s localhost:3000/health",
            "reasoning": None,
            "reward": 0.08,
            "error": "timeout",
        },
    ]
    block = _attempt_history_block(attempts)
    assert "step 1: command=redis-cli LLEN job_queue" in block
    assert "step 2: command=curl -s localhost:3000/health" in block
    assert "reasoning=check backlog" in block
    assert "error=timeout" in block
    assert "reward=" not in block


def test_build_prompt_contains_symptoms_and_history() -> None:
    obs = Observation(
        command_output="service checks show partial failures",
        metrics=SystemMetrics(
            gateway_success_rate=0.32,
            gateway_p99_latency_ms=1500.0,
            queue_depth=412,
            worker_restart_count=3,
            consumer_stall_count=2,
        ),
        process_status={"gateway": "running", "worker": "running"},
    )
    prompt = build_prompt(
        obs=obs,
        step_num=3,
        task_name=TaskName.BACKPRESSURE_CASCADE,
        attempt_history=[
            {
                "step": 1,
                "command": "redis-cli LLEN job_queue",
                "reasoning": "measure backlog",
                "reward": 0.10,
                "error": None,
            }
        ],
    )
    assert "TASK SYMPTOMS:" in prompt
    assert "PREVIOUS ATTEMPTS:" in prompt
    assert "step 1: command=redis-cli LLEN job_queue" in prompt
    assert "LATEST COMMAND OUTPUT:" in prompt
    assert "reward=" not in prompt


def test_parse_tasks_default_and_override() -> None:
    previous = os.getenv("TASKS_CSV")
    try:
        os.environ.pop("TASKS_CSV", None)
        default_tasks = _parse_tasks()
        assert default_tasks == [
            TaskName.CASCADING_TIMEOUT,
            TaskName.BYZANTINE_QUEUE_FAULT,
            TaskName.DISTRIBUTED_LOCK_STARVATION,
        ]

        os.environ["TASKS_CSV"] = "route-partition,backpressure-cascade"
        assert _parse_tasks() == [
            TaskName.ROUTE_PARTITION,
            TaskName.BACKPRESSURE_CASCADE,
        ]

        os.environ["TASKS_CSV"] = "registry-corruption,job-generator-runaway"
        assert _parse_tasks() == [
            TaskName.REGISTRY_CORRUPTION,
            TaskName.JOB_GENERATOR_RUNAWAY,
        ]
    finally:
        if previous is None:
            os.environ.pop("TASKS_CSV", None)
        else:
            os.environ["TASKS_CSV"] = previous



def test_episode_score_clamps_terminal_reward_to_unit_interval() -> None:
    assert _episode_score([]) == 0.01
    assert _episode_score([0.2, 0.8]) == 0.8
    assert _episode_score([1.2]) == 0.99
    assert _episode_score([-0.1]) == 0.01


def test_end_log_line_includes_score_and_reward_list() -> None:
    line = _format_end_line(success=True, steps=3, score=0.987, rewards=[0.0, 0.125, 1.0])
    assert line == (
        "[END]   success=true steps=3 score=0.99 rewards=0.00,0.12,1.00"
    )