File size: 5,217 Bytes
edcd2ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
"""
WebSocket API Endpoint - Phase 5
Real-time sync for multi-client updates
"""
import json
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, status
from fastapi.exceptions import HTTPException
from src.services.websocket_manager import get_websocket_manager
from src.utils.logger import get_logger
router = APIRouter()
logger = get_logger(__name__)
manager = get_websocket_manager()
@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
user_id: Optional[str] = Query(..., description="User ID for the connection")
):
"""
WebSocket endpoint for real-time task updates.
Connect to this endpoint to receive live updates when:
- Tasks are created, updated, completed, or deleted
- Reminders are created or triggered
- Recurring tasks generate new occurrences
Connection URL: ws://localhost:8000/ws?user_id=USER_ID
Message Types Received by Client:
- connected: Connection established
- task_update: Task changed (created, updated, completed, deleted)
- reminder_created: New reminder created
- recurring_task_generated: New recurring task occurrence created
Example client code:
```javascript
const ws = new WebSocket('ws://localhost:8000/ws?user_id=USER_ID');
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Received:', message);
if (message.type === 'task_update') {
// Update UI with new task data
if (message.update_type === 'created') {
addTaskToUI(message.data);
} else if (message.update_type === 'completed') {
markTaskCompleted(message.data);
}
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('Disconnected from real-time sync');
};
```
"""
if not user_id:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
logger.warning("WebSocket connection rejected: missing user_id")
return
try:
# Accept and track connection
await manager.connect(websocket, user_id)
# Keep connection alive and handle incoming messages
while True:
# Receive message from client (for keepalive/ping)
try:
data = await websocket.receive_text()
# Parse client message
try:
message = json.loads(data)
# Handle ping/pong for keepalive
if message.get("type") == "ping":
await websocket.send_json({
"type": "pong",
"timestamp": message.get("timestamp")
})
# Handle client requests
elif message.get("type") == "subscribe":
# Client can filter what updates they want
# For now, we send everything
await websocket.send_json({
"type": "subscribed",
"message": "Subscribed to all updates"
})
except json.JSONDecodeError:
logger.warning("Invalid JSON received from WebSocket client", user_id=user_id)
except WebSocketDisconnect:
# Client disconnected normally
logger.info("WebSocket disconnected by client", user_id=user_id)
break
except Exception as e:
logger.error(
"WebSocket error",
user_id=user_id,
error=str(e),
exc_info=True
)
break
except Exception as e:
logger.error(
"WebSocket connection error",
user_id=user_id,
error=str(e),
exc_info=True
)
finally:
# Clean up connection
await manager.disconnect(websocket)
@router.get("/ws/stats")
async def websocket_stats():
"""
Get WebSocket connection statistics.
Returns information about active WebSocket connections.
"""
connected_users = manager.get_connected_users()
return {
"total_users_connected": len(connected_users),
"total_connections": manager.get_connection_count(),
"connected_users": connected_users,
"status": "running"
}
@router.post("/ws/broadcast")
async def test_broadcast(
user_id: str,
message: str,
update_type: str = "test"
):
"""
Test endpoint to broadcast a message to a user's connections.
This is primarily for testing and demonstration purposes.
In production, broadcasts are triggered by Kafka events.
"""
await manager.send_personal_message({
"type": "test",
"update_type": update_type,
"message": message,
"timestamp": asyncio.get_event_loop().time()
}, user_id)
return {
"status": "sent",
"user_id": user_id,
"message": message
}
# Note: Need to import asyncio at the top
import asyncio
|