"""In-memory event hub for WebSockets and live dashboard feeds.""" from __future__ import annotations import asyncio import copy from collections import deque from typing import Any from fastapi import WebSocket _REPLAYABLE_TYPES = frozenset({"threat_feed", "detection", "incident", "ai_report"}) class EventHub: def __init__(self, maxlen: int = 2000) -> None: self._clients: set[WebSocket] = set() self.live_feed: deque[dict[str, Any]] = deque(maxlen=maxlen) self.agent_log: deque[dict[str, Any]] = deque(maxlen=500) self.replay_buffer: deque[dict[str, Any]] = deque(maxlen=400) self._lock = asyncio.Lock() self._replay_task: asyncio.Task | None = None async def connect(self, ws: WebSocket) -> None: await ws.accept() self._clients.add(ws) def disconnect(self, ws: WebSocket) -> None: self._clients.discard(ws) async def broadcast(self, message: dict[str, Any]) -> None: async with self._lock: self.live_feed.appendleft(message) if message.get("type") != "replay" and message.get("type") in _REPLAYABLE_TYPES: self.replay_buffer.append(copy.deepcopy(message)) dead: list[WebSocket] = [] for client in list(self._clients): try: await client.send_json(message) except Exception: # noqa: BLE001 dead.append(client) for c in dead: self.disconnect(c) async def log_agent(self, agent: str, status: str, detail: str = "") -> None: from datetime import datetime, timezone row = {"agent": agent, "status": status, "detail": detail, "ts": datetime.now(timezone.utc).isoformat()} async with self._lock: self.agent_log.appendleft(row) await self.broadcast({"type": "agent_activity", **row}) async def replay_attack_chain(self, delay_s: float = 0.45) -> int: """Re-broadcast buffered SOC events for judge demos (chronological).""" async with self._lock: items = list(self.replay_buffer) if not items: await self.broadcast( { "type": "replay", "phase": "empty", "message": "Replay buffer empty — trigger an attack simulation first.", } ) return 0 await self.broadcast({"type": "replay", "phase": "begin", "total": len(items)}) for i, msg in enumerate(items): await self.broadcast( { "type": "replay", "phase": "frame", "index": i, "total": len(items), "data": msg, } ) await asyncio.sleep(delay_s) await self.broadcast({"type": "replay", "phase": "end", "total": len(items)}) return len(items) def schedule_replay(self, delay_ms: int = 450) -> None: delay_s = max(0.05, delay_ms / 1000.0) async def _run() -> None: await self.replay_attack_chain(delay_s=delay_s) if self._replay_task and not self._replay_task.done(): self._replay_task.cancel() self._replay_task = asyncio.create_task(_run())