File size: 3,303 Bytes
8b3905d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""In-memory event hub for WebSockets and live dashboard feeds."""

from __future__ import annotations

import asyncio
import copy
from collections import deque
from typing import Any

from fastapi import WebSocket

_REPLAYABLE_TYPES = frozenset({"threat_feed", "detection", "incident", "ai_report"})


class EventHub:
    def __init__(self, maxlen: int = 2000) -> None:
        self._clients: set[WebSocket] = set()
        self.live_feed: deque[dict[str, Any]] = deque(maxlen=maxlen)
        self.agent_log: deque[dict[str, Any]] = deque(maxlen=500)
        self.replay_buffer: deque[dict[str, Any]] = deque(maxlen=400)
        self._lock = asyncio.Lock()
        self._replay_task: asyncio.Task | None = None

    async def connect(self, ws: WebSocket) -> None:
        await ws.accept()
        self._clients.add(ws)

    def disconnect(self, ws: WebSocket) -> None:
        self._clients.discard(ws)

    async def broadcast(self, message: dict[str, Any]) -> None:
        async with self._lock:
            self.live_feed.appendleft(message)
            if message.get("type") != "replay" and message.get("type") in _REPLAYABLE_TYPES:
                self.replay_buffer.append(copy.deepcopy(message))
        dead: list[WebSocket] = []
        for client in list(self._clients):
            try:
                await client.send_json(message)
            except Exception:  # noqa: BLE001
                dead.append(client)
        for c in dead:
            self.disconnect(c)

    async def log_agent(self, agent: str, status: str, detail: str = "") -> None:
        from datetime import datetime, timezone

        row = {"agent": agent, "status": status, "detail": detail, "ts": datetime.now(timezone.utc).isoformat()}
        async with self._lock:
            self.agent_log.appendleft(row)
        await self.broadcast({"type": "agent_activity", **row})

    async def replay_attack_chain(self, delay_s: float = 0.45) -> int:
        """Re-broadcast buffered SOC events for judge demos (chronological)."""
        async with self._lock:
            items = list(self.replay_buffer)
        if not items:
            await self.broadcast(
                {
                    "type": "replay",
                    "phase": "empty",
                    "message": "Replay buffer empty — trigger an attack simulation first.",
                }
            )
            return 0
        await self.broadcast({"type": "replay", "phase": "begin", "total": len(items)})
        for i, msg in enumerate(items):
            await self.broadcast(
                {
                    "type": "replay",
                    "phase": "frame",
                    "index": i,
                    "total": len(items),
                    "data": msg,
                }
            )
            await asyncio.sleep(delay_s)
        await self.broadcast({"type": "replay", "phase": "end", "total": len(items)})
        return len(items)

    def schedule_replay(self, delay_ms: int = 450) -> None:
        delay_s = max(0.05, delay_ms / 1000.0)

        async def _run() -> None:
            await self.replay_attack_chain(delay_s=delay_s)

        if self._replay_task and not self._replay_task.done():
            self._replay_task.cancel()
        self._replay_task = asyncio.create_task(_run())