Spaces:
Sleeping
Sleeping
File size: 8,322 Bytes
24dc421 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | """
WebSocket server implementation for handling real-time connections.
"""
import uuid
import json
from typing import Optional
from fastapi import WebSocket, WebSocketDisconnect, status
from app.config import get_logger
from app.server.connection_manager import get_connection_manager
from app.server.heartbeat import get_heartbeat_manager
from app.messaging.message_router import get_message_router
from app.messaging.protocol import MessageType, create_error_message
logger = get_logger(__name__)
class WebSocketServer:
"""WebSocket server for handling client connections."""
def __init__(self):
"""Initialize WebSocket server."""
self.connection_manager = get_connection_manager()
self.heartbeat_manager = get_heartbeat_manager()
self.message_router = get_message_router()
logger.info("websocket_server_initialized")
async def handle_connection(self, websocket: WebSocket) -> None:
"""Handle a new WebSocket connection.
Args:
websocket: WebSocket instance
"""
connection_id = str(uuid.uuid4())
try:
# Accept connection
await self.connection_manager.connect(websocket, connection_id)
# Start heartbeat monitoring
self.heartbeat_manager.start_heartbeat(
connection_id,
websocket,
on_timeout_callback=self._handle_timeout
)
logger.info(
"websocket_connection_established",
connection_id=connection_id
)
# Main message loop
await self._message_loop(websocket, connection_id)
except WebSocketDisconnect as e:
logger.info(
"websocket_disconnect",
connection_id=connection_id,
code=e.code
)
except Exception as e:
logger.error(
"websocket_error",
connection_id=connection_id,
error=str(e),
exc_info=True
)
finally:
# Clean up
await self._cleanup_connection(connection_id)
async def _message_loop(self, websocket: WebSocket, connection_id: str) -> None:
"""Main loop for receiving and processing messages.
Args:
websocket: WebSocket instance
connection_id: Connection identifier
"""
while True:
# Receive message (text or binary)
message = await websocket.receive()
# Handle different message types
if "text" in message:
await self._handle_text_message(
connection_id,
message["text"]
)
elif "bytes" in message:
await self._handle_binary_message(
connection_id,
message["bytes"]
)
elif message.get("type") == "websocket.disconnect":
logger.info("websocket_client_disconnect", connection_id=connection_id)
break
async def _handle_text_message(self, connection_id: str, text: str) -> None:
"""Handle incoming text message.
Args:
connection_id: Connection identifier
text: Message text (JSON)
"""
try:
# Parse JSON message
message = json.loads(text)
message_type = message.get("type")
logger.debug(
"text_message_received",
connection_id=connection_id,
message_type=message_type
)
# Handle pong messages (heartbeat response)
if message_type == MessageType.PONG:
self.heartbeat_manager.record_pong(connection_id)
return
# Route message to appropriate handler
await self.message_router.route_message(
connection_id,
message
)
except json.JSONDecodeError as e:
logger.error(
"invalid_json_message",
connection_id=connection_id,
error=str(e)
)
await self._send_error(
connection_id,
"INVALID_MESSAGE",
"Invalid JSON format"
)
except Exception as e:
logger.error(
"text_message_error",
connection_id=connection_id,
error=str(e),
exc_info=True
)
await self._send_error(
connection_id,
"INTERNAL_ERROR",
"Failed to process message"
)
async def _handle_binary_message(self, connection_id: str, data: bytes) -> None:
"""Handle incoming binary message (audio data).
Args:
connection_id: Connection identifier
data: Binary data
"""
try:
logger.debug(
"binary_message_received",
connection_id=connection_id,
size=len(data)
)
# Route binary message to appropriate handler
await self.message_router.route_binary(
connection_id,
data
)
except Exception as e:
logger.error(
"binary_message_error",
connection_id=connection_id,
error=str(e),
exc_info=True
)
async def _handle_timeout(self, connection_id: str) -> None:
"""Handle connection timeout.
Args:
connection_id: Connection identifier
"""
logger.warning("connection_timeout", connection_id=connection_id)
# Get websocket and close it
user_id = self.connection_manager.disconnect(connection_id)
if user_id:
# Notify room members about disconnection
await self.message_router.handle_user_disconnect(user_id)
async def _send_error(
self,
connection_id: str,
error_code: str,
message: str
) -> None:
"""Send error message to client.
Args:
connection_id: Connection identifier
error_code: Error code
message: Error message
"""
error_msg = create_error_message(error_code, message)
# Find user for this connection
for user_id, conn_id in self.connection_manager.user_connections.items():
if conn_id == connection_id:
await self.connection_manager.send_to_user(user_id, error_msg)
break
async def _cleanup_connection(self, connection_id: str) -> None:
"""Clean up connection resources.
Args:
connection_id: Connection identifier
"""
logger.info("cleaning_up_connection", connection_id=connection_id)
# Stop heartbeat
self.heartbeat_manager.stop_heartbeat(connection_id)
# Disconnect from connection manager
user_id = self.connection_manager.disconnect(connection_id)
# Handle user disconnect in message router
if user_id:
await self.message_router.handle_user_disconnect(user_id)
logger.info(
"connection_cleanup_complete",
connection_id=connection_id,
user_id=user_id
)
# Global WebSocket server instance
websocket_server = WebSocketServer()
def get_websocket_server() -> WebSocketServer:
"""Get the global WebSocket server instance.
Returns:
WebSocketServer instance
"""
return websocket_server
|