Spaces:
Sleeping
Sleeping
| """Main DroneAgent service.""" | |
| import os | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| from .mission_planner import MissionPlanner | |
| class DroneAgent: | |
| """Complete AI system with all features integrated.""" | |
| def __init__(self): | |
| self.mission_planner = MissionPlanner() | |
| async def process_message(self, message: str) -> Dict[str, Any]: | |
| """Process user message with conversational AI system.""" | |
| try: | |
| # First, analyze conversation for guidance or mission generation | |
| conversation_analysis = await self.mission_planner.gemini_ai.analyze_conversation( | |
| message, self.mission_planner.chat_history.messages | |
| ) | |
| # Check if this is a drone status request | |
| if conversation_analysis.get('is_drone_status', False): | |
| response = self._create_drone_status_response(message, conversation_analysis) | |
| self.mission_planner.chat_history.add_message( | |
| message, response['ai_response'], | |
| {'type': 'drone_status', 'state': conversation_analysis.get('conversation_state')} | |
| ) | |
| return response | |
| # Check if this is guidance or mission generation | |
| if conversation_analysis.get('is_guidance', False): | |
| response = self._create_guidance_response(message, conversation_analysis) | |
| self.mission_planner.chat_history.add_message( | |
| message, response['ai_response'], | |
| {'type': 'guidance', 'state': conversation_analysis.get('conversation_state')} | |
| ) | |
| return response | |
| # This is a mission generation request | |
| analysis = await self.mission_planner.analyze_user_request(message) | |
| filename = await self.mission_planner.create_mission(analysis) | |
| # Determine user experience for response formatting | |
| user_experience = self.mission_planner.chat_history.get_user_experience() | |
| response = self._create_response(message, analysis, filename, user_experience) | |
| self.mission_planner.chat_history.add_message( | |
| message, response['ai_response'], | |
| {'type': 'mission_generated', 'filename': filename} | |
| ) | |
| # QGC auto-import functionality removed as requested | |
| return response | |
| except ValueError as e: | |
| return self._handle_coordinate_error(message, str(e)) | |
| except Exception as e: | |
| return self._handle_error(message, str(e)) | |
| def _create_guidance_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]: | |
| """Create user guidance response.""" | |
| return { | |
| "user_message": message, | |
| "ai_response": conversation_analysis.get('guidance_response', 'How can I help you?'), | |
| "is_guidance": True, | |
| "conversation_state": conversation_analysis.get('conversation_state'), | |
| "user_experience": conversation_analysis.get('user_experience'), | |
| "needs_mission_generation": False, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _create_drone_status_response(self, message: str, conversation_analysis: Dict) -> Dict[str, Any]: | |
| """Create drone status response.""" | |
| return { | |
| "user_message": message, | |
| "ai_response": conversation_analysis.get('drone_status_response', 'Drone status not available'), | |
| "is_drone_status": True, | |
| "conversation_state": conversation_analysis.get('conversation_state'), | |
| "drone_status": conversation_analysis.get('drone_status', {}), | |
| "flight_constraints": conversation_analysis.get('flight_constraints', {}), | |
| "needs_mission_generation": False, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _create_response(self, message: str, analysis, filename: str, user_experience: str = "unknown") -> Dict[str, Any]: | |
| """Create complete AI response with detailed information.""" | |
| # Enhanced AI response with Gemini personality | |
| ai_response = f"""✅ **Mission Ready!** | |
| 🎯 **{analysis.mission_type.value.title()} Mission Created** | |
| 📍 Location: {analysis.coordinates[0]:.4f}, {analysis.coordinates[1]:.4f} | |
| 🛩️ Altitude: {analysis.altitude}m | Waypoints: {analysis.waypoint_count} | |
| 📁 File: `{filename}` | |
| 🧠 **Why this works:** {analysis.reasoning} | |
| ✅ Ready to fly safely!""" | |
| # Add personality response if available (from Gemini) | |
| if hasattr(analysis, 'personality_response') and analysis.personality_response: | |
| ai_response += f"\n\n💬 **DroneBot says:** {analysis.personality_response}" | |
| return { | |
| "user_message": message, | |
| "ai_response": ai_response, | |
| "operator_mindset": f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} - {analysis.mission_type.value.title()} Specialist", | |
| "confidence_level": analysis.confidence, | |
| "mission_analysis": { | |
| "mission_type": analysis.mission_type.value, | |
| "coordinates": list(analysis.coordinates), | |
| "altitude": analysis.altitude, | |
| "waypoint_count": analysis.waypoint_count, | |
| "ai_reasoning": analysis.reasoning, | |
| "mavlink_commands": ["TAKEOFF(22)", "WAYPOINT(16)", "DO_LAND_START(203)", "RTL(20)"], | |
| "coordinate_frames": "GLOBAL_RELATIVE_ALT(3) for navigation, MISSION(2) for landing", | |
| "gemini_enhanced": analysis.gemini_enhanced, | |
| "model_used": analysis.model_used | |
| }, | |
| "safety_briefing": [ | |
| "Mission uses perfect coordinate frames (eliminates takeoff errors)", | |
| f"Altitude optimized for {analysis.mission_type.value} operations", | |
| "Complete landing sequence with DO_LAND_START included", | |
| "Mission file ready for manual import into QGroundControl", | |
| f"Generated by {analysis.model_used} AI model" | |
| ], | |
| "technical_notes": [ | |
| f"{'Gemini 2.0 Flash' if analysis.gemini_enhanced else 'Local AI'} reasoning with complete knowledge base", | |
| "Perfect MAVLink command sequences", | |
| "Coordinate frame consistency maintained throughout", | |
| "Mission-specific pattern optimization applied" | |
| ], | |
| "reasoning_chain": [analysis.reasoning] if isinstance(analysis.reasoning, str) else analysis.reasoning, | |
| "downloadable_files": [f"downloads/{filename}"], | |
| "professional_grade": True, | |
| "ai_intelligence": True, | |
| "gemini_enhanced": analysis.gemini_enhanced, | |
| "model_used": analysis.model_used, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _handle_coordinate_error(self, message: str, error: str) -> Dict[str, Any]: | |
| """Handle coordinate requirement errors with friendly guidance.""" | |
| user_experience = self.mission_planner.chat_history.get_user_experience() | |
| if user_experience == "beginner": | |
| ai_response = """😊 **I'd love to help you create a drone mission!** | |
| 🗺️ **I need to know where you want to fly.** Here's how to tell me: | |
| **Option 1 - Get coordinates from Google Maps:** | |
| 1. Go to Google Maps | |
| 2. Right-click on your location | |
| 3. Copy the numbers (like "16.047, 108.206") | |
| 4. Tell me: "Create a survey mission at 16.047, 108.206" | |
| **Option 2 - Describe the location:** | |
| • "Survey my farm in [City Name]" | |
| • "Patrol around my house" | |
| • "Photography flight at the park" | |
| **Examples that work perfectly:** | |
| • "Go straight 100m from 16.047, 108.206" | |
| • "Survey mission at 16.047, 108.206" | |
| • "Patrol route at 16.047, 108.206 at 80m altitude" | |
| Just tell me where you want to fly! 🚁""" | |
| else: | |
| ai_response = f"""📍 **Location Required** | |
| Please provide coordinates in lat,lon format: | |
| • "Go straight 100m from 16.047, 108.206" | |
| • "Survey mission at 16.047, 108.206" | |
| • "Patrol route at 16.047, 108.206 at 80m altitude" | |
| Error: {error}""" | |
| return { | |
| "user_message": message, | |
| "ai_response": ai_response, | |
| "status": "need_coordinates", | |
| "error": error, | |
| "is_guidance": True | |
| } | |
| def _handle_error(self, message: str, error: str) -> Dict[str, Any]: | |
| """Handle general errors.""" | |
| return { | |
| "user_message": message, | |
| "ai_response": f"""System error occurred: {error} | |
| Please try again with proper coordinates format.""", | |
| "error": True, | |
| "error_details": error | |
| } | |