File size: 5,981 Bytes
b641d3d 3d75167 b641d3d 3d75167 b641d3d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | import os
from server.constants import TaskName
from inference import (
_attempt_history_block,
_episode_score,
_format_end_line,
_parse_tasks,
_single_line,
_task_symptom_block,
build_prompt,
extract_command,
extract_reasoning,
)
from server.models import Observation, SystemMetrics
def test_extract_command_rejects_non_json_code_fence() -> None:
raw = "```bash\nredis-cli LLEN job_queue\n```"
assert extract_command(raw) is None
def test_extract_command_returns_none_when_empty() -> None:
assert extract_command(" ") is None
def test_extract_command_reads_json_payload() -> None:
raw = '{"command":"redis-cli LLEN job_queue"}'
assert extract_command(raw) == "redis-cli LLEN job_queue"
def test_extract_command_reads_fenced_json_payload() -> None:
raw = '```json\n{"command":"ps -ef"}\n```'
assert extract_command(raw) == "ps -ef"
def test_extract_command_reads_json_embedded_in_text() -> None:
raw = 'Use this command: {"command":"redis-cli LLEN job_queue"} thanks.'
assert extract_command(raw) == "redis-cli LLEN job_queue"
def test_extract_command_reads_json_after_reasoning_preamble() -> None:
raw = (
"I'll start by checking process state.\n"
'{"command":"ps aux","reasoning":"list processes"}'
)
assert extract_command(raw) == "ps aux"
assert extract_reasoning(raw) == "list processes"
def test_extract_command_prefers_first_json_object_with_command() -> None:
raw = '{"meta":"skip"} then {"command":"ls -la","reasoning":"explore"}'
assert extract_command(raw) == "ls -la"
def test_extract_reasoning_when_present() -> None:
raw = '{"command":"redis-cli LLEN job_queue","reasoning":"check queue depth first"}'
assert extract_command(raw) == "redis-cli LLEN job_queue"
assert extract_reasoning(raw) == "check queue depth first"
def test_extract_command_requires_command_even_with_reasoning() -> None:
raw = '{"reasoning":"i should inspect logs"}'
assert extract_command(raw) is None
assert extract_reasoning(raw) is None
def test_single_line_removes_newlines() -> None:
assert _single_line("echo a\necho b") == "echo a echo b"
def test_task_symptom_block_is_non_empty() -> None:
block = _task_symptom_block(TaskName.ROUTE_PARTITION)
assert "connectivity path issue" in block
assert "route-partition" not in block
def test_task_symptom_block_includes_new_tasks() -> None:
registry_block = _task_symptom_block(TaskName.REGISTRY_CORRUPTION)
runaway_block = _task_symptom_block(TaskName.JOB_GENERATOR_RUNAWAY)
assert "registry" in registry_block.lower()
assert "queue" in runaway_block.lower()
assert "job-generator-runaway" not in runaway_block
def test_attempt_history_block_renders_all_attempts() -> None:
attempts = [
{
"step": 1,
"command": "redis-cli LLEN job_queue",
"reasoning": "check backlog",
"reward": 0.12,
"error": None,
},
{
"step": 2,
"command": "curl -s localhost:3000/health",
"reasoning": None,
"reward": 0.08,
"error": "timeout",
},
]
block = _attempt_history_block(attempts)
assert "step 1: command=redis-cli LLEN job_queue" in block
assert "step 2: command=curl -s localhost:3000/health" in block
assert "reasoning=check backlog" in block
assert "error=timeout" in block
assert "reward=" not in block
def test_build_prompt_contains_symptoms_and_history() -> None:
obs = Observation(
command_output="service checks show partial failures",
metrics=SystemMetrics(
gateway_success_rate=0.32,
gateway_p99_latency_ms=1500.0,
queue_depth=412,
worker_restart_count=3,
consumer_stall_count=2,
),
process_status={"gateway": "running", "worker": "running"},
)
prompt = build_prompt(
obs=obs,
step_num=3,
task_name=TaskName.BACKPRESSURE_CASCADE,
attempt_history=[
{
"step": 1,
"command": "redis-cli LLEN job_queue",
"reasoning": "measure backlog",
"reward": 0.10,
"error": None,
}
],
)
assert "TASK SYMPTOMS:" in prompt
assert "PREVIOUS ATTEMPTS:" in prompt
assert "step 1: command=redis-cli LLEN job_queue" in prompt
assert "LATEST COMMAND OUTPUT:" in prompt
assert "reward=" not in prompt
def test_parse_tasks_default_and_override() -> None:
previous = os.getenv("TASKS_CSV")
try:
os.environ.pop("TASKS_CSV", None)
default_tasks = _parse_tasks()
assert default_tasks == [
TaskName.CASCADING_TIMEOUT,
TaskName.BYZANTINE_QUEUE_FAULT,
TaskName.DISTRIBUTED_LOCK_STARVATION,
]
os.environ["TASKS_CSV"] = "route-partition,backpressure-cascade"
assert _parse_tasks() == [
TaskName.ROUTE_PARTITION,
TaskName.BACKPRESSURE_CASCADE,
]
os.environ["TASKS_CSV"] = "registry-corruption,job-generator-runaway"
assert _parse_tasks() == [
TaskName.REGISTRY_CORRUPTION,
TaskName.JOB_GENERATOR_RUNAWAY,
]
finally:
if previous is None:
os.environ.pop("TASKS_CSV", None)
else:
os.environ["TASKS_CSV"] = previous
def test_episode_score_clamps_terminal_reward_to_unit_interval() -> None:
assert _episode_score([]) == 0.01
assert _episode_score([0.2, 0.8]) == 0.8
assert _episode_score([1.2]) == 0.99
assert _episode_score([-0.1]) == 0.01
def test_end_log_line_includes_score_and_reward_list() -> None:
line = _format_end_line(success=True, steps=3, score=0.987, rewards=[0.0, 0.125, 1.0])
assert line == (
"[END] success=true steps=3 score=0.99 rewards=0.00,0.12,1.00"
) |