sysadmin-env / tests /test_integrationt.py
huggingmenfordays's picture
Upload folder using huggingface_hub
4732653 verified
from __future__ import annotations
import os
from pathlib import Path
import signal
import subprocess
import sys
import tempfile
import time
PROJECT_ROOT = Path(__file__).resolve().parents[1]
PYTHON_BIN = sys.executable
INFERENCE_PATH = PROJECT_ROOT / "inference.py"
def _read_rss_kb(pid: int) -> int:
status_path = Path(f"/proc/{pid}/status")
if not status_path.exists():
return 0
for line in status_path.read_text().splitlines():
if line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1])
return 0
def _read_cpu_ticks(pid: int) -> int:
stat_path = Path(f"/proc/{pid}/stat")
if not stat_path.exists():
return 0
parts = stat_path.read_text().split()
if len(parts) < 15:
return 0
return int(parts[13]) + int(parts[14])
def _wait_for_health(port: int, timeout: float = 15.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
probe = subprocess.run(
[
PYTHON_BIN,
"-c",
(
"import sys, urllib.request\n"
f"urllib.request.urlopen('http://127.0.0.1:{port}/health', timeout=2).read()\n"
),
],
capture_output=True,
text=True,
)
if probe.returncode == 0:
return
time.sleep(0.25)
raise RuntimeError("server health check did not become ready")
def _run_full_cycle(port: int) -> dict[str, float | int | str]:
env = os.environ.copy()
env["SYSADMIN_ENV_SERVER_URL"] = f"ws://127.0.0.1:{port}/ws"
env["SYSADMIN_ENV_HEALTHCHECK_URL"] = f"http://127.0.0.1:{port}/health"
env["SYSADMIN_ENV_TASKS_URL"] = f"http://127.0.0.1:{port}/tasks"
env["SYSADMIN_ENV_TASK_ID"] = ""
env["SYSADMIN_ENV_DOTENV_PATH"] = str(Path(tempfile.gettempdir()) / f"sysadmin_env_missing_dotenv_{port}")
server = subprocess.Popen(
[
PYTHON_BIN,
"-m",
"uvicorn",
"server.app:app",
"--host",
"127.0.0.1",
"--port",
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
cwd=PROJECT_ROOT,
)
peak_rss_kb = 0
cpu_start = _read_cpu_ticks(server.pid)
wall_start = time.perf_counter()
try:
_wait_for_health(port)
inference = subprocess.Popen(
[PYTHON_BIN, str(INFERENCE_PATH)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
cwd=PROJECT_ROOT,
)
while inference.poll() is None:
peak_rss_kb = max(peak_rss_kb, _read_rss_kb(server.pid), _read_rss_kb(inference.pid))
time.sleep(0.05)
peak_rss_kb = max(peak_rss_kb, _read_rss_kb(server.pid))
inference_output = inference.stdout.read() if inference.stdout is not None else ""
if inference.returncode != 0:
raise RuntimeError(inference_output)
wall_elapsed = time.perf_counter() - wall_start
cpu_end = _read_cpu_ticks(server.pid)
ticks_per_second = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
cpu_seconds = max(cpu_end - cpu_start, 0) / ticks_per_second
return {
"output": inference_output,
"wall_seconds": wall_elapsed,
"peak_rss_kb": peak_rss_kb,
"cpu_seconds": cpu_seconds,
}
finally:
if server.poll() is None:
server.send_signal(signal.SIGTERM)
try:
server.wait(timeout=10)
except subprocess.TimeoutExpired:
server.kill()
server.wait(timeout=5)
def test_three_consecutive_clean_runs_stay_within_phase_eight_envelope():
runs = []
for index in range(3):
port = 8200 + index
run = _run_full_cycle(port)
runs.append(run)
output = str(run["output"])
assert output.count("[START]") == 3
assert output.count("[END]") == 3
assert output.count("[STEP]") >= 3
assert "nginx_crash" in output
assert "disk_full" in output
assert "network_broken" in output
assert float(run["wall_seconds"]) < 1200.0
assert int(run["peak_rss_kb"]) < 8 * 1024 * 1024
assert float(run["cpu_seconds"]) < float(run["wall_seconds"]) * 2.2
assert len(runs) == 3