Spaces:
Sleeping
Sleeping
| """WebSocket server for real-time flight notifications.""" | |
| import asyncio | |
| import json | |
| import logging | |
| import websockets | |
| from typing import Set, Dict, Any | |
| from websockets import WebSocketServerProtocol | |
| from .flight_manager import flight_manager | |
| class FlightNotificationServer: | |
| """WebSocket server for real-time flight notifications.""" | |
| def __init__(self, host: str = "localhost", port: int = 8001): | |
| self.host = host | |
| self.port = port | |
| self.clients: Set[WebSocketServerProtocol] = set() | |
| self.server = None | |
| # Register with flight manager for notifications | |
| flight_manager.notification_callbacks.append(self.send_notification) | |
| async def register_client(self, websocket: WebSocketServerProtocol) -> None: | |
| """Register a new WebSocket client.""" | |
| self.clients.add(websocket) | |
| print(f"📱 Client connected. Total clients: {len(self.clients)}") | |
| # Send welcome message with current flight status | |
| await self.send_to_client(websocket, { | |
| 'type': 'welcome', | |
| 'message': 'Connected to DroneAgent flight notifications', | |
| 'active_flights': len(flight_manager.active_missions), | |
| 'timestamp': asyncio.get_event_loop().time() | |
| }) | |
| async def unregister_client(self, websocket: WebSocketServerProtocol) -> None: | |
| """Unregister a WebSocket client.""" | |
| self.clients.discard(websocket) | |
| print(f"📱 Client disconnected. Total clients: {len(self.clients)}") | |
| async def send_to_client(self, websocket: WebSocketServerProtocol, message: Dict[str, Any]) -> None: | |
| """Send message to a specific client.""" | |
| try: | |
| await websocket.send(json.dumps(message)) | |
| except Exception as e: | |
| print(f"❌ Error sending to client: {e}") | |
| await self.unregister_client(websocket) | |
| async def send_notification(self, notification: Dict[str, Any]) -> None: | |
| """Send notification to all connected clients.""" | |
| if not self.clients: | |
| return | |
| # Add notification metadata | |
| notification['server_timestamp'] = asyncio.get_event_loop().time() | |
| disconnected_clients = set() | |
| for client in self.clients: | |
| try: | |
| await client.send(json.dumps(notification)) | |
| except Exception as e: | |
| print(f"❌ Error sending notification to client: {e}") | |
| disconnected_clients.add(client) | |
| # Remove disconnected clients | |
| for client in disconnected_clients: | |
| await self.unregister_client(client) | |
| print(f"📢 Sent {notification['type']} notification to {len(self.clients)} clients") | |
| async def handle_client_message(self, websocket: WebSocketServerProtocol, message: str) -> None: | |
| """Handle incoming messages from clients.""" | |
| try: | |
| data = json.loads(message) | |
| message_type = data.get('type') | |
| if message_type == 'subscribe_flight': | |
| # Subscribe to specific flight notifications | |
| flight_id = data.get('flight_id') | |
| response = { | |
| 'type': 'subscription_confirmed', | |
| 'flight_id': flight_id, | |
| 'message': f'Subscribed to flight {flight_id} notifications' | |
| } | |
| await self.send_to_client(websocket, response) | |
| elif message_type == 'get_status': | |
| # Send current flight status | |
| status = await self.get_flight_status_summary() | |
| await self.send_to_client(websocket, status) | |
| elif message_type == 'ping': | |
| # Heartbeat/ping response | |
| await self.send_to_client(websocket, {'type': 'pong'}) | |
| except json.JSONDecodeError: | |
| await self.send_to_client(websocket, { | |
| 'type': 'error', | |
| 'message': 'Invalid JSON message' | |
| }) | |
| except Exception as e: | |
| await self.send_to_client(websocket, { | |
| 'type': 'error', | |
| 'message': f'Error processing message: {str(e)}' | |
| }) | |
| async def get_flight_status_summary(self) -> Dict[str, Any]: | |
| """Get summary of all active flights.""" | |
| active_flights = {} | |
| for mission_id, mission in flight_manager.active_missions.items(): | |
| telemetry = flight_manager.drone_telemetry.get(mission.drone_id) | |
| completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed) | |
| total_waypoints = len(mission.waypoints) | |
| progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0 | |
| flight_info = { | |
| 'mission_id': mission_id, | |
| 'drone_id': mission.drone_id, | |
| 'state': mission.state.value, | |
| 'progress_percent': progress, | |
| 'waypoints_completed': completed_waypoints, | |
| 'waypoints_total': total_waypoints, | |
| 'created_at': mission.created_at | |
| } | |
| if telemetry: | |
| flight_info.update({ | |
| 'battery': telemetry.battery, | |
| 'lat': telemetry.lat, | |
| 'lon': telemetry.lon, | |
| 'alt': telemetry.alt, | |
| 'speed': telemetry.speed, | |
| 'mode': telemetry.mode | |
| }) | |
| active_flights[mission_id] = flight_info | |
| return { | |
| 'type': 'flight_status_summary', | |
| 'active_flights': active_flights, | |
| 'total_flights': len(active_flights), | |
| 'timestamp': asyncio.get_event_loop().time() | |
| } | |
| async def handle_client(self, websocket: WebSocketServerProtocol, path: str) -> None: | |
| """Handle WebSocket client connection.""" | |
| await self.register_client(websocket) | |
| try: | |
| async for message in websocket: | |
| await self.handle_client_message(websocket, message) | |
| except Exception as e: | |
| print(f"❌ Client handler error: {e}") | |
| finally: | |
| await self.unregister_client(websocket) | |
| async def start_server(self) -> None: | |
| """Start the WebSocket server.""" | |
| print(f"🚀 Starting WebSocket notification server on {self.host}:{self.port}") | |
| self.server = await websockets.serve( | |
| self.handle_client, | |
| self.host, | |
| self.port, | |
| ping_interval=30, # Send ping every 30 seconds | |
| ping_timeout=10, # Wait 10 seconds for pong | |
| close_timeout=10 # Wait 10 seconds for close | |
| ) | |
| print(f"✅ WebSocket server running on ws://{self.host}:{self.port}") | |
| async def stop_server(self) -> None: | |
| """Stop the WebSocket server.""" | |
| if self.server: | |
| self.server.close() | |
| await self.server.wait_closed() | |
| print("🛑 WebSocket server stopped") | |
| async def broadcast_system_message(self, message: str, message_type: str = "system") -> None: | |
| """Broadcast a system message to all clients.""" | |
| notification = { | |
| 'type': message_type, | |
| 'message': message, | |
| 'timestamp': asyncio.get_event_loop().time() | |
| } | |
| await self.send_notification(notification) | |
| # Global WebSocket server instance | |
| websocket_server = FlightNotificationServer() | |
| async def start_websocket_server(): | |
| """Start the WebSocket server as a background task.""" | |
| await websocket_server.start_server() | |
| async def stop_websocket_server(): | |
| """Stop the WebSocket server.""" | |
| await websocket_server.stop_server() | |