Spaces:
Running
Running
| """In-memory event hub for WebSockets and live dashboard feeds.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import copy | |
| from collections import deque | |
| from typing import Any | |
| from fastapi import WebSocket | |
| _REPLAYABLE_TYPES = frozenset({"threat_feed", "detection", "incident", "ai_report"}) | |
| class EventHub: | |
| def __init__(self, maxlen: int = 2000) -> None: | |
| self._clients: set[WebSocket] = set() | |
| self.live_feed: deque[dict[str, Any]] = deque(maxlen=maxlen) | |
| self.agent_log: deque[dict[str, Any]] = deque(maxlen=500) | |
| self.replay_buffer: deque[dict[str, Any]] = deque(maxlen=400) | |
| self._lock = asyncio.Lock() | |
| self._replay_task: asyncio.Task | None = None | |
| async def connect(self, ws: WebSocket) -> None: | |
| await ws.accept() | |
| self._clients.add(ws) | |
| def disconnect(self, ws: WebSocket) -> None: | |
| self._clients.discard(ws) | |
| async def broadcast(self, message: dict[str, Any]) -> None: | |
| async with self._lock: | |
| self.live_feed.appendleft(message) | |
| if message.get("type") != "replay" and message.get("type") in _REPLAYABLE_TYPES: | |
| self.replay_buffer.append(copy.deepcopy(message)) | |
| dead: list[WebSocket] = [] | |
| for client in list(self._clients): | |
| try: | |
| await client.send_json(message) | |
| except Exception: # noqa: BLE001 | |
| dead.append(client) | |
| for c in dead: | |
| self.disconnect(c) | |
| async def log_agent(self, agent: str, status: str, detail: str = "") -> None: | |
| from datetime import datetime, timezone | |
| row = {"agent": agent, "status": status, "detail": detail, "ts": datetime.now(timezone.utc).isoformat()} | |
| async with self._lock: | |
| self.agent_log.appendleft(row) | |
| await self.broadcast({"type": "agent_activity", **row}) | |
| async def replay_attack_chain(self, delay_s: float = 0.45) -> int: | |
| """Re-broadcast buffered SOC events for judge demos (chronological).""" | |
| async with self._lock: | |
| items = list(self.replay_buffer) | |
| if not items: | |
| await self.broadcast( | |
| { | |
| "type": "replay", | |
| "phase": "empty", | |
| "message": "Replay buffer empty — trigger an attack simulation first.", | |
| } | |
| ) | |
| return 0 | |
| await self.broadcast({"type": "replay", "phase": "begin", "total": len(items)}) | |
| for i, msg in enumerate(items): | |
| await self.broadcast( | |
| { | |
| "type": "replay", | |
| "phase": "frame", | |
| "index": i, | |
| "total": len(items), | |
| "data": msg, | |
| } | |
| ) | |
| await asyncio.sleep(delay_s) | |
| await self.broadcast({"type": "replay", "phase": "end", "total": len(items)}) | |
| return len(items) | |
| def schedule_replay(self, delay_ms: int = 450) -> None: | |
| delay_s = max(0.05, delay_ms / 1000.0) | |
| async def _run() -> None: | |
| await self.replay_attack_chain(delay_s=delay_s) | |
| if self._replay_task and not self._replay_task.done(): | |
| self._replay_task.cancel() | |
| self._replay_task = asyncio.create_task(_run()) | |