biteve / watchdog /main.py
AIBRUH's picture
Upload folder using huggingface_hub
db72923 verified
"""EDEN OS V2 — Elevated Watchdog Sidecar.
Runs every 3 seconds:
- Samples last 15 frames from shared queue
- Computes motion score (pixel variance) + expression variance
- Calls router's /evaluate-failover for agent-enhanced decision
- Falls back to consecutive_bad >= 2 rule if agent unavailable
"""
import asyncio
import base64
import logging
import time
from pathlib import Path
import httpx
import numpy as np
from fastapi import FastAPI
from pydantic_settings import BaseSettings
logger = logging.getLogger("eden.watchdog")
class WatchdogSettings(BaseSettings):
router_url: str = "http://router:8100"
shared_dir: str = "/shared"
check_interval: float = 3.0
motion_threshold: float = 0.05
consecutive_fails_to_failover: int = 2
model_config = {"env_file": ".env", "extra": "ignore"}
cfg = WatchdogSettings()
app = FastAPI(title="EDEN Watchdog", version="2.0.0")
# State
consecutive_bad: dict[int, int] = {} # pipeline_id → consecutive bad count
last_check_time: float = 0.0
last_check_result: dict = {}
watchdog_running: bool = False
def _read_recent_frames(frame_dir: Path, n: int = 15) -> list[bytes]:
"""Read the N most recent frame files from the shared frame directory."""
if not frame_dir.exists():
return []
files = sorted(frame_dir.glob("*.jpg"), key=lambda f: f.stat().st_mtime, reverse=True)[:n]
frames = []
for f in files:
try:
frames.append(f.read_bytes())
except Exception:
pass
return list(reversed(frames))
def compute_motion_score(frames: list[bytes]) -> float:
"""Compute motion score from raw frame bytes."""
if len(frames) < 2:
return 0.0
try:
arrays = [np.frombuffer(f, dtype=np.uint8) for f in frames]
diffs = []
for i in range(1, len(arrays)):
min_len = min(len(arrays[i - 1]), len(arrays[i]))
diff = np.mean(np.abs(arrays[i][:min_len].astype(float) - arrays[i - 1][:min_len].astype(float)))
diffs.append(diff)
avg_diff = np.mean(diffs) if diffs else 0.0
return min(1.0, float(avg_diff / 25.0))
except Exception as e:
logger.warning(f"Motion score error: {e}")
return 0.5
async def _evaluate_failover_with_agent(pipeline_id: int, motion_score: float, consecutive: int, frame_count: int) -> bool:
"""Ask router's agent-enhanced endpoint whether to failover.
Falls back to consecutive_bad >= threshold if call fails.
"""
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.post(
f"{cfg.router_url}/evaluate-failover",
json={
"pipeline_id": pipeline_id,
"motion_score": motion_score,
"consecutive_bad": consecutive,
"frame_count": frame_count,
},
)
if resp.status_code == 200:
data = resp.json()
decision = data.get("should_failover", False)
logger.info(f"Agent failover decision for P{pipeline_id}: {decision}")
return decision
except Exception as e:
logger.warning(f"Agent failover eval failed, using threshold fallback: {e}")
# Fallback: original rule
return consecutive >= cfg.consecutive_fails_to_failover
async def _trigger_failover(pipeline_id: int):
"""Tell the Router to mark a pipeline as failed and swap to next."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(
f"{cfg.router_url}/failover",
json={"pipeline_id": pipeline_id},
)
logger.warning(f"Failover triggered for pipeline {pipeline_id}: {resp.status_code}")
except Exception as e:
logger.error(f"Failover trigger failed: {e}")
async def _check_loop():
"""Main watchdog loop — runs every 3 seconds."""
global last_check_time, last_check_result, watchdog_running
watchdog_running = True
frame_dir = Path(cfg.shared_dir) / "frames"
logger.info(f"Watchdog loop started (interval={cfg.check_interval}s, threshold={cfg.motion_threshold})")
while watchdog_running:
try:
frames = _read_recent_frames(frame_dir)
if len(frames) < 2:
await asyncio.sleep(cfg.check_interval)
continue
motion = compute_motion_score(frames)
last_check_time = time.time()
# Get current active pipeline from router
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{cfg.router_url}/status")
status = resp.json()
active_pipelines = [
p for p in status.get("pipelines", [])
if p["status"] in ("ready", "busy")
]
except Exception:
active_pipelines = []
last_check_result = {
"motion_score": round(motion, 3),
"frame_count": len(frames),
"is_healthy": motion >= cfg.motion_threshold,
"active_pipelines": len(active_pipelines),
"timestamp": last_check_time,
}
if motion < cfg.motion_threshold:
# Bad check — ask router's agent-enhanced failover decision
for p in active_pipelines:
pid = p["id"]
consecutive_bad[pid] = consecutive_bad.get(pid, 0) + 1
logger.warning(
f"Bad check #{consecutive_bad[pid]} for pipeline {p['name']} "
f"(motion={motion:.3f} < {cfg.motion_threshold})"
)
# Ask agent-enhanced router for failover decision
should_failover = await _evaluate_failover_with_agent(
pid, motion, consecutive_bad[pid], len(frames)
)
if should_failover:
await _trigger_failover(pid)
consecutive_bad[pid] = 0
else:
# Good check — reset counters
for pid in list(consecutive_bad.keys()):
consecutive_bad[pid] = 0
except Exception as e:
logger.error(f"Watchdog check error: {e}")
await asyncio.sleep(cfg.check_interval)
@app.get("/health")
async def health():
return {
"status": "ok",
"watchdog_running": watchdog_running,
"last_check": last_check_result,
"consecutive_bad": consecutive_bad,
}
@app.on_event("startup")
async def startup():
logger.info("EDEN Watchdog starting...")
asyncio.create_task(_check_loop())