Spaces:
Running
Running
File size: 3,303 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | """In-memory event hub for WebSockets and live dashboard feeds."""
from __future__ import annotations
import asyncio
import copy
from collections import deque
from typing import Any
from fastapi import WebSocket
_REPLAYABLE_TYPES = frozenset({"threat_feed", "detection", "incident", "ai_report"})
class EventHub:
def __init__(self, maxlen: int = 2000) -> None:
self._clients: set[WebSocket] = set()
self.live_feed: deque[dict[str, Any]] = deque(maxlen=maxlen)
self.agent_log: deque[dict[str, Any]] = deque(maxlen=500)
self.replay_buffer: deque[dict[str, Any]] = deque(maxlen=400)
self._lock = asyncio.Lock()
self._replay_task: asyncio.Task | None = None
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
self._clients.add(ws)
def disconnect(self, ws: WebSocket) -> None:
self._clients.discard(ws)
async def broadcast(self, message: dict[str, Any]) -> None:
async with self._lock:
self.live_feed.appendleft(message)
if message.get("type") != "replay" and message.get("type") in _REPLAYABLE_TYPES:
self.replay_buffer.append(copy.deepcopy(message))
dead: list[WebSocket] = []
for client in list(self._clients):
try:
await client.send_json(message)
except Exception: # noqa: BLE001
dead.append(client)
for c in dead:
self.disconnect(c)
async def log_agent(self, agent: str, status: str, detail: str = "") -> None:
from datetime import datetime, timezone
row = {"agent": agent, "status": status, "detail": detail, "ts": datetime.now(timezone.utc).isoformat()}
async with self._lock:
self.agent_log.appendleft(row)
await self.broadcast({"type": "agent_activity", **row})
async def replay_attack_chain(self, delay_s: float = 0.45) -> int:
"""Re-broadcast buffered SOC events for judge demos (chronological)."""
async with self._lock:
items = list(self.replay_buffer)
if not items:
await self.broadcast(
{
"type": "replay",
"phase": "empty",
"message": "Replay buffer empty — trigger an attack simulation first.",
}
)
return 0
await self.broadcast({"type": "replay", "phase": "begin", "total": len(items)})
for i, msg in enumerate(items):
await self.broadcast(
{
"type": "replay",
"phase": "frame",
"index": i,
"total": len(items),
"data": msg,
}
)
await asyncio.sleep(delay_s)
await self.broadcast({"type": "replay", "phase": "end", "total": len(items)})
return len(items)
def schedule_replay(self, delay_ms: int = 450) -> None:
delay_s = max(0.05, delay_ms / 1000.0)
async def _run() -> None:
await self.replay_attack_chain(delay_s=delay_s)
if self._replay_task and not self._replay_task.done():
self._replay_task.cancel()
self._replay_task = asyncio.create_task(_run())
|