Veer15's picture
chore: deploy distributed systems debug environment
b641d3d verified
import subprocess
import threading
from pathlib import Path
import httpx
from .models import SystemMetrics
class MetricsPoller(threading.Thread):
"""Background metrics poller with last-known-good caching."""
def __init__(self, poll_interval_s: float = 2.0) -> None:
super().__init__(daemon=True)
self.poll_interval_s = poll_interval_s
self._stop_event = threading.Event()
self._lock = threading.Lock()
self._latest: dict[str, float | int] = {
"gateway_success_rate": 0.0,
"gateway_p99_latency_ms": 0.0,
"queue_depth": 0,
"worker_restart_count": 0,
"consumer_stall_count": 0,
}
def stop(self) -> None:
self._stop_event.set()
def _read_counter(self, path: str) -> int:
file_path = Path(path)
if not file_path.exists():
return 0
try:
return int(file_path.read_text().strip() or "0")
except ValueError:
return 0
def _poll_gateway(self) -> dict[str, float]:
with httpx.Client(timeout=1.0) as client:
response = client.get("http://localhost:3000/health")
response.raise_for_status()
payload = response.json()
success_rate = float(
payload.get("success_rate", payload.get("gateway_success_rate", 0.0))
)
p99 = float(
payload.get("p99_latency_ms", payload.get("gateway_p99_latency_ms", 0.0))
)
return {
"gateway_success_rate": max(0.0, min(1.0, success_rate)),
"gateway_p99_latency_ms": max(0.0, p99),
}
def _poll_queue_depth(self) -> int:
result = subprocess.run(
["redis-cli", "LLEN", "job_queue"],
capture_output=True,
text=True,
timeout=2,
check=False,
)
if result.returncode != 0:
return int(self._latest["queue_depth"])
try:
return max(0, int(result.stdout.strip() or "0"))
except ValueError:
return int(self._latest["queue_depth"])
def poll_once(self) -> None:
snapshot = dict(self._latest)
try:
snapshot.update(self._poll_gateway())
except Exception:
pass
snapshot["queue_depth"] = self._poll_queue_depth()
snapshot["worker_restart_count"] = self._read_counter(
"/tmp/worker_restart_count"
)
snapshot["consumer_stall_count"] = self._read_counter(
"/tmp/consumer_stall_count"
)
with self._lock:
self._latest = snapshot
def run(self) -> None:
while not self._stop_event.is_set():
self.poll_once()
self._stop_event.wait(self.poll_interval_s)
def get_current_metrics(self) -> SystemMetrics:
with self._lock:
snapshot = dict(self._latest)
return SystemMetrics.model_validate(snapshot)