Spaces:
Running
Running
| from fastapi import WebSocket | |
| import json | |
| from typing import List | |
| # WebSocket connection manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| def disconnect(self, websocket: WebSocket): | |
| self.active_connections.remove(websocket) | |
| async def send_personal_message(self, message: str, websocket: WebSocket): | |
| await websocket.send_text(message) | |
| async def broadcast(self, message: dict): | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_text(json.dumps(message, ensure_ascii=False)) | |
| except: | |
| # Remove disconnected connections | |
| if connection in self.active_connections: | |
| self.active_connections.remove(connection) | |
| manager = ConnectionManager() |