zok213
Here's a summary of the changes:
6740714
"""AI integration for mission analysis."""
import os
import json
import re
import aiohttp
from typing import Dict, List, Any, Optional
from .knowledge_loader import KnowledgeBaseLoader
from .ai_personality import AIPersonality, ConversationState, UserExperience
from .geocoding import GeocodingService
class Gemini2FlashAI:
"""Gemini 2.0 Flash AI integration with fallback."""
def __init__(self):
api_keys_str = os.getenv('GEMINI_API_KEYS', '')
self.api_keys = [key.strip() for key in api_keys_str.split(',') if key.strip()]
self.enabled = bool(self.api_keys)
self.api_endpoint = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent"
self.current_key_index = 0
# Enhanced API key rotation system
self.key_status = {} # Track status of each key
self.key_last_used = {} # Track last usage time for each key
self.key_cooldown_until = {} # Track cooldown periods for rate limited keys
self.initialize_key_tracking()
# Load knowledge base for enhanced context
self.knowledge_loader = KnowledgeBaseLoader()
self.knowledge_base = self.knowledge_loader.load_complete_knowledge_base()
# Initialize AI personality for user guidance
self.personality = AIPersonality()
# Initialize geocoding service (reads GOONG_API_KEY from env)
self.geocoding = GeocodingService()
# Note: chat_history will be provided by mission_planner context
def initialize_key_tracking(self):
"""Initialize tracking for all API keys."""
from datetime import datetime
current_time = datetime.now()
for i, key in enumerate(self.api_keys):
self.key_status[key] = 'available' # available, rate_limited, cooldown
self.key_last_used[key] = current_time
self.key_cooldown_until[key] = None
def get_next_available_key(self) -> str:
"""Get the next available API key with intelligent rotation."""
if not self.api_keys:
return ""
from datetime import datetime, timedelta
current_time = datetime.now()
# Try each key to find one that's available
for attempt in range(len(self.api_keys)):
candidate_key = self.api_keys[self.current_key_index]
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
# Check if key is available
if self.key_status[candidate_key] == 'available':
self.key_last_used[candidate_key] = current_time
return candidate_key
# Check if rate limited key is ready for retry (after cooldown)
elif self.key_status[candidate_key] == 'rate_limited':
if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]:
print(f"🔄 API key {candidate_key[:20]}... cooldown period ended, retrying")
self.key_status[candidate_key] = 'available'
self.key_cooldown_until[candidate_key] = None
self.key_last_used[candidate_key] = current_time
return candidate_key
# Check if key is in cooldown
elif self.key_status[candidate_key] == 'cooldown':
if self.key_cooldown_until[candidate_key] and current_time >= self.key_cooldown_until[candidate_key]:
print(f"🔄 API key {candidate_key[:20]}... cooldown period ended, making available")
self.key_status[candidate_key] = 'available'
self.key_last_used[candidate_key] = current_time
return candidate_key
# If all keys are unavailable, return the first one anyway (better than no response)
print(f"⚠️ All API keys are rate limited, using first key as fallback")
return self.api_keys[0] if self.api_keys else ""
def mark_key_rate_limited(self, api_key: str):
"""Mark an API key as rate limited and set cooldown."""
from datetime import datetime, timedelta
if api_key in self.key_status:
self.key_status[api_key] = 'rate_limited'
# Set cooldown for 60 seconds (adjust as needed based on API limits)
self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=60)
print(f"🚫 API key {api_key[:20]}... marked as rate limited, cooldown until {self.key_cooldown_until[api_key]}")
def mark_key_error(self, api_key: str):
"""Mark an API key as having an error and set shorter cooldown."""
from datetime import datetime, timedelta
if api_key in self.key_status:
self.key_status[api_key] = 'cooldown'
# Set shorter cooldown for general errors (30 seconds)
self.key_cooldown_until[api_key] = datetime.now() + timedelta(seconds=30)
print(f"⚠️ API key {api_key[:20]}... marked for cooldown until {self.key_cooldown_until[api_key]}")
def mark_key_success(self, api_key: str):
"""Mark an API key as successful and available."""
if api_key in self.key_status:
self.key_status[api_key] = 'available'
self.key_cooldown_until[api_key] = None
print(f"✅ API key {api_key[:20]}... marked as available after successful use")
def get_current_api_key(self) -> str:
"""Get API key with intelligent rotation and fallback."""
return self.get_next_available_key()
async def analyze_conversation(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]:
"""🧠 GEMINI-FIRST CONVERSATION ANALYSIS: Intelligent, personalized responses."""
# Always use Gemini for conversation analysis if available
if self.enabled:
return await self._gemini_conversation_analysis(user_message, chat_history)
else:
# Fallback to local personality system
return await self._local_conversation_analysis(user_message, chat_history)
async def _gemini_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]:
"""🧠 Gemini-powered conversation analysis for intelligent responses."""
# Determine user experience and conversation state using local tools
user_experience = self.personality.analyze_user_experience(user_message, chat_history)
conversation_state = self.personality.determine_conversation_state(user_message, chat_history)
# Check if this is a mission request or needs guidance
if conversation_state in [ConversationState.READY_TO_GENERATE]:
return await self.analyze_mission_request(user_message)
elif conversation_state == ConversationState.DRONE_STATUS_REQUEST:
# Handle drone status requests
return await self._handle_drone_status_request()
elif conversation_state == ConversationState.GENERAL_QUERY:
# Use Gemini for intelligent general query responses
return await self._gemini_general_response(user_message, chat_history, user_experience)
else:
# Use Gemini for intelligent guidance responses
return await self._gemini_guidance_response(user_message, chat_history, conversation_state, user_experience)
async def _local_conversation_analysis(self, user_message: str, chat_history: List[Dict]) -> Dict[str, Any]:
"""Local fallback conversation analysis."""
# Determine user experience and conversation state
user_experience = self.personality.analyze_user_experience(user_message, chat_history)
conversation_state = self.personality.determine_conversation_state(user_message, chat_history)
# Check if this is a mission request or needs guidance
if conversation_state in [ConversationState.READY_TO_GENERATE]:
return await self.analyze_mission_request(user_message)
elif conversation_state == ConversationState.DRONE_STATUS_REQUEST:
# Handle drone status requests
return await self._handle_drone_status_request()
elif conversation_state == ConversationState.GENERAL_QUERY:
# Handle general queries
guidance_response = self.personality.get_guidance_response(
conversation_state, user_experience, {}, chat_history
)
return {
'is_guidance': True,
'conversation_state': conversation_state.value,
'user_experience': user_experience.value,
'guidance_response': guidance_response,
'needs_mission_generation': False,
'is_general_query': True
}
else:
# Provide user guidance
context = {
'location': self._extract_location_from_history(chat_history),
'mission_type': self._extract_mission_type_from_history(chat_history)
}
guidance_response = self.personality.get_guidance_response(
conversation_state, user_experience, context, chat_history
)
return {
'is_guidance': True,
'conversation_state': conversation_state.value,
'user_experience': user_experience.value,
'guidance_response': guidance_response,
'needs_mission_generation': False
}
async def _gemini_general_response(self, user_message: str, chat_history: List[Dict], user_experience) -> Dict[str, Any]:
"""🧠 Gemini-powered general query responses with flight awareness."""
# Check for active flights to provide flight-aware responses
flight_context = await self._get_flight_context()
# Prepare chat context
recent_context = ""
if chat_history:
recent_messages = chat_history[-3:]
context_parts = []
for msg in recent_messages:
if 'user' in msg:
context_parts.append(f"User: {msg['user']}")
if 'ai' in msg:
context_parts.append(f"Assistant: {msg['ai'][:100]}...")
recent_context = "\n".join(context_parts)
prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning.
The user is asking a general question or greeting. Respond naturally and helpfully.
USER EXPERIENCE LEVEL: {user_experience.value}
USER MESSAGE: "{user_message}"
RECENT CHAT CONTEXT:
{recent_context}
{flight_context}
Provide a friendly, personalized response that:
- Shows you understand their message
- If there are active flights, proactively offer flight management options
- Maintains a conversational, friendly tone
- Offers helpful guidance about drone missions
- During active flights, suggest options like: "Do you want to check flight status?", "Pause the current mission?", "Update the flight plan?", "Have the drone return home?"
- For greetings, include current drone status if available
Keep the response conversational and engaging, around 100-200 words."""
try:
api_key = self.get_current_api_key()
headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key}
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.7, # More creative for conversation
"topK": 40,
"topP": 0.95,
"maxOutputTokens": 1024
}
}
async with aiohttp.ClientSession() as session:
async with session.post(self.api_endpoint, json=payload, headers=headers) as response:
if response.status == 200:
result = await response.json()
gemini_response = result['candidates'][0]['content']['parts'][0]['text']
return {
'is_guidance': True,
'conversation_state': 'general_query',
'user_experience': user_experience.value,
'guidance_response': gemini_response,
'needs_mission_generation': False,
'is_general_query': True,
'gemini_powered': True
}
except Exception as e:
print(f"🚨 Gemini general response failed: {e}")
# Fallback to local response
guidance_response = self.personality.get_guidance_response(
ConversationState.GENERAL_QUERY, user_experience, {}, chat_history
)
return {
'is_guidance': True,
'conversation_state': 'general_query',
'user_experience': user_experience.value,
'guidance_response': guidance_response,
'needs_mission_generation': False,
'is_general_query': True
}
async def _gemini_guidance_response(self, user_message: str, chat_history: List[Dict],
conversation_state, user_experience) -> Dict[str, Any]:
"""🧠 Gemini-powered guidance responses."""
# Extract context
location = self._extract_location_from_history(chat_history)
mission_type = self._extract_mission_type_from_history(chat_history)
# Prepare chat context
recent_context = ""
if chat_history:
recent_messages = chat_history[-2:]
context_parts = []
for msg in recent_messages:
if 'user' in msg:
context_parts.append(f"User: {msg['user']}")
recent_context = "\n".join(context_parts)
prompt = f"""You are DroneBot, a friendly and intelligent AI assistant for drone mission planning.
The user needs guidance to complete their drone mission request.
USER MESSAGE: "{user_message}"
CONVERSATION STATE: {conversation_state.value}
USER EXPERIENCE: {user_experience.value}
KNOWN LOCATION: {location}
KNOWN MISSION TYPE: {mission_type}
RECENT CONTEXT:
{recent_context}
Based on the conversation state, provide helpful, personalized guidance:
- If they need location: Ask for coordinates or location description with examples
- If they need mission type: Explain mission types and ask what they want to accomplish
- If they need details: Ask for specific requirements like altitude, area size, etc.
- Be encouraging and educational for beginners
- Be concise and technical for experts
Provide a friendly, helpful response that guides them to the next step."""
try:
api_key = self.get_current_api_key()
headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key}
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.6,
"topK": 40,
"topP": 0.95,
"maxOutputTokens": 1024
}
}
async with aiohttp.ClientSession() as session:
async with session.post(self.api_endpoint, json=payload, headers=headers) as response:
if response.status == 200:
result = await response.json()
gemini_response = result['candidates'][0]['content']['parts'][0]['text']
return {
'is_guidance': True,
'conversation_state': conversation_state.value,
'user_experience': user_experience.value,
'guidance_response': gemini_response,
'needs_mission_generation': False,
'gemini_powered': True
}
except Exception as e:
print(f"🚨 Gemini guidance response failed: {e}")
# Fallback to local response
context = {'location': location, 'mission_type': mission_type}
guidance_response = self.personality.get_guidance_response(
conversation_state, user_experience, context, chat_history
)
return {
'is_guidance': True,
'conversation_state': conversation_state.value,
'user_experience': user_experience.value,
'guidance_response': guidance_response,
'needs_mission_generation': False
}
async def _handle_drone_status_request(self) -> Dict[str, Any]:
"""Handle drone status requests by calling the drone status endpoint."""
try:
from ..core.drone_status import drone_status
current_status = drone_status.get_current_status()
constraints = drone_status.get_flight_constraints()
# Format the drone status response
status_message = self._format_drone_status_response(current_status, constraints)
return {
'is_guidance': False,
'is_drone_status': True,
'conversation_state': 'drone_status_request',
'user_experience': 'unknown',
'drone_status_response': status_message,
'needs_mission_generation': False,
'drone_status': current_status,
'flight_constraints': constraints
}
except Exception as e:
return {
'is_guidance': False,
'is_drone_status': True,
'conversation_state': 'drone_status_request',
'error': f"Error retrieving drone status: {str(e)}",
'drone_status_response': f"❌ Error retrieving drone status: {str(e)}",
'needs_mission_generation': False
}
def _format_drone_status_response(self, current_status: Dict, constraints: Dict) -> str:
"""Format drone status information into a user-friendly message."""
message_parts = []
message_parts.append("🚁 **Drone Status Report**")
message_parts.append("")
# Basic status information
battery = current_status.get('batteryRemaining', 0)
satellites = current_status.get('satellite', 0)
voltage = current_status.get('voltage', 0.0)
message_parts.append(f"🔋 **Battery:** {battery}% ({voltage:.1f}V)")
message_parts.append(f"📡 **GPS:** {satellites} satellites")
message_parts.append(f"🛩️ **Mode:** {current_status.get('mode', 'UNKNOWN')}")
message_parts.append(f"📍 **Location:** {current_status.get('latitude', 0):.4f}, {current_status.get('longitude', 0):.4f}")
message_parts.append(f"🛩️ **Altitude:** {current_status.get('altitude', 0):.0f}m")
message_parts.append("")
# Flight constraints
message_parts.append("**Flight Constraints:**")
message_parts.append(f"• Safe Flight Time: {constraints.get('safe_flight_time_minutes', 0)} minutes")
message_parts.append(f"• Max Distance: {constraints.get('max_safe_distance_meters', 0)}m")
message_parts.append(f"• Optimal Altitude: {constraints.get('optimal_altitude_range', (30, 80))[0]}-{constraints.get('optimal_altitude_range', (30, 80))[1]}m")
message_parts.append(f"• Ready to Fly: {'✅ Yes' if constraints.get('ready_to_fly', False) else '❌ No'}")
message_parts.append("")
if not constraints.get('ready_to_fly', False):
message_parts.append("⚠️ **Not ready to fly:** Check battery level, GPS connection, and safety constraints.")
message_parts.append("")
message_parts.append("For mission planning, the system will optimize altitude and safety parameters based on this status.")
# Include raw telemetry hint for full status requests
if current_status.get('data_source') == 'hardware_telemetry':
message_parts.append("")
message_parts.append("Raw telemetry available via /drone/status")
return "\n".join(message_parts)
async def analyze_mission_request(self, user_message: str) -> Dict[str, Any]:
"""🧠 GEMINI-FIRST ANALYSIS: Always use Gemini as primary AI brain with tool support."""
# ALWAYS use Gemini if available - it's the smart brain that coordinates everything
if self.enabled:
print(f"🧠 GEMINI-FIRST: Using Gemini 2.0 Flash as primary AI brain")
return await self._gemini_first_analysis(user_message)
else:
print(f"⚠️ Gemini not available, using local fallback. API keys: {len(self.api_keys)}")
return await self._fallback_analysis(user_message, [])
async def _gemini_first_analysis(self, user_message: str) -> Dict[str, Any]:
"""🧠 Gemini-first analysis with intelligent tool integration."""
print(f"🔍 DEBUG: Starting Gemini analysis for: {user_message[:50]}...")
# Pre-extract supporting data for Gemini
explicit_coords = self._extract_coordinates(user_message)
is_location_query = self._is_location_based_query(user_message)
print(f"🔍 DEBUG: Explicit coords: {explicit_coords}, Is location query: {is_location_query}")
# Prepare comprehensive context for Gemini
support_data = {
"explicit_coordinates": explicit_coords,
"has_location_query": is_location_query,
"geocoding_available": True,
"drone_status_available": True
}
print(f"🔍 DEBUG: Support data prepared: {support_data}")
# Add geocoding if needed
geocoding_info = ""
if is_location_query and not explicit_coords:
print(f"🔍 DEBUG: Attempting geocoding for location query")
try:
geocoding_result = await self.geocoding.parse_location_with_distance(user_message)
if geocoding_result:
support_data["geocoded_location"] = geocoding_result
geocoding_info = f"""
GEOCODING RESULT:
- Location: {geocoding_result['location_name']}
- Coordinates: {geocoding_result['coordinates']}
- Distance: {geocoding_result.get('distance_meters', 'N/A')}m"""
print(f"🔍 DEBUG: Geocoding successful: {geocoding_result}")
else:
print(f"❌ DEBUG: Geocoding failed - no result")
except Exception as e:
geocoding_info = f"Geocoding failed: {e}"
print(f"❌ DEBUG: Geocoding exception: {e}")
else:
print(f"🔍 DEBUG: Skipping geocoding - has explicit coords or not location query")
# Enhanced prompt for Gemini with professional AI-driven analysis
prompt = f"""You are DroneBot, a professional AI drone mission specialist with comprehensive knowledge of aviation, photogrammetry, and mission planning. You must create intelligent, optimized missions based on real-world requirements.
COMPREHENSIVE KNOWLEDGE BASE:
{self.knowledge_base}
USER REQUEST: "{user_message}"
ENVIRONMENTAL DATA:
- Coordinates: {explicit_coords}
- Location query: {is_location_query}
{geocoding_info}
PROFESSIONAL MISSION ANALYSIS REQUIREMENTS:
You must analyze the request using professional drone operations knowledge:
1. MISSION TYPE INTELLIGENCE:
- Survey: Photogrammetry requirements, GSD calculation, overlap optimization
- Patrol: Security patterns, visibility requirements, threat assessment
- Photography: Lighting analysis, composition angles, equipment optimization
- Inspection: Structure analysis, safety protocols, detailed examination patterns
- Go_straight: Efficiency optimization, obstacle avoidance, direct routing
2. ENVIRONMENTAL ANALYSIS:
- Calculate optimal altitude based on mission requirements and safety
- Analyze terrain complexity and obstacle clearance needs
- Consider weather impact on flight parameters
- Assess battery requirements for mission completion
3. PROFESSIONAL FLIGHT PLANNING:
- Generate waypoint patterns based on industry standards
- Optimize for data quality, safety, and efficiency
- Calculate proper overlap for survey missions (80% forward, 60% side)
- Design approach angles for inspection missions
- Plan hold times based on equipment requirements
4. SAFETY INTEGRATION:
- Assess flight risks and mitigation strategies
- Calculate battery reserves and return-to-home requirements
- Ensure regulatory compliance and airspace considerations
- Plan emergency procedures and abort criteria
As a professional AI mission planner, provide a comprehensive JSON response:
{{
"mission_type": "survey|patrol|photography|inspection|go_straight|simple_flight",
"confidence": 0.95,
"coordinates": [latitude_6_decimal, longitude_6_decimal],
"altitude": optimized_altitude_meters,
"direction": "north|south|east|west|northeast|northwest|southeast|southwest|null",
"distance": calculated_distance_meters,
"reasoning": "Professional analysis explaining mission optimization, safety considerations, and technical decisions",
"mission_parameters": {{
"ground_sample_distance": gsd_cm_per_pixel,
"forward_overlap": overlap_percentage,
"side_overlap": overlap_percentage,
"flight_speed": optimized_speed_ms,
"hold_time": required_hold_time_seconds,
"camera_trigger_distance": trigger_distance_meters,
"battery_consumption_estimate": estimated_wh_usage,
"flight_time_estimate": estimated_minutes,
"data_volume_estimate": estimated_gb,
"weather_considerations": ["wind_impact", "visibility_requirements"],
"equipment_requirements": ["camera_specs", "gimbal_settings"]
}},
"waypoint_strategy": "Detailed explanation of flight pattern optimization based on professional standards",
"safety_considerations": [
"Comprehensive safety analysis",
"Risk mitigation strategies",
"Emergency procedures",
"Regulatory compliance checks"
],
"professional_insights": {{
"mission_complexity": "low|medium|high",
"skill_level_required": "beginner|intermediate|expert",
"environmental_challenges": ["challenge1", "challenge2"],
"optimization_opportunities": ["improvement1", "improvement2"],
"quality_assurance": "Data quality predictions and recommendations"
}},
"personality_response": "Friendly, professional message explaining the mission plan with educational insights"
}}
IMPORTANT:
- Be friendly, understanding, and personalized in your reasoning
- Use the support tools data (coordinates, geocoding) intelligently
- If coordinates are missing, include helpful guidance in personality_response
- Provide detailed, human-like reasoning that shows you understand the user's intent
- Make safety recommendations based on mission type
Respond ONLY with valid JSON."""
try:
api_key = self.get_current_api_key()
if not api_key:
print("❌ No API key available for Gemini")
return await self._fallback_analysis(user_message, [])
print(f"🔍 DEBUG: Using API key: {api_key[:20]}...")
headers = {'Content-Type': 'application/json', 'x-goog-api-key': api_key}
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.4, # Slightly more creative for personality
"topK": 40,
"topP": 0.95,
"maxOutputTokens": 3072
}
}
print(f"🔍 DEBUG: Making Gemini API call...")
async with aiohttp.ClientSession() as session:
async with session.post(self.api_endpoint, json=payload, headers=headers) as response:
print(f"🔍 DEBUG: Gemini API response status: {response.status}")
# Handle rate limiting by marking key and trying next available key
if response.status == 429:
print(f"🚫 API key {api_key[:20]}... got rate limited (429)")
self.mark_key_rate_limited(api_key)
# Try with next available key before falling back to local AI
next_key = self.get_next_available_key()
if next_key and next_key != api_key:
print(f"🔄 Trying with next available key: {next_key[:20]}...")
retry_headers = headers.copy()
retry_headers['x-goog-api-key'] = next_key
async with aiohttp.ClientSession() as retry_session:
async with retry_session.post(self.api_endpoint, json=payload, headers=retry_headers) as retry_response:
print(f"🔄 Retry response status: {retry_response.status}")
if retry_response.status == 200:
result = await retry_response.json()
print(f"🔄 Retry successful! Processing with new key")
# Continue with the successful result and process it
print(f"🔄 Retry result keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}")
# Process the successful retry result
if not result or 'candidates' not in result or not result['candidates']:
print("❌ Invalid retry response structure")
return await self._fallback_analysis(user_message, [])
candidate = result['candidates'][0]
if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']:
print("❌ Missing content in retry response")
return await self._fallback_analysis(user_message, [])
content = candidate['content']['parts'][0]['text']
print(f"🔄 Retry content: {content[:200]}...")
# Clean JSON
content = content.strip()
if content.startswith('```json'):
content = content[7:-3]
elif content.startswith('```'):
content = content[3:-3]
try:
analysis = json.loads(content)
if analysis is None:
print("❌ Retry returned null JSON")
return await self._fallback_analysis(user_message, [])
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON from retry: {e}")
return await self._fallback_analysis(user_message, [])
# Enhance with support tool data
if explicit_coords and not analysis.get('coordinates'):
analysis['coordinates'] = explicit_coords
elif 'geocoded_location' in support_data and not analysis.get('coordinates'):
analysis['coordinates'] = list(support_data['geocoded_location']['coordinates'])
analysis['gemini_enhanced'] = True
analysis['model'] = 'gemini-2.0-flash'
analysis['support_tools_used'] = list(support_data.keys())
# Mark the successful key as available for future use
self.mark_key_success(next_key)
print(f"🔄 Retry analysis successful: {analysis}")
return analysis
elif retry_response.status == 429:
print(f"🚫 Next key also rate limited, falling back to local AI")
self.mark_key_rate_limited(next_key)
return await self._fallback_analysis(user_message, [])
else:
print(f"❌ Retry failed with status {retry_response.status}, falling back to local AI")
self.mark_key_error(next_key)
return await self._fallback_analysis(user_message, [])
else:
print(f"⚠️ No other keys available, falling back to local AI")
return await self._fallback_analysis(user_message, [])
elif response.status == 200:
result = await response.json()
print(f"🔍 DEBUG: Gemini API response received, keys: {list(result.keys()) if isinstance(result, dict) else 'not dict'}")
print(f"🔍 DEBUG: Full result: {result}")
# Debug: Check if result structure is correct
if not result or 'candidates' not in result or not result['candidates']:
print("❌ Invalid Gemini API response structure")
return await self._fallback_analysis(user_message, [])
candidate = result['candidates'][0]
print(f"🔍 DEBUG: Candidate keys: {list(candidate.keys()) if isinstance(candidate, dict) else 'not dict'}")
if 'content' not in candidate or 'parts' not in candidate['content'] or not candidate['content']['parts']:
print("❌ Missing content in Gemini response")
return await self._fallback_analysis(user_message, [])
content = candidate['content']['parts'][0]['text']
print(f"🔍 DEBUG: Raw content from Gemini: {content[:200]}...")
# Clean JSON
content = content.strip()
if content.startswith('```json'):
content = content[7:-3]
elif content.startswith('```'):
content = content[3:-3]
print(f"🔍 DEBUG: Cleaned content: {content[:200]}...")
# Debug: Check if content is valid JSON
try:
analysis = json.loads(content)
print(f"🔍 DEBUG: Parsed analysis: {analysis}")
if analysis is None:
print("❌ Gemini returned null JSON")
return await self._fallback_analysis(user_message, [])
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON from Gemini: {e}")
print(f"Content: {content[:200]}...")
return await self._fallback_analysis(user_message, [])
# Enhance with support tool data
if explicit_coords and not analysis.get('coordinates'):
analysis['coordinates'] = explicit_coords
elif 'geocoded_location' in support_data and not analysis.get('coordinates'):
analysis['coordinates'] = list(support_data['geocoded_location']['coordinates'])
analysis['gemini_enhanced'] = True
analysis['model'] = 'gemini-2.0-flash'
analysis['support_tools_used'] = list(support_data.keys())
# Mark the successful key as available for future use
self.mark_key_success(api_key)
print(f"🔍 DEBUG: Final analysis: {analysis}")
return analysis
elif response.status in [400, 401, 403]:
print(f"❌ Gemini API error ({response.status}), marking key for cooldown")
self.mark_key_error(api_key)
return await self._fallback_analysis(user_message, [])
else:
print(f"❌ Gemini API returned status {response.status}, marking key for cooldown")
self.mark_key_error(api_key)
return await self._fallback_analysis(user_message, [])
except Exception as e:
print(f"🚨 Gemini API failed: {e}")
return await self._fallback_analysis(user_message, [])
async def _fallback_analysis(self, user_message: str, chat_history: List[Dict] = None) -> Dict[str, Any]:
"""Local AI fallback analysis with context awareness."""
if chat_history is None:
chat_history = []
mission_type = self._extract_mission_type(user_message)
coordinates = await self._get_coordinates_from_context(user_message, chat_history)
return {
'mission_type': mission_type,
'confidence': 0.75,
'coordinates': coordinates,
'altitude': self._extract_altitude(user_message),
'direction': self._extract_direction(user_message),
'distance': self._extract_distance(user_message),
'reasoning': f"Smart local AI: Detected '{mission_type}' mission with intelligent context analysis",
'mission_parameters': self._get_mission_parameters(mission_type),
'waypoint_strategy': self._get_waypoint_strategy(mission_type),
'safety_considerations': self._get_safety_considerations(mission_type),
'gemini_enhanced': False,
'model': 'local-fallback'
}
def _extract_mission_type(self, message: str) -> str:
"""Extract mission type from message with smart intent recognition."""
message_lower = message.lower()
# Priority-based mission type detection to avoid keyword conflicts
# Priority 1: Specific mission type keywords (highest priority)
if any(word in message_lower for word in ['survey', 'map', 'mapping', 'grid', 'coverage', 'scan', 'scanning']):
return 'survey'
elif any(word in message_lower for word in ['patrol', 'perimeter', 'guard', 'security', 'monitor', 'watch', 'circle', 'secure']):
return 'patrol'
elif any(word in message_lower for word in ['photo', 'photography', 'picture', 'capture', 'take photos', 'aerial photos', 'aerial', 'shoot']):
return 'photography'
elif any(word in message_lower for word in ['inspect', 'inspection', 'examine', 'check', 'analyze', 'detail', 'structure']):
return 'inspection'
# Priority 2: Directional movement keywords (medium priority)
elif any(word in message_lower for word in ['go straight', 'straight line', 'fly straight', 'move straight', 'straight flight', 'direct flight']):
return 'go_straight'
elif any(word in message_lower for word in ['go south', 'go north', 'go east', 'go west', 'fly to', 'move to']):
return 'go_straight'
# Priority 3: General context keywords (lowest priority)
mission_keywords = {
'survey': ['field', 'farm', 'land', 'area coverage'],
'patrol': ['route', 'boundary', 'surveillance'],
'photography': ['building', 'structure', 'landscape'],
'inspection': ['power line', 'bridge', 'roof', 'infrastructure'],
'go_straight': ['straight distance', 'linear path', 'direct route']
}
for mission_type, keywords in mission_keywords.items():
if any(keyword in message_lower for keyword in keywords):
return mission_type
return 'simple_flight'
def _extract_coordinates(self, message: str) -> Optional[List[float]]:
"""Extract coordinates from message with strict validation to prevent false positives."""
# STRICT coordinate patterns - must be explicitly GPS coordinates
patterns = [
# With explicit coordinate keywords (most reliable)
r'(?:coordinates?|coords)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)',
r'(?:location|position)\s+([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)',
r'at\s+(?:coordinates?|coords)?\s*([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)',
# Latitude/longitude explicitly mentioned
r'lat[itude]*[:\s]+([-+]?\d+\.?\d*)[,\s]+lon[gitude]*[:\s]+([-+]?\d+\.?\d*)',
# Decimal coordinates (must have decimal points for both numbers)
r'(\d+\.\d+)[,\s]+(\d+\.\d+)(?!\s*[a-zA-Z])', # Not followed by letters
# Standard coordinate format with "at" keyword
r'at\s+(\d{1,2}\.\d{3,})[,\s]+(\d{2,3}\.\d{3,})',
# Simple pattern but ONLY for reasonable coordinate ranges
r'(?:^|\s|at\s+)((?:[1-9]\d?|1[0-7]\d)\.?\d*)[,\s]+((?:[1-9]\d{1,2}|[1-2]\d{2})\.?\d*)(?=\s|$)',
]
for pattern in patterns:
matches = re.findall(pattern, message, re.IGNORECASE)
if matches:
try:
# Handle different pattern formats
if len(matches[0]) == 2:
lat, lon = float(matches[0][0]), float(matches[0][1])
else:
# For patterns with more groups, take first two
lat, lon = float(matches[0][0]), float(matches[0][1])
# STRICT validation for GPS coordinates
if (-90 <= lat <= 90 and -180 <= lon <= 180 and
# Additional checks to prevent false positives
abs(lat) > 0.1 and abs(lon) > 0.1 and # Not too close to 0,0
not (lat < 5 and lon < 5)): # Avoid small numbers that might be measurements
return [lat, lon]
except (ValueError, IndexError):
continue
return None
def _extract_altitude(self, message: str) -> int:
"""Extract altitude from message."""
altitude_patterns = [r'(\d+)\s*m(?:eter)?s?\b', r'altitude\s+(\d+)']
for pattern in altitude_patterns:
matches = re.findall(pattern, message.lower())
if matches:
return int(matches[0])
return 80
def _extract_direction(self, message: str) -> str:
"""Extract direction from message."""
directions = ['north', 'south', 'east', 'west', 'northeast', 'northwest', 'southeast', 'southwest']
message_lower = message.lower()
for direction in directions:
if direction in message_lower:
return direction
return 'north'
def _extract_distance(self, message: str) -> int:
"""Extract distance from message."""
distance_patterns = [r'(\d+)\s*m(?:eter)?s?', r'(\d+)\s*km']
for pattern in distance_patterns:
matches = re.findall(pattern, message.lower())
if matches:
distance = int(matches[0])
if 'km' in pattern:
distance *= 1000
return distance
return 100
def _get_mission_parameters(self, mission_type: str) -> Dict[str, Any]:
"""Get mission-specific parameters."""
parameters = {
'survey': {'grid_size': 4, 'overlap': 0.7, 'spacing': 200},
'patrol': {'perimeter_points': 8, 'radius': 300, 'hold_time': 2.0},
'inspection': {'detail_points': 6, 'precision': 'high', 'slow_speed': True},
'photography': {'photo_positions': 5, 'hold_time': 3.0, 'angles': 'multiple'},
'go_straight': {'waypoints': 3, 'linear': True, 'direct_path': True},
'simple_flight': {'waypoints': 4, 'pattern': 'basic', 'rectangular': True}
}
return parameters.get(mission_type, {})
def _get_waypoint_strategy(self, mission_type: str) -> str:
"""Get waypoint strategy description."""
strategies = {
'survey': 'Grid pattern with optimal coverage and 70% overlap for mapping',
'patrol': 'Perimeter pattern with 8 strategic points for surveillance',
'inspection': 'Linear pattern with high precision waypoints for detailed examination',
'photography': 'Strategic positioning with multiple angles and hold times',
'go_straight': 'Direct linear path with evenly spaced waypoints',
'simple_flight': 'Basic rectangular pattern for general flight operations'
}
return strategies.get(mission_type, 'Standard waypoint pattern optimized for mission type')
def _get_safety_considerations(self, mission_type: str) -> List[str]:
"""Get safety considerations for mission type."""
base_safety = [
"Maintain 30% battery reserve minimum",
"Check weather conditions before flight",
"Ensure GPS lock before takeoff",
"Verify no-fly zone compliance"
]
specific_safety = {
'survey': ["Maintain consistent altitude for mapping accuracy", "Monitor for aircraft in survey area"],
'patrol': ["Be aware of security personnel", "Maintain communication during perimeter flight"],
'inspection': ["Maintain safe distance from structures", "Use slow speeds for detailed observation"],
'photography': ["Consider lighting conditions", "Plan for sun angle optimization"],
'go_straight': ["Check for obstacles along flight path", "Ensure clear landing zone"],
'simple_flight': ["Standard safety protocols apply", "Monitor for other aircraft"]
}
return base_safety + specific_safety.get(mission_type, [])
def _extract_location_from_history(self, chat_history: List[Dict]) -> str:
"""Extract location information from chat history."""
for message in reversed(chat_history[-5:]): # Check last 5 messages
user_msg = message.get('user', '')
coords = self._extract_coordinates(user_msg)
if coords:
return f"{coords[0]}, {coords[1]}"
return "not specified"
async def _get_coordinates_from_context(self, message: str, chat_history: List[Dict]) -> Optional[List[float]]:
"""Get coordinates from current message, chat history, or geocoding service."""
# First try current message
coords = self._extract_coordinates(message)
if coords:
return coords
# If this is a location-based query, prioritize geocoding over chat history
if self._is_location_based_query(message):
try:
geocoded_coords = await self.geocoding.geocode_location(message)
if geocoded_coords:
return list(geocoded_coords)
except Exception as e:
print(f"Geocoding failed: {e}")
# Then try chat history as fallback
for msg in reversed(chat_history[-3:]):
user_msg = msg.get('user', '')
coords = self._extract_coordinates(user_msg)
if coords:
return coords
return None
def _is_location_based_query(self, message: str) -> bool:
"""Check if the message is a location-based query that needs geocoding."""
message_lower = message.lower()
# SKIP geocoding if user provided explicit coordinates
if self._extract_coordinates(message):
return False
# Check for location indicators that require geocoding
location_indicators = [
'around', 'near', 'in', 'district', 'province', 'city', 'town',
'street', 'road', 'avenue', 'park', 'university', 'school',
'hospital', 'airport', 'station', 'mall', 'market', 'center', 'from'
]
has_location_words = any(f' {word} ' in f' {message_lower} ' for word in location_indicators)
# Check for Vietnamese place name patterns
vietnamese_places = ['đà nẵng', 'hải châu', 'sơn trà', 'ngũ hành sơn', 'liên chiểu', 'hội an', 'huế', 'quảng nam', 'doi cung', 'đội cung', 'hai chau']
has_vietnamese_place = any(place in message_lower for place in vietnamese_places)
# Check for distance patterns like "100m around"
has_distance_pattern = bool(re.search(r'\d+\s*m\s+(around|from)', message_lower))
# Must have either location words, Vietnamese places, or distance pattern
# AND must have mission-related words like "scan", "survey", etc.
mission_words = ['scan', 'survey', 'fly', 'mission', 'help me', 'go', 'straight']
has_mission_intent = any(f' {word} ' in f' {message_lower} ' for word in mission_words)
return (has_location_words or has_vietnamese_place or has_distance_pattern) and has_mission_intent
def _extract_mission_type_from_history(self, chat_history: List[Dict]) -> str:
"""Extract mission type from chat history."""
for message in reversed(chat_history[-5:]):
user_msg = message.get('user', '')
mission_type = self._extract_mission_type(user_msg)
if mission_type != 'simple_flight':
return mission_type
return "not specified"
async def _get_flight_context(self) -> str:
"""Get current flight context for AI responses."""
try:
from ..core.flight_manager import flight_manager
# Get active missions
active_missions = flight_manager.active_missions
if not active_missions:
return "FLIGHT STATUS: No active flights currently."
flight_summaries = []
for mission_id, mission in active_missions.items():
# Get telemetry for this drone
telemetry = flight_manager.drone_telemetry.get(mission.drone_id)
completed_waypoints = sum(1 for wp in mission.waypoints if wp.completed)
total_waypoints = len(mission.waypoints)
progress = (completed_waypoints / total_waypoints) * 100 if total_waypoints > 0 else 0
flight_info = f"""
ACTIVE FLIGHT:
- Mission: {mission_id} ({mission.state.value})
- Drone: {mission.drone_id}
- Progress: {completed_waypoints}/{total_waypoints} waypoints ({progress:.1f}%)"""
if telemetry:
flight_info += f"""
- Battery: {telemetry.battery}%
- Location: {telemetry.lat:.4f}, {telemetry.lon:.4f}
- Altitude: {telemetry.alt:.1f}m"""
flight_summaries.append(flight_info)
context = "\n".join(flight_summaries)
return f"CURRENT FLIGHT STATUS:\n{context}\n\nIMPORTANT: User may want to check status, pause, update, or cancel the active flight(s)."
except Exception as e:
return f"FLIGHT STATUS: Error retrieving flight info ({str(e)})"
class ChatHistory:
"""Enhanced chat history for conversational AI."""
def __init__(self, max_messages: int = 10):
self.messages = []
self.max_messages = max_messages
self.user_context = {
'experience_level': None,
'current_mission': None,
'preferences': {},
'last_coordinates': None
}
def add_message(self, user_message: str, ai_response: str, metadata: Dict = None):
"""Add message pair to history with metadata."""
from datetime import datetime
if metadata is None:
metadata = {}
self.messages.append({
'user': user_message,
'ai': ai_response,
'timestamp': datetime.now().isoformat(),
'metadata': metadata
})
# Update user context
self._update_user_context(user_message, metadata)
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
def _update_user_context(self, user_message: str, metadata: Dict):
"""Update user context based on conversation."""
message_lower = user_message.lower()
# Update experience level
if any(term in message_lower for term in ['new to drones', 'beginner', 'first time']):
self.user_context['experience_level'] = 'beginner'
elif any(term in message_lower for term in ['mavlink', 'qgc', 'frame', 'coordinate']):
self.user_context['experience_level'] = 'expert'
# Extract and store coordinates
import re
coord_pattern = r'([-+]?\d+\.?\d*)[,\s]+([-+]?\d+\.?\d*)'
matches = re.findall(coord_pattern, user_message)
if matches:
try:
lat, lon = float(matches[0][0]), float(matches[0][1])
if -90 <= lat <= 90 and -180 <= lon <= 180:
self.user_context['last_coordinates'] = (lat, lon)
except ValueError:
pass
def get_context(self) -> str:
"""Get intelligent conversation context."""
if not self.messages:
return "New conversation"
context_parts = ["Recent conversation:"]
for msg in self.messages[-3:]:
context_parts.append(f"User: {msg['user']}")
context_parts.append(f"AI: {msg['ai'][:100]}...")
# Add user context
if self.user_context['experience_level']:
context_parts.append(f"User experience: {self.user_context['experience_level']}")
if self.user_context['last_coordinates']:
lat, lon = self.user_context['last_coordinates']
context_parts.append(f"Last known coordinates: {lat:.6f}, {lon:.6f}")
return "\n".join(context_parts)
def get_user_experience(self) -> str:
"""Get user experience level."""
return self.user_context.get('experience_level', 'unknown')
def has_coordinates(self) -> bool:
"""Check if user has provided coordinates."""
return self.user_context['last_coordinates'] is not None
def get_last_coordinates(self) -> Optional[tuple]:
"""Get last known coordinates."""
return self.user_context['last_coordinates']