Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from fastapi import WebSocket, WebSocketDisconnect, APIRouter | |
| from pydantic import BaseModel | |
| import json | |
| import time | |
| # Cấu hình logging | |
| logger = logging.getLogger(__name__) | |
| # Models cho Swagger documentation | |
| class ConnectionStatus(BaseModel): | |
| user_id: str | |
| active: bool | |
| connection_count: int | |
| last_activity: Optional[float] = None | |
| class UserConnection(BaseModel): | |
| user_id: str | |
| connection_count: int | |
| class AllConnectionsStatus(BaseModel): | |
| total_users: int | |
| total_connections: int | |
| users: List[UserConnection] | |
| # Khởi tạo router | |
| router = APIRouter( | |
| prefix="/ws", | |
| tags=["WebSockets"], | |
| ) | |
| class ConnectionManager: | |
| """Quản lý các kết nối WebSocket""" | |
| def __init__(self): | |
| # Lưu trữ các kết nối theo user_id | |
| self.active_connections: Dict[str, List[WebSocket]] = {} | |
| async def connect(self, websocket: WebSocket, user_id: str): | |
| """Kết nối một WebSocket mới""" | |
| await websocket.accept() | |
| if user_id not in self.active_connections: | |
| self.active_connections[user_id] = [] | |
| self.active_connections[user_id].append(websocket) | |
| logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}") | |
| def disconnect(self, websocket: WebSocket, user_id: str): | |
| """Ngắt kết nối WebSocket""" | |
| if user_id in self.active_connections: | |
| if websocket in self.active_connections[user_id]: | |
| self.active_connections[user_id].remove(websocket) | |
| # Xóa user_id khỏi dict nếu không còn kết nối nào | |
| if not self.active_connections[user_id]: | |
| del self.active_connections[user_id] | |
| logger.info(f"WebSocket disconnected for user {user_id}") | |
| async def send_message(self, message: Dict[str, Any], user_id: str): | |
| """Gửi tin nhắn tới tất cả kết nối của một user""" | |
| if user_id in self.active_connections: | |
| disconnected_websockets = [] | |
| for websocket in self.active_connections[user_id]: | |
| try: | |
| await websocket.send_text(json.dumps(message)) | |
| except Exception as e: | |
| logger.error(f"Error sending message to WebSocket: {str(e)}") | |
| disconnected_websockets.append(websocket) | |
| # Xóa các kết nối bị ngắt | |
| for websocket in disconnected_websockets: | |
| self.disconnect(websocket, user_id) | |
| def get_connection_status(self, user_id: str = None) -> Dict[str, Any]: | |
| """Lấy thông tin về trạng thái kết nối WebSocket""" | |
| if user_id: | |
| # Trả về thông tin kết nối cho user cụ thể | |
| if user_id in self.active_connections: | |
| return { | |
| "user_id": user_id, | |
| "active": True, | |
| "connection_count": len(self.active_connections[user_id]), | |
| "last_activity": time.time() | |
| } | |
| else: | |
| return { | |
| "user_id": user_id, | |
| "active": False, | |
| "connection_count": 0, | |
| "last_activity": None | |
| } | |
| else: | |
| # Trả về thông tin tất cả kết nối | |
| result = { | |
| "total_users": len(self.active_connections), | |
| "total_connections": sum(len(connections) for connections in self.active_connections.values()), | |
| "users": [] | |
| } | |
| for uid, connections in self.active_connections.items(): | |
| result["users"].append({ | |
| "user_id": uid, | |
| "connection_count": len(connections) | |
| }) | |
| return result | |
| # Tạo instance của ConnectionManager | |
| manager = ConnectionManager() | |
| # Test route for manual WebSocket sending | |
| async def test_websocket_send(user_id: str): | |
| """ | |
| Test route to manually send a WebSocket message to a user | |
| This is useful for debugging WebSocket connections | |
| """ | |
| logger.info(f"Attempting to send test message to user: {user_id}") | |
| # Check if user has a connection | |
| status = manager.get_connection_status(user_id) | |
| if not status["active"]: | |
| logger.warning(f"No active WebSocket connection for user: {user_id}") | |
| return {"success": False, "message": f"No active WebSocket connection for user: {user_id}"} | |
| # Send test message | |
| await manager.send_message({ | |
| "type": "test_message", | |
| "message": "This is a test WebSocket message", | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| logger.info(f"Test message sent to user: {user_id}") | |
| return {"success": True, "message": f"Test message sent to user: {user_id}"} | |
| async def websocket_endpoint(websocket: WebSocket, user_id: str): | |
| """Endpoint WebSocket để cập nhật tiến trình xử lý PDF""" | |
| logger.info(f"WebSocket connection request received for user: {user_id}") | |
| try: | |
| await manager.connect(websocket, user_id) | |
| logger.info(f"WebSocket connection accepted for user: {user_id}") | |
| # Send a test message to confirm connection | |
| await manager.send_message({ | |
| "type": "connection_established", | |
| "message": "WebSocket connection established successfully", | |
| "user_id": user_id, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| try: | |
| while True: | |
| # Đợi tin nhắn từ client (chỉ để giữ kết nối) | |
| data = await websocket.receive_text() | |
| logger.debug(f"Received from client: {data}") | |
| # Echo back to confirm receipt | |
| if data != "heartbeat": # Don't echo heartbeats | |
| await manager.send_message({ | |
| "type": "echo", | |
| "message": f"Received: {data}", | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| except WebSocketDisconnect: | |
| logger.info(f"WebSocket disconnected for user: {user_id}") | |
| manager.disconnect(websocket, user_id) | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {str(e)}") | |
| manager.disconnect(websocket, user_id) | |
| except Exception as e: | |
| logger.error(f"Failed to establish WebSocket connection: {str(e)}") | |
| # Ensure the connection is closed properly | |
| if websocket.client_state != 4: # 4 = CLOSED | |
| await websocket.close(code=1011, reason=f"Server error: {str(e)}") | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from fastapi import WebSocket, WebSocketDisconnect, APIRouter | |
| from pydantic import BaseModel | |
| import json | |
| import time | |
| # Cấu hình logging | |
| logger = logging.getLogger(__name__) | |
| # Models cho Swagger documentation | |
| class ConnectionStatus(BaseModel): | |
| user_id: str | |
| active: bool | |
| connection_count: int | |
| last_activity: Optional[float] = None | |
| class UserConnection(BaseModel): | |
| user_id: str | |
| connection_count: int | |
| class AllConnectionsStatus(BaseModel): | |
| total_users: int | |
| total_connections: int | |
| users: List[UserConnection] | |
| # Khởi tạo router | |
| router = APIRouter( | |
| prefix="", | |
| tags=["WebSockets"], | |
| ) | |
| class ConnectionManager: | |
| """Quản lý các kết nối WebSocket""" | |
| def __init__(self): | |
| # Lưu trữ các kết nối theo user_id | |
| self.active_connections: Dict[str, List[WebSocket]] = {} | |
| async def connect(self, websocket: WebSocket, user_id: str): | |
| """Kết nối một WebSocket mới""" | |
| await websocket.accept() | |
| if user_id not in self.active_connections: | |
| self.active_connections[user_id] = [] | |
| self.active_connections[user_id].append(websocket) | |
| logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}") | |
| def disconnect(self, websocket: WebSocket, user_id: str): | |
| """Ngắt kết nối WebSocket""" | |
| if user_id in self.active_connections: | |
| if websocket in self.active_connections[user_id]: | |
| self.active_connections[user_id].remove(websocket) | |
| # Xóa user_id khỏi dict nếu không còn kết nối nào | |
| if not self.active_connections[user_id]: | |
| del self.active_connections[user_id] | |
| logger.info(f"WebSocket disconnected for user {user_id}") | |
| async def send_message(self, message: Dict[str, Any], user_id: str): | |
| """Gửi tin nhắn tới tất cả kết nối của một user""" | |
| if user_id in self.active_connections: | |
| disconnected_websockets = [] | |
| for websocket in self.active_connections[user_id]: | |
| try: | |
| await websocket.send_text(json.dumps(message)) | |
| except Exception as e: | |
| logger.error(f"Error sending message to WebSocket: {str(e)}") | |
| disconnected_websockets.append(websocket) | |
| # Xóa các kết nối bị ngắt | |
| for websocket in disconnected_websockets: | |
| self.disconnect(websocket, user_id) | |
| def get_connection_status(self, user_id: str = None) -> Dict[str, Any]: | |
| """Lấy thông tin về trạng thái kết nối WebSocket""" | |
| if user_id: | |
| # Trả về thông tin kết nối cho user cụ thể | |
| if user_id in self.active_connections: | |
| return { | |
| "user_id": user_id, | |
| "active": True, | |
| "connection_count": len(self.active_connections[user_id]), | |
| "last_activity": time.time() | |
| } | |
| else: | |
| return { | |
| "user_id": user_id, | |
| "active": False, | |
| "connection_count": 0, | |
| "last_activity": None | |
| } | |
| else: | |
| # Trả về thông tin tất cả kết nối | |
| result = { | |
| "total_users": len(self.active_connections), | |
| "total_connections": sum(len(connections) for connections in self.active_connections.values()), | |
| "users": [] | |
| } | |
| for uid, connections in self.active_connections.items(): | |
| result["users"].append({ | |
| "user_id": uid, | |
| "connection_count": len(connections) | |
| }) | |
| return result | |
| # Tạo instance của ConnectionManager | |
| manager = ConnectionManager() | |
| async def websocket_endpoint(websocket: WebSocket, user_id: str): | |
| """Endpoint WebSocket để cập nhật tiến trình xử lý PDF""" | |
| await manager.connect(websocket, user_id) | |
| try: | |
| while True: | |
| # Đợi tin nhắn từ client (chỉ để giữ kết nối) | |
| await websocket.receive_text() | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket, user_id) | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {str(e)}") | |
| manager.disconnect(websocket, user_id) | |
| # API endpoints để kiểm tra trạng thái WebSocket | |
| async def get_all_websocket_connections(): | |
| """ | |
| Lấy thông tin về tất cả kết nối WebSocket hiện tại. | |
| Endpoint này trả về: | |
| - Tổng số người dùng đang kết nối | |
| - Tổng số kết nối WebSocket | |
| - Danh sách người dùng kèm theo số lượng kết nối của mỗi người | |
| """ | |
| return manager.get_connection_status() | |
| async def get_user_websocket_status(user_id: str): | |
| """ | |
| Lấy thông tin về kết nối WebSocket của một người dùng cụ thể. | |
| Parameters: | |
| - **user_id**: ID của người dùng cần kiểm tra | |
| Returns: | |
| - Thông tin về trạng thái kết nối, bao gồm: | |
| - active: Có đang kết nối hay không | |
| - connection_count: Số lượng kết nối hiện tại | |
| - last_activity: Thời gian hoạt động gần nhất | |
| """ | |
| return manager.get_connection_status(user_id) | |
| # Các hàm gửi thông báo cập nhật trạng thái | |
| async def send_pdf_upload_started(user_id: str, filename: str, document_id: str): | |
| """Gửi thông báo bắt đầu upload PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_upload_started", | |
| "document_id": document_id, | |
| "filename": filename, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_upload_progress(user_id: str, document_id: str, step: str, progress: float, message: str): | |
| """Gửi thông báo tiến độ upload PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_upload_progress", | |
| "document_id": document_id, | |
| "step": step, | |
| "progress": progress, | |
| "message": message, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_upload_completed(user_id: str, document_id: str, filename: str, chunks: int): | |
| """Gửi thông báo hoàn thành upload PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_upload_completed", | |
| "document_id": document_id, | |
| "filename": filename, | |
| "chunks": chunks, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_upload_failed(user_id: str, document_id: str, filename: str, error: str): | |
| """Gửi thông báo lỗi upload PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_upload_failed", | |
| "document_id": document_id, | |
| "filename": filename, | |
| "error": error, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_delete_started(user_id: str, namespace: str): | |
| """Gửi thông báo bắt đầu xóa PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_delete_started", | |
| "namespace": namespace, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_delete_completed(user_id: str, namespace: str, deleted_count: int = 0): | |
| """Gửi thông báo hoàn thành xóa PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_delete_completed", | |
| "namespace": namespace, | |
| "deleted_count": deleted_count, | |
| "timestamp": int(time.time()) | |
| }, user_id) | |
| async def send_pdf_delete_failed(user_id: str, namespace: str, error: str): | |
| """Gửi thông báo lỗi xóa PDF""" | |
| await manager.send_message({ | |
| "type": "pdf_delete_failed", | |
| "namespace": namespace, | |
| "error": error, | |
| "timestamp": int(time.time()) | |
| }, user_id) |