File size: 4,307 Bytes
9684770
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
============================================
RUHI-CORE - WebSocket Handler for Live Logs
============================================
"""

import json
import asyncio
from typing import Dict, Set, Optional
from datetime import datetime

from fastapi import WebSocket, WebSocketDisconnect
from loguru import logger

from core.config import settings


class ConnectionManager:
    """Manages WebSocket connections for live updates"""
    
    def __init__(self):
        # General dashboard connections
        self.dashboard_connections: Set[WebSocket] = set()
        
        # Per-service log connections: service_id -> set of websockets
        self.log_connections: Dict[str, Set[WebSocket]] = {}
        
        # Terminal connections
        self.terminal_connections: Dict[str, WebSocket] = {}
        
        logger.info("🔌 WebSocket ConnectionManager initialized")
    
    async def connect_dashboard(self, websocket: WebSocket):
        """Connect a dashboard WebSocket client"""
        await websocket.accept()
        self.dashboard_connections.add(websocket)
        logger.info(f"📡 Dashboard WebSocket connected. Total: {len(self.dashboard_connections)}")
    
    async def disconnect_dashboard(self, websocket: WebSocket):
        """Disconnect a dashboard WebSocket client"""
        self.dashboard_connections.discard(websocket)
        logger.info(f"📡 Dashboard WebSocket disconnected. Total: {len(self.dashboard_connections)}")
    
    async def connect_logs(self, websocket: WebSocket, service_id: str):
        """Connect a log viewer WebSocket client"""
        await websocket.accept()
        if service_id not in self.log_connections:
            self.log_connections[service_id] = set()
        self.log_connections[service_id].add(websocket)
        logger.info(f"📋 Log WebSocket connected for service {service_id}")
    
    async def disconnect_logs(self, websocket: WebSocket, service_id: str):
        """Disconnect a log viewer WebSocket client"""
        if service_id in self.log_connections:
            self.log_connections[service_id].discard(websocket)
            if not self.log_connections[service_id]:
                del self.log_connections[service_id]
    
    async def broadcast_dashboard(self, data: dict):
        """Broadcast data to all dashboard connections"""
        if not self.dashboard_connections:
            return
        
        message = json.dumps(data, default=str)
        dead_connections = set()
        
        for ws in self.dashboard_connections:
            try:
                await ws.send_text(message)
            except Exception:
                dead_connections.add(ws)
        
        # Cleanup dead connections
        self.dashboard_connections -= dead_connections
    
    async def broadcast_log(self, service_id: str, log_line: str):
        """Broadcast a log line to all watchers of a service"""
        connections = self.log_connections.get(service_id, set())
        if not connections:
            return
        
        message = json.dumps({
            "type": "log",
            "service_id": service_id,
            "timestamp": datetime.now().isoformat(),
            "message": log_line
        })
        
        dead_connections = set()
        for ws in connections:
            try:
                await ws.send_text(message)
            except Exception:
                dead_connections.add(ws)
        
        if dead_connections and service_id in self.log_connections:
            self.log_connections[service_id] -= dead_connections
    
    async def send_metrics(self, metrics: dict):
        """Send metrics update to dashboard"""
        await self.broadcast_dashboard({
            "type": "metrics",
            "data": metrics
        })
    
    async def send_service_update(self, service_info: dict):
        """Send service status update to dashboard"""
        await self.broadcast_dashboard({
            "type": "service_update",
            "data": service_info
        })
    
    @property
    def total_connections(self) -> int:
        log_conns = sum(len(conns) for conns in self.log_connections.values())
        return len(self.dashboard_connections) + log_conns + len(self.terminal_connections)


# Global instance
ws_manager = ConnectionManager()