| import subprocess |
| import threading |
| from pathlib import Path |
|
|
| import httpx |
|
|
| from .models import SystemMetrics |
|
|
|
|
| class MetricsPoller(threading.Thread): |
| """Background metrics poller with last-known-good caching.""" |
|
|
| def __init__(self, poll_interval_s: float = 2.0) -> None: |
| super().__init__(daemon=True) |
| self.poll_interval_s = poll_interval_s |
| self._stop_event = threading.Event() |
| self._lock = threading.Lock() |
| self._latest: dict[str, float | int] = { |
| "gateway_success_rate": 0.0, |
| "gateway_p99_latency_ms": 0.0, |
| "queue_depth": 0, |
| "worker_restart_count": 0, |
| "consumer_stall_count": 0, |
| } |
|
|
| def stop(self) -> None: |
| self._stop_event.set() |
|
|
| def _read_counter(self, path: str) -> int: |
| file_path = Path(path) |
| if not file_path.exists(): |
| return 0 |
| try: |
| return int(file_path.read_text().strip() or "0") |
| except ValueError: |
| return 0 |
|
|
| def _poll_gateway(self) -> dict[str, float]: |
| with httpx.Client(timeout=1.0) as client: |
| response = client.get("http://localhost:3000/health") |
| response.raise_for_status() |
| payload = response.json() |
|
|
| success_rate = float( |
| payload.get("success_rate", payload.get("gateway_success_rate", 0.0)) |
| ) |
| p99 = float( |
| payload.get("p99_latency_ms", payload.get("gateway_p99_latency_ms", 0.0)) |
| ) |
| return { |
| "gateway_success_rate": max(0.0, min(1.0, success_rate)), |
| "gateway_p99_latency_ms": max(0.0, p99), |
| } |
|
|
| def _poll_queue_depth(self) -> int: |
| result = subprocess.run( |
| ["redis-cli", "LLEN", "job_queue"], |
| capture_output=True, |
| text=True, |
| timeout=2, |
| check=False, |
| ) |
| if result.returncode != 0: |
| return int(self._latest["queue_depth"]) |
| try: |
| return max(0, int(result.stdout.strip() or "0")) |
| except ValueError: |
| return int(self._latest["queue_depth"]) |
|
|
| def poll_once(self) -> None: |
| snapshot = dict(self._latest) |
| try: |
| snapshot.update(self._poll_gateway()) |
| except Exception: |
| pass |
|
|
| snapshot["queue_depth"] = self._poll_queue_depth() |
| snapshot["worker_restart_count"] = self._read_counter( |
| "/tmp/worker_restart_count" |
| ) |
| snapshot["consumer_stall_count"] = self._read_counter( |
| "/tmp/consumer_stall_count" |
| ) |
|
|
| with self._lock: |
| self._latest = snapshot |
|
|
| def run(self) -> None: |
| while not self._stop_event.is_set(): |
| self.poll_once() |
| self._stop_event.wait(self.poll_interval_s) |
|
|
| def get_current_metrics(self) -> SystemMetrics: |
| with self._lock: |
| snapshot = dict(self._latest) |
| return SystemMetrics.model_validate(snapshot) |
|
|