Spaces:
Sleeping
Sleeping
| """AI integration for mission analysis.""" | |
| import os | |
| import json | |
| import re | |
| import aiohttp | |
| from typing import Dict, List, Any, Optional | |
| from .knowledge_loader import KnowledgeBaseLoader | |
| from .ai_personality import AIPersonality, ConversationState, UserExperience | |
| from .geocoding import GeocodingService | |
| class Gemini2FlashAI: | |
| """Gemini 2.0 Flash AI integration with fallback.""" | |
| def __init__(self): | |
| api_keys_str = os.getenv('GEMINI_API_KEYS', '') | |
| self.api_keys = [key.strip() for key in api_keys_str.split(',') if key.strip()] | |
| self.enabled = bool(self.api_keys) | |
| self.api_endpoint = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent" | |
| self.current_key_index = 0 | |
| # Enhanced API key rotation system | |
| self.key_status = {} # Track status of each key | |
| self.key_last_used = {} # Track last usage time for each key | |
| self.key_cooldown_until = {} # Track cooldown periods for rate limited keys | |
| self.initialize_key_tracking() | |
| # Load knowledge base for enhanced context | |
| self.knowledge_loader = KnowledgeBaseLoader() | |
| self.knowledge_base = self.knowledge_loader.load_complete_knowledge_base() | |
| # Initialize AI personality for user guidance | |
| self.personality = AIPersonality() | |
| # Initialize geocoding service (reads GOONG_API_KEY from env) | |
| self.geocoding = GeocodingService() | |
| # Note: chat_history will be provided by mission_planner context | |
| def initialize_key_tracking(self): | |
| """Initialize tracking for all API keys.""" | |
| from datetime import datetime | |
| current_time = datetime.now() | |
| for i, key in enumerate(self.api_keys): | |
| self.key_status[key] = 'available' # available, rate_limited, cooldown | |
| self.key_last_used[key] = current_time | |
| self.key_cooldown_until[key] = None | |
| def get_next_available_key(self) -> str: | |
| """Get the next available API key with intelligent rotation.""" | |
| if not self.api_keys: | |
| return "" | |
| from datetime import datetime, timedelta | |
| current_time = datetime.now() | |
| # Try each key to find one that's available | |
| for attempt in range(len(self.api_keys)): | |
| candidate_key = self.api_keys[self.current_key_index] | |
| self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) | |
| # Check if key is available | |
| if self.key_status[candidate_key] == 'available': | |
| self.key_last_used[candidate_key] = current_time | |
| return candidate_key | |
| # Check if rate limited key is ready for retry (after cooldown) | |
| elif self.key_status[candidate_key] == 'rate_limited': | |
| if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]: | |
| print(f"🔄 API key {candidate_key[:20]}... cooldown period ended, retrying") | |
| self.key_status[candidate_key] = 'available' | |
| self.key_cooldown_until[candidate_key] = None | |
| self.key_last_used[candidate_key] = current_time | |
| return candidate_key | |
| # Check if key is in cooldown | |
| elif self.key_status[candidate_key] == 'cooldown': | |
| if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]: | |
| print(f"🔄 API key {candidate_key[:20]}... cooldown period ended, making available") | |
| self.key_status[candidate_key] = 'available' | |
| self.key_last_used[candidate_key] = current_time | |
| return candidate_key | |
| # If all keys are unavailable, return the first one anyway (better than no response) | |
| print(f"⚠️ All API keys are rate limited, using first key as fallback") | |
| return self.api_keys[0] if self.api_keys else "" | |
| def mark_key_rate_limited(self, api_key: str): | |
| """Mark an API key as rate limited and set cooldown.""" | |
| from datetime import datetime, timedelta | |
| if api_key in self.key_status: | |
| self.key_status[api_key] = 'rate_limited' | |
| # Set cooldown for 60 seconds (adjust as needed based on API limits) | |
| self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=60) | |
| print(f"🚫 API key {api_key[:20]}... marked as rate limited, cooldown until {self.key_cooldown_until[api_key]}") | |
| def mark_key_error(self, api_key: str): | |
| """Mark an API key as having an error and set shorter cooldown.""" | |
| from datetime import datetime, timedelta | |
| if api_key in self.key_status: | |
| self.key_status[api_key] = 'cooldown' | |
| # Set shorter cooldown for general errors (30 seconds) | |
| self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=30) | |
| print(f"⚠️ API key {api_key[:20]}... marked for cooldown until {self.key_cooldown_until[api_key]}") | |
| def mark_key_success(self, api_key: str): | |
| """Mark an API key as successful and available.""" | |
| if api_key in self.key_status: | |
| self.key_status[api_key] = 'available' | |
| self.key_cooldown_until[api_key] = None | |
| print(f"✅ API key {api_key[:20]}... marked as available after successful use") | |
| def get_current_api_key(self) -> str: | |
| """Get API key with intelligent rotation and fallback.""" | |
| return self.get_next_available_key() | |
| async def analyze_conversation(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: | |
| """🧠 GEMINI-FIRST CONVERSATION ANALYSIS: Intelligent, personalized responses.""" | |
| # Always use Gemini for conversation analysis if available | |
| if self.enabled: | |
| return await self._gemini_conversation_analysis(user_message, chat_history) | |
| else: | |
| # Fallback to local personality system | |
| return await self._local_conversation_analysis(user_message, chat_history) | |
| async def _gemini_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: | |
| """🧠 Gemini-powered conversation analysis for intelligent responses.""" | |
| # Determine user experience and conversation state using local tools | |
| user_experience = self.personality.analyze_user_experience(user_message, chat_history) | |
| conversation_state = self.personality.determine_conversation_state(user_message, chat_history) | |
| # Check if this is a mission request or needs guidance | |
| if conversation_state in [ConversationState.READY_TO_GENERATE]: | |
| return await self.analyze_mission_request(user_message) | |
| elif conversation_state == ConversationState.DRONE_STATUS_REQUEST: | |
| # Handle drone status requests | |
| return await self._handle_drone_status_request() | |
| elif conversation_state == ConversationState.GENERAL_QUERY: | |
| # Use Gemini for intelligent general query responses | |
| return await self._gemini_general_response(user_message, chat_history, user_experience) | |
| else: | |
| # Use Gemini for intelligent guidance responses | |
| return await self._gemini_guidance_response(user_message, chat_history, conversation_state, user_experience) | |
| async def _local_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]: | |
| """Local fallback conversation analysis.""" | |
| # Determine user experience and conversation state | |
| user_experience = self.personality.analyze_user_experience(user_message, chat_history) | |
| conversation_state = self.personality.determine_conversation_state(user_message, chat_history) | |
| # Check if this is a mission request or needs guidance | |
| if conversation_state in [ConversationState.READY_TO_GENERATE]: | |
| return await self.analyze_mission_request(user_message) | |
| elif conversation_state == ConversationState.DRONE_STATUS_REQUEST: | |
| # Handle drone status requests | |
| return await self._handle_drone_status_request() | |
| elif conversation_state == ConversationState.GENERAL_QUERY: | |
| # Handle general queries | |
| guidance_response = self.personality.get_guidance_response( | |
| conversation_state, user_experience, {}, chat_history | |
| ) | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': conversation_state.value, | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': guidance_response, | |
| 'needs_mission_generation': False, | |
| 'is_general_query': True | |
| } | |
| else: | |
| # Provide user guidance | |
| context = { | |
| 'location': self._extract_location_from_history(chat_history), | |
| 'mission_type': self._extract_mission_type_from_history(chat_history) | |
| } | |
| guidance_response = self.personality.get_guidance_response( | |
| conversation_state, user_experience, context, chat_history | |
| ) | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': conversation_state.value, | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': guidance_response, | |
| 'needs_mission_generation': False | |
| } | |
| async def _gemini_general_response(self, user_message: str, chat_history: List[Dict], user_experience) -> Dict[str, Any]: | |
| """🧠 Gemini-powered general query responses with flight awareness.""" | |
| # Check for active flights to provide flight-aware responses | |
| flight_context = await self._get_flight_context() | |
| # Prepare chat context | |
| recent_context = "" | |
| if chat_history: | |
| recent_messages = chat_history[-3:] | |
| context_parts = [] | |
| for msg in recent_messages: | |
| if 'user' in msg: | |
| context_parts.append(f"User: {msg['user']}") | |
| if 'ai' in msg: | |
| context_parts.append(f"Assistant: {msg['ai'][:100]}...") | |
| recent_context = "\n".join(context_parts) | |
| prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning. | |
| The user is asking a general question or greeting. Respond naturally and helpfully. | |
| USER EXPERIENCE LEVEL: {user_experience.value} | |
| USER MESSAGE: "{user_message}" | |
| RECENT CHAT CONTEXT: | |
| {recent_context} | |
| {flight_context} | |
| Provide a friendly, personalized response that: | |
| - Shows you understand their message | |
| - If there are active flights, proactively offer flight management options | |
| - Maintains a conversational, friendly tone | |
| - Offers helpful guidance about drone missions | |
| - During active flights, suggest options like: "Do you want to check flight status?", "Pause the current mission?", "Update the flight plan?", "Have the drone return home?" | |
| - For greetings, include current drone status if available | |
| Keep the response conversational and engaging, around 100-200 words.""" | |
| try: | |
| api_key = self.get_current_api_key() | |
| headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} | |
| payload = { | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.7, # More creative for conversation | |
| "topK": 40, | |
| "topP": 0.95, | |
| "maxOutputTokens": 1024 | |
| } | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(self.api_endpoint, json=payload, headers=headers) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| gemini_response = result['candidates'][0]['content']['parts'][0]['text'] | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': 'general_query', | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': gemini_response, | |
| 'needs_mission_generation': False, | |
| 'is_general_query': True, | |
| 'gemini_powered': True | |
| } | |
| except Exception as e: | |
| print(f"🚨 Gemini general response failed: {e}") | |
| # Fallback to local response | |
| guidance_response = self.personality.get_guidance_response( | |
| ConversationState.GENERAL_QUERY, user_experience, {}, chat_history | |
| ) | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': 'general_query', | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': guidance_response, | |
| 'needs_mission_generation': False, | |
| 'is_general_query': True | |
| } | |
| async def _gemini_guidance_response(self, user_message: str, chat_history: List[Dict], | |
| conversation_state, user_experience) -> Dict[str, Any]: | |
| """🧠 Gemini-powered guidance responses.""" | |
| # Extract context | |
| location = self._extract_location_from_history(chat_history) | |
| mission_type = self._extract_mission_type_from_history(chat_history) | |
| # Prepare chat context | |
| recent_context = "" | |
| if chat_history: | |
| recent_messages = chat_history[-2:] | |
| context_parts = [] | |
| for msg in recent_messages: | |
| if 'user' in msg: | |
| context_parts.append(f"User: {msg['user']}") | |
| recent_context = "\n".join(context_parts) | |
| prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning. | |
| The user needs guidance to complete their drone mission request. | |
| USER MESSAGE: "{user_message}" | |
| CONVERSATION STATE: {conversation_state.value} | |
| USER EXPERIENCE: {user_experience.value} | |
| KNOWN LOCATION: {location} | |
| KNOWN MISSION TYPE: {mission_type} | |
| RECENT CONTEXT: | |
| {recent_context} | |
| Based on the conversation state, provide helpful, personalized guidance: | |
| - If they need location: Ask for coordinates or location description with examples | |
| - If they need mission type: Explain mission types and ask what they want to accomplish | |
| - If they need details: Ask for specific requirements like altitude, area size, etc. | |
| - Be encouraging and educational for beginners | |
| - Be concise and technical for experts | |
| Provide a friendly, helpful response that guides them to the next step.""" | |
| try: | |
| api_key = self.get_current_api_key() | |
| headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} | |
| payload = { | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.6, | |
| "topK": 40, | |
| "topP": 0.95, | |
| "maxOutputTokens": 1024 | |
| } | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(self.api_endpoint, json=payload, headers=headers) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| gemini_response = result['candidates'][0]['content']['parts'][0]['text'] | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': conversation_state.value, | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': gemini_response, | |
| 'needs_mission_generation': False, | |
| 'gemini_powered': True | |
| } | |
| except Exception as e: | |
| print(f"🚨 Gemini guidance response failed: {e}") | |
| # Fallback to local response | |
| context = {'location': location, 'mission_type': mission_type} | |
| guidance_response = self.personality.get_guidance_response( | |
| conversation_state, user_experience, context, chat_history | |
| ) | |
| return { | |
| 'is_guidance': True, | |
| 'conversation_state': conversation_state.value, | |
| 'user_experience': user_experience.value, | |
| 'guidance_response': guidance_response, | |
| 'needs_mission_generation': False | |
| } | |
| async def _handle_drone_status_request(self) -> Dict[str, Any]: | |
| """Handle drone status requests by calling the drone status endpoint.""" | |
| try: | |
| from ..core.drone_status import drone_status | |
| current_status = drone_status.get_current_status() | |
| constraints = drone_status.get_flight_constraints() | |
| # Format the drone status response | |
| status_message = self._format_drone_status_response(current_status, constraints) | |
| return { | |
| 'is_guidance': False, | |
| 'is_drone_status': True, | |
| 'conversation_state': 'drone_status_request', | |
| 'user_experience': 'unknown', | |
| 'drone_status_response': status_message, | |
| 'needs_mission_generation': False, | |
| 'drone_status': current_status, | |
| 'flight_constraints': constraints | |
| } | |
| except Exception as e: | |
| return { | |
| 'is_guidance': False, | |
| 'is_drone_status': True, | |
| 'conversation_state': 'drone_status_request', | |
| 'error': f"Error retrieving drone status: {str(e)}", | |
| 'drone_status_response': f"❌ Error retrieving drone status: {str(e)}", | |
| 'needs_mission_generation': False | |
| } | |
| def _format_drone_status_response(self, current_status: Dict, constraints: Dict) -> str: | |
| """Format drone status information into a user-friendly message.""" | |
| message_parts = [] | |
| message_parts.append("🚁 **Drone Status Report**") | |
| message_parts.append("") | |
| # Basic status information | |
| battery = current_status.get('batteryRemaining', 0) | |
| satellites = current_status.get('satellite', 0) | |
| voltage = current_status.get('voltage', 0.0) | |
| message_parts.append(f"🔋 **Battery:** {battery}% ({voltage:.1f}V)") | |
| message_parts.append(f"📡 **GPS:** {satellites} satellites") | |
| message_parts.append(f"🛩️ **Mode:** {current_status.get('mode', 'UNKNOWN')}") | |
| message_parts.append(f"📍 **Location:** {current_status.get('latitude', 0):.4f}, {current_status.get('longitude', 0):.4f}") | |
| message_parts.append(f"🛩️ **Altitude:** {current_status.get('altitude', 0):.0f}m") | |
| message_parts.append("") | |
| # Flight constraints | |
| message_parts.append("**Flight Constraints:**") | |
| message_parts.append(f"• Safe Flight Time: {constraints.get('safe_flight_time_minutes', 0)} minutes") | |
| message_parts.append(f"• Max Distance: {constraints.get('max_safe_distance_meters', 0)}m") | |
| message_parts.append(f"• Optimal Altitude: {constraints.get('optimal_altitude_range', (30, 80))[0]}-{constraints.get('optimal_altitude_range', (30, 80))[1]}m") | |
| message_parts.append(f"• Ready to Fly: {'✅ Yes' if constraints.get('ready_to_fly', False) else '❌ No'}") | |
| message_parts.append("") | |
| if not constraints.get('ready_to_fly', False): | |
| message_parts.append("⚠️ **Not ready to fly:** Check battery level, GPS connection, and safety constraints.") | |
| message_parts.append("") | |
| message_parts.append("For mission planning, the system will optimize altitude and safety parameters based on this status.") | |
| # Include raw telemetry hint for full status requests | |
| if current_status.get('data_source') == 'hardware_telemetry': | |
| message_parts.append("") | |
| message_parts.append("Raw telemetry available via /drone/status") | |
| return "\n".join(message_parts) | |
| async def analyze_mission_request(self, user_message: str) -> Dict[str, Any]: | |
| """🧠 GEMINI-FIRST ANALYSIS: Always use Gemini as primary AI brain with tool support.""" | |
| # ALWAYS use Gemini if available - it's the smart brain that coordinates everything | |
| if self.enabled: | |
| print(f"🧠 GEMINI-FIRST: Using Gemini 2.0 Flash as primary AI brain") | |
| return await self._gemini_first_analysis(user_message) | |
| else: | |
| print(f"⚠️ Gemini not available, using local fallback. API keys: {len(self.api_keys)}") | |
| return await self._fallback_analysis(user_message, []) | |
| async def _gemini_first_analysis(self, user_message: str) -> Dict[str, Any]: | |
| """🧠 Gemini-first analysis with intelligent tool integration.""" | |
| print(f"🔍 DEBUG: Starting Gemini analysis for: {user_message[:50]}...") | |
| # Pre-extract supporting data for Gemini | |
| explicit_coords = self._extract_coordinates(user_message) | |
| is_location_query = self._is_location_based_query(user_message) | |
| print(f"🔍 DEBUG: Explicit coords: {explicit_coords}, Is location query: {is_location_query}") | |
| # Prepare comprehensive context for Gemini | |
| support_data = { | |
| "explicit_coordinates": explicit_coords, | |
| "has_location_query": is_location_query, | |
| "geocoding_available": True, | |
| "drone_status_available": True | |
| } | |
| print(f"🔍 DEBUG: Support data prepared: {support_data}") | |
| # Add geocoding if needed | |
| geocoding_info = "" | |
| if is_location_query and not explicit_coords: | |
| print(f"🔍 DEBUG: Attempting geocoding for location query") | |
| try: | |
| geocoding_result = await self.geocoding.parse_location_with_distance(user_message) | |
| if geocoding_result: | |
| support_data["geocoded_location"] = geocoding_result | |
| geocoding_info = f""" | |
| GEOCODING RESULT: | |
| - Location: {geocoding_result['location_name']} | |
| - Coordinates: {geocoding_result['coordinates']} | |
| - Distance: {geocoding_result.get('distance_meters', 'N/A')}m""" | |
| print(f"🔍 DEBUG: Geocoding successful: {geocoding_result}") | |
| else: | |
| print(f"❌ DEBUG: Geocoding failed - no result") | |
| except Exception as e: | |
| geocoding_info = f"Geocoding failed: {e}" | |
| print(f"❌ DEBUG: Geocoding exception: {e}") | |
| else: | |
| print(f"🔍 DEBUG: Skipping geocoding - has explicit coords or not location query") | |
| # Enhanced prompt for Gemini with professional AI-driven analysis | |
| prompt = f"""You are DroneBot, a professional AI drone mission specialist with comprehensive knowledge of aviation, photogrammetry, and mission planning. You must create intelligent, optimized missions based on real-world requirements. | |
| COMPREHENSIVE KNOWLEDGE BASE: | |
| {self.knowledge_base} | |
| USER REQUEST: "{user_message}" | |
| ENVIRONMENTAL DATA: | |
| - Coordinates: {explicit_coords} | |
| - Location query: {is_location_query} | |
| {geocoding_info} | |
| PROFESSIONAL MISSION ANALYSIS REQUIREMENTS: | |
| You must analyze the request using professional drone operations knowledge: | |
| 1. MISSION TYPE INTELLIGENCE: | |
| - Survey: Photogrammetry requirements, GSD calculation, overlap optimization | |
| - Patrol: Security patterns, visibility requirements, threat assessment | |
| - Photography: Lighting analysis, composition angles, equipment optimization | |
| - Inspection: Structure analysis, safety protocols, detailed examination patterns | |
| - Go_straight: Efficiency optimization, obstacle avoidance, direct routing | |
| 2. ENVIRONMENTAL ANALYSIS: | |
| - Calculate optimal altitude based on mission requirements and safety | |
| - Analyze terrain complexity and obstacle clearance needs | |
| - Consider weather impact on flight parameters | |
| - Assess battery requirements for mission completion | |
| 3. PROFESSIONAL FLIGHT PLANNING: | |
| - Generate waypoint patterns based on industry standards | |
| - Optimize for data quality, safety, and efficiency | |
| - Calculate proper overlap for survey missions (80% forward, 60% side) | |
| - Design approach angles for inspection missions | |
| - Plan hold times based on equipment requirements | |
| 4. SAFETY INTEGRATION: | |
| - Assess flight risks and mitigation strategies | |
| - Calculate battery reserves and return-to-home requirements | |
| - Ensure regulatory compliance and airspace considerations | |
| - Plan emergency procedures and abort criteria | |
| As a professional AI mission planner, provide a comprehensive JSON response: | |
| {{ | |
| "mission_type": "survey|patrol|photography|inspection|go_straight|simple_flight", | |
| "confidence": 0.95, | |
| "coordinates": [latitude_6_decimal, longitude_6_decimal], | |
| "altitude": optimized_altitude_meters, | |
| "direction": "north|south|east|west|northeast|northwest|southeast|southwest|null", | |
| "distance": calculated_distance_meters, | |
| "reasoning": "Professional analysis explaining mission optimization, safety considerations, and technical decisions", | |
| "mission_parameters": {{ | |
| "ground_sample_distance": gsd_cm_per_pixel, | |
| "forward_overlap": overlap_percentage, | |
| "side_overlap": overlap_percentage, | |
| "flight_speed": optimized_speed_ms, | |
| "hold_time": required_hold_time_seconds, | |
| "camera_trigger_distance": trigger_distance_meters, | |
| "battery_consumption_estimate": estimated_wh_usage, | |
| "flight_time_estimate": estimated_minutes, | |
| "data_volume_estimate": estimated_gb, | |
| "weather_considerations": ["wind_impact", "visibility_requirements"], | |
| "equipment_requirements": ["camera_specs", "gimbal_settings"] | |
| }}, | |
| "waypoint_strategy": "Detailed explanation of flight pattern optimization based on professional standards", | |
| "safety_considerations": [ | |
| "Comprehensive safety analysis", | |
| "Risk mitigation strategies", | |
| "Emergency procedures", | |
| "Regulatory compliance checks" | |
| ], | |
| "professional_insights": {{ | |
| "mission_complexity": "low|medium|high", | |
| "skill_level_required": "beginner|intermediate|expert", | |
| "environmental_challenges": ["challenge1", "challenge2"], | |
| "optimization_opportunities": ["improvement1", "improvement2"], | |
| "quality_assurance": "Data quality predictions and recommendations" | |
| }}, | |
| "personality_response": "Friendly, professional message explaining the mission plan with educational insights" | |
| }} | |
| IMPORTANT: | |
| - Be friendly, understanding, and personalized in your reasoning | |
| - Use the support tools data (coordinates, geocoding) intelligently | |
| - If coordinates are missing, include helpful guidance in personality_response | |
| - Provide detailed, human-like reasoning that shows you understand the user's intent | |
| - Make safety recommendations based on mission type | |
| Respond ONLY with valid JSON.""" | |
| try: | |
| api_key = self.get_current_api_key() | |
| if not api_key: | |
| print("❌ No API key available for Gemini") | |
| return await self._fallback_analysis(user_message, []) | |
| print(f"🔍 DEBUG: Using API key: {api_key[:20]}...") | |
| headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key} | |
| payload = { | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.4, # Slightly more creative for personality | |
| "topK": 40, | |
| "topP": 0.95, | |
| "maxOutputTokens": 3072 | |
| } | |
| } | |
| print(f"🔍 DEBUG: Making Gemini API call...") | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(self.api_endpoint, json=payload, headers=headers) as response: | |
| print(f"🔍 DEBUG: Gemini API response status: {response.status}") | |
| # Handle rate limiting by marking key and trying next available key | |
| if response.status == 429: | |
| print(f"🚫 API key {api_key[:20]}... got rate limited (429)") | |
| self.mark_key_rate_limited(api_key) | |
| # Try with next available key before falling back to local AI | |
| next_key = self.get_next_available_key() | |
| if next_key and next_key != api_key: | |
| print(f"🔄 Trying with next available key: {next_key[:20]}...") | |
| retry_headers = headers.copy() | |
| retry_headers['x-goog-api-key'] = next_key | |
| async with aiohttp.ClientSession() as retry_session: | |
| async with retry_session.post(self.api_endpoint, json=payload, headers=retry_headers) as retry_response: | |
| print(f"🔄 Retry response status: {retry_response.status}") | |
| if retry_response.status == 200: | |
| result = await retry_response.json() | |
| print(f"🔄 Retry successful! Processing with new key") | |
| # Continue with the successful result and process it | |
| print(f"🔄 Retry result keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}") | |
| # Process the successful retry result | |
| if not result or 'candidates' not in result or not result['candidates']: | |
| print("❌ Invalid retry response structure") | |
| return await self._fallback_analysis(user_message, []) | |
| candidate = result['candidates'][0] | |
| if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']: | |
| print("❌ Missing content in retry response") | |
| return await self._fallback_analysis(user_message, []) | |
| content = candidate['content']['parts'][0]['text'] | |
| print(f"🔄 Retry content: {content[:200]}...") | |
| # Clean JSON | |
| content = content.strip() | |
| if content.startswith('```json'): | |
| content = content[7:-3] | |
| elif content.startswith('```'): | |
| content = content[3:-3] | |
| try: | |
| analysis = json.loads(content) | |
| if analysis is None: | |
| print("❌ Retry returned null JSON") | |
| return await self._fallback_analysis(user_message, []) | |
| except json.JSONDecodeError as e: | |
| print(f"❌ Invalid JSON from retry: {e}") | |
| return await self._fallback_analysis(user_message, []) | |
| # Enhance with support tool data | |
| if explicit_coords and not analysis.get('coordinates'): | |
| analysis['coordinates'] = explicit_coords | |
| elif 'geocoded_location' in support_data and not analysis.get('coordinates'): | |
| analysis['coordinates'] = list(support_data['geocoded_location']['coordinates']) | |
| analysis['gemini_enhanced'] = True | |
| analysis['model'] = 'gemini-2.0-flash' | |
| analysis['support_tools_used'] = list(support_data.keys()) | |
| # Mark the successful key as available for future use | |
| self.mark_key_success(next_key) | |
| print(f"🔄 Retry analysis successful: {analysis}") | |
| return analysis | |
| elif retry_response.status == 429: | |
| print(f"🚫 Next key also rate limited, falling back to local AI") | |
| self.mark_key_rate_limited(next_key) | |
| return await self._fallback_analysis(user_message, []) | |
| else: | |
| print(f"❌ Retry failed with status {retry_response.status}, falling back to local AI") | |
| self.mark_key_error(next_key) | |
| return await self._fallback_analysis(user_message, []) | |
| else: | |
| print(f"⚠️ No other keys available, falling back to local AI") | |
| return await self._fallback_analysis(user_message, []) | |
| elif response.status == 200: | |
| result = await response.json() | |
| print(f"🔍 DEBUG: Gemini API response received, keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}") | |
| print(f"🔍 DEBUG: Full result: {result}") | |
| # Debug: Check if result structure is correct | |
| if not result or 'candidates' not in result or not result['candidates']: | |
| print("❌ Invalid Gemini API response structure") | |
| return await self._fallback_analysis(user_message, []) | |
| candidate = result['candidates'][0] | |
| print(f"🔍 DEBUG: Candidate keys: {list(candidate.keys()) if isinstance(candidate, dict) else 'not dict'}") | |
| if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']: | |
| print("❌ Missing content in Gemini response") | |
| return await self._fallback_analysis(user_message, []) | |
| content = candidate['content']['parts'][0]['text'] | |
| print(f"🔍 DEBUG: Raw content from Gemini: {content[:200]}...") | |
| # Clean JSON | |
| content = content.strip() | |
| if content.startswith('```json'): | |
| content = content[7:-3] | |
| elif content.startswith('```'): | |
| content = content[3:-3] | |
| print(f"🔍 DEBUG: Cleaned content: {content[:200]}...") | |
| # Debug: Check if content is valid JSON | |
| try: | |
| analysis = json.loads(content) | |
| print(f"🔍 DEBUG: Parsed analysis: {analysis}") | |
| if analysis is None: | |
| print("❌ Gemini returned null JSON") | |
| return await self._fallback_analysis(user_message, []) | |
| except json.JSONDecodeError as e: | |
| print(f"❌ Invalid JSON from Gemini: {e}") | |
| print(f"Content: {content[:200]}...") | |
| return await self._fallback_analysis(user_message, []) | |
| # Enhance with support tool data | |
| if explicit_coords and not analysis.get('coordinates'): | |
| analysis['coordinates'] = explicit_coords | |
| elif 'geocoded_location' in support_data and not analysis.get('coordinates'): | |
| analysis['coordinates'] = list(support_data['geocoded_location']['coordinates']) | |
| analysis['gemini_enhanced'] = True | |
| analysis['model'] = 'gemini-2.0-flash' | |
| analysis['support_tools_used'] = list(support_data.keys()) | |
| # Mark the successful key as available for future use | |
| self.mark_key_success(api_key) | |
| print(f"🔍 DEBUG: Final analysis: {analysis}") | |
| return analysis | |
| elif response.status in [400, 401, 403]: | |
| print(f"❌ Gemini API error ({response.status}), marking key for cooldown") | |
| self.mark_key_error(api_key) | |
| return await self._fallback_analysis(user_message, []) | |
| else: | |
| print(f"❌ Gemini API returned status {response.status}, marking key for cooldown") | |
| self.mark_key_error(api_key) | |
| return await self._fallback_analysis(user_message, []) | |
| except Exception as e: | |
| print(f"🚨 Gemini API failed: {e}") | |
| return await self._fallback_analysis(user_message, []) | |
| async def _fallback_analysis(self, user_message: str, chat_history: List[Dict] = None) -> Dict[str, Any]: | |
| """Local AI fallback analysis with context awareness.""" | |
| if chat_history is None: | |
| chat_history = [] | |
| mission_type = self._extract_mission_type(user_message) | |
| coordinates = await self._get_coordinates_from_context(user_message, chat_history) | |
| return { | |
| 'mission_type': mission_type, | |
| 'confidence': 0.75, | |
| 'coordinates': coordinates, | |
| 'altitude': self._extract_altitude(user_message), | |
| 'direction': self._extract_direction(user_message), | |
| 'distance': self._extract_distance(user_message), | |
| 'reasoning': f"Smart local AI: Detected '{mission_type}' mission with intelligent context analysis", | |
| 'mission_parameters': self._get_mission_parameters(mission_type), | |
| 'waypoint_strategy': self._get_waypoint_strategy(mission_type), | |
| 'safety_considerations': self._get_safety_considerations(mission_type), | |
| 'gemini_enhanced': False, | |
| 'model': 'local-fallback' | |
| } | |
| def _extract_mission_type(self, message: str) -> str: | |
| """Extract mission type from message with smart intent recognition.""" | |
| message_lower = message.lower() | |
| # Priority-based mission type detection to avoid keyword conflicts | |
| # Priority 1: Specific mission type keywords (highest priority) | |
| if any(word in message_lower for word in ['survey', 'map', 'mapping', 'grid', 'coverage', 'scan', 'scanning']): | |
| return 'survey' | |
| elif any(word in message_lower for word in ['patrol', 'perimeter', 'guard', 'security', 'monitor', 'watch', 'circle', 'secure']): | |
| return 'patrol' | |
| elif any(word in message_lower for word in ['photo', 'photography', 'picture', 'capture', 'take photos', 'aerial photos', 'aerial', 'shoot']): | |
| return 'photography' | |
| elif any(word in message_lower for word in ['inspect', 'inspection', 'examine', 'check', 'analyze', 'detail', 'structure']): | |
| return 'inspection' | |
| # Priority 2: Directional movement keywords (medium priority) | |
| elif any(word in message_lower for word in ['go straight', 'straight line', 'fly straight', 'move straight', 'straight flight', 'direct flight']): | |
| return 'go_straight' | |
| elif any(word in message_lower for word in ['go south', 'go north', 'go east', 'go west', 'fly to', 'move to']): | |
| return 'go_straight' | |
| # Priority 3: General context keywords (lowest priority) | |
| mission_keywords = { | |
| 'survey': ['field', 'farm', 'land', 'area coverage'], | |
| 'patrol': ['route', 'boundary', 'surveillance'], | |
| 'photography': ['building', 'structure', 'landscape'], | |
| 'inspection': ['power line', 'bridge', 'roof', 'infrastructure'], | |
| 'go_straight': ['straight distance', 'linear path', 'direct route'] | |
| } | |
| for mission_type, keywords in mission_keywords.items(): | |
| if any(keyword in message_lower for keyword in keywords): | |
| return mission_type | |
| return 'simple_flight' | |
| def _extract_coordinates(self, message: str) -> Optional[List[float]]: | |
| """Extract coordinates from message with strict validation to prevent false positives.""" | |
| # STRICT coordinate patterns - must be explicitly GPS coordinates | |
| patterns = [ | |
| # With explicit coordinate keywords (most reliable) | |
| r'(?:coordinates?|coords)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', | |
| r'(?:location|position)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', | |
| r'at\s+(?:coordinates?|coords)?\s*([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)', | |
| # Latitude/longitude explicitly mentioned | |
| r'lat[itude]*[:\s]+([-+]?\d+\.?\d*)[,\s]+lon[gitude]*[:\s]+([-+]?\d+\.?\d*)', | |
| # Decimal coordinates (must have decimal points for both numbers) | |
| r'(\d+\.\d+)[,\s]+(\d+\.\d+)(?!\s*[a-zA-Z])', # Not followed by letters | |
| # Standard coordinate format with "at" keyword | |
| r'at\s+(\d{1,2}\.\d{3,})[,\s]+(\d{2,3}\.\d{3,})', | |
| # Simple pattern but ONLY for reasonable coordinate ranges | |
| r'(?:^|\s|at\s+)((?:[1-9]\d?|1[0-7]\d)\.?\d*)[,\s]+((?:[1-9]\d{1,2}|[1-2]\d{2})\.?\d*)(?=\s|$)', | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, message, re.IGNORECASE) | |
| if matches: | |
| try: | |
| # Handle different pattern formats | |
| if len(matches[0]) == 2: | |
| lat, lon = float(matches[0][0]), float(matches[0][1]) | |
| else: | |
| # For patterns with more groups, take first two | |
| lat, lon = float(matches[0][0]), float(matches[0][1]) | |
| # STRICT validation for GPS coordinates | |
| if (-90 <= lat <= 90 and -180 <= lon <= 180 and | |
| # Additional checks to prevent false positives | |
| abs(lat) > 0.1 and abs(lon) > 0.1 and # Not too close to 0,0 | |
| not (lat < 5 and lon < 5)): # Avoid small numbers that might be measurements | |
| return [lat, lon] | |
| except (ValueError, IndexError): | |
| continue | |
| return None | |
| def _extract_altitude(self, message: str) -> int: | |
| """Extract altitude from message.""" | |
| altitude_patterns = [r'(\d+)\s*m(?:eter)?s?\b', r'altitude\s+(\d+)'] | |
| for pattern in altitude_patterns: | |
| matches = re.findall(pattern, message.lower()) | |
| if matches: | |
| return int(matches[0]) | |
| return 80 | |
| def _extract_direction(self, message: str) -> str: | |
| """Extract direction from message.""" | |
| directions = ['north', 'south', 'east', 'west', 'northeast', 'northwest', 'southeast', 'southwest'] | |
| message_lower = message.lower() | |
| for direction in directions: | |
| if direction in message_lower: | |
| return direction | |
| return 'north' | |
| def _extract_distance(self, message: str) -> int: | |
| """Extract distance from message.""" | |
| distance_patterns = [r'(\d+)\s*m(?:eter)?s?', r'(\d+)\s*km'] | |
| for pattern in distance_patterns: | |
| matches = re.findall(pattern, message.lower()) | |
| if matches: | |
| distance = int(matches[0]) | |
| if 'km' in pattern: | |
| distance *= 1000 | |
| return distance | |
| return 100 | |
| def _get_mission_parameters(self, mission_type: str) -> Dict[str, Any]: | |
| """Get mission-specific parameters.""" | |
| parameters = { | |
| 'survey': {'grid_size': 4, 'overlap': 0.7, 'spacing': 200}, | |
| 'patrol': {'perimeter_points': 8, 'radius': 300, 'hold_time': 2.0}, | |
| 'inspection': {'detail_points': 6, 'precision': 'high', 'slow_speed': True}, | |
| 'photography': {'photo_positions': 5, 'hold_time': 3.0, 'angles': 'multiple'}, | |
| 'go_straight': {'waypoints': 3, 'linear': True, 'direct_path': True}, | |
| 'simple_flight': {'waypoints': 4, 'pattern': 'basic', 'rectangular': True} | |
| } | |
| return parameters.get(mission_type, {}) | |
| def _get_waypoint_strategy(self, mission_type: str) -> str: | |
| """Get waypoint strategy description.""" | |
| strategies = { | |
| 'survey': 'Grid pattern with optimal coverage and 70% overlap for mapping', | |
| 'patrol': 'Perimeter pattern with 8 strategic points for surveillance', | |
| 'inspection': 'Linear pattern with high precision waypoints for detailed examination', | |
| 'photography': 'Strategic positioning with multiple angles and hold times', | |
| 'go_straight': 'Direct linear path with evenly spaced waypoints', | |
| 'simple_flight': 'Basic rectangular pattern for general flight operations' | |
| } | |
| return strategies.get(mission_type, 'Standard waypoint pattern optimized for mission type') | |
| def _get_safety_considerations(self, mission_type: str) -> List[str]: | |
| """Get safety considerations for mission type.""" | |
| base_safety = [ | |
| "Maintain 30% battery reserve minimum", | |
| "Check weather conditions before flight", | |
| "Ensure GPS lock before takeoff", | |
| "Verify no-fly zone compliance" | |
| ] | |
| specific_safety = { | |
| 'survey': ["Maintain consistent altitude for mapping accuracy", "Monitor for aircraft in survey area"], | |
| 'patrol': ["Be aware of security personnel", "Maintain communication during perimeter flight"], | |
| 'inspection': ["Maintain safe distance from structures", "Use slow speeds for detailed observation"], | |
| 'photography': ["Consider lighting conditions", "Plan for sun angle optimization"], | |
| 'go_straight': ["Check for obstacles along flight path", "Ensure clear landing zone"], | |
| 'simple_flight': ["Standard safety protocols apply", "Monitor for other aircraft"] | |
| } | |
| return base_safety + specific_safety.get(mission_type, []) | |
| def _extract_location_from_history(self, chat_history: List[Dict]) -> str: | |
| """Extract location information from chat history.""" | |
| for message in reversed(chat_history[-5:]): # Check last 5 messages | |
| user_msg = message.get('user', '') | |
| coords = self._extract_coordinates(user_msg) | |
| if coords: | |
| return f"{coords[0]}, {coords[1]}" | |
| return "not specified" | |
| async def _get_coordinates_from_context(self, message: str, chat_history: List[Dict]) -> Optional[List[float]]: | |
| """Get coordinates from current message, chat history, or geocoding service.""" | |
| # First try current message | |
| coords = self._extract_coordinates(message) | |
| if coords: | |
| return coords | |
| # If this is a location-based query, prioritize geocoding over chat history | |
| if self._is_location_based_query(message): | |
| try: | |
| geocoded_coords = await self.geocoding.geocode_location(message) | |
| if geocoded_coords: | |
| return list(geocoded_coords) | |
| except Exception as e: | |
| print(f"Geocoding failed: {e}") | |
| # Then try chat history as fallback | |
| for msg in reversed(chat_history[-3:]): | |
| user_msg = msg.get('user', '') | |
| coords = self._extract_coordinates(user_msg) | |
| if coords: | |
| return coords | |
| return None | |
| def _is_location_based_query(self, message: str) -> bool: | |
| """Check if the message is a location-based query that needs geocoding.""" | |
| message_lower = message.lower() | |
| # SKIP geocoding if user provided explicit coordinates | |
| if self._extract_coordinates(message): | |
| return False | |
| # Check for location indicators that require geocoding | |
| location_indicators = [ | |
| 'around', 'near', 'in', 'district', 'province', 'city', 'town', | |
| 'street', 'road', 'avenue', 'park', 'university', 'school', | |
| 'hospital', 'airport', 'station', 'mall', 'market', 'center', 'from' | |
| ] | |
| has_location_words = any(f' {word} ' in f' {message_lower} ' for word in location_indicators) | |
| # Check for Vietnamese place name patterns | |
| vietnamese_places = ['đà nẵng', 'hải châu', 'sơn trà', 'ngũ hành sơn', 'liên chiểu', 'hội an', 'huế', 'quảng nam', 'doi cung', 'đội cung', 'hai chau'] | |
| has_vietnamese_place = any(place in message_lower for place in vietnamese_places) | |
| # Check for distance patterns like "100m around" | |
| has_distance_pattern = bool(re.search(r'\d+\s*m\s+(around|from)', message_lower)) | |
| # Must have either location words, Vietnamese places, or distance pattern | |
| # AND must have mission-related words like "scan", "survey", etc. | |
| mission_words = ['scan', 'survey', 'fly', 'mission', 'help me', 'go', 'straight'] | |
| has_mission_intent = any(f' {word} ' in f' {message_lower} ' for word in mission_words) | |
| return (has_location_words or has_vietnamese_place or has_distance_pattern) and has_mission_intent | |
| def _extract_mission_type_from_history(self, chat_history: List[Dict]) -> str: | |
| """Extract mission type from chat history.""" | |
| for message in reversed(chat_history[-5:]): | |
| user_msg = message.get('user', '') | |
| mission_type = self._extract_mission_type(user_msg) | |
| if mission_type != 'simple_flight': | |
| return mission_type | |
| return "not specified" | |
| async def _get_flight_context(self) -> str: | |
| """Get current flight context for AI responses.""" | |
| try: | |
| from ..core.flight_manager import flight_manager | |
| # Get active missions | |
| active_missions = flight_manager.active_missions | |
| if not active_missions: | |
| return "FLIGHT STATUS: No active flights currently." | |
| flight_summaries = [] | |
| for mission_id, mission in active_missions.items(): | |
| # Get telemetry for this drone | |
| telemetry = flight_manager.drone_telemetry.get(mission.drone_id) | |
| completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed) | |
| total_waypoints = len(mission.waypoints) | |
| progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0 | |
| flight_info = f""" | |
| ACTIVE FLIGHT: | |
| - Mission: {mission_id} ({mission.state.value}) | |
| - Drone: {mission.drone_id} | |
| - Progress: {completed_waypoints}/{total_waypoints} waypoints ({progress:.1f}%)""" | |
| if telemetry: | |
| flight_info += f""" | |
| - Battery: {telemetry.battery}% | |
| - Location: {telemetry.lat:.4f}, {telemetry.lon:.4f} | |
| - Altitude: {telemetry.alt:.1f}m""" | |
| flight_summaries.append(flight_info) | |
| context = "\n".join(flight_summaries) | |
| return f"CURRENT FLIGHT STATUS:\n{context}\n\nIMPORTANT: User may want to check status, pause, update, or cancel the active flight(s)." | |
| except Exception as e: | |
| return f"FLIGHT STATUS: Error retrieving flight info ({str(e)})" | |
| class ChatHistory: | |
| """Enhanced chat history for conversational AI.""" | |
| def __init__(self, max_messages: int = 10): | |
| self.messages = [] | |
| self.max_messages = max_messages | |
| self.user_context = { | |
| 'experience_level': None, | |
| 'current_mission': None, | |
| 'preferences': {}, | |
| 'last_coordinates': None | |
| } | |
| def add_message(self, user_message: str, ai_response: str, metadata: Dict = None): | |
| """Add message pair to history with metadata.""" | |
| from datetime import datetime | |
| if metadata is None: | |
| metadata = {} | |
| self.messages.append({ | |
| 'user': user_message, | |
| 'ai': ai_response, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'metadata': metadata | |
| }) | |
| # Update user context | |
| self._update_user_context(user_message, metadata) | |
| if len(self.messages) > self.max_messages: | |
| self.messages = self.messages[-self.max_messages:] | |
| def _update_user_context(self, user_message: str, metadata: Dict): | |
| """Update user context based on conversation.""" | |
| message_lower = user_message.lower() | |
| # Update experience level | |
| if any(term in message_lower for term in ['new to drones', 'beginner', 'first time']): | |
| self.user_context['experience_level'] = 'beginner' | |
| elif any(term in message_lower for term in ['mavlink', 'qgc', 'frame', 'coordinate']): | |
| self.user_context['experience_level'] = 'expert' | |
| # Extract and store coordinates | |
| import re | |
| coord_pattern = r'([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)' | |
| matches = re.findall(coord_pattern, user_message) | |
| if matches: | |
| try: | |
| lat, lon = float(matches[0][0]), float(matches[0][1]) | |
| if -90 <= lat <= 90 and -180 <= lon <= 180: | |
| self.user_context['last_coordinates'] = (lat, lon) | |
| except ValueError: | |
| pass | |
| def get_context(self) -> str: | |
| """Get intelligent conversation context.""" | |
| if not self.messages: | |
| return "New conversation" | |
| context_parts = ["Recent conversation:"] | |
| for msg in self.messages[-3:]: | |
| context_parts.append(f"User: {msg['user']}") | |
| context_parts.append(f"AI: {msg['ai'][:100]}...") | |
| # Add user context | |
| if self.user_context['experience_level']: | |
| context_parts.append(f"User experience: {self.user_context['experience_level']}") | |
| if self.user_context['last_coordinates']: | |
| lat, lon = self.user_context['last_coordinates'] | |
| context_parts.append(f"Last known coordinates: {lat:.6f}, {lon:.6f}") | |
| return "\n".join(context_parts) | |
| def get_user_experience(self) -> str: | |
| """Get user experience level.""" | |
| return self.user_context.get('experience_level', 'unknown') | |
| def has_coordinates(self) -> bool: | |
| """Check if user has provided coordinates.""" | |
| return self.user_context['last_coordinates'] is not None | |
| def get_last_coordinates(self) -> Optional[tuple]: | |
| """Get last known coordinates.""" | |
| return self.user_context['last_coordinates'] | |