Spaces:
Configuration error
Configuration error
| """ | |
| Drone tools manager for terminal interface. | |
| Adapts the existing drone control functionality for the terminal application. | |
| """ | |
| import time | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from .drone_control import DroneController | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DroneToolsManager: | |
| """Manages drone operations for the terminal interface.""" | |
| def __init__(self, connection_string: Optional[str] = None): | |
| self.controller = DroneController(connection_string) | |
| self.connected = False | |
| self.mission_in_progress = False | |
| # Status tracking | |
| self.status = "STANDBY" | |
| self.phase = "" | |
| self.log_entries = [] | |
| def connect_drone(self, connection_string: str, timeout: int = 30) -> bool: | |
| """Connect to a drone.""" | |
| try: | |
| self._update_status("CONNECTING", f"Connecting to {connection_string}") | |
| success = self.controller.connect_to_drone(connection_string, timeout) | |
| if success: | |
| self.connected = True | |
| self._update_status("CONNECTED", "Successfully connected to drone") | |
| # Get initial status | |
| location = self.get_location() | |
| battery = self.get_battery() | |
| logger.info(f"Connected to drone. Location: {location}, Battery: {battery}") | |
| return True | |
| else: | |
| self._update_status("ERROR", "Failed to connect to drone") | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Connection error: {str(e)}") | |
| logger.error(f"Connection error: {e}") | |
| return False | |
| def disconnect_drone(self): | |
| """Disconnect from the drone.""" | |
| try: | |
| self._update_status("DISCONNECTING", "Disconnecting from drone") | |
| self.controller.disconnect() | |
| self.connected = False | |
| self.mission_in_progress = False | |
| self._update_status("STANDBY", "Disconnected from drone") | |
| logger.info("Disconnected from drone") | |
| except Exception as e: | |
| logger.error(f"Disconnect error: {e}") | |
| def takeoff(self, altitude: float) -> bool: | |
| """Take off to specified altitude.""" | |
| if not self._ensure_connected(): | |
| return False | |
| try: | |
| self._update_status("TAKING OFF", f"Taking off to {altitude} meters") | |
| self.mission_in_progress = True | |
| success = self.controller.arm_and_takeoff(altitude) | |
| if success: | |
| self._update_status("AIRBORNE", f"Reached altitude of {altitude} meters") | |
| return True | |
| else: | |
| self._update_status("ERROR", "Takeoff failed") | |
| self.mission_in_progress = False | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Takeoff error: {str(e)}") | |
| self.mission_in_progress = False | |
| logger.error(f"Takeoff error: {e}") | |
| return False | |
| def land(self) -> bool: | |
| """Land the drone.""" | |
| if not self._ensure_connected(): | |
| return False | |
| try: | |
| self._update_status("LANDING", "Landing drone") | |
| success = self.controller.land() | |
| if success: | |
| self._update_status("LANDED", "Drone has landed") | |
| self.mission_in_progress = False | |
| return True | |
| else: | |
| self._update_status("ERROR", "Landing failed") | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Landing error: {str(e)}") | |
| logger.error(f"Landing error: {e}") | |
| return False | |
| def return_home(self) -> bool: | |
| """Return to launch/home location.""" | |
| if not self._ensure_connected(): | |
| return False | |
| try: | |
| self._update_status("RETURNING", "Returning to launch point") | |
| success = self.controller.return_to_launch() | |
| if success: | |
| self._update_status("RETURNING", "Drone is returning to launch point") | |
| return True | |
| else: | |
| self._update_status("ERROR", "Return to home failed") | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Return error: {str(e)}") | |
| logger.error(f"Return error: {e}") | |
| return False | |
| def fly_to(self, latitude: float, longitude: float, altitude: float) -> bool: | |
| """Fly to specific GPS coordinates.""" | |
| if not self._ensure_connected(): | |
| return False | |
| try: | |
| self._update_status( | |
| "NAVIGATING", | |
| f"Flying to lat:{latitude:.4f}, lon:{longitude:.4f}, alt:{altitude}m" | |
| ) | |
| success = self.controller.goto_location(latitude, longitude, altitude) | |
| if success: | |
| return True | |
| else: | |
| self._update_status("ERROR", "Navigation failed") | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Navigation error: {str(e)}") | |
| logger.error(f"Navigation error: {e}") | |
| return False | |
| def get_location(self) -> Dict[str, Any]: | |
| """Get current GPS location.""" | |
| if not self._ensure_connected(): | |
| return {"error": "Not connected to drone"} | |
| try: | |
| return self.controller.get_current_location() | |
| except Exception as e: | |
| logger.error(f"Location error: {e}") | |
| return {"error": str(e)} | |
| def get_battery(self) -> Dict[str, Any]: | |
| """Get battery status.""" | |
| if not self._ensure_connected(): | |
| return {"error": "Not connected to drone"} | |
| try: | |
| return self.controller.get_battery_status() | |
| except Exception as e: | |
| logger.error(f"Battery error: {e}") | |
| return {"error": str(e)} | |
| def execute_mission(self, waypoints: List[Dict[str, float]]) -> bool: | |
| """Execute a mission with multiple waypoints.""" | |
| if not self._ensure_connected(): | |
| return False | |
| if not waypoints: | |
| logger.error("No waypoints provided") | |
| return False | |
| try: | |
| self._update_status("MISSION", f"Starting mission with {len(waypoints)} waypoints") | |
| self.mission_in_progress = True | |
| # Upload mission | |
| upload_success = self.controller.upload_mission(waypoints) | |
| if not upload_success: | |
| self._update_status("ERROR", "Failed to upload mission") | |
| self.mission_in_progress = False | |
| return False | |
| # Execute mission | |
| execute_success = self.controller.execute_mission() | |
| if execute_success: | |
| self._update_status("EXECUTING", "Mission execution started") | |
| # Simulate mission progress updates | |
| for i, wp in enumerate(waypoints, 1): | |
| self._update_status( | |
| "EXECUTING", | |
| f"Waypoint {i}/{len(waypoints)}: {wp['lat']:.4f}, {wp['lon']:.4f}" | |
| ) | |
| time.sleep(1) # Brief pause between waypoints | |
| self._update_status("MISSION COMPLETE", "All waypoints reached") | |
| return True | |
| else: | |
| self._update_status("ERROR", "Failed to start mission execution") | |
| self.mission_in_progress = False | |
| return False | |
| except Exception as e: | |
| self._update_status("ERROR", f"Mission error: {str(e)}") | |
| self.mission_in_progress = False | |
| logger.error(f"Mission error: {e}") | |
| return False | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get current drone and system status.""" | |
| status_info = { | |
| "connected": self.connected, | |
| "mission_in_progress": self.mission_in_progress, | |
| "status": self.status, | |
| "phase": self.phase, | |
| "log_entries": self.log_entries[-5:], # Last 5 entries | |
| } | |
| if self.connected: | |
| try: | |
| status_info["location"] = self.get_location() | |
| status_info["battery"] = self.get_battery() | |
| if hasattr(self.controller, 'vehicle') and self.controller.vehicle: | |
| status_info["mode"] = str(self.controller.vehicle.mode.name) | |
| status_info["armed"] = self.controller.vehicle.armed | |
| status_info["system_status"] = str(self.controller.vehicle.system_status.state) | |
| except Exception as e: | |
| status_info["telemetry_error"] = str(e) | |
| return status_info | |
| def is_connected(self) -> bool: | |
| """Check if connected to drone.""" | |
| return self.connected | |
| def emergency_stop(self): | |
| """Emergency stop - immediately land or RTL.""" | |
| if not self.connected: | |
| return | |
| try: | |
| self._update_status("EMERGENCY", "Emergency stop initiated") | |
| # Try RTL first, then land | |
| if not self.return_home(): | |
| self.land() | |
| self.mission_in_progress = False | |
| except Exception as e: | |
| logger.error(f"Emergency stop error: {e}") | |
| def set_airspeed(self, speed: float) -> bool: | |
| """Set target airspeed.""" | |
| if not self._ensure_connected(): | |
| return False | |
| try: | |
| return self.controller.set_airspeed(speed) | |
| except Exception as e: | |
| logger.error(f"Airspeed error: {e}") | |
| return False | |
| def get_telemetry(self) -> Dict[str, Any]: | |
| """Get comprehensive telemetry data.""" | |
| if not self._ensure_connected(): | |
| return {"error": "Not connected to drone"} | |
| try: | |
| telemetry = {} | |
| # Basic info | |
| telemetry["location"] = self.get_location() | |
| telemetry["battery"] = self.get_battery() | |
| # Vehicle-specific data if available | |
| if hasattr(self.controller, 'vehicle') and self.controller.vehicle: | |
| vehicle = self.controller.vehicle | |
| telemetry.update({ | |
| "mode": str(vehicle.mode.name), | |
| "armed": vehicle.armed, | |
| "system_status": str(vehicle.system_status.state), | |
| "airspeed": vehicle.airspeed, | |
| "groundspeed": vehicle.groundspeed, | |
| "heading": vehicle.heading, | |
| }) | |
| # GPS info | |
| if vehicle.gps_0: | |
| telemetry["gps"] = { | |
| "fix_type": vehicle.gps_0.fix_type, | |
| "satellites_visible": vehicle.gps_0.satellites_visible, | |
| "eph": vehicle.gps_0.eph, | |
| "epv": vehicle.gps_0.epv, | |
| } | |
| return telemetry | |
| except Exception as e: | |
| logger.error(f"Telemetry error: {e}") | |
| return {"error": str(e)} | |
| def _ensure_connected(self) -> bool: | |
| """Ensure drone is connected.""" | |
| if not self.connected: | |
| logger.error("Not connected to drone") | |
| return False | |
| return True | |
| def _update_status(self, status: str, phase: str = ""): | |
| """Update system status and log entry.""" | |
| self.status = status | |
| self.phase = phase | |
| # Create log entry | |
| timestamp = time.strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {status}: {phase}" if phase else f"[{timestamp}] {status}" | |
| self.log_entries.append(log_entry) | |
| # Keep only last 50 entries | |
| if len(self.log_entries) > 50: | |
| self.log_entries = self.log_entries[-50:] | |
| logger.info(log_entry) |