File size: 7,901 Bytes
8579cdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""WebSocket server for real-time flight notifications."""
import asyncio
import json
import logging
import websockets
from typing import Set, Dict, Any
from websockets import WebSocketServerProtocol

from .flight_manager import flight_manager


class FlightNotificationServer:
    """WebSocket server for real-time flight notifications."""
    
    def __init__(self, host: str = "localhost", port: int = 8001):
        self.host = host
        self.port = port
        self.clients: Set[WebSocketServerProtocol] = set()
        self.server = None
        
        # Register with flight manager for notifications
        flight_manager.notification_callbacks.append(self.send_notification)
        
    async def register_client(self, websocket: WebSocketServerProtocol) -> None:
        """Register a new WebSocket client."""
        self.clients.add(websocket)
        print(f"📱 Client connected. Total clients: {len(self.clients)}")
        
        # Send welcome message with current flight status
        await self.send_to_client(websocket, {
            'type': 'welcome',
            'message': 'Connected to DroneAgent flight notifications',
            'active_flights': len(flight_manager.active_missions),
            'timestamp': asyncio.get_event_loop().time()
        })
    
    async def unregister_client(self, websocket: WebSocketServerProtocol) -> None:
        """Unregister a WebSocket client."""
        self.clients.discard(websocket)
        print(f"📱 Client disconnected. Total clients: {len(self.clients)}")
    
    async def send_to_client(self, websocket: WebSocketServerProtocol, message: Dict[str, Any]) -> None:
        """Send message to a specific client."""
        try:
            await websocket.send(json.dumps(message))
        except Exception as e:
            print(f"❌ Error sending to client: {e}")
            await self.unregister_client(websocket)
    
    async def send_notification(self, notification: Dict[str, Any]) -> None:
        """Send notification to all connected clients."""
        if not self.clients:
            return
        
        # Add notification metadata
        notification['server_timestamp'] = asyncio.get_event_loop().time()
        
        disconnected_clients = set()
        for client in self.clients:
            try:
                await client.send(json.dumps(notification))
            except Exception as e:
                print(f"❌ Error sending notification to client: {e}")
                disconnected_clients.add(client)
        
        # Remove disconnected clients
        for client in disconnected_clients:
            await self.unregister_client(client)
        
        print(f"📢 Sent {notification['type']} notification to {len(self.clients)} clients")
    
    async def handle_client_message(self, websocket: WebSocketServerProtocol, message: str) -> None:
        """Handle incoming messages from clients."""
        try:
            data = json.loads(message)
            message_type = data.get('type')
            
            if message_type == 'subscribe_flight':
                # Subscribe to specific flight notifications
                flight_id = data.get('flight_id')
                response = {
                    'type': 'subscription_confirmed',
                    'flight_id': flight_id,
                    'message': f'Subscribed to flight {flight_id} notifications'
                }
                await self.send_to_client(websocket, response)
                
            elif message_type == 'get_status':
                # Send current flight status
                status = await self.get_flight_status_summary()
                await self.send_to_client(websocket, status)
                
            elif message_type == 'ping':
                # Heartbeat/ping response
                await self.send_to_client(websocket, {'type': 'pong'})
                
        except json.JSONDecodeError:
            await self.send_to_client(websocket, {
                'type': 'error',
                'message': 'Invalid JSON message'
            })
        except Exception as e:
            await self.send_to_client(websocket, {
                'type': 'error',
                'message': f'Error processing message: {str(e)}'
            })
    
    async def get_flight_status_summary(self) -> Dict[str, Any]:
        """Get summary of all active flights."""
        active_flights = {}
        for mission_id, mission in flight_manager.active_missions.items():
            telemetry = flight_manager.drone_telemetry.get(mission.drone_id)
            
            completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed)
            total_waypoints = len(mission.waypoints)
            progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0
            
            flight_info = {
                'mission_id': mission_id,
                'drone_id': mission.drone_id,
                'state': mission.state.value,
                'progress_percent': progress,
                'waypoints_completed': completed_waypoints,
                'waypoints_total': total_waypoints,
                'created_at': mission.created_at
            }
            
            if telemetry:
                flight_info.update({
                    'battery': telemetry.battery,
                    'lat': telemetry.lat,
                    'lon': telemetry.lon,
                    'alt': telemetry.alt,
                    'speed': telemetry.speed,
                    'mode': telemetry.mode
                })
            
            active_flights[mission_id] = flight_info
        
        return {
            'type': 'flight_status_summary',
            'active_flights': active_flights,
            'total_flights': len(active_flights),
            'timestamp': asyncio.get_event_loop().time()
        }
    
    async def handle_client(self, websocket: WebSocketServerProtocol, path: str) -> None:
        """Handle WebSocket client connection."""
        await self.register_client(websocket)
        try:
            async for message in websocket:
                await self.handle_client_message(websocket, message)
        except Exception as e:
            print(f"❌ Client handler error: {e}")
        finally:
            await self.unregister_client(websocket)
    
    async def start_server(self) -> None:
        """Start the WebSocket server."""
        print(f"🚀 Starting WebSocket notification server on {self.host}:{self.port}")
        
        self.server = await websockets.serve(
            self.handle_client,
            self.host,
            self.port,
            ping_interval=30,  # Send ping every 30 seconds
            ping_timeout=10,   # Wait 10 seconds for pong
            close_timeout=10   # Wait 10 seconds for close
        )
        
        print(f"✅ WebSocket server running on ws://{self.host}:{self.port}")
    
    async def stop_server(self) -> None:
        """Stop the WebSocket server."""
        if self.server:
            self.server.close()
            await self.server.wait_closed()
            print("🛑 WebSocket server stopped")
    
    async def broadcast_system_message(self, message: str, message_type: str = "system") -> None:
        """Broadcast a system message to all clients."""
        notification = {
            'type': message_type,
            'message': message,
            'timestamp': asyncio.get_event_loop().time()
        }
        await self.send_notification(notification)


# Global WebSocket server instance
websocket_server = FlightNotificationServer()


async def start_websocket_server():
    """Start the WebSocket server as a background task."""
    await websocket_server.start_server()


async def stop_websocket_server():
    """Stop the WebSocket server."""
    await websocket_server.stop_server()