GodSpeed / src /anomaly /notifier.py
Ananth Shyam
Implement anomaly detection and forecasting features
451d52a
"""In-process WebSocket broadcast bridge for anomaly signals.
Celery workers run in a separate OS process, so `_notification_clients`
in src/ws/router.py is always empty there. Real-time push is handled here:
a FastAPI BackgroundTask (running inside the API server process) polls for
recently-detected critical/high signals and calls broadcast_notification().
Rate-limited to one check per 5 minutes via a module-level timestamp.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
_last_notified: datetime = datetime.utcnow() - timedelta(minutes=5)
async def broadcast_new_critical_signals() -> None:
"""Check for new critical/high anomaly signals and push via WebSocket.
Called as a BackgroundTask from GET /api/anomaly/signals.
No-ops if fewer than 5 minutes have elapsed since the last dispatch.
"""
global _last_notified
now = datetime.utcnow()
if (now - _last_notified).total_seconds() < 300:
return
_last_notified = now
try:
from src.anomaly.db import get_signals
from src.ws.router import broadcast_notification
recent_signals = get_signals(
team_id=None,
severity=None,
signal_type=None,
resolved=False,
limit=20,
)
ten_minutes_ago = now - timedelta(minutes=10)
for sig in recent_signals:
if sig.get("severity") not in ("critical", "high"):
continue
try:
detected = datetime.fromisoformat(
str(sig["detected_at"]).replace("Z", "+00:00")
).replace(tzinfo=None)
except Exception:
continue
if detected < ten_minutes_ago:
continue
ws_type = (
"escalation_spike"
if sig["signal_type"] in ("query_spike", "query_drop", "escalation_trend")
else "knowledge_gap"
)
entity_suffix = f" — {sig['entity_id']}" if sig.get("entity_id") else ""
await broadcast_notification({
"type": ws_type,
"message": sig["signal_type"].replace("_", " ").title() + entity_suffix,
"severity": sig["severity"],
"timestamp": sig["detected_at"],
})
except Exception:
logger.warning("notifier: broadcast_new_critical_signals failed", exc_info=True)