File size: 4,537 Bytes
4732653 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | from __future__ import annotations
import os
from pathlib import Path
import signal
import subprocess
import sys
import tempfile
import time
PROJECT_ROOT = Path(__file__).resolve().parents[1]
PYTHON_BIN = sys.executable
INFERENCE_PATH = PROJECT_ROOT / "inference.py"
def _read_rss_kb(pid: int) -> int:
status_path = Path(f"/proc/{pid}/status")
if not status_path.exists():
return 0
for line in status_path.read_text().splitlines():
if line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1])
return 0
def _read_cpu_ticks(pid: int) -> int:
stat_path = Path(f"/proc/{pid}/stat")
if not stat_path.exists():
return 0
parts = stat_path.read_text().split()
if len(parts) < 15:
return 0
return int(parts[13]) + int(parts[14])
def _wait_for_health(port: int, timeout: float = 15.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
probe = subprocess.run(
[
PYTHON_BIN,
"-c",
(
"import sys, urllib.request\n"
f"urllib.request.urlopen('http://127.0.0.1:{port}/health', timeout=2).read()\n"
),
],
capture_output=True,
text=True,
)
if probe.returncode == 0:
return
time.sleep(0.25)
raise RuntimeError("server health check did not become ready")
def _run_full_cycle(port: int) -> dict[str, float | int | str]:
env = os.environ.copy()
env["SYSADMIN_ENV_SERVER_URL"] = f"ws://127.0.0.1:{port}/ws"
env["SYSADMIN_ENV_HEALTHCHECK_URL"] = f"http://127.0.0.1:{port}/health"
env["SYSADMIN_ENV_TASKS_URL"] = f"http://127.0.0.1:{port}/tasks"
env["SYSADMIN_ENV_TASK_ID"] = ""
env["SYSADMIN_ENV_DOTENV_PATH"] = str(Path(tempfile.gettempdir()) / f"sysadmin_env_missing_dotenv_{port}")
server = subprocess.Popen(
[
PYTHON_BIN,
"-m",
"uvicorn",
"server.app:app",
"--host",
"127.0.0.1",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
cwd=PROJECT_ROOT,
)
peak_rss_kb = 0
cpu_start = _read_cpu_ticks(server.pid)
wall_start = time.perf_counter()
try:
_wait_for_health(port)
inference = subprocess.Popen(
[PYTHON_BIN, str(INFERENCE_PATH)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
cwd=PROJECT_ROOT,
)
while inference.poll() is None:
peak_rss_kb = max(peak_rss_kb, _read_rss_kb(server.pid), _read_rss_kb(inference.pid))
time.sleep(0.05)
peak_rss_kb = max(peak_rss_kb, _read_rss_kb(server.pid))
inference_output = inference.stdout.read() if inference.stdout is not None else ""
if inference.returncode != 0:
raise RuntimeError(inference_output)
wall_elapsed = time.perf_counter() - wall_start
cpu_end = _read_cpu_ticks(server.pid)
ticks_per_second = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
cpu_seconds = max(cpu_end - cpu_start, 0) / ticks_per_second
return {
"output": inference_output,
"wall_seconds": wall_elapsed,
"peak_rss_kb": peak_rss_kb,
"cpu_seconds": cpu_seconds,
}
finally:
if server.poll() is None:
server.send_signal(signal.SIGTERM)
try:
server.wait(timeout=10)
except subprocess.TimeoutExpired:
server.kill()
server.wait(timeout=5)
def test_three_consecutive_clean_runs_stay_within_phase_eight_envelope():
runs = []
for index in range(3):
port = 8200 + index
run = _run_full_cycle(port)
runs.append(run)
output = str(run["output"])
assert output.count("[START]") == 3
assert output.count("[END]") == 3
assert output.count("[STEP]") >= 3
assert "nginx_crash" in output
assert "disk_full" in output
assert "network_broken" in output
assert float(run["wall_seconds"]) < 1200.0
assert int(run["peak_rss_kb"]) < 8 * 1024 * 1024
assert float(run["cpu_seconds"]) < float(run["wall_seconds"]) * 2.2
assert len(runs) == 3
|