best / backend /app /api /websocket.py
anky2002's picture
feat: Add unit tests, production Docker, rate limiting, WebSocket notifications, data export (GDPR), error handling
56d23bb verified
Raw
History Blame Contribute Delete
5.08 kB
"""
WebSocket real-time notifications.
"""
import json
import logging
from typing import Dict, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.core.security import verify_token
logger = logging.getLogger(__name__)
router = APIRouter(tags=["websocket"])
class ConnectionManager:
"""Manage WebSocket connections per user."""
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)
logger.info(f"WebSocket connected: user={user_id}")
def disconnect(self, websocket: WebSocket, user_id: str):
if user_id in self.active_connections:
self.active_connections[user_id].discard(websocket)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
logger.info(f"WebSocket disconnected: user={user_id}")
async def send_to_user(self, user_id: str, message: dict):
"""Send a message to all connections for a user."""
if user_id in self.active_connections:
dead_connections = set()
for connection in self.active_connections[user_id]:
try:
await connection.send_json(message)
except Exception:
dead_connections.add(connection)
# Clean up dead connections
for conn in dead_connections:
self.active_connections[user_id].discard(conn)
async def broadcast(self, message: dict):
"""Broadcast to all connected users."""
for user_id in list(self.active_connections.keys()):
await self.send_to_user(user_id, message)
@property
def connected_users(self) -> int:
return len(self.active_connections)
# Global connection manager
manager = ConnectionManager()
@router.websocket("/ws/notifications")
async def websocket_notifications(websocket: WebSocket):
"""
WebSocket endpoint for real-time notifications.
Client connects with token in query params:
ws://localhost:8000/ws/notifications?token=<access_token>
Server sends JSON messages:
{"type": "notification", "data": {...}}
{"type": "job_alert", "data": {...}}
{"type": "application_update", "data": {...}}
"""
# Authenticate via query param
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Missing token")
return
payload = verify_token(token)
if not payload:
await websocket.close(code=4001, reason="Invalid token")
return
user_id = payload["sub"]
# Connect
await manager.connect(websocket, user_id)
try:
# Send initial message
await websocket.send_json({
"type": "connected",
"data": {"message": "Connected to notifications"},
})
# Keep connection alive and handle incoming messages
while True:
data = await websocket.receive_text()
# Handle client messages (ping/pong, read receipts, etc.)
try:
message = json.loads(data)
if message.get("type") == "ping":
await websocket.send_json({"type": "pong"})
elif message.get("type") == "mark_read":
# Client acknowledged a notification
notification_id = message.get("notification_id")
if notification_id:
# TODO: Mark notification as read in DB
pass
except json.JSONDecodeError:
pass
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
except Exception as e:
logger.error(f"WebSocket error for user {user_id}: {e}")
manager.disconnect(websocket, user_id)
# Helper function to send notifications from other parts of the app
async def notify_user(user_id: str, notification_type: str, data: dict):
"""Send a real-time notification to a user."""
await manager.send_to_user(user_id, {
"type": notification_type,
"data": data,
})
async def notify_job_alert(user_id: str, alert_name: str, jobs_count: int, jobs: list[dict]):
"""Send a job alert notification."""
await manager.send_to_user(user_id, {
"type": "job_alert",
"data": {
"alert_name": alert_name,
"jobs_count": jobs_count,
"preview_jobs": jobs[:3],
},
})
async def notify_application_update(user_id: str, application_id: str, new_status: str):
"""Notify user of application status change."""
await manager.send_to_user(user_id, {
"type": "application_update",
"data": {
"application_id": application_id,
"new_status": new_status,
},
})