|
|
""" |
|
|
WebSocket API Endpoint - Phase 5 |
|
|
Real-time sync for multi-client updates |
|
|
""" |
|
|
|
|
|
import json |
|
|
from typing import Optional |
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, status |
|
|
from fastapi.exceptions import HTTPException |
|
|
|
|
|
from src.services.websocket_manager import get_websocket_manager |
|
|
from src.utils.logger import get_logger |
|
|
|
|
|
router = APIRouter() |
|
|
logger = get_logger(__name__) |
|
|
manager = get_websocket_manager() |
|
|
|
|
|
|
|
|
@router.websocket("/ws") |
|
|
async def websocket_endpoint( |
|
|
websocket: WebSocket, |
|
|
user_id: Optional[str] = Query(..., description="User ID for the connection") |
|
|
): |
|
|
""" |
|
|
WebSocket endpoint for real-time task updates. |
|
|
|
|
|
Connect to this endpoint to receive live updates when: |
|
|
- Tasks are created, updated, completed, or deleted |
|
|
- Reminders are created or triggered |
|
|
- Recurring tasks generate new occurrences |
|
|
|
|
|
Connection URL: ws://localhost:8000/ws?user_id=USER_ID |
|
|
|
|
|
Message Types Received by Client: |
|
|
- connected: Connection established |
|
|
- task_update: Task changed (created, updated, completed, deleted) |
|
|
- reminder_created: New reminder created |
|
|
- recurring_task_generated: New recurring task occurrence created |
|
|
|
|
|
Example client code: |
|
|
```javascript |
|
|
const ws = new WebSocket('ws://localhost:8000/ws?user_id=USER_ID'); |
|
|
|
|
|
ws.onmessage = (event) => { |
|
|
const message = JSON.parse(event.data); |
|
|
console.log('Received:', message); |
|
|
|
|
|
if (message.type === 'task_update') { |
|
|
// Update UI with new task data |
|
|
if (message.update_type === 'created') { |
|
|
addTaskToUI(message.data); |
|
|
} else if (message.update_type === 'completed') { |
|
|
markTaskCompleted(message.data); |
|
|
} |
|
|
} |
|
|
}; |
|
|
|
|
|
ws.onerror = (error) => { |
|
|
console.error('WebSocket error:', error); |
|
|
}; |
|
|
|
|
|
ws.onclose = () => { |
|
|
console.log('Disconnected from real-time sync'); |
|
|
}; |
|
|
``` |
|
|
""" |
|
|
if not user_id: |
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION) |
|
|
logger.warning("WebSocket connection rejected: missing user_id") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
await manager.connect(websocket, user_id) |
|
|
|
|
|
|
|
|
while True: |
|
|
|
|
|
try: |
|
|
data = await websocket.receive_text() |
|
|
|
|
|
|
|
|
try: |
|
|
message = json.loads(data) |
|
|
|
|
|
|
|
|
if message.get("type") == "ping": |
|
|
await websocket.send_json({ |
|
|
"type": "pong", |
|
|
"timestamp": message.get("timestamp") |
|
|
}) |
|
|
|
|
|
|
|
|
elif message.get("type") == "subscribe": |
|
|
|
|
|
|
|
|
await websocket.send_json({ |
|
|
"type": "subscribed", |
|
|
"message": "Subscribed to all updates" |
|
|
}) |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
logger.warning("Invalid JSON received from WebSocket client", user_id=user_id) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
|
|
|
logger.info("WebSocket disconnected by client", user_id=user_id) |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"WebSocket error", |
|
|
user_id=user_id, |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"WebSocket connection error", |
|
|
user_id=user_id, |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
await manager.disconnect(websocket) |
|
|
|
|
|
|
|
|
@router.get("/ws/stats") |
|
|
async def websocket_stats(): |
|
|
""" |
|
|
Get WebSocket connection statistics. |
|
|
|
|
|
Returns information about active WebSocket connections. |
|
|
""" |
|
|
connected_users = manager.get_connected_users() |
|
|
|
|
|
return { |
|
|
"total_users_connected": len(connected_users), |
|
|
"total_connections": manager.get_connection_count(), |
|
|
"connected_users": connected_users, |
|
|
"status": "running" |
|
|
} |
|
|
|
|
|
|
|
|
@router.post("/ws/broadcast") |
|
|
async def test_broadcast( |
|
|
user_id: str, |
|
|
message: str, |
|
|
update_type: str = "test" |
|
|
): |
|
|
""" |
|
|
Test endpoint to broadcast a message to a user's connections. |
|
|
|
|
|
This is primarily for testing and demonstration purposes. |
|
|
In production, broadcasts are triggered by Kafka events. |
|
|
""" |
|
|
await manager.send_personal_message({ |
|
|
"type": "test", |
|
|
"update_type": update_type, |
|
|
"message": message, |
|
|
"timestamp": asyncio.get_event_loop().time() |
|
|
}, user_id) |
|
|
|
|
|
return { |
|
|
"status": "sent", |
|
|
"user_id": user_id, |
|
|
"message": message |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
|
|