""" ============================================ RUHI-CORE - WebSocket Handler for Live Logs ============================================ """ import json import asyncio from typing import Dict, Set, Optional from datetime import datetime from fastapi import WebSocket, WebSocketDisconnect from loguru import logger from core.config import settings class ConnectionManager: """Manages WebSocket connections for live updates""" def __init__(self): # General dashboard connections self.dashboard_connections: Set[WebSocket] = set() # Per-service log connections: service_id -> set of websockets self.log_connections: Dict[str, Set[WebSocket]] = {} # Terminal connections self.terminal_connections: Dict[str, WebSocket] = {} logger.info("🔌 WebSocket ConnectionManager initialized") async def connect_dashboard(self, websocket: WebSocket): """Connect a dashboard WebSocket client""" await websocket.accept() self.dashboard_connections.add(websocket) logger.info(f"📡 Dashboard WebSocket connected. Total: {len(self.dashboard_connections)}") async def disconnect_dashboard(self, websocket: WebSocket): """Disconnect a dashboard WebSocket client""" self.dashboard_connections.discard(websocket) logger.info(f"📡 Dashboard WebSocket disconnected. Total: {len(self.dashboard_connections)}") async def connect_logs(self, websocket: WebSocket, service_id: str): """Connect a log viewer WebSocket client""" await websocket.accept() if service_id not in self.log_connections: self.log_connections[service_id] = set() self.log_connections[service_id].add(websocket) logger.info(f"📋 Log WebSocket connected for service {service_id}") async def disconnect_logs(self, websocket: WebSocket, service_id: str): """Disconnect a log viewer WebSocket client""" if service_id in self.log_connections: self.log_connections[service_id].discard(websocket) if not self.log_connections[service_id]: del self.log_connections[service_id] async def broadcast_dashboard(self, data: dict): """Broadcast data to all dashboard connections""" if not self.dashboard_connections: return message = json.dumps(data, default=str) dead_connections = set() for ws in self.dashboard_connections: try: await ws.send_text(message) except Exception: dead_connections.add(ws) # Cleanup dead connections self.dashboard_connections -= dead_connections async def broadcast_log(self, service_id: str, log_line: str): """Broadcast a log line to all watchers of a service""" connections = self.log_connections.get(service_id, set()) if not connections: return message = json.dumps({ "type": "log", "service_id": service_id, "timestamp": datetime.now().isoformat(), "message": log_line }) dead_connections = set() for ws in connections: try: await ws.send_text(message) except Exception: dead_connections.add(ws) if dead_connections and service_id in self.log_connections: self.log_connections[service_id] -= dead_connections async def send_metrics(self, metrics: dict): """Send metrics update to dashboard""" await self.broadcast_dashboard({ "type": "metrics", "data": metrics }) async def send_service_update(self, service_info: dict): """Send service status update to dashboard""" await self.broadcast_dashboard({ "type": "service_update", "data": service_info }) @property def total_connections(self) -> int: log_conns = sum(len(conns) for conns in self.log_connections.values()) return len(self.dashboard_connections) + log_conns + len(self.terminal_connections) # Global instance ws_manager = ConnectionManager()