Spaces:
Sleeping
Sleeping
| from fastapi import WebSocket | |
| from typing import List | |
| from src.utils import logger | |
| class ConnectionService: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| async def connect(self, websocket: WebSocket): | |
| try: | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| logger.info("WebSocket connection established.") | |
| except Exception as e: | |
| logger.error(f"Failed to establish WebSocket connection: {e}") | |
| def disconnect(self, websocket: WebSocket): | |
| try: | |
| self.active_connections.remove(websocket) | |
| logger.info("WebSocket connection closed.") | |
| except Exception as e: | |
| logger.error(f"Failed to close WebSocket connection: {e}") | |
| async def send_personal_message(self, message: str, websocket: WebSocket): | |
| try: | |
| await websocket.send_text(message) | |
| except Exception as e: | |
| logger.error(f"Failed to send message: {e}") | |
| async def broadcast(self, message: str): | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_text(message) | |
| except Exception as e: | |
| logger.error(f"Failed to broadcast message: {e}") | |
| self.disconnect(connection) |