"""Main DroneAgent service.""" import os import asyncio from datetime import datetime from typing import Dict, Any from .mission_planner import MissionPlanner class DroneAgent: """Complete AI system with all features integrated.""" def __init__(self): self.mission_planner = MissionPlanner() async def process_message(self, message: str) -> Dict[str, Any]: """Process user message with conversational AI system.""" try: # First, analyze conversation for guidance or mission generation conversation_analysis = await self.mission_planner.gemini_ai.analyze_conversation( message, self.mission_planner.chat_history.messages ) # Check if this is a drone status request if conversation_analysis.get('is_drone_status', False): response = self._create_drone_status_response(message, conversation_analysis) self.mission_planner.chat_history.add_message( message, response['ai_response'], {'type': 'drone_status', 'state': conversation_analysis.get('conversation_state')} ) return response # Check if this is guidance or mission generation if conversation_analysis.get('is_guidance', False): response = self._create_guidance_response(message, conversation_analysis) self.mission_planner.chat_history.add_message( message, response['ai_response'], {'type': 'guidance', 'state': conversation_analysis.get('conversation_state')} ) return response # This is a mission generation request analysis = await self.mission_planner.analyze_user_request(message) filename = await self.mission_planner.create_mission(analysis) # Determine user experience for response formatting user_experience = self.mission_planner.chat_history.get_user_experience() response = self._create_response(message, analysis, filename, user_experience) self.mission_planner.chat_history.add_message( message, response['ai_response'], {'type': 'mission_generated', 'filename': filename} ) # QGC auto-import functionality removed as requested return response except ValueError as e: return self._handle_coordinate_error(message, str(e)) except Exception as e: return self._handle_error(message, str(e)) def _create_guidance_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]: """Create user guidance response.""" return { "user_message": message, "ai_response": conversation_analysis.get('guidance_response', 'How can I help you?'), "is_guidance": True, "conversation_state": conversation_analysis.get('conversation_state'), "user_experience": conversation_analysis.get('user_experience'), "needs_mission_generation": False, "timestamp": datetime.now().isoformat() } def _create_drone_status_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]: """Create drone status response.""" return { "user_message": message, "ai_response": conversation_analysis.get('drone_status_response', 'Drone status not available'), "is_drone_status": True, "conversation_state": conversation_analysis.get('conversation_state'), "drone_status": conversation_analysis.get('drone_status', {}), "flight_constraints": conversation_analysis.get('flight_constraints', {}), "needs_mission_generation": False, "timestamp": datetime.now().isoformat() } def _create_response(self, message: str, analysis, filename: str, user_experience: str = "unknown") -> Dict[str, Any]: """Create complete AI response with detailed information.""" # Enhanced AI response with Gemini personality ai_response = f"""βœ… **Mission Ready!** 🎯 **{analysis.mission_type.value.title()} Mission Created** πŸ“ Location: {analysis.coordinates[0]:.4f}, {analysis.coordinates[1]:.4f} πŸ›©οΈ Altitude: {analysis.altitude}m | Waypoints: {analysis.waypoint_count} πŸ“ File: `{filename}` 🧠 **Why this works:** {analysis.reasoning} βœ… Ready to fly safely!""" # Add personality response if available (from Gemini) if hasattr(analysis, 'personality_response') and analysis.personality_response: ai_response += f"\n\nπŸ’¬ **DroneBot says:** {analysis.personality_response}" return { "user_message": message, "ai_response": ai_response, "operator_mindset": f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} - {analysis.mission_type.value.title()} Specialist", "confidence_level": analysis.confidence, "mission_analysis": { "mission_type": analysis.mission_type.value, "coordinates": list(analysis.coordinates), "altitude": analysis.altitude, "waypoint_count": analysis.waypoint_count, "ai_reasoning": analysis.reasoning, "mavlink_commands": ["TAKEOFF(22)", "WAYPOINT(16)", "DO_LAND_START(203)", "RTL(20)"], "coordinate_frames": "GLOBAL_RELATIVE_ALT(3) for navigation, MISSION(2) for landing", "gemini_enhanced": analysis.gemini_enhanced, "model_used": analysis.model_used }, "safety_briefing": [ "Mission uses perfect coordinate frames (eliminates takeoff errors)", f"Altitude optimized for {analysis.mission_type.value} operations", "Complete landing sequence with DO_LAND_START included", "Mission file ready for manual import into QGroundControl", f"Generated by {analysis.model_used} AI model" ], "technical_notes": [ f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} reasoning with complete knowledge base", "Perfect MAVLink command sequences", "Coordinate frame consistency maintained throughout", "Mission-specific pattern optimization applied" ], "reasoning_chain": [analysis.reasoning] if isinstance(analysis.reasoning, str) else analysis.reasoning, "downloadable_files": [f"downloads/{filename}"], "professional_grade": True, "ai_intelligence": True, "gemini_enhanced": analysis.gemini_enhanced, "model_used": analysis.model_used, "timestamp": datetime.now().isoformat() } def _handle_coordinate_error(self, message: str, error: str) -> Dict[str, Any]: """Handle coordinate requirement errors with friendly guidance.""" user_experience = self.mission_planner.chat_history.get_user_experience() if user_experience == "beginner": ai_response = """😊 **I'd love to help you create a drone mission!** πŸ—ΊοΈ **I need to know where you want to fly.** Here's how to tell me: **Option 1 - Get coordinates from Google Maps:** 1. Go to Google Maps 2. Right-click on your location 3. Copy the numbers (like "16.047, 108.206") 4. Tell me: "Create a survey mission at 16.047, 108.206" **Option 2 - Describe the location:** β€’ "Survey my farm in [City Name]" β€’ "Patrol around my house" β€’ "Photography flight at the park" **Examples that work perfectly:** β€’ "Go straight 100m from 16.047, 108.206" β€’ "Survey mission at 16.047, 108.206" β€’ "Patrol route at 16.047, 108.206 at 80m altitude" Just tell me where you want to fly! 🚁""" else: ai_response = f"""πŸ“ **Location Required** Please provide coordinates in lat,lon format: β€’ "Go straight 100m from 16.047, 108.206" β€’ "Survey mission at 16.047, 108.206" β€’ "Patrol route at 16.047, 108.206 at 80m altitude" Error: {error}""" return { "user_message": message, "ai_response": ai_response, "status": "need_coordinates", "error": error, "is_guidance": True } def _handle_error(self, message: str, error: str) -> Dict[str, Any]: """Handle general errors.""" return { "user_message": message, "ai_response": f"""System error occurred: {error} Please try again with proper coordinates format.""", "error": True, "error_details": error }