SentinelAI / services /event_hub.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""In-memory event hub for WebSockets and live dashboard feeds."""
from __future__ import annotations
import asyncio
import copy
from collections import deque
from typing import Any
from fastapi import WebSocket
_REPLAYABLE_TYPES = frozenset({"threat_feed", "detection", "incident", "ai_report"})
class EventHub:
def __init__(self, maxlen: int = 2000) -> None:
self._clients: set[WebSocket] = set()
self.live_feed: deque[dict[str, Any]] = deque(maxlen=maxlen)
self.agent_log: deque[dict[str, Any]] = deque(maxlen=500)
self.replay_buffer: deque[dict[str, Any]] = deque(maxlen=400)
self._lock = asyncio.Lock()
self._replay_task: asyncio.Task | None = None
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
self._clients.add(ws)
def disconnect(self, ws: WebSocket) -> None:
self._clients.discard(ws)
async def broadcast(self, message: dict[str, Any]) -> None:
async with self._lock:
self.live_feed.appendleft(message)
if message.get("type") != "replay" and message.get("type") in _REPLAYABLE_TYPES:
self.replay_buffer.append(copy.deepcopy(message))
dead: list[WebSocket] = []
for client in list(self._clients):
try:
await client.send_json(message)
except Exception: # noqa: BLE001
dead.append(client)
for c in dead:
self.disconnect(c)
async def log_agent(self, agent: str, status: str, detail: str = "") -> None:
from datetime import datetime, timezone
row = {"agent": agent, "status": status, "detail": detail, "ts": datetime.now(timezone.utc).isoformat()}
async with self._lock:
self.agent_log.appendleft(row)
await self.broadcast({"type": "agent_activity", **row})
async def replay_attack_chain(self, delay_s: float = 0.45) -> int:
"""Re-broadcast buffered SOC events for judge demos (chronological)."""
async with self._lock:
items = list(self.replay_buffer)
if not items:
await self.broadcast(
{
"type": "replay",
"phase": "empty",
"message": "Replay buffer empty — trigger an attack simulation first.",
}
)
return 0
await self.broadcast({"type": "replay", "phase": "begin", "total": len(items)})
for i, msg in enumerate(items):
await self.broadcast(
{
"type": "replay",
"phase": "frame",
"index": i,
"total": len(items),
"data": msg,
}
)
await asyncio.sleep(delay_s)
await self.broadcast({"type": "replay", "phase": "end", "total": len(items)})
return len(items)
def schedule_replay(self, delay_ms: int = 450) -> None:
delay_s = max(0.05, delay_ms / 1000.0)
async def _run() -> None:
await self.replay_attack_chain(delay_s=delay_s)
if self._replay_task and not self._replay_task.done():
self._replay_task.cancel()
self._replay_task = asyncio.create_task(_run())