Spaces:
Sleeping
Sleeping
| """WebSocket real-time communication""" | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect, WebSocketException | |
| from typing import Set, Dict, Any | |
| import json | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| router = APIRouter() | |
| # Store active connections | |
| active_connections: Set[WebSocket] = set() | |
| connection_info: Dict[WebSocket, dict] = {} | |
| logger = logging.getLogger(__name__) | |
| class ConnectionManager: | |
| """Manage WebSocket connections""" | |
| def __init__(self): | |
| self.active_connections: Set[WebSocket] = set() | |
| self.connection_info: Dict[WebSocket, dict] = {} | |
| async def connect(self, websocket: WebSocket, client_info: dict): | |
| await websocket.accept() | |
| self.active_connections.add(websocket) | |
| self.connection_info[websocket] = { | |
| "connected_at": datetime.utcnow().isoformat(), | |
| "client_info": client_info, | |
| "subscriptions": set() | |
| } | |
| logger.info(f"WebSocket connected: {client_info}") | |
| def disconnect(self, websocket: WebSocket): | |
| self.active_connections.remove(websocket) | |
| if websocket in self.connection_info: | |
| del self.connection_info[websocket] | |
| logger.info("WebSocket disconnected") | |
| async def send_personal_message(self, message: dict, websocket: WebSocket): | |
| try: | |
| await websocket.send_json(message) | |
| except: | |
| self.disconnect(websocket) | |
| async def broadcast(self, message: dict): | |
| """Broadcast message to all connected clients""" | |
| disconnected = set() | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_json(message) | |
| except: | |
| disconnected.add(connection) | |
| # Clean up disconnected | |
| for conn in disconnected: | |
| self.disconnect(conn) | |
| def subscribe(self, websocket: WebSocket, topic: str): | |
| """Subscribe client to a topic""" | |
| if websocket in self.connection_info: | |
| self.connection_info[websocket]["subscriptions"].add(topic) | |
| def unsubscribe(self, websocket: WebSocket, topic: str): | |
| """Unsubscribe client from a topic""" | |
| if websocket in self.connection_info: | |
| self.connection_info[websocket]["subscriptions"].discard(topic) | |
| def get_subscribers(self, topic: str) -> Set[WebSocket]: | |
| """Get all clients subscribed to a topic""" | |
| subscribers = set() | |
| for ws, info in self.connection_info.items(): | |
| if topic in info["subscriptions"]: | |
| subscribers.add(ws) | |
| return subscribers | |
| manager = ConnectionManager() | |
| async def websocket_endpoint(websocket: WebSocket): | |
| """WebSocket endpoint for real-time data""" | |
| # Get client info from query parameters | |
| client_info = { | |
| "user_agent": websocket.headers.get("user-agent", "unknown"), | |
| "client_id": websocket.query_params.get("client_id", "anonymous"), | |
| "protocol": websocket.headers.get("sec-websocket-protocol", "none") | |
| } | |
| await manager.connect(websocket, client_info) | |
| try: | |
| # Send initial connection confirmation | |
| await manager.send_personal_message({ | |
| "type": "connection", | |
| "status": "connected", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "message": "Connected to TSU-WAVE real-time stream" | |
| }, websocket) | |
| # Handle incoming messages | |
| while True: | |
| data = await websocket.receive_text() | |
| try: | |
| message = json.loads(data) | |
| await handle_message(websocket, message) | |
| except json.JSONDecodeError: | |
| await manager.send_personal_message({ | |
| "type": "error", | |
| "message": "Invalid JSON format" | |
| }, websocket) | |
| except Exception as e: | |
| await manager.send_personal_message({ | |
| "type": "error", | |
| "message": str(e) | |
| }, websocket) | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket) | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {e}") | |
| manager.disconnect(websocket) | |
| async def handle_message(websocket: WebSocket, message: dict): | |
| """Handle incoming WebSocket messages""" | |
| msg_type = message.get("type", "") | |
| if msg_type == "subscribe": | |
| # Subscribe to topics | |
| topics = message.get("topics", []) | |
| for topic in topics: | |
| manager.subscribe(websocket, topic) | |
| await manager.send_personal_message({ | |
| "type": "subscribed", | |
| "topics": topics, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }, websocket) | |
| elif msg_type == "unsubscribe": | |
| # Unsubscribe from topics | |
| topics = message.get("topics", []) | |
| for topic in topics: | |
| manager.unsubscribe(websocket, topic) | |
| await manager.send_personal_message({ | |
| "type": "unsubscribed", | |
| "topics": topics | |
| }, websocket) | |
| elif msg_type == "ping": | |
| # Ping-pong for connection testing | |
| await manager.send_personal_message({ | |
| "type": "pong", | |
| "timestamp": datetime.utcnow().isoformat() | |
| }, websocket) | |
| elif msg_type == "request": | |
| # Request specific data | |
| data_type = message.get("data_type") | |
| if data_type == "parameters": | |
| await send_parameters(websocket, message.get("zone", "global")) | |
| elif data_type == "chi": | |
| await send_chi(websocket, message.get("zone", "global")) | |
| elif data_type == "event": | |
| await send_event(websocket, message.get("event_id")) | |
| else: | |
| await manager.send_personal_message({ | |
| "type": "error", | |
| "message": f"Unknown data type: {data_type}" | |
| }, websocket) | |
| else: | |
| await manager.send_personal_message({ | |
| "type": "error", | |
| "message": f"Unknown message type: {msg_type}" | |
| }, websocket) | |
| async def send_parameters(websocket: WebSocket, zone: str): | |
| """Send parameter data to client""" | |
| # Placeholder for parameter data | |
| await manager.send_personal_message({ | |
| "type": "parameters", | |
| "zone": zone, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": { | |
| "wcc": 1.31, | |
| "kpr": 1.44, | |
| "hfsi": 0.63, | |
| "becf": 4.80, | |
| "sdb": 1.20, | |
| "sbsp": 0.67, | |
| "smvi": 0.29 | |
| } | |
| }, websocket) | |
| async def send_chi(websocket: WebSocket, zone: str): | |
| """Send CHI data to client""" | |
| await manager.send_personal_message({ | |
| "type": "chi", | |
| "zone": zone, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": { | |
| "value": 0.72, | |
| "status": "HIGH", | |
| "contributions": { | |
| "wcc": 0.12, | |
| "kpr": 0.19, | |
| "hfsi": 0.24, | |
| "becf": 0.21, | |
| "sdb": 0.08, | |
| "sbsp": 0.11, | |
| "smvi": 0.05 | |
| } | |
| } | |
| }, websocket) | |
| async def send_event(websocket: WebSocket, event_id: str): | |
| """Send event data to client""" | |
| await manager.send_personal_message({ | |
| "type": "event", | |
| "event_id": event_id, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": { | |
| "name": "Tōhoku 2011", | |
| "magnitude": 9.0, | |
| "runup": 40.5, | |
| "status": "archived" | |
| } | |
| }, websocket) | |
| # Background task for broadcasting updates | |
| async def broadcast_updates(): | |
| """Broadcast periodic updates to all connected clients""" | |
| while True: | |
| await asyncio.sleep(5) # Update every 5 seconds | |
| if manager.active_connections: | |
| # Get subscribers for each topic | |
| chi_subscribers = manager.get_subscribers("chi") | |
| param_subscribers = manager.get_subscribers("parameters") | |
| # Send updates to appropriate subscribers | |
| if chi_subscribers: | |
| update = { | |
| "type": "chi_update", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": { | |
| "global": 0.72, | |
| "hilo_bay": 0.65, | |
| "miyako": 0.91 | |
| } | |
| } | |
| for ws in chi_subscribers: | |
| try: | |
| await ws.send_json(update) | |
| except: | |
| pass | |
| if param_subscribers: | |
| update = { | |
| "type": "parameters_update", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "data": { | |
| "wcc": 1.31 + 0.01, | |
| "kpr": 1.44 + 0.02, | |
| "hfsi": 0.63 - 0.01 | |
| } | |
| } | |
| for ws in param_subscribers: | |
| try: | |
| await ws.send_json(update) | |
| except: | |
| pass | |
| # Start background task on router initialization | |
| async def startup_event(): | |
| asyncio.create_task(broadcast_updates()) | |