| """ |
| WebSocket real-time notifications. |
| """ |
| import json |
| import logging |
| from typing import Dict, Set |
|
|
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| from app.core.security import verify_token |
|
|
| logger = logging.getLogger(__name__) |
|
|
| router = APIRouter(tags=["websocket"]) |
|
|
|
|
| class ConnectionManager: |
| """Manage WebSocket connections per user.""" |
|
|
| def __init__(self): |
| self.active_connections: Dict[str, Set[WebSocket]] = {} |
|
|
| async def connect(self, websocket: WebSocket, user_id: str): |
| await websocket.accept() |
| if user_id not in self.active_connections: |
| self.active_connections[user_id] = set() |
| self.active_connections[user_id].add(websocket) |
| logger.info(f"WebSocket connected: user={user_id}") |
|
|
| def disconnect(self, websocket: WebSocket, user_id: str): |
| if user_id in self.active_connections: |
| self.active_connections[user_id].discard(websocket) |
| if not self.active_connections[user_id]: |
| del self.active_connections[user_id] |
| logger.info(f"WebSocket disconnected: user={user_id}") |
|
|
| async def send_to_user(self, user_id: str, message: dict): |
| """Send a message to all connections for a user.""" |
| if user_id in self.active_connections: |
| dead_connections = set() |
| for connection in self.active_connections[user_id]: |
| try: |
| await connection.send_json(message) |
| except Exception: |
| dead_connections.add(connection) |
|
|
| |
| for conn in dead_connections: |
| self.active_connections[user_id].discard(conn) |
|
|
| async def broadcast(self, message: dict): |
| """Broadcast to all connected users.""" |
| for user_id in list(self.active_connections.keys()): |
| await self.send_to_user(user_id, message) |
|
|
| @property |
| def connected_users(self) -> int: |
| return len(self.active_connections) |
|
|
|
|
| |
| manager = ConnectionManager() |
|
|
|
|
| @router.websocket("/ws/notifications") |
| async def websocket_notifications(websocket: WebSocket): |
| """ |
| WebSocket endpoint for real-time notifications. |
| |
| Client connects with token in query params: |
| ws://localhost:8000/ws/notifications?token=<access_token> |
| |
| Server sends JSON messages: |
| {"type": "notification", "data": {...}} |
| {"type": "job_alert", "data": {...}} |
| {"type": "application_update", "data": {...}} |
| """ |
| |
| token = websocket.query_params.get("token") |
| if not token: |
| await websocket.close(code=4001, reason="Missing token") |
| return |
|
|
| payload = verify_token(token) |
| if not payload: |
| await websocket.close(code=4001, reason="Invalid token") |
| return |
|
|
| user_id = payload["sub"] |
|
|
| |
| await manager.connect(websocket, user_id) |
|
|
| try: |
| |
| await websocket.send_json({ |
| "type": "connected", |
| "data": {"message": "Connected to notifications"}, |
| }) |
|
|
| |
| while True: |
| data = await websocket.receive_text() |
|
|
| |
| try: |
| message = json.loads(data) |
| if message.get("type") == "ping": |
| await websocket.send_json({"type": "pong"}) |
| elif message.get("type") == "mark_read": |
| |
| notification_id = message.get("notification_id") |
| if notification_id: |
| |
| pass |
| except json.JSONDecodeError: |
| pass |
|
|
| except WebSocketDisconnect: |
| manager.disconnect(websocket, user_id) |
| except Exception as e: |
| logger.error(f"WebSocket error for user {user_id}: {e}") |
| manager.disconnect(websocket, user_id) |
|
|
|
|
| |
| async def notify_user(user_id: str, notification_type: str, data: dict): |
| """Send a real-time notification to a user.""" |
| await manager.send_to_user(user_id, { |
| "type": notification_type, |
| "data": data, |
| }) |
|
|
|
|
| async def notify_job_alert(user_id: str, alert_name: str, jobs_count: int, jobs: list[dict]): |
| """Send a job alert notification.""" |
| await manager.send_to_user(user_id, { |
| "type": "job_alert", |
| "data": { |
| "alert_name": alert_name, |
| "jobs_count": jobs_count, |
| "preview_jobs": jobs[:3], |
| }, |
| }) |
|
|
|
|
| async def notify_application_update(user_id: str, application_id: str, new_status: str): |
| """Notify user of application status change.""" |
| await manager.send_to_user(user_id, { |
| "type": "application_update", |
| "data": { |
| "application_id": application_id, |
| "new_status": new_status, |
| }, |
| }) |
|
|