Spaces:
No application file
No application file
| """ | |
| MCP Server for LLM-Agent-Designed Obstacle-Passing Vehicle System | |
| This server exposes vehicle design, simulation, and evaluation tools via the Model Context Protocol. | |
| Supports both robots and drones for crossing a 5cm obstacle. | |
| """ | |
| from mcp.server.fastmcp import FastMCP | |
| import json | |
| import time | |
| import os | |
| import imageio | |
| from typing import Dict, Any, List, Optional, Literal | |
| # Import our existing modules | |
| import simulation_env_enhanced as simulation_env | |
| import llm_interface | |
| import evaluation | |
| # Create the MCP server | |
| mcp = FastMCP("Robot-Drone-Design-Server") | |
| # Global configuration | |
| MAX_ITERATIONS = 5 | |
| SIMULATION_DURATION_SEC = 10 | |
| OBSTACLE_FAR_EDGE_X = 0.8 | |
| def get_obstacle_info() -> Dict[str, Any]: | |
| """Get information about the obstacle that vehicles need to cross""" | |
| return simulation_env.get_obstacle_info() | |
| def design_vehicle( | |
| vehicle_type: Literal["robot", "drone"], | |
| specifications: Dict[str, Any], | |
| design_reasoning: str = "" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Design a robot or drone with specified parameters | |
| Args: | |
| vehicle_type: Either "robot" or "drone" | |
| specifications: Vehicle parameters (depends on type) | |
| design_reasoning: Explanation of design choices | |
| Returns: | |
| Design validation and summary | |
| """ | |
| try: | |
| # Add vehicle_type to specifications | |
| specs = specifications.copy() | |
| specs["vehicle_type"] = vehicle_type | |
| # Validate specifications based on vehicle type | |
| if vehicle_type == "robot": | |
| required_params = ["wheel_type", "body_clearance_cm", "main_material"] | |
| valid_wheel_types = ["small_high_grip", "large_smooth", "tracked_base"] | |
| valid_materials = ["light_plastic", "sturdy_metal_alloy"] | |
| # Validate parameters | |
| for param in required_params: | |
| if param not in specs: | |
| return {"error": f"Missing required parameter: {param}"} | |
| if specs["wheel_type"] not in valid_wheel_types: | |
| return {"error": f"Invalid wheel_type. Must be one of: {valid_wheel_types}"} | |
| if specs["main_material"] not in valid_materials: | |
| return {"error": f"Invalid main_material. Must be one of: {valid_materials}"} | |
| if not (1 <= specs["body_clearance_cm"] <= 10): | |
| return {"error": "body_clearance_cm must be between 1 and 10"} | |
| elif vehicle_type == "drone": | |
| required_params = ["propeller_size", "flight_height_cm", "main_material"] | |
| valid_propeller_sizes = ["small_agile", "medium", "large_stable"] | |
| valid_materials = ["light_carbon_fiber", "sturdy_aluminum"] | |
| # Validate parameters | |
| for param in required_params: | |
| if param not in specs: | |
| return {"error": f"Missing required parameter: {param}"} | |
| if specs["propeller_size"] not in valid_propeller_sizes: | |
| return {"error": f"Invalid propeller_size. Must be one of: {valid_propeller_sizes}"} | |
| if specs["main_material"] not in valid_materials: | |
| return {"error": f"Invalid main_material. Must be one of: {valid_materials}"} | |
| if not (10 <= specs["flight_height_cm"] <= 50): | |
| return {"error": "flight_height_cm must be between 10 and 50"} | |
| else: | |
| return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"} | |
| return { | |
| "success": True, | |
| "vehicle_type": vehicle_type, | |
| "specifications": specs, | |
| "design_reasoning": design_reasoning, | |
| "status": "Design validated and ready for simulation" | |
| } | |
| except Exception as e: | |
| return {"error": f"Design validation failed: {str(e)}"} | |
| def simulate_vehicle( | |
| vehicle_type: Literal["robot", "drone"], | |
| specifications: Dict[str, Any], | |
| simulation_duration: float = 10.0 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run physics simulation of a vehicle attempting to cross the obstacle | |
| Args: | |
| vehicle_type: Either "robot" or "drone" | |
| specifications: Vehicle parameters | |
| simulation_duration: How long to run simulation (seconds) | |
| Returns: | |
| Simulation results and performance data | |
| """ | |
| try: | |
| # Add vehicle_type to specifications | |
| specs = specifications.copy() | |
| specs["vehicle_type"] = vehicle_type | |
| # Setup PyBullet environment | |
| obstacle_id, plane_id = simulation_env.setup_pybullet_environment() | |
| # Create vehicle | |
| if vehicle_type == "robot": | |
| vehicle_id, joint_indices, v_type = simulation_env.create_robot(specs) | |
| vehicle_props = None | |
| elif vehicle_type == "drone": | |
| vehicle_id, joint_indices, v_type, vehicle_props = simulation_env.create_drone(specs) | |
| else: | |
| return {"error": f"Invalid vehicle_type: {vehicle_type}"} | |
| # Run simulation | |
| start_time = time.time() | |
| final_feedback = {} | |
| simulation_steps = int(simulation_duration * 240) # 240 Hz | |
| for step in range(simulation_steps): | |
| # Run simulation step | |
| simulation_env.run_simulation_step( | |
| vehicle_id, joint_indices, {}, vehicle_type, | |
| vehicle_props if vehicle_type == "drone" else None | |
| ) | |
| current_sim_time = time.time() - start_time | |
| # Get feedback every 60 steps (4 times per second) | |
| if step % 60 == 0: | |
| feedback = simulation_env.get_simulation_feedback( | |
| vehicle_id, obstacle_id, start_time, current_sim_time, vehicle_type | |
| ) | |
| final_feedback = feedback | |
| # Check for early exit | |
| vehicle_x_pos = feedback['robot_position'][0] # Using robot_position for compatibility | |
| is_stable = feedback['is_robot_upright'] # Using is_robot_upright for compatibility | |
| if vehicle_x_pos > OBSTACLE_FAR_EDGE_X + 0.1 or not is_stable: | |
| break | |
| if current_sim_time > simulation_duration: | |
| break | |
| # Cleanup | |
| simulation_env.reset_simulation() | |
| return { | |
| "success": True, | |
| "vehicle_type": vehicle_type, | |
| "specifications": specs, | |
| "simulation_feedback": final_feedback, | |
| "simulation_duration": current_sim_time | |
| } | |
| except Exception as e: | |
| # Cleanup on error | |
| try: | |
| simulation_env.reset_simulation() | |
| except: | |
| pass | |
| return {"error": f"Simulation failed: {str(e)}"} | |
| def evaluate_performance(simulation_feedback: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Evaluate vehicle performance based on simulation feedback | |
| Args: | |
| simulation_feedback: Feedback from simulation | |
| Returns: | |
| Performance evaluation and success criteria analysis | |
| """ | |
| try: | |
| # Use existing evaluation logic | |
| eval_results = evaluation.evaluate_simulation_outcome( | |
| simulation_feedback, OBSTACLE_FAR_EDGE_X | |
| ) | |
| feedback_str = evaluation.format_feedback_for_llm(eval_results) | |
| return { | |
| "success": True, | |
| "evaluation_results": eval_results, | |
| "feedback_summary": feedback_str, | |
| "success_criteria": { | |
| "crossed_obstacle": eval_results["robot_crossed_obstacle"], | |
| "remained_stable": eval_results["robot_remains_upright"], | |
| "minimal_collision": eval_results["no_significant_collision_with_obstacle_during_pass"], | |
| "overall_success": eval_results["overall_success"] | |
| } | |
| } | |
| except Exception as e: | |
| return {"error": f"Evaluation failed: {str(e)}"} | |
| def iterative_design_process( | |
| vehicle_type: Literal["robot", "drone"], | |
| task_description: str = "Cross the 5cm obstacle", | |
| max_iterations: int = 5 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run complete iterative LLM-driven vehicle design process | |
| Args: | |
| vehicle_type: Either "robot" or "drone" | |
| task_description: Description of the task | |
| max_iterations: Maximum number of design iterations | |
| Returns: | |
| Complete design process results with all iterations | |
| """ | |
| try: | |
| all_attempts = [] | |
| iteration = 1 | |
| while iteration <= max_iterations: | |
| # Generate design prompt | |
| if iteration == 1: | |
| if vehicle_type == "robot": | |
| prompt = llm_interface.generate_initial_robot_design_prompt() | |
| else: # drone | |
| prompt = llm_interface.generate_initial_drone_design_prompt() | |
| else: | |
| last_attempt = all_attempts[-1] | |
| if vehicle_type == "robot": | |
| prompt = llm_interface.generate_iterative_robot_design_prompt( | |
| last_attempt, iteration | |
| ) | |
| else: # drone | |
| prompt = llm_interface.generate_iterative_drone_design_prompt( | |
| last_attempt, iteration | |
| ) | |
| # Get LLM design | |
| llm_response = llm_interface.call_llm_api(prompt) | |
| vehicle_specs = llm_response.get('robot_specs', {}) # robot_specs used for both | |
| design_reasoning = llm_response.get('design_reasoning', '') | |
| # Add vehicle type | |
| vehicle_specs["vehicle_type"] = vehicle_type | |
| # Run simulation | |
| sim_result = simulate_vehicle(vehicle_type, vehicle_specs, SIMULATION_DURATION_SEC) | |
| if sim_result.get("error"): | |
| all_attempts.append({ | |
| "iteration": iteration, | |
| "error": sim_result["error"], | |
| "specifications": vehicle_specs | |
| }) | |
| iteration += 1 | |
| continue | |
| # Evaluate performance | |
| eval_result = evaluate_performance(sim_result["simulation_feedback"]) | |
| # Store attempt | |
| attempt_data = { | |
| "iteration": iteration, | |
| "vehicle_type": vehicle_type, | |
| "specifications": vehicle_specs, | |
| "design_reasoning": design_reasoning, | |
| "simulation_result": sim_result, | |
| "evaluation_result": eval_result, | |
| "success": eval_result.get("evaluation_results", {}).get("overall_success", False) | |
| } | |
| all_attempts.append(attempt_data) | |
| # Check for success | |
| if attempt_data["success"]: | |
| return { | |
| "success": True, | |
| "vehicle_type": vehicle_type, | |
| "iterations_needed": iteration, | |
| "successful_design": vehicle_specs, | |
| "all_attempts": all_attempts, | |
| "final_evaluation": eval_result | |
| } | |
| iteration += 1 | |
| # Max iterations reached | |
| best_attempt = max(all_attempts, | |
| key=lambda x: x.get("evaluation_result", {}).get("evaluation_results", {}).get("final_robot_x_position", 0), | |
| default={}) | |
| return { | |
| "success": False, | |
| "vehicle_type": vehicle_type, | |
| "max_iterations_reached": True, | |
| "best_attempt": best_attempt, | |
| "all_attempts": all_attempts | |
| } | |
| except Exception as e: | |
| return {"error": f"Iterative design process failed: {str(e)}"} | |
| def get_design_parameters(vehicle_type: Literal["robot", "drone"]) -> Dict[str, Any]: | |
| """ | |
| Get available design parameters for robots or drones | |
| Args: | |
| vehicle_type: Either "robot" or "drone" | |
| Returns: | |
| Available parameters and their valid ranges/options | |
| """ | |
| if vehicle_type == "robot": | |
| return { | |
| "vehicle_type": "robot", | |
| "parameters": { | |
| "wheel_type": { | |
| "type": "enum", | |
| "options": ["small_high_grip", "large_smooth", "tracked_base"], | |
| "description": "Type of wheels for traction and obstacle climbing" | |
| }, | |
| "body_clearance_cm": { | |
| "type": "integer", | |
| "range": [1, 10], | |
| "description": "Ground clearance in centimeters (obstacle is 5cm high)" | |
| }, | |
| "approach_sensor_enabled": { | |
| "type": "boolean", | |
| "description": "Enable obstacle detection sensors (conceptual for MVP)" | |
| }, | |
| "main_material": { | |
| "type": "enum", | |
| "options": ["light_plastic", "sturdy_metal_alloy"], | |
| "description": "Material affects weight and durability" | |
| } | |
| } | |
| } | |
| elif vehicle_type == "drone": | |
| return { | |
| "vehicle_type": "drone", | |
| "parameters": { | |
| "propeller_size": { | |
| "type": "enum", | |
| "options": ["small_agile", "medium", "large_stable"], | |
| "description": "Propeller size affects thrust and maneuverability" | |
| }, | |
| "flight_height_cm": { | |
| "type": "integer", | |
| "range": [10, 50], | |
| "description": "Target flight height in centimeters (obstacle is 5cm high)" | |
| }, | |
| "stability_mode": { | |
| "type": "enum", | |
| "options": ["auto_hover", "manual_control"], | |
| "description": "Flight stability control mode" | |
| }, | |
| "main_material": { | |
| "type": "enum", | |
| "options": ["light_carbon_fiber", "sturdy_aluminum"], | |
| "description": "Material affects weight and flight characteristics" | |
| } | |
| } | |
| } | |
| else: | |
| return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"} | |
| # Run the MCP server | |
| if __name__ == "__main__": | |
| mcp.run() |