"""AI integration for mission analysis.""" import os import json import re import aiohttp from typing import Dict, List, Any, Optional from .knowledge_loader import KnowledgeBaseLoader from .ai_personality import AIPersonality, ConversationState, UserExperience from .geocoding import GeocodingService class Gemini2FlashAI: """Gemini 2.0 Flash AI integration with fallback.""" def __init__(self): api_keys_str = os.getenv('GEMINI_API_KEYS', '') self.api_keys = [key.strip() for key in api_keys_str.split(',') if key.strip()] self.enabled = bool(self.api_keys) self.api_endpoint = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent" self.current_key_index = 0 # Enhanced API key rotation system self.key_status = {} # Track status of each key self.key_last_used = {} # Track last usage time for each key self.key_cooldown_until = {} # Track cooldown periods for rate limited keys self.initialize_key_tracking() # Load knowledge base for enhanced context self.knowledge_loader = KnowledgeBaseLoader() self.knowledge_base = self.knowledge_loader.load_complete_knowledge_base() # Initialize AI personality for user guidance self.personality = AIPersonality() # Initialize geocoding service (reads GOONG_API_KEY from env) self.geocoding = GeocodingService() # Note: chat_history will be provided by mission_planner context def initialize_key_tracking(self): """Initialize tracking for all API keys.""" from datetime import datetime current_time = datetime.now() for i, key in enumerate(self.api_keys): self.key_status[key] = 'available' # available, rate_limited, cooldown self.key_last_used[key] = current_time self.key_cooldown_until[key] = None def get_next_available_key(self) -> str: """Get the next available API key with intelligent rotation.""" if not self.api_keys: return "" from datetime import datetime, timedelta current_time = datetime.now() # Try each key to find one that's available for attempt in range(len(self.api_keys)): candidate_key = self.api_keys[self.current_key_index] self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) # Check if key is available if self.key_status[candidate_key] == 'available': self.key_last_used[candidate_key] = current_time return candidate_key # Check if rate limited key is ready for retry (after cooldown) elif self.key_status[candidate_key] == 'rate_limited': if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]: print(f"πŸ”„ API key {candidate_key[:20]}... cooldown period ended, retrying") self.key_status[candidate_key] = 'available' self.key_cooldown_until[candidate_key] = None self.key_last_used[candidate_key] = current_time return candidate_key # Check if key is in cooldown elif self.key_status[candidate_key] == 'cooldown': if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]: print(f"πŸ”„ API key {candidate_key[:20]}... cooldown period ended, making available") self.key_status[candidate_key] = 'available' self.key_last_used[candidate_key] = current_time return candidate_key # If all keys are unavailable, return the first one anyway (better than no response) print(f"⚠️ All API keys are rate limited, using first key as fallback") return self.api_keys[0] if self.api_keys else "" def mark_key_rate_limited(self, api_key: str): """Mark an API key as rate limited and set cooldown.""" from datetime import datetime, timedelta if api_key in self.key_status: self.key_status[api_key] = 'rate_limited' # Set cooldown for 60 seconds (adjust as needed based on API limits) self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=60) print(f"🚫 API key {api_key[:20]}... marked as rate limited, cooldown until {self.key_cooldown_until[api_key]}") def mark_key_error(self, api_key: str): """Mark an API key as having an error and set shorter cooldown.""" from datetime import datetime, timedelta if api_key in self.key_status: self.key_status[api_key] = 'cooldown' # Set shorter cooldown for general errors (30 seconds) self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=30) print(f"⚠️ API key {api_key[:20]}... marked for cooldown until {self.key_cooldown_until[api_key]}") def mark_key_success(self, api_key: str): """Mark an API key as successful and available.""" if api_key in self.key_status: self.key_status[api_key] = 'available' self.key_cooldown_until[api_key] = None print(f"βœ… API key {api_key[:20]}... marked as available after successful use") def get_current_api_key(self) -> str: """Get API key with intelligent rotation and fallback.""" return self.get_next_available_key() async def analyze_conversation(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: """🧠 GEMINI-FIRST CONVERSATION ANALYSIS: Intelligent, personalized responses.""" # Always use Gemini for conversation analysis if available if self.enabled: return await self._gemini_conversation_analysis(user_message, chat_history) else: # Fallback to local personality system return await self._local_conversation_analysis(user_message, chat_history) async def _gemini_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: """🧠 Gemini-powered conversation analysis for intelligent responses.""" # Determine user experience and conversation state using local tools user_experience = self.personality.analyze_user_experience(user_message, chat_history) conversation_state = self.personality.determine_conversation_state(user_message, chat_history) # Check if this is a mission request or needs guidance if conversation_state in [ConversationState.READY_TO_GENERATE]: return await self.analyze_mission_request(user_message) elif conversation_state == ConversationState.DRONE_STATUS_REQUEST: # Handle drone status requests return await self._handle_drone_status_request() elif conversation_state == ConversationState.GENERAL_QUERY: # Use Gemini for intelligent general query responses return await self._gemini_general_response(user_message, chat_history, user_experience) else: # Use Gemini for intelligent guidance responses return await self._gemini_guidance_response(user_message, chat_history, conversation_state, user_experience) async def _local_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: """Local fallback conversation analysis.""" # Determine user experience and conversation state user_experience = self.personality.analyze_user_experience(user_message, chat_history) conversation_state = self.personality.determine_conversation_state(user_message, chat_history) # Check if this is a mission request or needs guidance if conversation_state in [ConversationState.READY_TO_GENERATE]: return await self.analyze_mission_request(user_message) elif conversation_state == ConversationState.DRONE_STATUS_REQUEST: # Handle drone status requests return await self._handle_drone_status_request() elif conversation_state == ConversationState.GENERAL_QUERY: # Handle general queries guidance_response = self.personality.get_guidance_response( conversation_state, user_experience, {}, chat_history ) return { 'is_guidance': True, 'conversation_state': conversation_state.value, 'user_experience': user_experience.value, 'guidance_response': guidance_response, 'needs_mission_generation': False, 'is_general_query': True } else: # Provide user guidance context = { 'location': self._extract_location_from_history(chat_history), 'mission_type': self._extract_mission_type_from_history(chat_history) } guidance_response = self.personality.get_guidance_response( conversation_state, user_experience, context, chat_history ) return { 'is_guidance': True, 'conversation_state': conversation_state.value, 'user_experience': user_experience.value, 'guidance_response': guidance_response, 'needs_mission_generation': False } async def _gemini_general_response(self, user_message: str, chat_history: List[Dict], user_experience) -> Dict[str, Any]: """🧠 Gemini-powered general query responses with flight awareness.""" # Check for active flights to provide flight-aware responses flight_context = await self._get_flight_context() # Prepare chat context recent_context = "" if chat_history: recent_messages = chat_history[-3:] context_parts = [] for msg in recent_messages: if 'user' in msg: context_parts.append(f"User: {msg['user']}") if 'ai' in msg: context_parts.append(f"Assistant: {msg['ai'][:100]}...") recent_context = "\n".join(context_parts) prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning. The user is asking a general question or greeting. Respond naturally and helpfully. USER EXPERIENCE LEVEL: {user_experience.value} USER MESSAGE: "{user_message}" RECENT CHAT CONTEXT: {recent_context} {flight_context} Provide a friendly, personalized response that: - Shows you understand their message - If there are active flights, proactively offer flight management options - Maintains a conversational, friendly tone - Offers helpful guidance about drone missions - During active flights, suggest options like: "Do you want to check flight status?", "Pause the current mission?", "Update the flight plan?", "Have the drone return home?" - For greetings, include current drone status if available Keep the response conversational and engaging, around 100-200 words.""" try: api_key = self.get_current_api_key() headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.7, # More creative for conversation "topK": 40, "topP": 0.95, "maxOutputTokens": 1024 } } async with aiohttp.ClientSession() as session: async with session.post(self.api_endpoint, json=payload, headers=headers) as response: if response.status == 200: result = await response.json() gemini_response = result['candidates'][0]['content']['parts'][0]['text'] return { 'is_guidance': True, 'conversation_state': 'general_query', 'user_experience': user_experience.value, 'guidance_response': gemini_response, 'needs_mission_generation': False, 'is_general_query': True, 'gemini_powered': True } except Exception as e: print(f"🚨 Gemini general response failed: {e}") # Fallback to local response guidance_response = self.personality.get_guidance_response( ConversationState.GENERAL_QUERY, user_experience, {}, chat_history ) return { 'is_guidance': True, 'conversation_state': 'general_query', 'user_experience': user_experience.value, 'guidance_response': guidance_response, 'needs_mission_generation': False, 'is_general_query': True } async def _gemini_guidance_response(self, user_message: str, chat_history: List[Dict], conversation_state, user_experience) -> Dict[str, Any]: """🧠 Gemini-powered guidance responses.""" # Extract context location = self._extract_location_from_history(chat_history) mission_type = self._extract_mission_type_from_history(chat_history) # Prepare chat context recent_context = "" if chat_history: recent_messages = chat_history[-2:] context_parts = [] for msg in recent_messages: if 'user' in msg: context_parts.append(f"User: {msg['user']}") recent_context = "\n".join(context_parts) prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning. The user needs guidance to complete their drone mission request. USER MESSAGE: "{user_message}" CONVERSATION STATE: {conversation_state.value} USER EXPERIENCE: {user_experience.value} KNOWN LOCATION: {location} KNOWN MISSION TYPE: {mission_type} RECENT CONTEXT: {recent_context} Based on the conversation state, provide helpful, personalized guidance: - If they need location: Ask for coordinates or location description with examples - If they need mission type: Explain mission types and ask what they want to accomplish - If they need details: Ask for specific requirements like altitude, area size, etc. - Be encouraging and educational for beginners - Be concise and technical for experts Provide a friendly, helpful response that guides them to the next step.""" try: api_key = self.get_current_api_key() headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.6, "topK": 40, "topP": 0.95, "maxOutputTokens": 1024 } } async with aiohttp.ClientSession() as session: async with session.post(self.api_endpoint, json=payload, headers=headers) as response: if response.status == 200: result = await response.json() gemini_response = result['candidates'][0]['content']['parts'][0]['text'] return { 'is_guidance': True, 'conversation_state': conversation_state.value, 'user_experience': user_experience.value, 'guidance_response': gemini_response, 'needs_mission_generation': False, 'gemini_powered': True } except Exception as e: print(f"🚨 Gemini guidance response failed: {e}") # Fallback to local response context = {'location': location, 'mission_type': mission_type} guidance_response = self.personality.get_guidance_response( conversation_state, user_experience, context, chat_history ) return { 'is_guidance': True, 'conversation_state': conversation_state.value, 'user_experience': user_experience.value, 'guidance_response': guidance_response, 'needs_mission_generation': False } async def _handle_drone_status_request(self) -> Dict[str, Any]: """Handle drone status requests by calling the drone status endpoint.""" try: from ..core.drone_status import drone_status current_status = drone_status.get_current_status() constraints = drone_status.get_flight_constraints() # Format the drone status response status_message = self._format_drone_status_response(current_status, constraints) return { 'is_guidance': False, 'is_drone_status': True, 'conversation_state': 'drone_status_request', 'user_experience': 'unknown', 'drone_status_response': status_message, 'needs_mission_generation': False, 'drone_status': current_status, 'flight_constraints': constraints } except Exception as e: return { 'is_guidance': False, 'is_drone_status': True, 'conversation_state': 'drone_status_request', 'error': f"Error retrieving drone status: {str(e)}", 'drone_status_response': f"❌ Error retrieving drone status: {str(e)}", 'needs_mission_generation': False } def _format_drone_status_response(self, current_status: Dict, constraints: Dict) -> str: """Format drone status information into a user-friendly message.""" message_parts = [] message_parts.append("🚁 **Drone Status Report**") message_parts.append("") # Basic status information battery = current_status.get('batteryRemaining', 0) satellites = current_status.get('satellite', 0) voltage = current_status.get('voltage', 0.0) message_parts.append(f"πŸ”‹ **Battery:** {battery}% ({voltage:.1f}V)") message_parts.append(f"πŸ“‘ **GPS:** {satellites} satellites") message_parts.append(f"πŸ›©οΈ **Mode:** {current_status.get('mode', 'UNKNOWN')}") message_parts.append(f"πŸ“ **Location:** {current_status.get('latitude', 0):.4f}, {current_status.get('longitude', 0):.4f}") message_parts.append(f"πŸ›©οΈ **Altitude:** {current_status.get('altitude', 0):.0f}m") message_parts.append("") # Flight constraints message_parts.append("**Flight Constraints:**") message_parts.append(f"β€’ Safe Flight Time: {constraints.get('safe_flight_time_minutes', 0)} minutes") message_parts.append(f"β€’ Max Distance: {constraints.get('max_safe_distance_meters', 0)}m") message_parts.append(f"β€’ Optimal Altitude: {constraints.get('optimal_altitude_range', (30, 80))[0]}-{constraints.get('optimal_altitude_range', (30, 80))[1]}m") message_parts.append(f"β€’ Ready to Fly: {'βœ… Yes' if constraints.get('ready_to_fly', False) else '❌ No'}") message_parts.append("") if not constraints.get('ready_to_fly', False): message_parts.append("⚠️ **Not ready to fly:** Check battery level, GPS connection, and safety constraints.") message_parts.append("") message_parts.append("For mission planning, the system will optimize altitude and safety parameters based on this status.") # Include raw telemetry hint for full status requests if current_status.get('data_source') == 'hardware_telemetry': message_parts.append("") message_parts.append("Raw telemetry available via /drone/status") return "\n".join(message_parts) async def analyze_mission_request(self, user_message: str) -> Dict[str, Any]: """🧠 GEMINI-FIRST ANALYSIS: Always use Gemini as primary AI brain with tool support.""" # ALWAYS use Gemini if available - it's the smart brain that coordinates everything if self.enabled: print(f"🧠 GEMINI-FIRST: Using Gemini 2.0 Flash as primary AI brain") return await self._gemini_first_analysis(user_message) else: print(f"⚠️ Gemini not available, using local fallback. API keys: {len(self.api_keys)}") return await self._fallback_analysis(user_message, []) async def _gemini_first_analysis(self, user_message: str) -> Dict[str, Any]: """🧠 Gemini-first analysis with intelligent tool integration.""" print(f"πŸ” DEBUG: Starting Gemini analysis for: {user_message[:50]}...") # Pre-extract supporting data for Gemini explicit_coords = self._extract_coordinates(user_message) is_location_query = self._is_location_based_query(user_message) print(f"πŸ” DEBUG: Explicit coords: {explicit_coords}, Is location query: {is_location_query}") # Prepare comprehensive context for Gemini support_data = { "explicit_coordinates": explicit_coords, "has_location_query": is_location_query, "geocoding_available": True, "drone_status_available": True } print(f"πŸ” DEBUG: Support data prepared: {support_data}") # Add geocoding if needed geocoding_info = "" if is_location_query and not explicit_coords: print(f"πŸ” DEBUG: Attempting geocoding for location query") try: geocoding_result = await self.geocoding.parse_location_with_distance(user_message) if geocoding_result: support_data["geocoded_location"] = geocoding_result geocoding_info = f""" GEOCODING RESULT: - Location: {geocoding_result['location_name']} - Coordinates: {geocoding_result['coordinates']} - Distance: {geocoding_result.get('distance_meters', 'N/A')}m""" print(f"πŸ” DEBUG: Geocoding successful: {geocoding_result}") else: print(f"❌ DEBUG: Geocoding failed - no result") except Exception as e: geocoding_info = f"Geocoding failed: {e}" print(f"❌ DEBUG: Geocoding exception: {e}") else: print(f"πŸ” DEBUG: Skipping geocoding - has explicit coords or not location query") # Enhanced prompt for Gemini with professional AI-driven analysis prompt = f"""You are DroneBot, a professional AI drone mission specialist with comprehensive knowledge of aviation, photogrammetry, and mission planning. You must create intelligent, optimized missions based on real-world requirements. COMPREHENSIVE KNOWLEDGE BASE: {self.knowledge_base} USER REQUEST: "{user_message}" ENVIRONMENTAL DATA: - Coordinates: {explicit_coords} - Location query: {is_location_query} {geocoding_info} PROFESSIONAL MISSION ANALYSIS REQUIREMENTS: You must analyze the request using professional drone operations knowledge: 1. MISSION TYPE INTELLIGENCE: - Survey: Photogrammetry requirements, GSD calculation, overlap optimization - Patrol: Security patterns, visibility requirements, threat assessment - Photography: Lighting analysis, composition angles, equipment optimization - Inspection: Structure analysis, safety protocols, detailed examination patterns - Go_straight: Efficiency optimization, obstacle avoidance, direct routing 2. ENVIRONMENTAL ANALYSIS: - Calculate optimal altitude based on mission requirements and safety - Analyze terrain complexity and obstacle clearance needs - Consider weather impact on flight parameters - Assess battery requirements for mission completion 3. PROFESSIONAL FLIGHT PLANNING: - Generate waypoint patterns based on industry standards - Optimize for data quality, safety, and efficiency - Calculate proper overlap for survey missions (80% forward, 60% side) - Design approach angles for inspection missions - Plan hold times based on equipment requirements 4. SAFETY INTEGRATION: - Assess flight risks and mitigation strategies - Calculate battery reserves and return-to-home requirements - Ensure regulatory compliance and airspace considerations - Plan emergency procedures and abort criteria As a professional AI mission planner, provide a comprehensive JSON response: {{ "mission_type": "survey|patrol|photography|inspection|go_straight|simple_flight", "confidence": 0.95, "coordinates": [latitude_6_decimal, longitude_6_decimal], "altitude": optimized_altitude_meters, "direction": "north|south|east|west|northeast|northwest|southeast|southwest|null", "distance": calculated_distance_meters, "reasoning": "Professional analysis explaining mission optimization, safety considerations, and technical decisions", "mission_parameters": {{ "ground_sample_distance": gsd_cm_per_pixel, "forward_overlap": overlap_percentage, "side_overlap": overlap_percentage, "flight_speed": optimized_speed_ms, "hold_time": required_hold_time_seconds, "camera_trigger_distance": trigger_distance_meters, "battery_consumption_estimate": estimated_wh_usage, "flight_time_estimate": estimated_minutes, "data_volume_estimate": estimated_gb, "weather_considerations": ["wind_impact", "visibility_requirements"], "equipment_requirements": ["camera_specs", "gimbal_settings"] }}, "waypoint_strategy": "Detailed explanation of flight pattern optimization based on professional standards", "safety_considerations": [ "Comprehensive safety analysis", "Risk mitigation strategies", "Emergency procedures", "Regulatory compliance checks" ], "professional_insights": {{ "mission_complexity": "low|medium|high", "skill_level_required": "beginner|intermediate|expert", "environmental_challenges": ["challenge1", "challenge2"], "optimization_opportunities": ["improvement1", "improvement2"], "quality_assurance": "Data quality predictions and recommendations" }}, "personality_response": "Friendly, professional message explaining the mission plan with educational insights" }} IMPORTANT: - Be friendly, understanding, and personalized in your reasoning - Use the support tools data (coordinates, geocoding) intelligently - If coordinates are missing, include helpful guidance in personality_response - Provide detailed, human-like reasoning that shows you understand the user's intent - Make safety recommendations based on mission type Respond ONLY with valid JSON.""" try: api_key = self.get_current_api_key() if not api_key: print("❌ No API key available for Gemini") return await self._fallback_analysis(user_message, []) print(f"πŸ” DEBUG: Using API key: {api_key[:20]}...") headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} payload = { "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.4, # Slightly more creative for personality "topK": 40, "topP": 0.95, "maxOutputTokens": 3072 } } print(f"πŸ” DEBUG: Making Gemini API call...") async with aiohttp.ClientSession() as session: async with session.post(self.api_endpoint, json=payload, headers=headers) as response: print(f"πŸ” DEBUG: Gemini API response status: {response.status}") # Handle rate limiting by marking key and trying next available key if response.status == 429: print(f"🚫 API key {api_key[:20]}... got rate limited (429)") self.mark_key_rate_limited(api_key) # Try with next available key before falling back to local AI next_key = self.get_next_available_key() if next_key and next_key != api_key: print(f"πŸ”„ Trying with next available key: {next_key[:20]}...") retry_headers = headers.copy() retry_headers['x-goog-api-key'] = next_key async with aiohttp.ClientSession() as retry_session: async with retry_session.post(self.api_endpoint, json=payload, headers=retry_headers) as retry_response: print(f"πŸ”„ Retry response status: {retry_response.status}") if retry_response.status == 200: result = await retry_response.json() print(f"πŸ”„ Retry successful! Processing with new key") # Continue with the successful result and process it print(f"πŸ”„ Retry result keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}") # Process the successful retry result if not result or 'candidates' not in result or not result['candidates']: print("❌ Invalid retry response structure") return await self._fallback_analysis(user_message, []) candidate = result['candidates'][0] if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']: print("❌ Missing content in retry response") return await self._fallback_analysis(user_message, []) content = candidate['content']['parts'][0]['text'] print(f"πŸ”„ Retry content: {content[:200]}...") # Clean JSON content = content.strip() if content.startswith('```json'): content = content[7:-3] elif content.startswith('```'): content = content[3:-3] try: analysis = json.loads(content) if analysis is None: print("❌ Retry returned null JSON") return await self._fallback_analysis(user_message, []) except json.JSONDecodeError as e: print(f"❌ Invalid JSON from retry: {e}") return await self._fallback_analysis(user_message, []) # Enhance with support tool data if explicit_coords and not analysis.get('coordinates'): analysis['coordinates'] = explicit_coords elif 'geocoded_location' in support_data and not analysis.get('coordinates'): analysis['coordinates'] = list(support_data['geocoded_location']['coordinates']) analysis['gemini_enhanced'] = True analysis['model'] = 'gemini-2.0-flash' analysis['support_tools_used'] = list(support_data.keys()) # Mark the successful key as available for future use self.mark_key_success(next_key) print(f"πŸ”„ Retry analysis successful: {analysis}") return analysis elif retry_response.status == 429: print(f"🚫 Next key also rate limited, falling back to local AI") self.mark_key_rate_limited(next_key) return await self._fallback_analysis(user_message, []) else: print(f"❌ Retry failed with status {retry_response.status}, falling back to local AI") self.mark_key_error(next_key) return await self._fallback_analysis(user_message, []) else: print(f"⚠️ No other keys available, falling back to local AI") return await self._fallback_analysis(user_message, []) elif response.status == 200: result = await response.json() print(f"πŸ” DEBUG: Gemini API response received, keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}") print(f"πŸ” DEBUG: Full result: {result}") # Debug: Check if result structure is correct if not result or 'candidates' not in result or not result['candidates']: print("❌ Invalid Gemini API response structure") return await self._fallback_analysis(user_message, []) candidate = result['candidates'][0] print(f"πŸ” DEBUG: Candidate keys: {list(candidate.keys()) if isinstance(candidate, dict) else 'not dict'}") if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']: print("❌ Missing content in Gemini response") return await self._fallback_analysis(user_message, []) content = candidate['content']['parts'][0]['text'] print(f"πŸ” DEBUG: Raw content from Gemini: {content[:200]}...") # Clean JSON content = content.strip() if content.startswith('```json'): content = content[7:-3] elif content.startswith('```'): content = content[3:-3] print(f"πŸ” DEBUG: Cleaned content: {content[:200]}...") # Debug: Check if content is valid JSON try: analysis = json.loads(content) print(f"πŸ” DEBUG: Parsed analysis: {analysis}") if analysis is None: print("❌ Gemini returned null JSON") return await self._fallback_analysis(user_message, []) except json.JSONDecodeError as e: print(f"❌ Invalid JSON from Gemini: {e}") print(f"Content: {content[:200]}...") return await self._fallback_analysis(user_message, []) # Enhance with support tool data if explicit_coords and not analysis.get('coordinates'): analysis['coordinates'] = explicit_coords elif 'geocoded_location' in support_data and not analysis.get('coordinates'): analysis['coordinates'] = list(support_data['geocoded_location']['coordinates']) analysis['gemini_enhanced'] = True analysis['model'] = 'gemini-2.0-flash' analysis['support_tools_used'] = list(support_data.keys()) # Mark the successful key as available for future use self.mark_key_success(api_key) print(f"πŸ” DEBUG: Final analysis: {analysis}") return analysis elif response.status in [400, 401, 403]: print(f"❌ Gemini API error ({response.status}), marking key for cooldown") self.mark_key_error(api_key) return await self._fallback_analysis(user_message, []) else: print(f"❌ Gemini API returned status {response.status}, marking key for cooldown") self.mark_key_error(api_key) return await self._fallback_analysis(user_message, []) except Exception as e: print(f"🚨 Gemini API failed: {e}") return await self._fallback_analysis(user_message, []) async def _fallback_analysis(self, user_message: str, chat_history: List[Dict] = None) -> Dict[str, Any]: """Local AI fallback analysis with context awareness.""" if chat_history is None: chat_history = [] mission_type = self._extract_mission_type(user_message) coordinates = await self._get_coordinates_from_context(user_message, chat_history) return { 'mission_type': mission_type, 'confidence': 0.75, 'coordinates': coordinates, 'altitude': self._extract_altitude(user_message), 'direction': self._extract_direction(user_message), 'distance': self._extract_distance(user_message), 'reasoning': f"Smart local AI: Detected '{mission_type}' mission with intelligent context analysis", 'mission_parameters': self._get_mission_parameters(mission_type), 'waypoint_strategy': self._get_waypoint_strategy(mission_type), 'safety_considerations': self._get_safety_considerations(mission_type), 'gemini_enhanced': False, 'model': 'local-fallback' } def _extract_mission_type(self, message: str) -> str: """Extract mission type from message with smart intent recognition.""" message_lower = message.lower() # Priority-based mission type detection to avoid keyword conflicts # Priority 1: Specific mission type keywords (highest priority) if any(word in message_lower for word in ['survey', 'map', 'mapping', 'grid', 'coverage', 'scan', 'scanning']): return 'survey' elif any(word in message_lower for word in ['patrol', 'perimeter', 'guard', 'security', 'monitor', 'watch', 'circle', 'secure']): return 'patrol' elif any(word in message_lower for word in ['photo', 'photography', 'picture', 'capture', 'take photos', 'aerial photos', 'aerial', 'shoot']): return 'photography' elif any(word in message_lower for word in ['inspect', 'inspection', 'examine', 'check', 'analyze', 'detail', 'structure']): return 'inspection' # Priority 2: Directional movement keywords (medium priority) elif any(word in message_lower for word in ['go straight', 'straight line', 'fly straight', 'move straight', 'straight flight', 'direct flight']): return 'go_straight' elif any(word in message_lower for word in ['go south', 'go north', 'go east', 'go west', 'fly to', 'move to']): return 'go_straight' # Priority 3: General context keywords (lowest priority) mission_keywords = { 'survey': ['field', 'farm', 'land', 'area coverage'], 'patrol': ['route', 'boundary', 'surveillance'], 'photography': ['building', 'structure', 'landscape'], 'inspection': ['power line', 'bridge', 'roof', 'infrastructure'], 'go_straight': ['straight distance', 'linear path', 'direct route'] } for mission_type, keywords in mission_keywords.items(): if any(keyword in message_lower for keyword in keywords): return mission_type return 'simple_flight' def _extract_coordinates(self, message: str) -> Optional[List[float]]: """Extract coordinates from message with strict validation to prevent false positives.""" # STRICT coordinate patterns - must be explicitly GPS coordinates patterns = [ # With explicit coordinate keywords (most reliable) r'(?:coordinates?|coords)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', r'(?:location|position)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', r'at\s+(?:coordinates?|coords)?\s*([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', # Latitude/longitude explicitly mentioned r'lat[itude]*[:\s]+([-+]?\d+\.?\d*)[,\s]+lon[gitude]*[:\s]+([-+]?\d+\.?\d*)', # Decimal coordinates (must have decimal points for both numbers) r'(\d+\.\d+)[,\s]+(\d+\.\d+)(?!\s*[a-zA-Z])', # Not followed by letters # Standard coordinate format with "at" keyword r'at\s+(\d{1,2}\.\d{3,})[,\s]+(\d{2,3}\.\d{3,})', # Simple pattern but ONLY for reasonable coordinate ranges r'(?:^|\s|at\s+)((?:[1-9]\d?|1[0-7]\d)\.?\d*)[,\s]+((?:[1-9]\d{1,2}|[1-2]\d{2})\.?\d*)(?=\s|$)', ] for pattern in patterns: matches = re.findall(pattern, message, re.IGNORECASE) if matches: try: # Handle different pattern formats if len(matches[0]) == 2: lat, lon = float(matches[0][0]), float(matches[0][1]) else: # For patterns with more groups, take first two lat, lon = float(matches[0][0]), float(matches[0][1]) # STRICT validation for GPS coordinates if (-90 <= lat <= 90 and -180 <= lon <= 180 and # Additional checks to prevent false positives abs(lat) > 0.1 and abs(lon) > 0.1 and # Not too close to 0,0 not (lat < 5 and lon < 5)): # Avoid small numbers that might be measurements return [lat, lon] except (ValueError, IndexError): continue return None def _extract_altitude(self, message: str) -> int: """Extract altitude from message.""" altitude_patterns = [r'(\d+)\s*m(?:eter)?s?\b', r'altitude\s+(\d+)'] for pattern in altitude_patterns: matches = re.findall(pattern, message.lower()) if matches: return int(matches[0]) return 80 def _extract_direction(self, message: str) -> str: """Extract direction from message.""" directions = ['north', 'south', 'east', 'west', 'northeast', 'northwest', 'southeast', 'southwest'] message_lower = message.lower() for direction in directions: if direction in message_lower: return direction return 'north' def _extract_distance(self, message: str) -> int: """Extract distance from message.""" distance_patterns = [r'(\d+)\s*m(?:eter)?s?', r'(\d+)\s*km'] for pattern in distance_patterns: matches = re.findall(pattern, message.lower()) if matches: distance = int(matches[0]) if 'km' in pattern: distance *= 1000 return distance return 100 def _get_mission_parameters(self, mission_type: str) -> Dict[str, Any]: """Get mission-specific parameters.""" parameters = { 'survey': {'grid_size': 4, 'overlap': 0.7, 'spacing': 200}, 'patrol': {'perimeter_points': 8, 'radius': 300, 'hold_time': 2.0}, 'inspection': {'detail_points': 6, 'precision': 'high', 'slow_speed': True}, 'photography': {'photo_positions': 5, 'hold_time': 3.0, 'angles': 'multiple'}, 'go_straight': {'waypoints': 3, 'linear': True, 'direct_path': True}, 'simple_flight': {'waypoints': 4, 'pattern': 'basic', 'rectangular': True} } return parameters.get(mission_type, {}) def _get_waypoint_strategy(self, mission_type: str) -> str: """Get waypoint strategy description.""" strategies = { 'survey': 'Grid pattern with optimal coverage and 70% overlap for mapping', 'patrol': 'Perimeter pattern with 8 strategic points for surveillance', 'inspection': 'Linear pattern with high precision waypoints for detailed examination', 'photography': 'Strategic positioning with multiple angles and hold times', 'go_straight': 'Direct linear path with evenly spaced waypoints', 'simple_flight': 'Basic rectangular pattern for general flight operations' } return strategies.get(mission_type, 'Standard waypoint pattern optimized for mission type') def _get_safety_considerations(self, mission_type: str) -> List[str]: """Get safety considerations for mission type.""" base_safety = [ "Maintain 30% battery reserve minimum", "Check weather conditions before flight", "Ensure GPS lock before takeoff", "Verify no-fly zone compliance" ] specific_safety = { 'survey': ["Maintain consistent altitude for mapping accuracy", "Monitor for aircraft in survey area"], 'patrol': ["Be aware of security personnel", "Maintain communication during perimeter flight"], 'inspection': ["Maintain safe distance from structures", "Use slow speeds for detailed observation"], 'photography': ["Consider lighting conditions", "Plan for sun angle optimization"], 'go_straight': ["Check for obstacles along flight path", "Ensure clear landing zone"], 'simple_flight': ["Standard safety protocols apply", "Monitor for other aircraft"] } return base_safety + specific_safety.get(mission_type, []) def _extract_location_from_history(self, chat_history: List[Dict]) -> str: """Extract location information from chat history.""" for message in reversed(chat_history[-5:]): # Check last 5 messages user_msg = message.get('user', '') coords = self._extract_coordinates(user_msg) if coords: return f"{coords[0]}, {coords[1]}" return "not specified" async def _get_coordinates_from_context(self, message: str, chat_history: List[Dict]) -> Optional[List[float]]: """Get coordinates from current message, chat history, or geocoding service.""" # First try current message coords = self._extract_coordinates(message) if coords: return coords # If this is a location-based query, prioritize geocoding over chat history if self._is_location_based_query(message): try: geocoded_coords = await self.geocoding.geocode_location(message) if geocoded_coords: return list(geocoded_coords) except Exception as e: print(f"Geocoding failed: {e}") # Then try chat history as fallback for msg in reversed(chat_history[-3:]): user_msg = msg.get('user', '') coords = self._extract_coordinates(user_msg) if coords: return coords return None def _is_location_based_query(self, message: str) -> bool: """Check if the message is a location-based query that needs geocoding.""" message_lower = message.lower() # SKIP geocoding if user provided explicit coordinates if self._extract_coordinates(message): return False # Check for location indicators that require geocoding location_indicators = [ 'around', 'near', 'in', 'district', 'province', 'city', 'town', 'street', 'road', 'avenue', 'park', 'university', 'school', 'hospital', 'airport', 'station', 'mall', 'market', 'center', 'from' ] has_location_words = any(f' {word} ' in f' {message_lower} ' for word in location_indicators) # Check for Vietnamese place name patterns vietnamese_places = ['Δ‘Γ  nαΊ΅ng', 'hαΊ£i chΓ’u', 'sΖ‘n trΓ ', 'ngΕ© hΓ nh sΖ‘n', 'liΓͺn chiểu', 'hα»™i an', 'huαΊΏ', 'quαΊ£ng nam', 'doi cung', 'Δ‘α»™i cung', 'hai chau'] has_vietnamese_place = any(place in message_lower for place in vietnamese_places) # Check for distance patterns like "100m around" has_distance_pattern = bool(re.search(r'\d+\s*m\s+(around|from)', message_lower)) # Must have either location words, Vietnamese places, or distance pattern # AND must have mission-related words like "scan", "survey", etc. mission_words = ['scan', 'survey', 'fly', 'mission', 'help me', 'go', 'straight'] has_mission_intent = any(f' {word} ' in f' {message_lower} ' for word in mission_words) return (has_location_words or has_vietnamese_place or has_distance_pattern) and has_mission_intent def _extract_mission_type_from_history(self, chat_history: List[Dict]) -> str: """Extract mission type from chat history.""" for message in reversed(chat_history[-5:]): user_msg = message.get('user', '') mission_type = self._extract_mission_type(user_msg) if mission_type != 'simple_flight': return mission_type return "not specified" async def _get_flight_context(self) -> str: """Get current flight context for AI responses.""" try: from ..core.flight_manager import flight_manager # Get active missions active_missions = flight_manager.active_missions if not active_missions: return "FLIGHT STATUS: No active flights currently." flight_summaries = [] for mission_id, mission in active_missions.items(): # Get telemetry for this drone telemetry = flight_manager.drone_telemetry.get(mission.drone_id) completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed) total_waypoints = len(mission.waypoints) progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0 flight_info = f""" ACTIVE FLIGHT: - Mission: {mission_id} ({mission.state.value}) - Drone: {mission.drone_id} - Progress: {completed_waypoints}/{total_waypoints} waypoints ({progress:.1f}%)""" if telemetry: flight_info += f""" - Battery: {telemetry.battery}% - Location: {telemetry.lat:.4f}, {telemetry.lon:.4f} - Altitude: {telemetry.alt:.1f}m""" flight_summaries.append(flight_info) context = "\n".join(flight_summaries) return f"CURRENT FLIGHT STATUS:\n{context}\n\nIMPORTANT: User may want to check status, pause, update, or cancel the active flight(s)." except Exception as e: return f"FLIGHT STATUS: Error retrieving flight info ({str(e)})" class ChatHistory: """Enhanced chat history for conversational AI.""" def __init__(self, max_messages: int = 10): self.messages = [] self.max_messages = max_messages self.user_context = { 'experience_level': None, 'current_mission': None, 'preferences': {}, 'last_coordinates': None } def add_message(self, user_message: str, ai_response: str, metadata: Dict = None): """Add message pair to history with metadata.""" from datetime import datetime if metadata is None: metadata = {} self.messages.append({ 'user': user_message, 'ai': ai_response, 'timestamp': datetime.now().isoformat(), 'metadata': metadata }) # Update user context self._update_user_context(user_message, metadata) if len(self.messages) > self.max_messages: self.messages = self.messages[-self.max_messages:] def _update_user_context(self, user_message: str, metadata: Dict): """Update user context based on conversation.""" message_lower = user_message.lower() # Update experience level if any(term in message_lower for term in ['new to drones', 'beginner', 'first time']): self.user_context['experience_level'] = 'beginner' elif any(term in message_lower for term in ['mavlink', 'qgc', 'frame', 'coordinate']): self.user_context['experience_level'] = 'expert' # Extract and store coordinates import re coord_pattern = r'([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)' matches = re.findall(coord_pattern, user_message) if matches: try: lat, lon = float(matches[0][0]), float(matches[0][1]) if -90 <= lat <= 90 and -180 <= lon <= 180: self.user_context['last_coordinates'] = (lat, lon) except ValueError: pass def get_context(self) -> str: """Get intelligent conversation context.""" if not self.messages: return "New conversation" context_parts = ["Recent conversation:"] for msg in self.messages[-3:]: context_parts.append(f"User: {msg['user']}") context_parts.append(f"AI: {msg['ai'][:100]}...") # Add user context if self.user_context['experience_level']: context_parts.append(f"User experience: {self.user_context['experience_level']}") if self.user_context['last_coordinates']: lat, lon = self.user_context['last_coordinates'] context_parts.append(f"Last known coordinates: {lat:.6f}, {lon:.6f}") return "\n".join(context_parts) def get_user_experience(self) -> str: """Get user experience level.""" return self.user_context.get('experience_level', 'unknown') def has_coordinates(self) -> bool: """Check if user has provided coordinates.""" return self.user_context['last_coordinates'] is not None def get_last_coordinates(self) -> Optional[tuple]: """Get last known coordinates.""" return self.user_context['last_coordinates']