import asyncio import json import logging from datetime import datetime from typing import Optional, Dict, Any, AsyncGenerator logger = logging.getLogger(__name__) class LiveTelemetry: """ In-memory broadcaster for Server-Sent Events (SSE). Dystrybuuje logi telemetryczne do podłączonych klientów admina. """ def __init__(self, max_history: int = 1000): self.queues = set() self.history = [] self.max_history = max_history def log( self, level: str, agent: str, message: str, metadata: Optional[Dict[str, Any]] = None, trace_id: Optional[str] = None, ): """ Główne wejście dla telemetrii na żywo. """ event = { "timestamp": datetime.utcnow().isoformat() + "Z", "level": level.upper(), "agent": agent, "message": message, "metadata": metadata or {}, "trace_id": trace_id, } # Zapis do in-memory historii self.history.append(event) if len(self.history) > self.max_history: self.history.pop(0) # Powiadomienie wszystkich subskrybentów (nieblokujące) event_str = json.dumps(event) for q in list(self.queues): try: q.put_nowait(event_str) except asyncio.QueueFull: logger.warning("Telemetry queue is full, dropping event for a client.") async def subscribe(self) -> AsyncGenerator[str, None]: """ Generator SSE wysyłający zdarzenia do klienta. """ q = asyncio.Queue(maxsize=100) self.queues.add(q) try: # Wypchnięcie najnowszej historii przy starcie (max 50 dla płynności) recent_history = self.history[-50:] for event in recent_history: yield f"event: telemetry_log\ndata: {json.dumps(event)}\n\n" while True: # Oczekiwanie na nowe zdarzenia event_str = await q.get() yield f"event: telemetry_log\ndata: {event_str}\n\n" except asyncio.CancelledError: pass finally: self.queues.discard(q) # Globalny singleton telemetryczny telemetry = LiveTelemetry()