Spaces:
Sleeping
Sleeping
| """Advanced Flight Management Agent with Real-time Telemetry and Mission Control.""" | |
| import asyncio | |
| import json | |
| import math | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional, Callable | |
| from enum import Enum | |
| import websockets | |
| from dataclasses import dataclass, asdict | |
| class FlightState(Enum): | |
| """Flight states for mission tracking.""" | |
| IDLE = "idle" | |
| CONFIRMED = "confirmed" | |
| ACTIVE = "active" | |
| PAUSED = "paused" | |
| COMPLETED = "completed" | |
| CANCELLED = "cancelled" | |
| EMERGENCY = "emergency" | |
| class DroneMode(Enum): | |
| """Drone flight modes.""" | |
| AUTO = "AUTO" | |
| LOITER = "LOITER" | |
| RTL = "RTL" | |
| GUIDED = "GUIDED" | |
| LAND = "LAND" | |
| class Waypoint: | |
| """Mission waypoint definition.""" | |
| id: int | |
| lat: float | |
| lon: float | |
| alt: float | |
| hold_time: float = 0.0 | |
| accept_radius: float = 2.0 | |
| completed: bool = False | |
| completion_time: Optional[str] = None | |
| class FlightMission: | |
| """Complete flight mission definition.""" | |
| mission_id: str | |
| drone_id: str | |
| waypoints: List[Waypoint] | |
| created_at: str | |
| confirmed_at: Optional[str] = None | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| state: FlightState = FlightState.IDLE | |
| current_waypoint_index: int = 0 | |
| total_distance: float = 0.0 | |
| estimated_duration: int = 0 # minutes | |
| pause_timer: Optional[threading.Timer] = None | |
| class TelemetryData: | |
| """Real-time drone telemetry.""" | |
| drone_id: str | |
| timestamp: str | |
| lat: float | |
| lon: float | |
| alt: float | |
| battery: int | |
| mode: str | |
| speed: float | |
| heading: float | |
| satellites: int | |
| voltage: float | |
| distance_to_waypoint: float | |
| current_waypoint: int | |
| mission_progress: float # 0-100% | |
| class FlightManager: | |
| """Advanced Flight Management Agent with AI reasoning and real-time control.""" | |
| def __init__(self): | |
| self.active_missions: Dict[str, FlightMission] = {} | |
| self.drone_telemetry: Dict[str, TelemetryData] = {} | |
| self.tracking_threads: Dict[str, threading.Thread] = {} | |
| self.websocket_clients: List[websockets.WebSocketServerProtocol] = [] | |
| self.notification_callbacks: List[Callable] = [] | |
| self.ai_reasoning_enabled = True | |
| # Flight safety parameters | |
| self.min_battery_level = 25 # % | |
| self.max_flight_time = 30 # minutes | |
| self.waypoint_completion_radius = 5.0 # meters | |
| async def confirm_flight(self, mission_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Confirm and start tracking a flight mission.""" | |
| try: | |
| mission_id = mission_data.get('mission_id') | |
| drone_id = mission_data.get('drone_id') | |
| waypoints_data = mission_data.get('waypoints', []) | |
| # Create waypoints | |
| waypoints = [] | |
| for i, wp_data in enumerate(waypoints_data): | |
| waypoint = Waypoint( | |
| id=i, | |
| lat=wp_data['lat'], | |
| lon=wp_data['lon'], | |
| alt=wp_data['alt'], | |
| hold_time=wp_data.get('hold_time', 0.0), | |
| accept_radius=wp_data.get('accept_radius', 2.0) | |
| ) | |
| waypoints.append(waypoint) | |
| # Create mission | |
| mission = FlightMission( | |
| mission_id=mission_id, | |
| drone_id=drone_id, | |
| waypoints=waypoints, | |
| created_at=datetime.now().isoformat(), | |
| confirmed_at=datetime.now().isoformat(), | |
| state=FlightState.CONFIRMED | |
| ) | |
| self.active_missions[mission_id] = mission | |
| # Start tracking thread | |
| tracking_thread = threading.Thread( | |
| target=self._track_mission, | |
| args=(mission_id,), | |
| daemon=True | |
| ) | |
| tracking_thread.start() | |
| self.tracking_threads[mission_id] = tracking_thread | |
| # Send initial notification | |
| await self._send_notification({ | |
| 'type': 'flight_confirmed', | |
| 'mission_id': mission_id, | |
| 'drone_id': drone_id, | |
| 'message': f'Flight mission {mission_id} confirmed and tracking started', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return { | |
| 'status': 'confirmed', | |
| 'tracking_id': mission_id, | |
| 'message': f'Flight mission {mission_id} confirmed and tracking started' | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f'Failed to confirm flight: {str(e)}' | |
| } | |
| async def pause_flight(self, mission_id: str, drone_id: str) -> Dict[str, Any]: | |
| """Pause an active flight mission.""" | |
| try: | |
| mission = self.active_missions.get(mission_id) | |
| if not mission: | |
| return {'status': 'error', 'message': 'Mission not found'} | |
| # Update mission state | |
| mission.state = FlightState.PAUSED | |
| # Send LOITER command to drone | |
| await self._send_drone_command(drone_id, { | |
| 'command': 'SET_MODE', | |
| 'mode': DroneMode.LOITER.value, | |
| 'reason': 'User pause request' | |
| }) | |
| # Calculate timeout based on battery and conditions | |
| current_telemetry = self.drone_telemetry.get(drone_id) | |
| timeout_seconds = self._calculate_pause_timeout(current_telemetry) | |
| # Start pause timer | |
| pause_timer = threading.Timer( | |
| timeout_seconds, | |
| self._handle_pause_timeout, | |
| args=(mission_id, drone_id) | |
| ) | |
| pause_timer.daemon = True | |
| pause_timer.start() | |
| # Store timer reference for cleanup | |
| mission.pause_timer = pause_timer | |
| await self._send_notification({ | |
| 'type': 'flight_paused', | |
| 'mission_id': mission_id, | |
| 'drone_id': drone_id, | |
| 'message': f'Flight paused. Drone in LOITER mode. Timeout: {timeout_seconds}s', | |
| 'timeout_seconds': timeout_seconds, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return { | |
| 'status': 'paused', | |
| 'timeout_seconds': timeout_seconds, | |
| 'message': f'Flight paused successfully. Timeout: {timeout_seconds}s' | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f'Failed to pause flight: {str(e)}' | |
| } | |
| async def cancel_flight(self, mission_id: str, drone_id: str) -> Dict[str, Any]: | |
| """Cancel an active flight mission.""" | |
| try: | |
| mission = self.active_missions.get(mission_id) | |
| if not mission: | |
| return {'status': 'error', 'message': 'Mission not found'} | |
| # Cancel any active pause timer | |
| if hasattr(mission, 'pause_timer') and mission.pause_timer: | |
| mission.pause_timer.cancel() | |
| # Update mission state | |
| mission.state = FlightState.CANCELLED | |
| mission.completed_at = datetime.now().isoformat() | |
| # Send RTL command to drone | |
| await self._send_drone_command(drone_id, { | |
| 'command': 'SET_MODE', | |
| 'mode': DroneMode.RTL.value, | |
| 'reason': 'Mission cancelled by user' | |
| }) | |
| # Stop tracking | |
| if mission_id in self.tracking_threads: | |
| # Note: Thread will check mission state and stop naturally | |
| pass | |
| await self._send_notification({ | |
| 'type': 'flight_cancelled', | |
| 'mission_id': mission_id, | |
| 'drone_id': drone_id, | |
| 'message': f'Flight cancelled. Drone returning to launch.', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return { | |
| 'status': 'cancelled', | |
| 'message': 'Flight cancelled successfully. Drone returning to launch.' | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f'Failed to cancel flight: {str(e)}' | |
| } | |
| async def update_mission(self, mission_id: str, new_waypoints: List[Dict], reason: str = "User request") -> Dict[str, Any]: | |
| """Dynamically update mission waypoints during flight.""" | |
| try: | |
| mission = self.active_missions.get(mission_id) | |
| if not mission: | |
| return {'status': 'error', 'message': 'Mission not found'} | |
| # Create new waypoints | |
| updated_waypoints = [] | |
| for i, wp_data in enumerate(new_waypoints): | |
| waypoint = Waypoint( | |
| id=len(mission.waypoints) + i, | |
| lat=wp_data['lat'], | |
| lon=wp_data['lon'], | |
| alt=wp_data['alt'], | |
| hold_time=wp_data.get('hold_time', 0.0), | |
| accept_radius=wp_data.get('accept_radius', 2.0) | |
| ) | |
| updated_waypoints.append(waypoint) | |
| # Safety check with AI reasoning | |
| safety_check = await self._ai_safety_check(mission.drone_id, updated_waypoints, reason) | |
| if not safety_check['safe']: | |
| return { | |
| 'status': 'unsafe', | |
| 'message': safety_check['reason'], | |
| 'recommendations': safety_check.get('recommendations', []) | |
| } | |
| # Update mission | |
| mission.waypoints.extend(updated_waypoints) | |
| # Upload new mission to drone | |
| await self._upload_mission_to_drone(mission.drone_id, updated_waypoints) | |
| # Cancel any active pause timer and resume AUTO mode if paused | |
| if hasattr(mission, 'pause_timer') and mission.pause_timer: | |
| mission.pause_timer.cancel() | |
| mission.pause_timer = None | |
| if mission.state == FlightState.PAUSED: | |
| await self._send_drone_command(mission.drone_id, { | |
| 'command': 'SET_MODE', | |
| 'mode': DroneMode.AUTO.value, | |
| 'reason': 'Mission updated, resuming' | |
| }) | |
| mission.state = FlightState.ACTIVE | |
| await self._send_notification({ | |
| 'type': 'mission_updated', | |
| 'mission_id': mission_id, | |
| 'drone_id': mission.drone_id, | |
| 'message': f'Mission updated with {len(updated_waypoints)} new waypoints. {reason}', | |
| 'new_waypoints_count': len(updated_waypoints), | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return { | |
| 'status': 'updated', | |
| 'message': f'Mission updated successfully with {len(updated_waypoints)} new waypoints', | |
| 'safety_reasoning': safety_check['reasoning'] | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f'Failed to update mission: {str(e)}' | |
| } | |
| def update_telemetry(self, telemetry_data: Dict[str, Any]) -> None: | |
| """Update drone telemetry data and trigger AI reasoning.""" | |
| try: | |
| drone_id = telemetry_data.get('drone_id') | |
| if not drone_id: | |
| return | |
| # Create telemetry object | |
| telemetry = TelemetryData( | |
| drone_id=drone_id, | |
| timestamp=datetime.now().isoformat(), | |
| lat=telemetry_data.get('latitude', 0.0), | |
| lon=telemetry_data.get('longitude', 0.0), | |
| alt=telemetry_data.get('altitude', 0.0), | |
| battery=telemetry_data.get('batteryRemaining', 0), | |
| mode=telemetry_data.get('mode', 'UNKNOWN'), | |
| speed=telemetry_data.get('groundSpeed', 0.0), | |
| heading=telemetry_data.get('heading', 0.0), | |
| satellites=telemetry_data.get('satellite', 0), | |
| voltage=telemetry_data.get('voltage', 0.0), | |
| distance_to_waypoint=telemetry_data.get('distance_to_waypoint', 0.0), | |
| current_waypoint=telemetry_data.get('current_waypoint', 0), | |
| mission_progress=telemetry_data.get('mission_progress', 0.0) | |
| ) | |
| # Store telemetry | |
| self.drone_telemetry[drone_id] = telemetry | |
| # Trigger AI reasoning and notifications | |
| asyncio.create_task(self._process_telemetry_with_ai(telemetry)) | |
| except Exception as e: | |
| print(f"Error updating telemetry: {e}") | |
| async def get_intelligent_status_report(self, drone_id: str, user_context: str = "") -> str: | |
| """Generate AI-powered status report with reasoning.""" | |
| try: | |
| telemetry = self.drone_telemetry.get(drone_id) | |
| if not telemetry: | |
| return "❌ No telemetry data available for drone status report." | |
| # Find active mission for this drone | |
| active_mission = None | |
| for mission in self.active_missions.values(): | |
| if mission.drone_id == drone_id and mission.state in [FlightState.ACTIVE, FlightState.PAUSED]: | |
| active_mission = mission | |
| break | |
| # Generate AI reasoning | |
| if self.ai_reasoning_enabled: | |
| return await self._generate_ai_status_report(telemetry, active_mission, user_context) | |
| else: | |
| return self._generate_basic_status_report(telemetry, active_mission) | |
| except Exception as e: | |
| return f"❌ Error generating status report: {str(e)}" | |
| def _track_mission(self, mission_id: str) -> None: | |
| """Background mission tracking thread.""" | |
| print(f"🎯 Started tracking mission: {mission_id}") | |
| # Create a new event loop for this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(self._mission_tracking_loop(mission_id)) | |
| finally: | |
| loop.close() | |
| async def _mission_tracking_loop(self, mission_id: str) -> None: | |
| """Async mission tracking loop.""" | |
| while True: | |
| try: | |
| mission = self.active_missions.get(mission_id) | |
| if not mission or mission.state in [FlightState.COMPLETED, FlightState.CANCELLED]: | |
| print(f"✅ Mission tracking ended: {mission_id}") | |
| break | |
| # Check for waypoint completion | |
| self._check_waypoint_completion(mission) | |
| # Check mission safety | |
| self._check_mission_safety(mission) | |
| await asyncio.sleep(2) # Check every 2 seconds | |
| except Exception as e: | |
| print(f"❌ Error in mission tracking: {e}") | |
| await asyncio.sleep(5) | |
| def _check_waypoint_completion(self, mission: FlightMission) -> None: | |
| """Check if current waypoint is completed.""" | |
| telemetry = self.drone_telemetry.get(mission.drone_id) | |
| if not telemetry: | |
| return | |
| current_wp_index = mission.current_waypoint_index | |
| if current_wp_index >= len(mission.waypoints): | |
| # Mission completed | |
| mission.state = FlightState.COMPLETED | |
| mission.completed_at = datetime.now().isoformat() | |
| # Schedule notification safely | |
| self._schedule_notification({ | |
| 'type': 'mission_completed', | |
| 'mission_id': mission.mission_id, | |
| 'drone_id': mission.drone_id, | |
| 'message': 'Mission completed successfully! All waypoints reached.', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| return | |
| current_waypoint = mission.waypoints[current_wp_index] | |
| if current_waypoint.completed: | |
| return | |
| # Calculate distance to waypoint | |
| distance = self._calculate_distance( | |
| telemetry.lat, telemetry.lon, | |
| current_waypoint.lat, current_waypoint.lon | |
| ) | |
| # Check if waypoint is reached | |
| if distance <= self.waypoint_completion_radius: | |
| current_waypoint.completed = True | |
| current_waypoint.completion_time = datetime.now().isoformat() | |
| mission.current_waypoint_index += 1 | |
| # Schedule waypoint notification safely | |
| self._schedule_waypoint_notification(mission, current_waypoint, telemetry) | |
| async def _send_waypoint_notification(self, mission: FlightMission, waypoint: Waypoint, telemetry: TelemetryData) -> None: | |
| """Send intelligent waypoint completion notification.""" | |
| # Generate AI reasoning for waypoint completion | |
| reasoning = await self._generate_waypoint_reasoning(mission, waypoint, telemetry) | |
| notification = { | |
| 'type': 'waypoint_completed', | |
| 'mission_id': mission.mission_id, | |
| 'drone_id': mission.drone_id, | |
| 'waypoint_id': waypoint.id, | |
| 'waypoint_position': {'lat': waypoint.lat, 'lon': waypoint.lon, 'alt': waypoint.alt}, | |
| 'completion_time': waypoint.completion_time, | |
| 'progress': f"{waypoint.id + 1}/{len(mission.waypoints)}", | |
| 'battery_remaining': telemetry.battery, | |
| 'reasoning': reasoning, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| await self._send_notification(notification) | |
| async def _generate_waypoint_reasoning(self, mission: FlightMission, waypoint: Waypoint, telemetry: TelemetryData) -> str: | |
| """Generate AI reasoning for waypoint completion.""" | |
| remaining_waypoints = len(mission.waypoints) - waypoint.id - 1 | |
| progress_percent = ((waypoint.id + 1) / len(mission.waypoints)) * 100 | |
| # Estimate time to completion | |
| avg_speed = max(telemetry.speed, 5.0) # Minimum 5 m/s assumption | |
| remaining_distance = self._estimate_remaining_distance(mission, waypoint.id + 1) | |
| estimated_time_minutes = (remaining_distance / avg_speed) / 60 | |
| if remaining_waypoints == 0: | |
| return f"🎉 Mission completed! Drone has successfully reached all {len(mission.waypoints)} waypoints. Battery remaining: {telemetry.battery}%. Ready for return to launch." | |
| elif telemetry.battery < self.min_battery_level: | |
| return f"⚠️ Waypoint {waypoint.id + 1} reached, but battery critically low at {telemetry.battery}%. Recommend immediate return to launch. {remaining_waypoints} waypoints remaining." | |
| elif remaining_waypoints <= 2: | |
| return f"✅ Waypoint {waypoint.id + 1} completed! Almost done - only {remaining_waypoints} waypoints left. Progress: {progress_percent:.1f}%. Battery: {telemetry.battery}%. Estimated completion: {estimated_time_minutes:.1f} minutes." | |
| else: | |
| return f"✅ Waypoint {waypoint.id + 1} completed successfully! Progress: {progress_percent:.1f}% ({remaining_waypoints} waypoints remaining). Battery: {telemetry.battery}%. On track for completion in ~{estimated_time_minutes:.1f} minutes." | |
| def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: | |
| """Calculate distance between two GPS coordinates in meters.""" | |
| # Haversine formula | |
| R = 6371000 # Earth's radius in meters | |
| lat1_rad = math.radians(lat1) | |
| lat2_rad = math.radians(lat2) | |
| delta_lat = math.radians(lat2 - lat1) | |
| delta_lon = math.radians(lon2 - lon1) | |
| a = (math.sin(delta_lat / 2) ** 2 + | |
| math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2) | |
| c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
| return R * c | |
| def _estimate_remaining_distance(self, mission: FlightMission, start_waypoint_index: int) -> float: | |
| """Estimate remaining mission distance.""" | |
| if start_waypoint_index >= len(mission.waypoints): | |
| return 0.0 | |
| total_distance = 0.0 | |
| for i in range(start_waypoint_index, len(mission.waypoints) - 1): | |
| wp1 = mission.waypoints[i] | |
| wp2 = mission.waypoints[i + 1] | |
| total_distance += self._calculate_distance(wp1.lat, wp1.lon, wp2.lat, wp2.lon) | |
| return total_distance | |
| async def _send_notification(self, notification: Dict[str, Any]) -> None: | |
| """Send notification to all connected clients.""" | |
| try: | |
| message = json.dumps(notification) | |
| # Send to WebSocket clients | |
| if self.websocket_clients: | |
| disconnected_clients = [] | |
| for client in self.websocket_clients: | |
| try: | |
| await client.send(message) | |
| except: | |
| disconnected_clients.append(client) | |
| # Remove disconnected clients | |
| for client in disconnected_clients: | |
| self.websocket_clients.remove(client) | |
| # Call notification callbacks | |
| for callback in self.notification_callbacks: | |
| try: | |
| await callback(notification) | |
| except Exception as e: | |
| print(f"Notification callback error: {e}") | |
| print(f"📢 Notification sent: {notification['type']}") | |
| except Exception as e: | |
| print(f"❌ Error sending notification: {e}") | |
| async def _send_drone_command(self, drone_id: str, command: Dict[str, Any]) -> bool: | |
| """Send command to drone (integrate with MAVLink or drone API).""" | |
| try: | |
| # TODO: Integrate with actual drone API/MAVLink | |
| print(f"🚁 Sending command to drone {drone_id}: {command}") | |
| # Simulate command sending | |
| await asyncio.sleep(0.1) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error sending drone command: {e}") | |
| return False | |
| async def _upload_mission_to_drone(self, drone_id: str, waypoints: List[Waypoint]) -> bool: | |
| """Upload mission waypoints to drone.""" | |
| try: | |
| # TODO: Integrate with actual drone mission upload | |
| print(f"📤 Uploading {len(waypoints)} waypoints to drone {drone_id}") | |
| # Simulate mission upload | |
| await asyncio.sleep(0.5) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error uploading mission: {e}") | |
| return False | |
| def _calculate_pause_timeout(self, telemetry: Optional[TelemetryData]) -> int: | |
| """Calculate appropriate pause timeout based on drone conditions.""" | |
| if not telemetry: | |
| return 300 # Default 5 minutes | |
| # Calculate based on battery level | |
| if telemetry.battery > 70: | |
| return 600 # 10 minutes | |
| elif telemetry.battery > 40: | |
| return 300 # 5 minutes | |
| elif telemetry.battery > 25: | |
| return 120 # 2 minutes | |
| else: | |
| return 60 # 1 minute - critical battery | |
| def _handle_pause_timeout(self, mission_id: str, drone_id: str) -> None: | |
| """Handle pause timeout - automatically return to launch.""" | |
| # Create a new event loop for this timeout operation | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(self._execute_pause_timeout(mission_id, drone_id)) | |
| finally: | |
| loop.close() | |
| async def _execute_pause_timeout(self, mission_id: str, drone_id: str) -> None: | |
| """Execute pause timeout actions.""" | |
| mission = self.active_missions.get(mission_id) | |
| if not mission or mission.state != FlightState.PAUSED: | |
| return | |
| print(f"⏰ Pause timeout executing for mission {mission_id}") | |
| # Send RTL command | |
| await self._send_drone_command(drone_id, { | |
| 'command': 'SET_MODE', | |
| 'mode': DroneMode.RTL.value, | |
| 'reason': 'Pause timeout exceeded' | |
| }) | |
| mission.state = FlightState.CANCELLED | |
| mission.completed_at = datetime.now().isoformat() | |
| await self._send_notification({ | |
| 'type': 'pause_timeout', | |
| 'mission_id': mission_id, | |
| 'drone_id': drone_id, | |
| 'message': 'Pause timeout exceeded. Drone returning to launch automatically.', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| async def _ai_safety_check(self, drone_id: str, waypoints: List[Waypoint], reason: str) -> Dict[str, Any]: | |
| """AI-powered safety check for mission updates.""" | |
| telemetry = self.drone_telemetry.get(drone_id) | |
| if not telemetry: | |
| return { | |
| 'safe': False, | |
| 'reason': 'No telemetry data available for safety check', | |
| 'reasoning': 'Cannot verify drone status without telemetry data' | |
| } | |
| # Check battery level | |
| if telemetry.battery < self.min_battery_level: | |
| return { | |
| 'safe': False, | |
| 'reason': f'Battery too low: {telemetry.battery}%', | |
| 'reasoning': f'Current battery level {telemetry.battery}% is below minimum safe level {self.min_battery_level}%', | |
| 'recommendations': ['Return to launch immediately', 'Charge battery before continuing mission'] | |
| } | |
| # Check waypoint distances | |
| total_distance = 0.0 | |
| if waypoints: | |
| # Distance from current position to first new waypoint | |
| total_distance += self._calculate_distance( | |
| telemetry.lat, telemetry.lon, | |
| waypoints[0].lat, waypoints[0].lon | |
| ) | |
| # Distance between waypoints | |
| for i in range(len(waypoints) - 1): | |
| total_distance += self._calculate_distance( | |
| waypoints[i].lat, waypoints[i].lon, | |
| waypoints[i + 1].lat, waypoints[i + 1].lon | |
| ) | |
| # Estimate flight time and battery usage | |
| avg_speed = max(telemetry.speed, 8.0) # m/s | |
| estimated_time_minutes = (total_distance / avg_speed) / 60 | |
| estimated_battery_usage = estimated_time_minutes * 2 # ~2% per minute assumption | |
| if telemetry.battery - estimated_battery_usage < self.min_battery_level: | |
| return { | |
| 'safe': False, | |
| 'reason': f'Insufficient battery for mission: {estimated_battery_usage:.1f}% needed, {telemetry.battery - self.min_battery_level}% available', | |
| 'reasoning': f'Mission requires ~{estimated_time_minutes:.1f} minutes ({estimated_battery_usage:.1f}% battery). Current: {telemetry.battery}%, minimum reserve: {self.min_battery_level}%', | |
| 'recommendations': ['Reduce mission distance', 'Return to launch and recharge', 'Split mission into multiple flights'] | |
| } | |
| return { | |
| 'safe': True, | |
| 'reason': 'Mission update passed safety checks', | |
| 'reasoning': f'Battery sufficient ({telemetry.battery}% > {estimated_battery_usage:.1f}% needed), distance reasonable ({total_distance:.0f}m), estimated time {estimated_time_minutes:.1f} minutes' | |
| } | |
| async def _process_telemetry_with_ai(self, telemetry: TelemetryData) -> None: | |
| """Process telemetry data with AI reasoning for proactive notifications.""" | |
| try: | |
| # Check for critical conditions | |
| critical_notifications = [] | |
| # Battery warnings (only if battery data is available) | |
| if telemetry.battery is not None: | |
| if telemetry.battery <= 20: | |
| critical_notifications.append({ | |
| 'type': 'critical_battery', | |
| 'severity': 'high', | |
| 'message': f'Critical battery warning: {telemetry.battery}% remaining. Immediate action required.', | |
| 'recommendation': 'Return to launch immediately' | |
| }) | |
| elif telemetry.battery <= 30: | |
| critical_notifications.append({ | |
| 'type': 'low_battery', | |
| 'severity': 'medium', | |
| 'message': f'Low battery warning: {telemetry.battery}% remaining. Consider returning soon.', | |
| 'recommendation': 'Plan return to launch' | |
| }) | |
| # GPS quality warnings | |
| if telemetry.satellites is not None and telemetry.satellites < 6: | |
| critical_notifications.append({ | |
| 'type': 'poor_gps', | |
| 'severity': 'medium', | |
| 'message': f'Poor GPS signal: Only {telemetry.satellites} satellites. Navigation accuracy reduced.', | |
| 'recommendation': 'Monitor position closely, consider pausing mission' | |
| }) | |
| # Send critical notifications | |
| for notification in critical_notifications: | |
| notification.update({ | |
| 'drone_id': telemetry.drone_id, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| await self._send_notification(notification) | |
| except Exception as e: | |
| print(f"❌ Error processing telemetry with AI: {e}") | |
| async def _generate_ai_status_report(self, telemetry: TelemetryData, mission: Optional[FlightMission], user_context: str) -> str: | |
| """Generate AI-powered intelligent status report.""" | |
| try: | |
| # Import Gemini AI for intelligent reasoning | |
| from .ai import Gemini2FlashAI | |
| ai = Gemini2FlashAI() | |
| if not ai.enabled: | |
| return self._generate_basic_status_report(telemetry, mission) | |
| # Prepare context for AI | |
| mission_context = "" | |
| if mission: | |
| completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed) | |
| total_waypoints = len(mission.waypoints) | |
| progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0 | |
| mission_context = f""" | |
| ACTIVE MISSION: | |
| - Mission ID: {mission.mission_id} | |
| - State: {mission.state.value} | |
| - Progress: {completed_waypoints}/{total_waypoints} waypoints ({progress:.1f}%) | |
| - Current waypoint: {mission.current_waypoint_index} | |
| - Mission type: Based on waypoint pattern""" | |
| prompt = f"""You are DroneBot, providing an intelligent drone status report. | |
| Analyze the telemetry data and provide insights, not just raw numbers. | |
| DRONE TELEMETRY: | |
| - Battery: {telemetry.battery}% ({telemetry.voltage:.1f}V) | |
| - Position: {telemetry.lat:.6f}, {telemetry.lon:.6f} at {telemetry.alt:.1f}m | |
| - Speed: {telemetry.speed:.1f} m/s, Heading: {telemetry.heading:.0f}° | |
| - GPS: {telemetry.satellites} satellites | |
| - Mode: {telemetry.mode} | |
| - Distance to waypoint: {telemetry.distance_to_waypoint:.1f}m | |
| {mission_context} | |
| USER CONTEXT: {user_context} | |
| Provide a concise, insightful status report that: | |
| - Analyzes the drone's condition and performance | |
| - Explains what the drone is currently doing | |
| - Identifies any concerns or recommendations | |
| - Shows understanding of the mission progress | |
| - Is conversational and easy to understand | |
| Focus on insights, not just data. Be helpful and proactive.""" | |
| try: | |
| api_key = ai.get_current_api_key() | |
| headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} | |
| payload = { | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.6, | |
| "topK": 40, | |
| "topP": 0.95, | |
| "maxOutputTokens": 1024 | |
| } | |
| } | |
| import aiohttp | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(ai.api_endpoint, json=payload, headers=headers) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| ai_report = result['candidates'][0]['content']['parts'][0]['text'] | |
| return f"🧠 **AI Status Analysis:**\n\n{ai_report}" | |
| except Exception as e: | |
| print(f"AI status report failed: {e}") | |
| # Fallback to basic report | |
| return self._generate_basic_status_report(telemetry, mission) | |
| except Exception as e: | |
| return f"❌ Error generating AI status report: {str(e)}" | |
| def _generate_basic_status_report(self, telemetry: TelemetryData, mission: Optional[FlightMission]) -> str: | |
| """Generate basic status report without AI.""" | |
| status_parts = [] | |
| status_parts.append("🚁 **Drone Status Report**") | |
| status_parts.append("") | |
| # Battery status with emoji | |
| battery_emoji = "🔋" if telemetry.battery > 50 else "🪫" if telemetry.battery > 25 else "🔴" | |
| status_parts.append(f"{battery_emoji} **Battery:** {telemetry.battery}% ({telemetry.voltage:.1f}V)") | |
| # Location and altitude | |
| status_parts.append(f"📍 **Position:** {telemetry.lat:.6f}, {telemetry.lon:.6f}") | |
| status_parts.append(f"🛩️ **Altitude:** {telemetry.alt:.1f}m") | |
| status_parts.append(f"🧭 **Heading:** {telemetry.heading:.0f}° at {telemetry.speed:.1f} m/s") | |
| # GPS and mode | |
| gps_emoji = "📡" if telemetry.satellites >= 8 else "📶" if telemetry.satellites >= 6 else "⚠️" | |
| status_parts.append(f"{gps_emoji} **GPS:** {telemetry.satellites} satellites") | |
| status_parts.append(f"🎯 **Mode:** {telemetry.mode}") | |
| # Mission progress | |
| if mission: | |
| completed = sum(1 for wp in mission.waypoints if wp.completed) | |
| total = len(mission.waypoints) | |
| progress = (completed / total) * 100 if total > 0 else 0 | |
| status_parts.append("") | |
| status_parts.append(f"🎯 **Mission:** {mission.state.value.title()}") | |
| status_parts.append(f"📊 **Progress:** {completed}/{total} waypoints ({progress:.1f}%)") | |
| if telemetry.distance_to_waypoint > 0: | |
| status_parts.append(f"🎯 **Next waypoint:** {telemetry.distance_to_waypoint:.1f}m away") | |
| return "\n".join(status_parts) | |
| def _schedule_notification(self, notification: Dict[str, Any]) -> None: | |
| """Schedule notification safely from synchronous context.""" | |
| try: | |
| # Try to get running loop | |
| loop = asyncio.get_running_loop() | |
| # Create task in the running loop | |
| loop.create_task(self._send_notification(notification)) | |
| except RuntimeError: | |
| # No running loop, schedule for later | |
| print(f"📢 Notification queued: {notification['type']}") | |
| def _schedule_waypoint_notification(self, mission: FlightMission, waypoint: Waypoint, telemetry: TelemetryData) -> None: | |
| """Schedule waypoint notification safely from synchronous context.""" | |
| try: | |
| loop = asyncio.get_running_loop() | |
| loop.create_task(self._send_waypoint_notification(mission, waypoint, telemetry)) | |
| except RuntimeError: | |
| print(f"📢 Waypoint notification queued: {waypoint.id}") | |
| def _check_mission_safety(self, mission: FlightMission) -> None: | |
| """Check mission safety conditions.""" | |
| telemetry = self.drone_telemetry.get(mission.drone_id) | |
| if not telemetry: | |
| return | |
| # Check for emergency conditions | |
| emergency_conditions = [] | |
| if telemetry.battery is not None and telemetry.battery <= 15: | |
| emergency_conditions.append("Critical battery level") | |
| if telemetry.satellites is not None and telemetry.satellites < 4: | |
| emergency_conditions.append("GPS signal lost") | |
| if telemetry.voltage is not None and telemetry.voltage < 10.0: | |
| emergency_conditions.append("Critical voltage level") | |
| if emergency_conditions: | |
| mission.state = FlightState.EMERGENCY | |
| self._schedule_emergency_handling(mission, emergency_conditions) | |
| def _schedule_emergency_handling(self, mission: FlightMission, conditions: List[str]) -> None: | |
| """Schedule emergency handling safely from synchronous context.""" | |
| try: | |
| loop = asyncio.get_running_loop() | |
| loop.create_task(self._handle_emergency(mission, conditions)) | |
| except RuntimeError: | |
| print(f"🚨 Emergency queued: {', '.join(conditions)}") | |
| async def _handle_emergency(self, mission: FlightMission, conditions: List[str]) -> None: | |
| """Handle emergency conditions.""" | |
| # Send emergency RTL command | |
| await self._send_drone_command(mission.drone_id, { | |
| 'command': 'SET_MODE', | |
| 'mode': DroneMode.RTL.value, | |
| 'reason': f'Emergency: {", ".join(conditions)}' | |
| }) | |
| # Send emergency notification | |
| await self._send_notification({ | |
| 'type': 'emergency', | |
| 'mission_id': mission.mission_id, | |
| 'drone_id': mission.drone_id, | |
| 'conditions': conditions, | |
| 'message': f'EMERGENCY: {", ".join(conditions)}. Drone commanded to return immediately.', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| # Global flight manager instance | |
| flight_manager = FlightManager() | |