"""WebSocket real-time communication""" from fastapi import APIRouter, WebSocket, WebSocketDisconnect, WebSocketException from typing import Set, Dict, Any import json import asyncio import logging from datetime import datetime router = APIRouter() # Store active connections active_connections: Set[WebSocket] = set() connection_info: Dict[WebSocket, dict] = {} logger = logging.getLogger(__name__) class ConnectionManager: """Manage WebSocket connections""" def __init__(self): self.active_connections: Set[WebSocket] = set() self.connection_info: Dict[WebSocket, dict] = {} async def connect(self, websocket: WebSocket, client_info: dict): await websocket.accept() self.active_connections.add(websocket) self.connection_info[websocket] = { "connected_at": datetime.utcnow().isoformat(), "client_info": client_info, "subscriptions": set() } logger.info(f"WebSocket connected: {client_info}") def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) if websocket in self.connection_info: del self.connection_info[websocket] logger.info("WebSocket disconnected") async def send_personal_message(self, message: dict, websocket: WebSocket): try: await websocket.send_json(message) except: self.disconnect(websocket) async def broadcast(self, message: dict): """Broadcast message to all connected clients""" disconnected = set() for connection in self.active_connections: try: await connection.send_json(message) except: disconnected.add(connection) # Clean up disconnected for conn in disconnected: self.disconnect(conn) def subscribe(self, websocket: WebSocket, topic: str): """Subscribe client to a topic""" if websocket in self.connection_info: self.connection_info[websocket]["subscriptions"].add(topic) def unsubscribe(self, websocket: WebSocket, topic: str): """Unsubscribe client from a topic""" if websocket in self.connection_info: self.connection_info[websocket]["subscriptions"].discard(topic) def get_subscribers(self, topic: str) -> Set[WebSocket]: """Get all clients subscribed to a topic""" subscribers = set() for ws, info in self.connection_info.items(): if topic in info["subscriptions"]: subscribers.add(ws) return subscribers manager = ConnectionManager() @router.websocket("/v1/realtime") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time data""" # Get client info from query parameters client_info = { "user_agent": websocket.headers.get("user-agent", "unknown"), "client_id": websocket.query_params.get("client_id", "anonymous"), "protocol": websocket.headers.get("sec-websocket-protocol", "none") } await manager.connect(websocket, client_info) try: # Send initial connection confirmation await manager.send_personal_message({ "type": "connection", "status": "connected", "timestamp": datetime.utcnow().isoformat(), "message": "Connected to TSU-WAVE real-time stream" }, websocket) # Handle incoming messages while True: data = await websocket.receive_text() try: message = json.loads(data) await handle_message(websocket, message) except json.JSONDecodeError: await manager.send_personal_message({ "type": "error", "message": "Invalid JSON format" }, websocket) except Exception as e: await manager.send_personal_message({ "type": "error", "message": str(e) }, websocket) except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: logger.error(f"WebSocket error: {e}") manager.disconnect(websocket) async def handle_message(websocket: WebSocket, message: dict): """Handle incoming WebSocket messages""" msg_type = message.get("type", "") if msg_type == "subscribe": # Subscribe to topics topics = message.get("topics", []) for topic in topics: manager.subscribe(websocket, topic) await manager.send_personal_message({ "type": "subscribed", "topics": topics, "timestamp": datetime.utcnow().isoformat() }, websocket) elif msg_type == "unsubscribe": # Unsubscribe from topics topics = message.get("topics", []) for topic in topics: manager.unsubscribe(websocket, topic) await manager.send_personal_message({ "type": "unsubscribed", "topics": topics }, websocket) elif msg_type == "ping": # Ping-pong for connection testing await manager.send_personal_message({ "type": "pong", "timestamp": datetime.utcnow().isoformat() }, websocket) elif msg_type == "request": # Request specific data data_type = message.get("data_type") if data_type == "parameters": await send_parameters(websocket, message.get("zone", "global")) elif data_type == "chi": await send_chi(websocket, message.get("zone", "global")) elif data_type == "event": await send_event(websocket, message.get("event_id")) else: await manager.send_personal_message({ "type": "error", "message": f"Unknown data type: {data_type}" }, websocket) else: await manager.send_personal_message({ "type": "error", "message": f"Unknown message type: {msg_type}" }, websocket) async def send_parameters(websocket: WebSocket, zone: str): """Send parameter data to client""" # Placeholder for parameter data await manager.send_personal_message({ "type": "parameters", "zone": zone, "timestamp": datetime.utcnow().isoformat(), "data": { "wcc": 1.31, "kpr": 1.44, "hfsi": 0.63, "becf": 4.80, "sdb": 1.20, "sbsp": 0.67, "smvi": 0.29 } }, websocket) async def send_chi(websocket: WebSocket, zone: str): """Send CHI data to client""" await manager.send_personal_message({ "type": "chi", "zone": zone, "timestamp": datetime.utcnow().isoformat(), "data": { "value": 0.72, "status": "HIGH", "contributions": { "wcc": 0.12, "kpr": 0.19, "hfsi": 0.24, "becf": 0.21, "sdb": 0.08, "sbsp": 0.11, "smvi": 0.05 } } }, websocket) async def send_event(websocket: WebSocket, event_id: str): """Send event data to client""" await manager.send_personal_message({ "type": "event", "event_id": event_id, "timestamp": datetime.utcnow().isoformat(), "data": { "name": "Tōhoku 2011", "magnitude": 9.0, "runup": 40.5, "status": "archived" } }, websocket) # Background task for broadcasting updates async def broadcast_updates(): """Broadcast periodic updates to all connected clients""" while True: await asyncio.sleep(5) # Update every 5 seconds if manager.active_connections: # Get subscribers for each topic chi_subscribers = manager.get_subscribers("chi") param_subscribers = manager.get_subscribers("parameters") # Send updates to appropriate subscribers if chi_subscribers: update = { "type": "chi_update", "timestamp": datetime.utcnow().isoformat(), "data": { "global": 0.72, "hilo_bay": 0.65, "miyako": 0.91 } } for ws in chi_subscribers: try: await ws.send_json(update) except: pass if param_subscribers: update = { "type": "parameters_update", "timestamp": datetime.utcnow().isoformat(), "data": { "wcc": 1.31 + 0.01, "kpr": 1.44 + 0.02, "hfsi": 0.63 - 0.01 } } for ws in param_subscribers: try: await ws.send_json(update) except: pass # Start background task on router initialization @router.on_event("startup") async def startup_event(): asyncio.create_task(broadcast_updates())