File size: 8,322 Bytes
24dc421
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""

WebSocket server implementation for handling real-time connections.

"""
import uuid
import json
from typing import Optional
from fastapi import WebSocket, WebSocketDisconnect, status
from app.config import get_logger
from app.server.connection_manager import get_connection_manager
from app.server.heartbeat import get_heartbeat_manager
from app.messaging.message_router import get_message_router
from app.messaging.protocol import MessageType, create_error_message

logger = get_logger(__name__)


class WebSocketServer:
    """WebSocket server for handling client connections."""
    
    def __init__(self):
        """Initialize WebSocket server."""
        self.connection_manager = get_connection_manager()
        self.heartbeat_manager = get_heartbeat_manager()
        self.message_router = get_message_router()
        
        logger.info("websocket_server_initialized")
    
    async def handle_connection(self, websocket: WebSocket) -> None:
        """Handle a new WebSocket connection.

        

        Args:

            websocket: WebSocket instance

        """
        connection_id = str(uuid.uuid4())
        
        try:
            # Accept connection
            await self.connection_manager.connect(websocket, connection_id)
            
            # Start heartbeat monitoring
            self.heartbeat_manager.start_heartbeat(
                connection_id,
                websocket,
                on_timeout_callback=self._handle_timeout
            )
            
            logger.info(
                "websocket_connection_established",
                connection_id=connection_id
            )
            
            # Main message loop
            await self._message_loop(websocket, connection_id)
        
        except WebSocketDisconnect as e:
            logger.info(
                "websocket_disconnect",
                connection_id=connection_id,
                code=e.code
            )
        
        except Exception as e:
            logger.error(
                "websocket_error",
                connection_id=connection_id,
                error=str(e),
                exc_info=True
            )
        
        finally:
            # Clean up
            await self._cleanup_connection(connection_id)
    
    async def _message_loop(self, websocket: WebSocket, connection_id: str) -> None:
        """Main loop for receiving and processing messages.

        

        Args:

            websocket: WebSocket instance

            connection_id: Connection identifier

        """
        while True:
            # Receive message (text or binary)
            message = await websocket.receive()
            
            # Handle different message types
            if "text" in message:
                await self._handle_text_message(
                    connection_id,
                    message["text"]
                )
            
            elif "bytes" in message:
                await self._handle_binary_message(
                    connection_id,
                    message["bytes"]
                )
            
            elif message.get("type") == "websocket.disconnect":
                logger.info("websocket_client_disconnect", connection_id=connection_id)
                break
    
    async def _handle_text_message(self, connection_id: str, text: str) -> None:
        """Handle incoming text message.

        

        Args:

            connection_id: Connection identifier

            text: Message text (JSON)

        """
        try:
            # Parse JSON message
            message = json.loads(text)
            message_type = message.get("type")
            
            logger.debug(
                "text_message_received",
                connection_id=connection_id,
                message_type=message_type
            )
            
            # Handle pong messages (heartbeat response)
            if message_type == MessageType.PONG:
                self.heartbeat_manager.record_pong(connection_id)
                return
            
            # Route message to appropriate handler
            await self.message_router.route_message(
                connection_id,
                message
            )
        
        except json.JSONDecodeError as e:
            logger.error(
                "invalid_json_message",
                connection_id=connection_id,
                error=str(e)
            )
            await self._send_error(
                connection_id,
                "INVALID_MESSAGE",
                "Invalid JSON format"
            )
        
        except Exception as e:
            logger.error(
                "text_message_error",
                connection_id=connection_id,
                error=str(e),
                exc_info=True
            )
            await self._send_error(
                connection_id,
                "INTERNAL_ERROR",
                "Failed to process message"
            )
    
    async def _handle_binary_message(self, connection_id: str, data: bytes) -> None:
        """Handle incoming binary message (audio data).

        

        Args:

            connection_id: Connection identifier

            data: Binary data

        """
        try:
            logger.debug(
                "binary_message_received",
                connection_id=connection_id,
                size=len(data)
            )
            
            # Route binary message to appropriate handler
            await self.message_router.route_binary(
                connection_id,
                data
            )
        
        except Exception as e:
            logger.error(
                "binary_message_error",
                connection_id=connection_id,
                error=str(e),
                exc_info=True
            )
    
    async def _handle_timeout(self, connection_id: str) -> None:
        """Handle connection timeout.

        

        Args:

            connection_id: Connection identifier

        """
        logger.warning("connection_timeout", connection_id=connection_id)
        
        # Get websocket and close it
        user_id = self.connection_manager.disconnect(connection_id)
        
        if user_id:
            # Notify room members about disconnection
            await self.message_router.handle_user_disconnect(user_id)
    
    async def _send_error(

        self,

        connection_id: str,

        error_code: str,

        message: str

    ) -> None:
        """Send error message to client.

        

        Args:

            connection_id: Connection identifier

            error_code: Error code

            message: Error message

        """
        error_msg = create_error_message(error_code, message)
        
        # Find user for this connection
        for user_id, conn_id in self.connection_manager.user_connections.items():
            if conn_id == connection_id:
                await self.connection_manager.send_to_user(user_id, error_msg)
                break
    
    async def _cleanup_connection(self, connection_id: str) -> None:
        """Clean up connection resources.

        

        Args:

            connection_id: Connection identifier

        """
        logger.info("cleaning_up_connection", connection_id=connection_id)
        
        # Stop heartbeat
        self.heartbeat_manager.stop_heartbeat(connection_id)
        
        # Disconnect from connection manager
        user_id = self.connection_manager.disconnect(connection_id)
        
        # Handle user disconnect in message router
        if user_id:
            await self.message_router.handle_user_disconnect(user_id)
        
        logger.info(
            "connection_cleanup_complete",
            connection_id=connection_id,
            user_id=user_id
        )


# Global WebSocket server instance
websocket_server = WebSocketServer()


def get_websocket_server() -> WebSocketServer:
    """Get the global WebSocket server instance.

    

    Returns:

        WebSocketServer instance

    """
    return websocket_server