Spaces:
Configuration error
Configuration error
| """ | |
| Interactive drone chat interface - similar to Claude Code environment. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import threading | |
| from typing import List, Dict, Any, Optional | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.text import Text | |
| from rich.markdown import Markdown | |
| from rich.layout import Layout | |
| from rich.live import Live | |
| from rich.spinner import Spinner | |
| from rich.table import Table | |
| from rich.align import Align | |
| from prompt_toolkit import prompt | |
| from prompt_toolkit.history import InMemoryHistory | |
| from prompt_toolkit.auto_suggest import AutoSuggestFromHistory | |
| from prompt_toolkit.completion import WordCompleter | |
| from prompt_toolkit.styles import Style | |
| from prompt_toolkit.shortcuts import confirm | |
| import re | |
| from .config import ModelConfig | |
| from .llm_interface import LLMInterface | |
| from .drone_tools import DroneToolsManager | |
| class DroneChatInterface: | |
| """Interactive chat interface for drone control with AI.""" | |
| def __init__(self, model_config: ModelConfig, connection_string: Optional[str] = None): | |
| self.console = Console() | |
| self.model_config = model_config | |
| self.connection_string = connection_string or "udp:127.0.0.1:14550" | |
| # Initialize components | |
| self.llm = LLMInterface(model_config) | |
| self.drone_tools = DroneToolsManager(self.connection_string) | |
| # Chat state | |
| self.chat_history: List[Dict[str, str]] = [] | |
| self.session_active = True | |
| # Setup prompt components | |
| self.history = InMemoryHistory() | |
| self.completer = WordCompleter([ | |
| 'connect', 'takeoff', 'land', 'fly', 'goto', 'mission', 'status', | |
| 'battery', 'location', 'return', 'home', 'help', 'quit', 'exit', | |
| 'emergency', 'stop', 'altitude', 'waypoint', 'navigate' | |
| ]) | |
| self.style = Style.from_dict({ | |
| 'prompt': '#00ff00 bold', | |
| 'input': '#ffffff', | |
| 'completion-menu.completion': 'bg:#333333 #ffffff', | |
| 'completion-menu.completion.current': 'bg:#00ff00 #000000 bold', | |
| }) | |
| # Status tracking | |
| self.last_status_update = time.time() | |
| self.status_thread = None | |
| self.status_running = False | |
| def start(self): | |
| """Start the interactive chat session.""" | |
| self._show_welcome() | |
| self._start_status_monitor() | |
| try: | |
| while self.session_active: | |
| try: | |
| # Get user input | |
| user_input = self._get_user_input() | |
| if not user_input.strip(): | |
| continue | |
| # Handle special commands | |
| if self._handle_special_commands(user_input): | |
| continue | |
| # Process the message | |
| self._process_message(user_input) | |
| except KeyboardInterrupt: | |
| self._handle_exit() | |
| break | |
| except EOFError: | |
| self._handle_exit() | |
| break | |
| finally: | |
| self._cleanup() | |
| def _show_welcome(self): | |
| """Show welcome message and status.""" | |
| welcome_panel = Panel( | |
| f"""[bold green]π DEEPDRONE CHAT INTERFACE[/bold green] | |
| [bold]AI Model:[/bold] {self.model_config.name} ({self.model_config.provider}) | |
| [bold]Model ID:[/bold] {self.model_config.model_id} | |
| [bold]Drone Connection:[/bold] {self.connection_string} | |
| [bold]Status:[/bold] Ready for commands | |
| [bold cyan]π― Available Commands:[/bold cyan] | |
| β’ Natural language: "Connect to drone and take off to 30 meters" | |
| β’ Status commands: "Show battery status", "What's my location?" | |
| β’ Mission commands: "Fly in a square pattern", "Return home" | |
| β’ Help: Type 'help' for more information | |
| β’ Exit: Type 'quit' or 'exit' to end session | |
| [dim]Type your commands below. The AI will interpret and execute drone operations.[/dim]""", | |
| title="[bold green]DeepDrone Control Center[/bold green]", | |
| border_style="bright_green", | |
| padding=(1, 2) | |
| ) | |
| self.console.print(welcome_panel) | |
| self.console.print() | |
| def _get_user_input(self) -> str: | |
| """Get user input.""" | |
| try: | |
| # Use simple input() for better compatibility | |
| return input("π DeepDrone> ") | |
| except (KeyboardInterrupt, EOFError): | |
| return '/quit' | |
| def _handle_special_commands(self, command: str) -> bool: | |
| """Handle special commands. Returns True if handled.""" | |
| cmd = command.strip().lower() | |
| if cmd in ['/quit', '/exit', 'quit', 'exit']: | |
| self.session_active = False | |
| return True | |
| elif cmd in ['/help', 'help']: | |
| self._show_help() | |
| return True | |
| elif cmd in ['/status', 'status']: | |
| self._show_detailed_status() | |
| return True | |
| elif cmd in ['/clear', 'clear']: | |
| self.console.clear() | |
| self._show_welcome() | |
| return True | |
| elif cmd.startswith('/connect'): | |
| parts = cmd.split() | |
| if len(parts) > 1: | |
| self.connection_string = parts[1] | |
| self._connect_drone_direct() | |
| else: | |
| self.console.print("[red]Usage: /connect <connection_string>[/red]") | |
| return True | |
| elif cmd in ['/disconnect', 'disconnect']: | |
| self._disconnect_drone_direct() | |
| return True | |
| elif cmd in ['/emergency', 'emergency']: | |
| self._emergency_stop() | |
| return True | |
| elif cmd in ['/ollama', 'ollama']: | |
| self._show_ollama_status() | |
| return True | |
| return False | |
| def _process_message(self, user_message: str): | |
| """Process user message with AI and execute drone commands.""" | |
| # Add to history | |
| self.chat_history.append({"role": "user", "content": user_message}) | |
| # Show user message | |
| self.console.print(Panel( | |
| user_message, | |
| title="[bold blue]You[/bold blue]", | |
| border_style="blue", | |
| padding=(0, 1) | |
| )) | |
| # Generate AI response | |
| with Live( | |
| Spinner("dots", text="[green]π€ AI is analyzing your request...[/green]"), | |
| console=self.console, | |
| transient=True | |
| ) as live: | |
| try: | |
| # Create system prompt for drone operations | |
| system_prompt = self._create_drone_system_prompt() | |
| # Prepare messages for AI | |
| messages = [{"role": "system", "content": system_prompt}] | |
| messages.extend(self.chat_history[-10:]) # Last 10 messages for context | |
| # Get AI response | |
| ai_response = self.llm.chat(messages) | |
| live.stop() | |
| # Process the response for drone commands | |
| self._process_ai_response(ai_response) | |
| # Add to history | |
| self.chat_history.append({"role": "assistant", "content": ai_response}) | |
| except Exception as e: | |
| live.stop() | |
| self.console.print(f"[red]β Error processing request: {e}[/red]") | |
| def _create_drone_system_prompt(self) -> str: | |
| """Create system prompt for drone operations.""" | |
| return f"""You are DeepDrone AI, an advanced drone control assistant. You can control real drones through Python code. | |
| Current drone status: | |
| - Connected: {'Yes' if self.drone_tools.is_connected() else 'No'} | |
| - Connection: {self.connection_string} | |
| - Mission active: {'Yes' if self.drone_tools.mission_in_progress else 'No'} | |
| Available drone functions (use these in Python code blocks): | |
| - connect_drone(connection_string): Connect to drone | |
| - takeoff(altitude): Take off to specified altitude in meters | |
| - land(): Land the drone | |
| - return_home(): Return to launch point | |
| - fly_to(lat, lon, alt): Fly to GPS coordinates | |
| - get_location(): Get current GPS position | |
| - get_battery(): Get battery status | |
| - execute_mission(waypoints): Execute mission with waypoints list | |
| - disconnect_drone(): Disconnect from drone | |
| When user asks for drone operations: | |
| 1. Explain what you'll do | |
| 2. Provide Python code in ```python code blocks | |
| 3. The code will be executed automatically | |
| 4. Provide status updates | |
| Example response: | |
| "I'll connect to the drone and take off to 30 meters altitude. | |
| ```python | |
| # Connect to the drone | |
| connect_drone('{self.connection_string}') | |
| # Take off to 30 meters | |
| takeoff(30) | |
| # Get status | |
| location = get_location() | |
| battery = get_battery() | |
| print(f"Location: {{location}}") | |
| print(f"Battery: {{battery}}") | |
| ``` | |
| The drone should now be airborne at 30 meters altitude." | |
| Always prioritize safety and explain each operation clearly.""" | |
| def _process_ai_response(self, response: str): | |
| """Process AI response and execute any drone commands.""" | |
| # Show AI response | |
| if any(marker in response for marker in ['**', '*', '```', '#', '-', '1.']): | |
| content = Markdown(response) | |
| else: | |
| content = Text(response) | |
| self.console.print(Panel( | |
| content, | |
| title="[bold green]π€ DeepDrone AI[/bold green]", | |
| border_style="green", | |
| padding=(0, 1) | |
| )) | |
| # Extract and execute Python code blocks | |
| code_blocks = self._extract_code_blocks(response) | |
| if code_blocks: | |
| self.console.print(Panel( | |
| "[yellow]π§ Executing drone operations...[/yellow]", | |
| border_style="yellow" | |
| )) | |
| for i, code in enumerate(code_blocks, 1): | |
| self.console.print(f"[dim]Executing code block {i}...[/dim]") | |
| try: | |
| result = self._execute_drone_code(code) | |
| if result: | |
| self.console.print(Panel( | |
| f"[green]β Execution Result:[/green]\n{result}", | |
| border_style="green" | |
| )) | |
| except Exception as e: | |
| self.console.print(Panel( | |
| f"[red]β Execution Error:[/red]\n{str(e)}", | |
| border_style="red" | |
| )) | |
| def _extract_code_blocks(self, text: str) -> List[str]: | |
| """Extract Python code blocks from markdown text.""" | |
| pattern = r'```(?:python)?\n(.*?)\n```' | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| return [match.strip() for match in matches if match.strip()] | |
| def _execute_drone_code(self, code: str) -> str: | |
| """Execute drone code safely.""" | |
| # Create safe execution environment | |
| safe_globals = { | |
| '__builtins__': { | |
| 'print': print, | |
| 'len': len, | |
| 'str': str, | |
| 'int': int, | |
| 'float': float, | |
| 'dict': dict, | |
| 'list': list, | |
| 'range': range, | |
| }, | |
| 'connect_drone': self.drone_tools.connect_drone, | |
| 'disconnect_drone': self.drone_tools.disconnect_drone, | |
| 'takeoff': self.drone_tools.takeoff, | |
| 'land': self.drone_tools.land, | |
| 'return_home': self.drone_tools.return_home, | |
| 'fly_to': self.drone_tools.fly_to, | |
| 'get_location': self.drone_tools.get_location, | |
| 'get_battery': self.drone_tools.get_battery, | |
| 'execute_mission': self.drone_tools.execute_mission, | |
| 'time': time, | |
| } | |
| # Capture output | |
| output_lines = [] | |
| def capture_print(*args, **kwargs): | |
| output_lines.append(' '.join(str(arg) for arg in args)) | |
| safe_globals['print'] = capture_print | |
| # Execute code | |
| exec(code, safe_globals) | |
| return '\n'.join(output_lines) if output_lines else "Code executed successfully" | |
| def _show_help(self): | |
| """Show help information.""" | |
| help_text = """[bold cyan]π DeepDrone Help[/bold cyan] | |
| [bold]Natural Language Commands:[/bold] | |
| β’ "Connect to the drone simulator" | |
| β’ "Take off to 30 meters altitude" | |
| β’ "Fly to coordinates 37.7749, -122.4194" | |
| β’ "Show me the current battery status" | |
| β’ "Execute a square flight pattern" | |
| β’ "Return home and land safely" | |
| [bold]Direct Commands:[/bold] | |
| β’ [cyan]/status[/cyan] - Show detailed system status | |
| β’ [cyan]/connect <connection>[/cyan] - Connect to specific drone | |
| β’ [cyan]/disconnect[/cyan] - Disconnect from drone | |
| β’ [cyan]/emergency[/cyan] - Emergency stop | |
| β’ [cyan]/ollama[/cyan] - Show Ollama status and models | |
| β’ [cyan]/clear[/cyan] - Clear screen | |
| β’ [cyan]/help[/cyan] - Show this help | |
| β’ [cyan]/quit[/cyan] - Exit DeepDrone | |
| [bold]Example Session:[/bold] | |
| [dim]π DeepDrone> Connect to simulator and take off to 20 meters | |
| π€ AI: I'll connect to the simulator and take off to 20 meters... | |
| π DeepDrone> Fly in a circle with 50 meter radius | |
| π€ AI: I'll create a circular flight pattern...[/dim] | |
| [bold]Tips:[/bold] | |
| β’ Use arrow keys to navigate command history | |
| β’ Tab completion available for common commands | |
| β’ AI understands natural language - be conversational! | |
| β’ Always prioritize safety in flight operations""" | |
| self.console.print(Panel( | |
| help_text, | |
| title="[bold]DeepDrone Help[/bold]", | |
| border_style="cyan", | |
| padding=(1, 2) | |
| )) | |
| def _show_detailed_status(self): | |
| """Show detailed system and drone status.""" | |
| status = self.drone_tools.get_status() | |
| # Create status table | |
| table = Table(title="System Status", show_header=True, header_style="bold magenta") | |
| table.add_column("Component", style="cyan", width=20) | |
| table.add_column("Status", style="white") | |
| table.add_column("Details", style="yellow") | |
| # AI Model status | |
| table.add_row("AI Model", "β Online", f"{self.model_config.name} ({self.model_config.provider})") | |
| # Drone connection | |
| conn_status = "β Connected" if status["connected"] else "β Disconnected" | |
| table.add_row("Drone Connection", conn_status, self.connection_string) | |
| # Mission status | |
| mission_status = "π Active" if status["mission_in_progress"] else "βΈοΈ Standby" | |
| table.add_row("Mission", mission_status, status.get("phase", "N/A")) | |
| # Current status | |
| table.add_row("System Status", status["status"], "") | |
| if status["connected"]: | |
| location = status.get("location", {}) | |
| battery = status.get("battery", {}) | |
| if not location.get("error"): | |
| lat = location.get("latitude", "N/A") | |
| lon = location.get("longitude", "N/A") | |
| alt = location.get("altitude", "N/A") | |
| table.add_row("Location", "π GPS Lock", f"Lat: {lat}, Lon: {lon}, Alt: {alt}m") | |
| if not battery.get("error"): | |
| voltage = battery.get("voltage", "N/A") | |
| level = battery.get("level", "N/A") | |
| table.add_row("Battery", "π Monitoring", f"Voltage: {voltage}V, Level: {level}%") | |
| self.console.print(table) | |
| # Show recent log entries | |
| if status.get("log_entries"): | |
| self.console.print("\n[bold]Recent Activity:[/bold]") | |
| for entry in status["log_entries"][-5:]: | |
| self.console.print(f"[dim]{entry}[/dim]") | |
| def _connect_drone_direct(self): | |
| """Connect to drone directly.""" | |
| with Live( | |
| Spinner("dots", text=f"Connecting to {self.connection_string}..."), | |
| console=self.console, | |
| transient=True | |
| ) as live: | |
| success = self.drone_tools.connect_drone(self.connection_string) | |
| live.stop() | |
| if success: | |
| self.console.print(f"[green]β Connected to drone at {self.connection_string}[/green]") | |
| else: | |
| self.console.print(f"[red]β Failed to connect to {self.connection_string}[/red]") | |
| def _disconnect_drone_direct(self): | |
| """Disconnect from drone directly.""" | |
| if self.drone_tools.is_connected(): | |
| self.drone_tools.disconnect_drone() | |
| self.console.print("[yellow]π‘ Disconnected from drone[/yellow]") | |
| else: | |
| self.console.print("[yellow]No active drone connection[/yellow]") | |
| def _emergency_stop(self): | |
| """Emergency stop operation.""" | |
| if self.drone_tools.is_connected(): | |
| self.console.print("[red]π¨ EMERGENCY STOP INITIATED[/red]") | |
| self.drone_tools.emergency_stop() | |
| else: | |
| self.console.print("[yellow]No active drone connection for emergency stop[/yellow]") | |
| def _start_status_monitor(self): | |
| """Start background status monitoring.""" | |
| self.status_running = True | |
| self.status_thread = threading.Thread(target=self._status_monitor_loop, daemon=True) | |
| self.status_thread.start() | |
| def _status_monitor_loop(self): | |
| """Background status monitoring loop.""" | |
| while self.status_running and self.session_active: | |
| try: | |
| time.sleep(5) # Update every 5 seconds | |
| # Could update status bar here if needed | |
| except Exception: | |
| break | |
| def _handle_exit(self): | |
| """Handle session exit.""" | |
| self.console.print("\n[yellow]π Shutting down DeepDrone...[/yellow]") | |
| if self.drone_tools.is_connected(): | |
| from rich.prompt import Confirm | |
| if Confirm.ask("Disconnect from drone before exit?", default=True): | |
| self.drone_tools.disconnect_drone() | |
| self.session_active = False | |
| def _show_ollama_status(self): | |
| """Show Ollama status and available models.""" | |
| try: | |
| import ollama | |
| # Check connection | |
| try: | |
| models_response = ollama.list() | |
| available_models = models_response.models if hasattr(models_response, 'models') else [] | |
| self.console.print("[bold green]β Ollama Status: Connected[/bold green]\n") | |
| if available_models: | |
| self.console.print(f"[bold]π Available Models ({len(available_models)}):[/bold]") | |
| for model in available_models: | |
| name = model.model | |
| size = model.size if hasattr(model, 'size') else 0 | |
| size_gb = size / (1024**3) if size else 0 | |
| modified = str(model.modified_at)[:19] if hasattr(model, 'modified_at') else 'Unknown' | |
| self.console.print(f" β’ [green]{name}[/green] ([blue]{size_gb:.1f} GB[/blue]) - {modified}") | |
| else: | |
| self.console.print("[yellow]π No models installed[/yellow]") | |
| self.console.print("\nπ‘ Install a model with:") | |
| self.console.print(" [cyan]ollama pull llama3.1[/cyan]") | |
| self.console.print(" [cyan]ollama pull codestral[/cyan]") | |
| self.console.print(" [cyan]ollama pull qwen2.5-coder[/cyan]") | |
| self.console.print(f"\n[dim]Current model: {self.model_config.model_id}[/dim]") | |
| except Exception as e: | |
| self.console.print("[red]β Ollama Status: Not connected[/red]") | |
| self.console.print(f"[red]Error: {e}[/red]\n") | |
| self.console.print("π‘ Make sure Ollama is running:") | |
| self.console.print(" [cyan]ollama serve[/cyan]\n") | |
| self.console.print("π₯ Download Ollama from:") | |
| self.console.print(" [cyan]https://ollama.com/download[/cyan]") | |
| except ImportError: | |
| self.console.print("[red]β Ollama package not installed[/red]") | |
| self.console.print("Install with: [cyan]pip install ollama[/cyan]") | |
| def _cleanup(self): | |
| """Cleanup resources.""" | |
| self.status_running = False | |
| if self.drone_tools.is_connected(): | |
| self.drone_tools.disconnect_drone() | |
| self.console.print("[green]π DeepDrone session ended. Fly safe![/green]") |