"""WebSocket server for real-time flight notifications.""" import asyncio import json import logging import websockets from typing import Set, Dict, Any from websockets import WebSocketServerProtocol from .flight_manager import flight_manager class FlightNotificationServer: """WebSocket server for real-time flight notifications.""" def __init__(self, host: str = "localhost", port: int = 8001): self.host = host self.port = port self.clients: Set[WebSocketServerProtocol] = set() self.server = None # Register with flight manager for notifications flight_manager.notification_callbacks.append(self.send_notification) async def register_client(self, websocket: WebSocketServerProtocol) -> None: """Register a new WebSocket client.""" self.clients.add(websocket) print(f"📱 Client connected. Total clients: {len(self.clients)}") # Send welcome message with current flight status await self.send_to_client(websocket, { 'type': 'welcome', 'message': 'Connected to DroneAgent flight notifications', 'active_flights': len(flight_manager.active_missions), 'timestamp': asyncio.get_event_loop().time() }) async def unregister_client(self, websocket: WebSocketServerProtocol) -> None: """Unregister a WebSocket client.""" self.clients.discard(websocket) print(f"📱 Client disconnected. Total clients: {len(self.clients)}") async def send_to_client(self, websocket: WebSocketServerProtocol, message: Dict[str, Any]) -> None: """Send message to a specific client.""" try: await websocket.send(json.dumps(message)) except Exception as e: print(f"❌ Error sending to client: {e}") await self.unregister_client(websocket) async def send_notification(self, notification: Dict[str, Any]) -> None: """Send notification to all connected clients.""" if not self.clients: return # Add notification metadata notification['server_timestamp'] = asyncio.get_event_loop().time() disconnected_clients = set() for client in self.clients: try: await client.send(json.dumps(notification)) except Exception as e: print(f"❌ Error sending notification to client: {e}") disconnected_clients.add(client) # Remove disconnected clients for client in disconnected_clients: await self.unregister_client(client) print(f"📢 Sent {notification['type']} notification to {len(self.clients)} clients") async def handle_client_message(self, websocket: WebSocketServerProtocol, message: str) -> None: """Handle incoming messages from clients.""" try: data = json.loads(message) message_type = data.get('type') if message_type == 'subscribe_flight': # Subscribe to specific flight notifications flight_id = data.get('flight_id') response = { 'type': 'subscription_confirmed', 'flight_id': flight_id, 'message': f'Subscribed to flight {flight_id} notifications' } await self.send_to_client(websocket, response) elif message_type == 'get_status': # Send current flight status status = await self.get_flight_status_summary() await self.send_to_client(websocket, status) elif message_type == 'ping': # Heartbeat/ping response await self.send_to_client(websocket, {'type': 'pong'}) except json.JSONDecodeError: await self.send_to_client(websocket, { 'type': 'error', 'message': 'Invalid JSON message' }) except Exception as e: await self.send_to_client(websocket, { 'type': 'error', 'message': f'Error processing message: {str(e)}' }) async def get_flight_status_summary(self) -> Dict[str, Any]: """Get summary of all active flights.""" active_flights = {} for mission_id, mission in flight_manager.active_missions.items(): telemetry = flight_manager.drone_telemetry.get(mission.drone_id) completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed) total_waypoints = len(mission.waypoints) progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0 flight_info = { 'mission_id': mission_id, 'drone_id': mission.drone_id, 'state': mission.state.value, 'progress_percent': progress, 'waypoints_completed': completed_waypoints, 'waypoints_total': total_waypoints, 'created_at': mission.created_at } if telemetry: flight_info.update({ 'battery': telemetry.battery, 'lat': telemetry.lat, 'lon': telemetry.lon, 'alt': telemetry.alt, 'speed': telemetry.speed, 'mode': telemetry.mode }) active_flights[mission_id] = flight_info return { 'type': 'flight_status_summary', 'active_flights': active_flights, 'total_flights': len(active_flights), 'timestamp': asyncio.get_event_loop().time() } async def handle_client(self, websocket: WebSocketServerProtocol, path: str) -> None: """Handle WebSocket client connection.""" await self.register_client(websocket) try: async for message in websocket: await self.handle_client_message(websocket, message) except Exception as e: print(f"❌ Client handler error: {e}") finally: await self.unregister_client(websocket) async def start_server(self) -> None: """Start the WebSocket server.""" print(f"🚀 Starting WebSocket notification server on {self.host}:{self.port}") self.server = await websockets.serve( self.handle_client, self.host, self.port, ping_interval=30, # Send ping every 30 seconds ping_timeout=10, # Wait 10 seconds for pong close_timeout=10 # Wait 10 seconds for close ) print(f"✅ WebSocket server running on ws://{self.host}:{self.port}") async def stop_server(self) -> None: """Stop the WebSocket server.""" if self.server: self.server.close() await self.server.wait_closed() print("🛑 WebSocket server stopped") async def broadcast_system_message(self, message: str, message_type: str = "system") -> None: """Broadcast a system message to all clients.""" notification = { 'type': message_type, 'message': message, 'timestamp': asyncio.get_event_loop().time() } await self.send_notification(notification) # Global WebSocket server instance websocket_server = FlightNotificationServer() async def start_websocket_server(): """Start the WebSocket server as a background task.""" await websocket_server.start_server() async def stop_websocket_server(): """Stop the WebSocket server.""" await websocket_server.stop_server()