Spaces:
Sleeping
Sleeping
| """In-process WebSocket broadcast bridge for anomaly signals. | |
| Celery workers run in a separate OS process, so `_notification_clients` | |
| in src/ws/router.py is always empty there. Real-time push is handled here: | |
| a FastAPI BackgroundTask (running inside the API server process) polls for | |
| recently-detected critical/high signals and calls broadcast_notification(). | |
| Rate-limited to one check per 5 minutes via a module-level timestamp. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| _last_notified: datetime = datetime.utcnow() - timedelta(minutes=5) | |
| async def broadcast_new_critical_signals() -> None: | |
| """Check for new critical/high anomaly signals and push via WebSocket. | |
| Called as a BackgroundTask from GET /api/anomaly/signals. | |
| No-ops if fewer than 5 minutes have elapsed since the last dispatch. | |
| """ | |
| global _last_notified | |
| now = datetime.utcnow() | |
| if (now - _last_notified).total_seconds() < 300: | |
| return | |
| _last_notified = now | |
| try: | |
| from src.anomaly.db import get_signals | |
| from src.ws.router import broadcast_notification | |
| recent_signals = get_signals( | |
| team_id=None, | |
| severity=None, | |
| signal_type=None, | |
| resolved=False, | |
| limit=20, | |
| ) | |
| ten_minutes_ago = now - timedelta(minutes=10) | |
| for sig in recent_signals: | |
| if sig.get("severity") not in ("critical", "high"): | |
| continue | |
| try: | |
| detected = datetime.fromisoformat( | |
| str(sig["detected_at"]).replace("Z", "+00:00") | |
| ).replace(tzinfo=None) | |
| except Exception: | |
| continue | |
| if detected < ten_minutes_ago: | |
| continue | |
| ws_type = ( | |
| "escalation_spike" | |
| if sig["signal_type"] in ("query_spike", "query_drop", "escalation_trend") | |
| else "knowledge_gap" | |
| ) | |
| entity_suffix = f" — {sig['entity_id']}" if sig.get("entity_id") else "" | |
| await broadcast_notification({ | |
| "type": ws_type, | |
| "message": sig["signal_type"].replace("_", " ").title() + entity_suffix, | |
| "severity": sig["severity"], | |
| "timestamp": sig["detected_at"], | |
| }) | |
| except Exception: | |
| logger.warning("notifier: broadcast_new_critical_signals failed", exc_info=True) | |