DroneAgent / src /core /websocket_server.py
zok213
Initial commit
8579cdc
"""WebSocket server for real-time flight notifications."""
import asyncio
import json
import logging
import websockets
from typing import Set, Dict, Any
from websockets import WebSocketServerProtocol
from .flight_manager import flight_manager
class FlightNotificationServer:
"""WebSocket server for real-time flight notifications."""
def __init__(self, host: str = "localhost", port: int = 8001):
self.host = host
self.port = port
self.clients: Set[WebSocketServerProtocol] = set()
self.server = None
# Register with flight manager for notifications
flight_manager.notification_callbacks.append(self.send_notification)
async def register_client(self, websocket: WebSocketServerProtocol) -> None:
"""Register a new WebSocket client."""
self.clients.add(websocket)
print(f"📱 Client connected. Total clients: {len(self.clients)}")
# Send welcome message with current flight status
await self.send_to_client(websocket, {
'type': 'welcome',
'message': 'Connected to DroneAgent flight notifications',
'active_flights': len(flight_manager.active_missions),
'timestamp': asyncio.get_event_loop().time()
})
async def unregister_client(self, websocket: WebSocketServerProtocol) -> None:
"""Unregister a WebSocket client."""
self.clients.discard(websocket)
print(f"📱 Client disconnected. Total clients: {len(self.clients)}")
async def send_to_client(self, websocket: WebSocketServerProtocol, message: Dict[str, Any]) -> None:
"""Send message to a specific client."""
try:
await websocket.send(json.dumps(message))
except Exception as e:
print(f"❌ Error sending to client: {e}")
await self.unregister_client(websocket)
async def send_notification(self, notification: Dict[str, Any]) -> None:
"""Send notification to all connected clients."""
if not self.clients:
return
# Add notification metadata
notification['server_timestamp'] = asyncio.get_event_loop().time()
disconnected_clients = set()
for client in self.clients:
try:
await client.send(json.dumps(notification))
except Exception as e:
print(f"❌ Error sending notification to client: {e}")
disconnected_clients.add(client)
# Remove disconnected clients
for client in disconnected_clients:
await self.unregister_client(client)
print(f"📢 Sent {notification['type']} notification to {len(self.clients)} clients")
async def handle_client_message(self, websocket: WebSocketServerProtocol, message: str) -> None:
"""Handle incoming messages from clients."""
try:
data = json.loads(message)
message_type = data.get('type')
if message_type == 'subscribe_flight':
# Subscribe to specific flight notifications
flight_id = data.get('flight_id')
response = {
'type': 'subscription_confirmed',
'flight_id': flight_id,
'message': f'Subscribed to flight {flight_id} notifications'
}
await self.send_to_client(websocket, response)
elif message_type == 'get_status':
# Send current flight status
status = await self.get_flight_status_summary()
await self.send_to_client(websocket, status)
elif message_type == 'ping':
# Heartbeat/ping response
await self.send_to_client(websocket, {'type': 'pong'})
except json.JSONDecodeError:
await self.send_to_client(websocket, {
'type': 'error',
'message': 'Invalid JSON message'
})
except Exception as e:
await self.send_to_client(websocket, {
'type': 'error',
'message': f'Error processing message: {str(e)}'
})
async def get_flight_status_summary(self) -> Dict[str, Any]:
"""Get summary of all active flights."""
active_flights = {}
for mission_id, mission in flight_manager.active_missions.items():
telemetry = flight_manager.drone_telemetry.get(mission.drone_id)
completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed)
total_waypoints = len(mission.waypoints)
progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0
flight_info = {
'mission_id': mission_id,
'drone_id': mission.drone_id,
'state': mission.state.value,
'progress_percent': progress,
'waypoints_completed': completed_waypoints,
'waypoints_total': total_waypoints,
'created_at': mission.created_at
}
if telemetry:
flight_info.update({
'battery': telemetry.battery,
'lat': telemetry.lat,
'lon': telemetry.lon,
'alt': telemetry.alt,
'speed': telemetry.speed,
'mode': telemetry.mode
})
active_flights[mission_id] = flight_info
return {
'type': 'flight_status_summary',
'active_flights': active_flights,
'total_flights': len(active_flights),
'timestamp': asyncio.get_event_loop().time()
}
async def handle_client(self, websocket: WebSocketServerProtocol, path: str) -> None:
"""Handle WebSocket client connection."""
await self.register_client(websocket)
try:
async for message in websocket:
await self.handle_client_message(websocket, message)
except Exception as e:
print(f"❌ Client handler error: {e}")
finally:
await self.unregister_client(websocket)
async def start_server(self) -> None:
"""Start the WebSocket server."""
print(f"🚀 Starting WebSocket notification server on {self.host}:{self.port}")
self.server = await websockets.serve(
self.handle_client,
self.host,
self.port,
ping_interval=30, # Send ping every 30 seconds
ping_timeout=10, # Wait 10 seconds for pong
close_timeout=10 # Wait 10 seconds for close
)
print(f"✅ WebSocket server running on ws://{self.host}:{self.port}")
async def stop_server(self) -> None:
"""Stop the WebSocket server."""
if self.server:
self.server.close()
await self.server.wait_closed()
print("🛑 WebSocket server stopped")
async def broadcast_system_message(self, message: str, message_type: str = "system") -> None:
"""Broadcast a system message to all clients."""
notification = {
'type': message_type,
'message': message,
'timestamp': asyncio.get_event_loop().time()
}
await self.send_notification(notification)
# Global WebSocket server instance
websocket_server = FlightNotificationServer()
async def start_websocket_server():
"""Start the WebSocket server as a background task."""
await websocket_server.start_server()
async def stop_websocket_server():
"""Stop the WebSocket server."""
await websocket_server.stop_server()