| """EDEN OS V2 — Elevated Watchdog Sidecar. |
| |
| Runs every 3 seconds: |
| - Samples last 15 frames from shared queue |
| - Computes motion score (pixel variance) + expression variance |
| - Calls router's /evaluate-failover for agent-enhanced decision |
| - Falls back to consecutive_bad >= 2 rule if agent unavailable |
| """ |
|
|
| import asyncio |
| import base64 |
| import logging |
| import time |
| from pathlib import Path |
|
|
| import httpx |
| import numpy as np |
| from fastapi import FastAPI |
| from pydantic_settings import BaseSettings |
|
|
| logger = logging.getLogger("eden.watchdog") |
|
|
|
|
| class WatchdogSettings(BaseSettings): |
| router_url: str = "http://router:8100" |
| shared_dir: str = "/shared" |
| check_interval: float = 3.0 |
| motion_threshold: float = 0.05 |
| consecutive_fails_to_failover: int = 2 |
| model_config = {"env_file": ".env", "extra": "ignore"} |
|
|
|
|
| cfg = WatchdogSettings() |
| app = FastAPI(title="EDEN Watchdog", version="2.0.0") |
|
|
| |
| consecutive_bad: dict[int, int] = {} |
| last_check_time: float = 0.0 |
| last_check_result: dict = {} |
| watchdog_running: bool = False |
|
|
|
|
| def _read_recent_frames(frame_dir: Path, n: int = 15) -> list[bytes]: |
| """Read the N most recent frame files from the shared frame directory.""" |
| if not frame_dir.exists(): |
| return [] |
| files = sorted(frame_dir.glob("*.jpg"), key=lambda f: f.stat().st_mtime, reverse=True)[:n] |
| frames = [] |
| for f in files: |
| try: |
| frames.append(f.read_bytes()) |
| except Exception: |
| pass |
| return list(reversed(frames)) |
|
|
|
|
| def compute_motion_score(frames: list[bytes]) -> float: |
| """Compute motion score from raw frame bytes.""" |
| if len(frames) < 2: |
| return 0.0 |
| try: |
| arrays = [np.frombuffer(f, dtype=np.uint8) for f in frames] |
| diffs = [] |
| for i in range(1, len(arrays)): |
| min_len = min(len(arrays[i - 1]), len(arrays[i])) |
| diff = np.mean(np.abs(arrays[i][:min_len].astype(float) - arrays[i - 1][:min_len].astype(float))) |
| diffs.append(diff) |
| avg_diff = np.mean(diffs) if diffs else 0.0 |
| return min(1.0, float(avg_diff / 25.0)) |
| except Exception as e: |
| logger.warning(f"Motion score error: {e}") |
| return 0.5 |
|
|
|
|
| async def _evaluate_failover_with_agent(pipeline_id: int, motion_score: float, consecutive: int, frame_count: int) -> bool: |
| """Ask router's agent-enhanced endpoint whether to failover. |
| Falls back to consecutive_bad >= threshold if call fails. |
| """ |
| try: |
| async with httpx.AsyncClient(timeout=3.0) as client: |
| resp = await client.post( |
| f"{cfg.router_url}/evaluate-failover", |
| json={ |
| "pipeline_id": pipeline_id, |
| "motion_score": motion_score, |
| "consecutive_bad": consecutive, |
| "frame_count": frame_count, |
| }, |
| ) |
| if resp.status_code == 200: |
| data = resp.json() |
| decision = data.get("should_failover", False) |
| logger.info(f"Agent failover decision for P{pipeline_id}: {decision}") |
| return decision |
| except Exception as e: |
| logger.warning(f"Agent failover eval failed, using threshold fallback: {e}") |
|
|
| |
| return consecutive >= cfg.consecutive_fails_to_failover |
|
|
|
|
| async def _trigger_failover(pipeline_id: int): |
| """Tell the Router to mark a pipeline as failed and swap to next.""" |
| try: |
| async with httpx.AsyncClient(timeout=5.0) as client: |
| resp = await client.post( |
| f"{cfg.router_url}/failover", |
| json={"pipeline_id": pipeline_id}, |
| ) |
| logger.warning(f"Failover triggered for pipeline {pipeline_id}: {resp.status_code}") |
| except Exception as e: |
| logger.error(f"Failover trigger failed: {e}") |
|
|
|
|
| async def _check_loop(): |
| """Main watchdog loop — runs every 3 seconds.""" |
| global last_check_time, last_check_result, watchdog_running |
| watchdog_running = True |
| frame_dir = Path(cfg.shared_dir) / "frames" |
|
|
| logger.info(f"Watchdog loop started (interval={cfg.check_interval}s, threshold={cfg.motion_threshold})") |
|
|
| while watchdog_running: |
| try: |
| frames = _read_recent_frames(frame_dir) |
| if len(frames) < 2: |
| await asyncio.sleep(cfg.check_interval) |
| continue |
|
|
| motion = compute_motion_score(frames) |
| last_check_time = time.time() |
|
|
| |
| try: |
| async with httpx.AsyncClient(timeout=3.0) as client: |
| resp = await client.get(f"{cfg.router_url}/status") |
| status = resp.json() |
| active_pipelines = [ |
| p for p in status.get("pipelines", []) |
| if p["status"] in ("ready", "busy") |
| ] |
| except Exception: |
| active_pipelines = [] |
|
|
| last_check_result = { |
| "motion_score": round(motion, 3), |
| "frame_count": len(frames), |
| "is_healthy": motion >= cfg.motion_threshold, |
| "active_pipelines": len(active_pipelines), |
| "timestamp": last_check_time, |
| } |
|
|
| if motion < cfg.motion_threshold: |
| |
| for p in active_pipelines: |
| pid = p["id"] |
| consecutive_bad[pid] = consecutive_bad.get(pid, 0) + 1 |
| logger.warning( |
| f"Bad check #{consecutive_bad[pid]} for pipeline {p['name']} " |
| f"(motion={motion:.3f} < {cfg.motion_threshold})" |
| ) |
|
|
| |
| should_failover = await _evaluate_failover_with_agent( |
| pid, motion, consecutive_bad[pid], len(frames) |
| ) |
| if should_failover: |
| await _trigger_failover(pid) |
| consecutive_bad[pid] = 0 |
| else: |
| |
| for pid in list(consecutive_bad.keys()): |
| consecutive_bad[pid] = 0 |
|
|
| except Exception as e: |
| logger.error(f"Watchdog check error: {e}") |
|
|
| await asyncio.sleep(cfg.check_interval) |
|
|
|
|
| @app.get("/health") |
| async def health(): |
| return { |
| "status": "ok", |
| "watchdog_running": watchdog_running, |
| "last_check": last_check_result, |
| "consecutive_bad": consecutive_bad, |
| } |
|
|
|
|
| @app.on_event("startup") |
| async def startup(): |
| logger.info("EDEN Watchdog starting...") |
| asyncio.create_task(_check_loop()) |
|
|