DroneAgent / src /services /drone_agent.py
zok213
Here's a summary of the changes:
6740714
"""Main DroneAgent service."""
import os
import asyncio
from datetime import datetime
from typing import Dict, Any
from .mission_planner import MissionPlanner
class DroneAgent:
"""Complete AI system with all features integrated."""
def __init__(self):
self.mission_planner = MissionPlanner()
async def process_message(self, message: str) -> Dict[str, Any]:
"""Process user message with conversational AI system."""
try:
# First, analyze conversation for guidance or mission generation
conversation_analysis = await self.mission_planner.gemini_ai.analyze_conversation(
message, self.mission_planner.chat_history.messages
)
# Check if this is a drone status request
if conversation_analysis.get('is_drone_status', False):
response = self._create_drone_status_response(message, conversation_analysis)
self.mission_planner.chat_history.add_message(
message, response['ai_response'],
{'type': 'drone_status', 'state': conversation_analysis.get('conversation_state')}
)
return response
# Check if this is guidance or mission generation
if conversation_analysis.get('is_guidance', False):
response = self._create_guidance_response(message, conversation_analysis)
self.mission_planner.chat_history.add_message(
message, response['ai_response'],
{'type': 'guidance', 'state': conversation_analysis.get('conversation_state')}
)
return response
# This is a mission generation request
analysis = await self.mission_planner.analyze_user_request(message)
filename = await self.mission_planner.create_mission(analysis)
# Determine user experience for response formatting
user_experience = self.mission_planner.chat_history.get_user_experience()
response = self._create_response(message, analysis, filename, user_experience)
self.mission_planner.chat_history.add_message(
message, response['ai_response'],
{'type': 'mission_generated', 'filename': filename}
)
# QGC auto-import functionality removed as requested
return response
except ValueError as e:
return self._handle_coordinate_error(message, str(e))
except Exception as e:
return self._handle_error(message, str(e))
def _create_guidance_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]:
"""Create user guidance response."""
return {
"user_message": message,
"ai_response": conversation_analysis.get('guidance_response', 'How can I help you?'),
"is_guidance": True,
"conversation_state": conversation_analysis.get('conversation_state'),
"user_experience": conversation_analysis.get('user_experience'),
"needs_mission_generation": False,
"timestamp": datetime.now().isoformat()
}
def _create_drone_status_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]:
"""Create drone status response."""
return {
"user_message": message,
"ai_response": conversation_analysis.get('drone_status_response', 'Drone status not available'),
"is_drone_status": True,
"conversation_state": conversation_analysis.get('conversation_state'),
"drone_status": conversation_analysis.get('drone_status', {}),
"flight_constraints": conversation_analysis.get('flight_constraints', {}),
"needs_mission_generation": False,
"timestamp": datetime.now().isoformat()
}
def _create_response(self, message: str, analysis, filename: str, user_experience: str = "unknown") -> Dict[str, Any]:
"""Create complete AI response with detailed information."""
# Enhanced AI response with Gemini personality
ai_response = f"""✅ **Mission Ready!**
🎯 **{analysis.mission_type.value.title()} Mission Created**
📍 Location: {analysis.coordinates[0]:.4f}, {analysis.coordinates[1]:.4f}
🛩️ Altitude: {analysis.altitude}m | Waypoints: {analysis.waypoint_count}
📁 File: `{filename}`
🧠 **Why this works:** {analysis.reasoning}
✅ Ready to fly safely!"""
# Add personality response if available (from Gemini)
if hasattr(analysis, 'personality_response') and analysis.personality_response:
ai_response += f"\n\n💬 **DroneBot says:** {analysis.personality_response}"
return {
"user_message": message,
"ai_response": ai_response,
"operator_mindset": f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} - {analysis.mission_type.value.title()} Specialist",
"confidence_level": analysis.confidence,
"mission_analysis": {
"mission_type": analysis.mission_type.value,
"coordinates": list(analysis.coordinates),
"altitude": analysis.altitude,
"waypoint_count": analysis.waypoint_count,
"ai_reasoning": analysis.reasoning,
"mavlink_commands": ["TAKEOFF(22)", "WAYPOINT(16)", "DO_LAND_START(203)", "RTL(20)"],
"coordinate_frames": "GLOBAL_RELATIVE_ALT(3) for navigation, MISSION(2) for landing",
"gemini_enhanced": analysis.gemini_enhanced,
"model_used": analysis.model_used
},
"safety_briefing": [
"Mission uses perfect coordinate frames (eliminates takeoff errors)",
f"Altitude optimized for {analysis.mission_type.value} operations",
"Complete landing sequence with DO_LAND_START included",
"Mission file ready for manual import into QGroundControl",
f"Generated by {analysis.model_used} AI model"
],
"technical_notes": [
f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} reasoning with complete knowledge base",
"Perfect MAVLink command sequences",
"Coordinate frame consistency maintained throughout",
"Mission-specific pattern optimization applied"
],
"reasoning_chain": [analysis.reasoning] if isinstance(analysis.reasoning, str) else analysis.reasoning,
"downloadable_files": [f"downloads/{filename}"],
"professional_grade": True,
"ai_intelligence": True,
"gemini_enhanced": analysis.gemini_enhanced,
"model_used": analysis.model_used,
"timestamp": datetime.now().isoformat()
}
def _handle_coordinate_error(self, message: str, error: str) -> Dict[str, Any]:
"""Handle coordinate requirement errors with friendly guidance."""
user_experience = self.mission_planner.chat_history.get_user_experience()
if user_experience == "beginner":
ai_response = """😊 **I'd love to help you create a drone mission!**
🗺️ **I need to know where you want to fly.** Here's how to tell me:
**Option 1 - Get coordinates from Google Maps:**
1. Go to Google Maps
2. Right-click on your location
3. Copy the numbers (like "16.047, 108.206")
4. Tell me: "Create a survey mission at 16.047, 108.206"
**Option 2 - Describe the location:**
• "Survey my farm in [City Name]"
• "Patrol around my house"
• "Photography flight at the park"
**Examples that work perfectly:**
• "Go straight 100m from 16.047, 108.206"
• "Survey mission at 16.047, 108.206"
• "Patrol route at 16.047, 108.206 at 80m altitude"
Just tell me where you want to fly! 🚁"""
else:
ai_response = f"""📍 **Location Required**
Please provide coordinates in lat,lon format:
• "Go straight 100m from 16.047, 108.206"
• "Survey mission at 16.047, 108.206"
• "Patrol route at 16.047, 108.206 at 80m altitude"
Error: {error}"""
return {
"user_message": message,
"ai_response": ai_response,
"status": "need_coordinates",
"error": error,
"is_guidance": True
}
def _handle_error(self, message: str, error: str) -> Dict[str, Any]:
"""Handle general errors."""
return {
"user_message": message,
"ai_response": f"""System error occurred: {error}
Please try again with proper coordinates format.""",
"error": True,
"error_details": error
}