Spaces:
Sleeping
Sleeping
| """API routes.""" | |
| from datetime import datetime | |
| from pathlib import Path | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import FileResponse | |
| from .models import ChatMessage, DroneTelemetry | |
| from ..services.drone_agent import DroneAgent | |
| from ..core.coordinates import DroneKnowledgeBase | |
| from ..core.flight_manager import flight_manager | |
| router = APIRouter() | |
| drone_agent = DroneAgent() | |
| async def chat_endpoint(message: ChatMessage): | |
| """Main chat endpoint with conversational AI system.""" | |
| try: | |
| # Only show greeting for very first interaction or explicit help requests | |
| if message.message.lower().strip() in ['hello', 'hi', 'hey', 'start'] and len(message.message.strip()) <= 5: | |
| from ..core.ai_personality import AIPersonality | |
| personality = AIPersonality() | |
| return { | |
| "user_message": message.message, | |
| "ai_response": personality.get_greeting_message(), | |
| "is_greeting": True, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| result = await drone_agent.process_message(message.message) | |
| return result | |
| except Exception as e: | |
| return {"ai_response": f"System error: {str(e)}", "error": True} | |
| async def download_mission(filename: str): | |
| """Download mission file.""" | |
| file_path = Path("downloads") / filename | |
| if file_path.exists() and file_path.suffix == '.plan': | |
| return FileResponse(file_path, filename=filename) | |
| else: | |
| raise HTTPException(status_code=404, detail="Mission file not found") | |
| async def get_latest_mission(): | |
| """Get the latest generated mission file.""" | |
| try: | |
| # Find the most recent .plan file in downloads | |
| from pathlib import Path | |
| import os | |
| downloads_dir = Path("downloads") | |
| if not downloads_dir.exists(): | |
| return { | |
| "status": "no_missions", | |
| "message": "No missions generated yet. Create a mission first!", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| plan_files = list(downloads_dir.glob("*.plan")) | |
| if not plan_files: | |
| return { | |
| "status": "no_missions", | |
| "message": "No mission files found. Create a mission first!", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Get the most recent file | |
| latest_file = max(plan_files, key=os.path.getctime) | |
| # Read and return the mission content | |
| with open(latest_file, 'r', encoding='utf-8') as f: | |
| mission_content = f.read() | |
| return { | |
| "status": "success", | |
| "message": "Latest mission retrieved successfully", | |
| "filename": latest_file.name, | |
| "mission_plan": mission_content, | |
| "file_size": latest_file.stat().st_size, | |
| "created_at": datetime.fromtimestamp(latest_file.stat().st_ctime).isoformat(), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Error retrieving latest mission: {str(e)}", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def receive_drone_telemetry(telemetry: DroneTelemetry): | |
| """Receive real-time drone telemetry data from hardware. | |
| This endpoint receives telemetry data from the drone hardware/flight controller. | |
| The data is used for mission planning optimization and safety constraints. | |
| """ | |
| from ..core.drone_status import drone_status | |
| # Convert Pydantic model to dict | |
| telemetry_data = telemetry.dict() | |
| # Update the drone status with real telemetry | |
| success = drone_status.update_telemetry(telemetry_data) | |
| if success: | |
| # Calculate current flight constraints based on real data | |
| constraints = drone_status.get_flight_constraints() | |
| return { | |
| "status": "success", | |
| "message": "Telemetry data received and processed", | |
| "telemetry_accepted": True, | |
| "flight_safety": { | |
| "ready_to_fly": constraints["ready_to_fly"], | |
| "battery_status": f"{constraints['battery_remaining']}% ({constraints['voltage_status']})", | |
| "gps_quality": constraints["gps_quality"], | |
| "safe_flight_time": f"{constraints['safe_flight_time_minutes']} minutes" | |
| }, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": "Failed to process telemetry data", | |
| "telemetry_accepted": False, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def get_current_drone_status(): | |
| """Get current drone status (for monitoring/debugging purposes). | |
| Returns the last received telemetry data and current flight constraints. | |
| """ | |
| from ..core.drone_status import drone_status | |
| current_status = drone_status.get_current_status() | |
| constraints = drone_status.get_flight_constraints() | |
| return { | |
| "status": "success", | |
| "drone_status": current_status, | |
| "flight_constraints": constraints, | |
| "telemetry_fresh": drone_status.is_telemetry_fresh(), | |
| "connected": drone_status.is_connected, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def serve_chatbot(): | |
| """Serve the chatbot HTML interface.""" | |
| from fastapi.responses import FileResponse | |
| return FileResponse("chatbot.html") | |
| async def get_mission_waypoints(filename: str = None, latest: bool = True): | |
| """Get mission waypoints for map editing. | |
| Returns waypoints in format suitable for map editing: | |
| [ | |
| { lat: 16.010361, lng: 108.258056 }, | |
| { lat: 16.012904326313578, lng: 108.25862196878661 }, | |
| ... | |
| ] | |
| Args: | |
| filename: Specific mission file name (optional) | |
| latest: Get latest mission if filename not provided | |
| """ | |
| try: | |
| from pathlib import Path | |
| import json | |
| downloads_dir = Path("downloads") | |
| if not downloads_dir.exists(): | |
| return { | |
| "status": "error", | |
| "message": "No missions found. Generate a mission first!", | |
| "waypoints": [] | |
| } | |
| # Find the mission file | |
| if filename: | |
| mission_file = downloads_dir / filename | |
| if not mission_file.exists(): | |
| return { | |
| "status": "error", | |
| "message": f"Mission file {filename} not found", | |
| "waypoints": [] | |
| } | |
| else: | |
| # Get latest .plan file | |
| plan_files = list(downloads_dir.glob("*.plan")) | |
| if not plan_files: | |
| return { | |
| "status": "error", | |
| "message": "No mission files found. Generate a mission first!", | |
| "waypoints": [] | |
| } | |
| mission_file = max(plan_files, key=lambda f: f.stat().st_mtime) | |
| # Read and parse the mission file | |
| with open(mission_file, 'r', encoding='utf-8') as f: | |
| mission_data = json.load(f) | |
| # Extract waypoints from mission items (skip takeoff and landing) | |
| waypoints = [] | |
| mission_items = mission_data.get("mission", {}).get("items", []) | |
| for item in mission_items: | |
| # Only include waypoint items (command 16 = MAV_CMD_NAV_WAYPOINT) | |
| if item.get("command") == 16: # MAV_CMD_NAV_WAYPOINT | |
| params = item.get("params", []) | |
| if len(params) >= 7: # lat, lon are params[4] and [5] | |
| # Return exact coordinates without rounding | |
| waypoint = { | |
| "lat": params[4], # Exact latitude | |
| "lng": params[5] # Exact longitude (using lng as requested) | |
| } | |
| waypoints.append(waypoint) | |
| return { | |
| "status": "success", | |
| "message": f"Retrieved {len(waypoints)} waypoints from {mission_file.name}", | |
| "filename": mission_file.name, | |
| "waypoints": waypoints, | |
| "mission_info": { | |
| "cruise_speed": mission_data.get("mission", {}).get("cruiseSpeed", 0), | |
| "firmware_type": mission_data.get("mission", {}).get("firmwareType", 0), | |
| "total_items": len(mission_items) | |
| } | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "status": "error", | |
| "message": f"Invalid mission file format: {str(e)}", | |
| "waypoints": [] | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Failed to retrieve waypoints: {str(e)}", | |
| "waypoints": [] | |
| } | |
| async def confirm_flight(mission_data: dict): | |
| """Confirm and start tracking a flight mission. | |
| Expects JSON with: | |
| - mission_id: str | |
| - drone_id: str | |
| - waypoints: list of {"lat": float, "lng": float} (user-edited coordinates) | |
| OR {"lat": float, "lon": float, "alt": float} (original format) | |
| - altitude: float (optional, defaults to 70m) | |
| """ | |
| try: | |
| # Handle both formats: {lat, lng} and {lat, lon} | |
| waypoints_data = mission_data.get('waypoints', []) | |
| # Normalize waypoints to internal format | |
| normalized_waypoints = [] | |
| for wp in waypoints_data: | |
| # Support both {lat, lng} (user-edited) and {lat, lon} (original) formats | |
| if 'lng' in wp: | |
| # User-edited format from map | |
| normalized_waypoints.append({ | |
| 'lat': wp['lat'], # Keep exact precision | |
| 'lon': wp['lng'], # Note: using lng as longitude | |
| 'alt': wp.get('alt', mission_data.get('altitude', 70.0)) | |
| }) | |
| elif 'lon' in wp: | |
| # Original format | |
| normalized_waypoints.append({ | |
| 'lat': wp['lat'], # Keep exact precision | |
| 'lon': wp['lon'], # Keep exact precision | |
| 'alt': wp.get('alt', mission_data.get('altitude', 70.0)) | |
| }) | |
| else: | |
| # Minimal format, add default altitude | |
| normalized_waypoints.append({ | |
| 'lat': wp['lat'], | |
| 'lon': wp.get('lng', wp.get('lon', 0)), | |
| 'alt': mission_data.get('altitude', 70.0) | |
| }) | |
| # Update mission data with normalized waypoints | |
| mission_data['waypoints'] = normalized_waypoints | |
| result = await flight_manager.confirm_flight(mission_data) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to confirm flight: {str(e)}") | |
| async def pause_flight(request_data: dict): | |
| """Pause an active flight mission. | |
| Expects JSON with: | |
| - mission_id: str | |
| - drone_id: str | |
| """ | |
| try: | |
| mission_id = request_data.get('mission_id') | |
| drone_id = request_data.get('drone_id') | |
| if not mission_id or not drone_id: | |
| raise HTTPException(status_code=400, detail="mission_id and drone_id are required") | |
| result = await flight_manager.pause_flight(mission_id, drone_id) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to pause flight: {str(e)}") | |
| async def cancel_flight(request_data: dict): | |
| """Cancel an active flight mission. | |
| Expects JSON with: | |
| - mission_id: str | |
| - drone_id: str | |
| """ | |
| try: | |
| mission_id = request_data.get('mission_id') | |
| drone_id = request_data.get('drone_id') | |
| if not mission_id or not drone_id: | |
| raise HTTPException(status_code=400, detail="mission_id and drone_id are required") | |
| result = await flight_manager.cancel_flight(mission_id, drone_id) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to cancel flight: {str(e)}") | |
| async def update_mission(request_data: dict): | |
| """Update mission waypoints during flight. | |
| Expects JSON with: | |
| - mission_id: str | |
| - waypoints: list of {"lat": float, "lon": float, "alt": float} | |
| - reason: str (optional) | |
| """ | |
| try: | |
| mission_id = request_data.get('mission_id') | |
| waypoints = request_data.get('waypoints', []) | |
| reason = request_data.get('reason', 'User request') | |
| if not mission_id: | |
| raise HTTPException(status_code=400, detail="mission_id is required") | |
| result = await flight_manager.update_mission(mission_id, waypoints, reason) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to update mission: {str(e)}") | |
| async def get_flight_status(drone_id: str, context: str = ""): | |
| """Get intelligent flight status report for a drone. | |
| Args: | |
| drone_id: ID of the drone | |
| context: Optional user context for personalized response | |
| """ | |
| try: | |
| status_report = await flight_manager.get_intelligent_status_report(drone_id, context) | |
| return { | |
| "status": "success", | |
| "drone_id": drone_id, | |
| "status_report": status_report, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to get flight status: {str(e)}") | |
| async def get_active_missions(): | |
| """Get all active flight missions.""" | |
| try: | |
| missions = {} | |
| for mission_id, mission in flight_manager.active_missions.items(): | |
| missions[mission_id] = { | |
| "mission_id": mission.mission_id, | |
| "drone_id": mission.drone_id, | |
| "state": mission.state.value, | |
| "waypoints_total": len(mission.waypoints), | |
| "waypoints_completed": sum(1 for wp in mission.waypoints if wp.completed), | |
| "progress_percent": (sum(1 for wp in mission.waypoints if wp.completed) / len(mission.waypoints)) * 100 if mission.waypoints else 0, | |
| "created_at": mission.created_at, | |
| "confirmed_at": mission.confirmed_at, | |
| "started_at": mission.started_at | |
| } | |
| return { | |
| "status": "success", | |
| "active_missions": missions, | |
| "total_missions": len(missions), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to get missions: {str(e)}") | |
| async def update_flight_telemetry(telemetry_data: dict): | |
| """Update flight telemetry data (enhanced version of drone telemetry). | |
| This endpoint receives real-time telemetry during flight and triggers | |
| intelligent notifications and mission tracking. | |
| """ | |
| try: | |
| # Update flight manager with telemetry | |
| flight_manager.update_telemetry(telemetry_data) | |
| # Also update the original drone status for compatibility | |
| from ..core.drone_status import drone_status | |
| drone_status.update_telemetry(telemetry_data) | |
| return { | |
| "status": "success", | |
| "message": "Flight telemetry updated successfully", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to update flight telemetry: {str(e)}") | |