Spaces:
Sleeping
Sleeping
| """ | |
| ============================================ | |
| RUHI-CORE - WebSocket Handler for Live Logs | |
| ============================================ | |
| """ | |
| import json | |
| import asyncio | |
| from typing import Dict, Set, Optional | |
| from datetime import datetime | |
| from fastapi import WebSocket, WebSocketDisconnect | |
| from loguru import logger | |
| from core.config import settings | |
| class ConnectionManager: | |
| """Manages WebSocket connections for live updates""" | |
| def __init__(self): | |
| # General dashboard connections | |
| self.dashboard_connections: Set[WebSocket] = set() | |
| # Per-service log connections: service_id -> set of websockets | |
| self.log_connections: Dict[str, Set[WebSocket]] = {} | |
| # Terminal connections | |
| self.terminal_connections: Dict[str, WebSocket] = {} | |
| logger.info("π WebSocket ConnectionManager initialized") | |
| async def connect_dashboard(self, websocket: WebSocket): | |
| """Connect a dashboard WebSocket client""" | |
| await websocket.accept() | |
| self.dashboard_connections.add(websocket) | |
| logger.info(f"π‘ Dashboard WebSocket connected. Total: {len(self.dashboard_connections)}") | |
| async def disconnect_dashboard(self, websocket: WebSocket): | |
| """Disconnect a dashboard WebSocket client""" | |
| self.dashboard_connections.discard(websocket) | |
| logger.info(f"π‘ Dashboard WebSocket disconnected. Total: {len(self.dashboard_connections)}") | |
| async def connect_logs(self, websocket: WebSocket, service_id: str): | |
| """Connect a log viewer WebSocket client""" | |
| await websocket.accept() | |
| if service_id not in self.log_connections: | |
| self.log_connections[service_id] = set() | |
| self.log_connections[service_id].add(websocket) | |
| logger.info(f"π Log WebSocket connected for service {service_id}") | |
| async def disconnect_logs(self, websocket: WebSocket, service_id: str): | |
| """Disconnect a log viewer WebSocket client""" | |
| if service_id in self.log_connections: | |
| self.log_connections[service_id].discard(websocket) | |
| if not self.log_connections[service_id]: | |
| del self.log_connections[service_id] | |
| async def broadcast_dashboard(self, data: dict): | |
| """Broadcast data to all dashboard connections""" | |
| if not self.dashboard_connections: | |
| return | |
| message = json.dumps(data, default=str) | |
| dead_connections = set() | |
| for ws in self.dashboard_connections: | |
| try: | |
| await ws.send_text(message) | |
| except Exception: | |
| dead_connections.add(ws) | |
| # Cleanup dead connections | |
| self.dashboard_connections -= dead_connections | |
| async def broadcast_log(self, service_id: str, log_line: str): | |
| """Broadcast a log line to all watchers of a service""" | |
| connections = self.log_connections.get(service_id, set()) | |
| if not connections: | |
| return | |
| message = json.dumps({ | |
| "type": "log", | |
| "service_id": service_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "message": log_line | |
| }) | |
| dead_connections = set() | |
| for ws in connections: | |
| try: | |
| await ws.send_text(message) | |
| except Exception: | |
| dead_connections.add(ws) | |
| if dead_connections and service_id in self.log_connections: | |
| self.log_connections[service_id] -= dead_connections | |
| async def send_metrics(self, metrics: dict): | |
| """Send metrics update to dashboard""" | |
| await self.broadcast_dashboard({ | |
| "type": "metrics", | |
| "data": metrics | |
| }) | |
| async def send_service_update(self, service_info: dict): | |
| """Send service status update to dashboard""" | |
| await self.broadcast_dashboard({ | |
| "type": "service_update", | |
| "data": service_info | |
| }) | |
| def total_connections(self) -> int: | |
| log_conns = sum(len(conns) for conns in self.log_connections.values()) | |
| return len(self.dashboard_connections) + log_conns + len(self.terminal_connections) | |
| # Global instance | |
| ws_manager = ConnectionManager() | |