DroneAgent / src /api /routes.py
zok213
Here's a summary of the changes:
6740714
"""API routes."""
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse
from .models import ChatMessage, DroneTelemetry
from ..services.drone_agent import DroneAgent
from ..core.coordinates import DroneKnowledgeBase
from ..core.flight_manager import flight_manager
router = APIRouter()
drone_agent = DroneAgent()
@router.post("/chat")
async def chat_endpoint(message: ChatMessage):
"""Main chat endpoint with conversational AI system."""
try:
# Only show greeting for very first interaction or explicit help requests
if message.message.lower().strip() in ['hello', 'hi', 'hey', 'start'] and len(message.message.strip()) <= 5:
from ..core.ai_personality import AIPersonality
personality = AIPersonality()
return {
"user_message": message.message,
"ai_response": personality.get_greeting_message(),
"is_greeting": True,
"timestamp": datetime.now().isoformat()
}
result = await drone_agent.process_message(message.message)
return result
except Exception as e:
return {"ai_response": f"System error: {str(e)}", "error": True}
@router.get("/download/{filename}")
async def download_mission(filename: str):
"""Download mission file."""
file_path = Path("downloads") / filename
if file_path.exists() and file_path.suffix == '.plan':
return FileResponse(file_path, filename=filename)
else:
raise HTTPException(status_code=404, detail="Mission file not found")
@router.get("/api/latest-mission")
async def get_latest_mission():
"""Get the latest generated mission file."""
try:
# Find the most recent .plan file in downloads
from pathlib import Path
import os
downloads_dir = Path("downloads")
if not downloads_dir.exists():
return {
"status": "no_missions",
"message": "No missions generated yet. Create a mission first!",
"timestamp": datetime.now().isoformat()
}
plan_files = list(downloads_dir.glob("*.plan"))
if not plan_files:
return {
"status": "no_missions",
"message": "No mission files found. Create a mission first!",
"timestamp": datetime.now().isoformat()
}
# Get the most recent file
latest_file = max(plan_files, key=os.path.getctime)
# Read and return the mission content
with open(latest_file, 'r', encoding='utf-8') as f:
mission_content = f.read()
return {
"status": "success",
"message": "Latest mission retrieved successfully",
"filename": latest_file.name,
"mission_plan": mission_content,
"file_size": latest_file.stat().st_size,
"created_at": datetime.fromtimestamp(latest_file.stat().st_ctime).isoformat(),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"message": f"Error retrieving latest mission: {str(e)}",
"timestamp": datetime.now().isoformat()
}
@router.post("/drone/telemetry")
async def receive_drone_telemetry(telemetry: DroneTelemetry):
"""Receive real-time drone telemetry data from hardware.
This endpoint receives telemetry data from the drone hardware/flight controller.
The data is used for mission planning optimization and safety constraints.
"""
from ..core.drone_status import drone_status
# Convert Pydantic model to dict
telemetry_data = telemetry.dict()
# Update the drone status with real telemetry
success = drone_status.update_telemetry(telemetry_data)
if success:
# Calculate current flight constraints based on real data
constraints = drone_status.get_flight_constraints()
return {
"status": "success",
"message": "Telemetry data received and processed",
"telemetry_accepted": True,
"flight_safety": {
"ready_to_fly": constraints["ready_to_fly"],
"battery_status": f"{constraints['battery_remaining']}% ({constraints['voltage_status']})",
"gps_quality": constraints["gps_quality"],
"safe_flight_time": f"{constraints['safe_flight_time_minutes']} minutes"
},
"timestamp": datetime.now().isoformat()
}
else:
return {
"status": "error",
"message": "Failed to process telemetry data",
"telemetry_accepted": False,
"timestamp": datetime.now().isoformat()
}
@router.get("/drone/status")
async def get_current_drone_status():
"""Get current drone status (for monitoring/debugging purposes).
Returns the last received telemetry data and current flight constraints.
"""
from ..core.drone_status import drone_status
current_status = drone_status.get_current_status()
constraints = drone_status.get_flight_constraints()
return {
"status": "success",
"drone_status": current_status,
"flight_constraints": constraints,
"telemetry_fresh": drone_status.is_telemetry_fresh(),
"connected": drone_status.is_connected,
"timestamp": datetime.now().isoformat()
}
@router.get("/chatbot.html")
async def serve_chatbot():
"""Serve the chatbot HTML interface."""
from fastapi.responses import FileResponse
return FileResponse("chatbot.html")
@router.get("/mission/waypoints")
async def get_mission_waypoints(filename: str = None, latest: bool = True):
"""Get mission waypoints for map editing.
Returns waypoints in format suitable for map editing:
[
{ lat: 16.010361, lng: 108.258056 },
{ lat: 16.012904326313578, lng: 108.25862196878661 },
...
]
Args:
filename: Specific mission file name (optional)
latest: Get latest mission if filename not provided
"""
try:
from pathlib import Path
import json
downloads_dir = Path("downloads")
if not downloads_dir.exists():
return {
"status": "error",
"message": "No missions found. Generate a mission first!",
"waypoints": []
}
# Find the mission file
if filename:
mission_file = downloads_dir / filename
if not mission_file.exists():
return {
"status": "error",
"message": f"Mission file {filename} not found",
"waypoints": []
}
else:
# Get latest .plan file
plan_files = list(downloads_dir.glob("*.plan"))
if not plan_files:
return {
"status": "error",
"message": "No mission files found. Generate a mission first!",
"waypoints": []
}
mission_file = max(plan_files, key=lambda f: f.stat().st_mtime)
# Read and parse the mission file
with open(mission_file, 'r', encoding='utf-8') as f:
mission_data = json.load(f)
# Extract waypoints from mission items (skip takeoff and landing)
waypoints = []
mission_items = mission_data.get("mission", {}).get("items", [])
for item in mission_items:
# Only include waypoint items (command 16 = MAV_CMD_NAV_WAYPOINT)
if item.get("command") == 16: # MAV_CMD_NAV_WAYPOINT
params = item.get("params", [])
if len(params) >= 7: # lat, lon are params[4] and [5]
# Return exact coordinates without rounding
waypoint = {
"lat": params[4], # Exact latitude
"lng": params[5] # Exact longitude (using lng as requested)
}
waypoints.append(waypoint)
return {
"status": "success",
"message": f"Retrieved {len(waypoints)} waypoints from {mission_file.name}",
"filename": mission_file.name,
"waypoints": waypoints,
"mission_info": {
"cruise_speed": mission_data.get("mission", {}).get("cruiseSpeed", 0),
"firmware_type": mission_data.get("mission", {}).get("firmwareType", 0),
"total_items": len(mission_items)
}
}
except json.JSONDecodeError as e:
return {
"status": "error",
"message": f"Invalid mission file format: {str(e)}",
"waypoints": []
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to retrieve waypoints: {str(e)}",
"waypoints": []
}
@router.post("/flight/confirm")
async def confirm_flight(mission_data: dict):
"""Confirm and start tracking a flight mission.
Expects JSON with:
- mission_id: str
- drone_id: str
- waypoints: list of {"lat": float, "lng": float} (user-edited coordinates)
OR {"lat": float, "lon": float, "alt": float} (original format)
- altitude: float (optional, defaults to 70m)
"""
try:
# Handle both formats: {lat, lng} and {lat, lon}
waypoints_data = mission_data.get('waypoints', [])
# Normalize waypoints to internal format
normalized_waypoints = []
for wp in waypoints_data:
# Support both {lat, lng} (user-edited) and {lat, lon} (original) formats
if 'lng' in wp:
# User-edited format from map
normalized_waypoints.append({
'lat': wp['lat'], # Keep exact precision
'lon': wp['lng'], # Note: using lng as longitude
'alt': wp.get('alt', mission_data.get('altitude', 70.0))
})
elif 'lon' in wp:
# Original format
normalized_waypoints.append({
'lat': wp['lat'], # Keep exact precision
'lon': wp['lon'], # Keep exact precision
'alt': wp.get('alt', mission_data.get('altitude', 70.0))
})
else:
# Minimal format, add default altitude
normalized_waypoints.append({
'lat': wp['lat'],
'lon': wp.get('lng', wp.get('lon', 0)),
'alt': mission_data.get('altitude', 70.0)
})
# Update mission data with normalized waypoints
mission_data['waypoints'] = normalized_waypoints
result = await flight_manager.confirm_flight(mission_data)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to confirm flight: {str(e)}")
@router.post("/flight/pause")
async def pause_flight(request_data: dict):
"""Pause an active flight mission.
Expects JSON with:
- mission_id: str
- drone_id: str
"""
try:
mission_id = request_data.get('mission_id')
drone_id = request_data.get('drone_id')
if not mission_id or not drone_id:
raise HTTPException(status_code=400, detail="mission_id and drone_id are required")
result = await flight_manager.pause_flight(mission_id, drone_id)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to pause flight: {str(e)}")
@router.post("/flight/cancel")
async def cancel_flight(request_data: dict):
"""Cancel an active flight mission.
Expects JSON with:
- mission_id: str
- drone_id: str
"""
try:
mission_id = request_data.get('mission_id')
drone_id = request_data.get('drone_id')
if not mission_id or not drone_id:
raise HTTPException(status_code=400, detail="mission_id and drone_id are required")
result = await flight_manager.cancel_flight(mission_id, drone_id)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to cancel flight: {str(e)}")
@router.post("/flight/update")
async def update_mission(request_data: dict):
"""Update mission waypoints during flight.
Expects JSON with:
- mission_id: str
- waypoints: list of {"lat": float, "lon": float, "alt": float}
- reason: str (optional)
"""
try:
mission_id = request_data.get('mission_id')
waypoints = request_data.get('waypoints', [])
reason = request_data.get('reason', 'User request')
if not mission_id:
raise HTTPException(status_code=400, detail="mission_id is required")
result = await flight_manager.update_mission(mission_id, waypoints, reason)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update mission: {str(e)}")
@router.get("/flight/status/{drone_id}")
async def get_flight_status(drone_id: str, context: str = ""):
"""Get intelligent flight status report for a drone.
Args:
drone_id: ID of the drone
context: Optional user context for personalized response
"""
try:
status_report = await flight_manager.get_intelligent_status_report(drone_id, context)
return {
"status": "success",
"drone_id": drone_id,
"status_report": status_report,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get flight status: {str(e)}")
@router.get("/flight/missions")
async def get_active_missions():
"""Get all active flight missions."""
try:
missions = {}
for mission_id, mission in flight_manager.active_missions.items():
missions[mission_id] = {
"mission_id": mission.mission_id,
"drone_id": mission.drone_id,
"state": mission.state.value,
"waypoints_total": len(mission.waypoints),
"waypoints_completed": sum(1 for wp in mission.waypoints if wp.completed),
"progress_percent": (sum(1 for wp in mission.waypoints if wp.completed) / len(mission.waypoints)) * 100 if mission.waypoints else 0,
"created_at": mission.created_at,
"confirmed_at": mission.confirmed_at,
"started_at": mission.started_at
}
return {
"status": "success",
"active_missions": missions,
"total_missions": len(missions),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get missions: {str(e)}")
@router.post("/flight/telemetry")
async def update_flight_telemetry(telemetry_data: dict):
"""Update flight telemetry data (enhanced version of drone telemetry).
This endpoint receives real-time telemetry during flight and triggers
intelligent notifications and mission tracking.
"""
try:
# Update flight manager with telemetry
flight_manager.update_telemetry(telemetry_data)
# Also update the original drone status for compatibility
from ..core.drone_status import drone_status
drone_status.update_telemetry(telemetry_data)
return {
"status": "success",
"message": "Flight telemetry updated successfully",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update flight telemetry: {str(e)}")