File size: 2,305 Bytes
afd56bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
import json
import logging
from datetime import datetime
from typing import Optional, Dict, Any, AsyncGenerator

logger = logging.getLogger(__name__)


class LiveTelemetry:
    """
    In-memory broadcaster for Server-Sent Events (SSE).
    Dystrybuuje logi telemetryczne do pod艂膮czonych klient贸w admina.
    """

    def __init__(self, max_history: int = 1000):
        self.queues = set()
        self.history = []
        self.max_history = max_history

    def log(
        self,
        level: str,
        agent: str,
        message: str,
        metadata: Optional[Dict[str, Any]] = None,
        trace_id: Optional[str] = None,
    ):
        """
        G艂贸wne wej艣cie dla telemetrii na 偶ywo.
        """
        event = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": level.upper(),
            "agent": agent,
            "message": message,
            "metadata": metadata or {},
            "trace_id": trace_id,
        }

        # Zapis do in-memory historii
        self.history.append(event)
        if len(self.history) > self.max_history:
            self.history.pop(0)

        # Powiadomienie wszystkich subskrybent贸w (nieblokuj膮ce)
        event_str = json.dumps(event)
        for q in list(self.queues):
            try:
                q.put_nowait(event_str)
            except asyncio.QueueFull:
                logger.warning("Telemetry queue is full, dropping event for a client.")

    async def subscribe(self) -> AsyncGenerator[str, None]:
        """
        Generator SSE wysy艂aj膮cy zdarzenia do klienta.
        """
        q = asyncio.Queue(maxsize=100)
        self.queues.add(q)
        try:
            # Wypchni臋cie najnowszej historii przy starcie (max 50 dla p艂ynno艣ci)
            recent_history = self.history[-50:]
            for event in recent_history:
                yield f"event: telemetry_log\ndata: {json.dumps(event)}\n\n"

            while True:
                # Oczekiwanie na nowe zdarzenia
                event_str = await q.get()
                yield f"event: telemetry_log\ndata: {event_str}\n\n"
        except asyncio.CancelledError:
            pass
        finally:
            self.queues.discard(q)


# Globalny singleton telemetryczny
telemetry = LiveTelemetry()