Spaces:
Running
Running
| import asyncio | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, AsyncGenerator | |
| logger = logging.getLogger(__name__) | |
| class LiveTelemetry: | |
| """ | |
| In-memory broadcaster for Server-Sent Events (SSE). | |
| Dystrybuuje logi telemetryczne do pod艂膮czonych klient贸w admina. | |
| """ | |
| def __init__(self, max_history: int = 1000): | |
| self.queues = set() | |
| self.history = [] | |
| self.max_history = max_history | |
| def log( | |
| self, | |
| level: str, | |
| agent: str, | |
| message: str, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| trace_id: Optional[str] = None, | |
| ): | |
| """ | |
| G艂贸wne wej艣cie dla telemetrii na 偶ywo. | |
| """ | |
| event = { | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "level": level.upper(), | |
| "agent": agent, | |
| "message": message, | |
| "metadata": metadata or {}, | |
| "trace_id": trace_id, | |
| } | |
| # Zapis do in-memory historii | |
| self.history.append(event) | |
| if len(self.history) > self.max_history: | |
| self.history.pop(0) | |
| # Powiadomienie wszystkich subskrybent贸w (nieblokuj膮ce) | |
| event_str = json.dumps(event) | |
| for q in list(self.queues): | |
| try: | |
| q.put_nowait(event_str) | |
| except asyncio.QueueFull: | |
| logger.warning("Telemetry queue is full, dropping event for a client.") | |
| async def subscribe(self) -> AsyncGenerator[str, None]: | |
| """ | |
| Generator SSE wysy艂aj膮cy zdarzenia do klienta. | |
| """ | |
| q = asyncio.Queue(maxsize=100) | |
| self.queues.add(q) | |
| try: | |
| # Wypchni臋cie najnowszej historii przy starcie (max 50 dla p艂ynno艣ci) | |
| recent_history = self.history[-50:] | |
| for event in recent_history: | |
| yield f"event: telemetry_log\ndata: {json.dumps(event)}\n\n" | |
| while True: | |
| # Oczekiwanie na nowe zdarzenia | |
| event_str = await q.get() | |
| yield f"event: telemetry_log\ndata: {event_str}\n\n" | |
| except asyncio.CancelledError: | |
| pass | |
| finally: | |
| self.queues.discard(q) | |
| # Globalny singleton telemetryczny | |
| telemetry = LiveTelemetry() | |