""" WebSocket real-time notifications. """ import json import logging from typing import Dict, Set from fastapi import APIRouter, WebSocket, WebSocketDisconnect from app.core.security import verify_token logger = logging.getLogger(__name__) router = APIRouter(tags=["websocket"]) class ConnectionManager: """Manage WebSocket connections per user.""" def __init__(self): self.active_connections: Dict[str, Set[WebSocket]] = {} async def connect(self, websocket: WebSocket, user_id: str): await websocket.accept() if user_id not in self.active_connections: self.active_connections[user_id] = set() self.active_connections[user_id].add(websocket) logger.info(f"WebSocket connected: user={user_id}") def disconnect(self, websocket: WebSocket, user_id: str): if user_id in self.active_connections: self.active_connections[user_id].discard(websocket) if not self.active_connections[user_id]: del self.active_connections[user_id] logger.info(f"WebSocket disconnected: user={user_id}") async def send_to_user(self, user_id: str, message: dict): """Send a message to all connections for a user.""" if user_id in self.active_connections: dead_connections = set() for connection in self.active_connections[user_id]: try: await connection.send_json(message) except Exception: dead_connections.add(connection) # Clean up dead connections for conn in dead_connections: self.active_connections[user_id].discard(conn) async def broadcast(self, message: dict): """Broadcast to all connected users.""" for user_id in list(self.active_connections.keys()): await self.send_to_user(user_id, message) @property def connected_users(self) -> int: return len(self.active_connections) # Global connection manager manager = ConnectionManager() @router.websocket("/ws/notifications") async def websocket_notifications(websocket: WebSocket): """ WebSocket endpoint for real-time notifications. Client connects with token in query params: ws://localhost:8000/ws/notifications?token= Server sends JSON messages: {"type": "notification", "data": {...}} {"type": "job_alert", "data": {...}} {"type": "application_update", "data": {...}} """ # Authenticate via query param token = websocket.query_params.get("token") if not token: await websocket.close(code=4001, reason="Missing token") return payload = verify_token(token) if not payload: await websocket.close(code=4001, reason="Invalid token") return user_id = payload["sub"] # Connect await manager.connect(websocket, user_id) try: # Send initial message await websocket.send_json({ "type": "connected", "data": {"message": "Connected to notifications"}, }) # Keep connection alive and handle incoming messages while True: data = await websocket.receive_text() # Handle client messages (ping/pong, read receipts, etc.) try: message = json.loads(data) if message.get("type") == "ping": await websocket.send_json({"type": "pong"}) elif message.get("type") == "mark_read": # Client acknowledged a notification notification_id = message.get("notification_id") if notification_id: # TODO: Mark notification as read in DB pass except json.JSONDecodeError: pass except WebSocketDisconnect: manager.disconnect(websocket, user_id) except Exception as e: logger.error(f"WebSocket error for user {user_id}: {e}") manager.disconnect(websocket, user_id) # Helper function to send notifications from other parts of the app async def notify_user(user_id: str, notification_type: str, data: dict): """Send a real-time notification to a user.""" await manager.send_to_user(user_id, { "type": notification_type, "data": data, }) async def notify_job_alert(user_id: str, alert_name: str, jobs_count: int, jobs: list[dict]): """Send a job alert notification.""" await manager.send_to_user(user_id, { "type": "job_alert", "data": { "alert_name": alert_name, "jobs_count": jobs_count, "preview_jobs": jobs[:3], }, }) async def notify_application_update(user_id: str, application_id: str, new_status: str): """Notify user of application status change.""" await manager.send_to_user(user_id, { "type": "application_update", "data": { "application_id": application_id, "new_status": new_status, }, })