""" MCP Server for LLM-Agent-Designed Obstacle-Passing Vehicle System This server exposes vehicle design, simulation, and evaluation tools via the Model Context Protocol. Supports both robots and drones for crossing a 5cm obstacle. """ from mcp.server.fastmcp import FastMCP import json import time import os import imageio from typing import Dict, Any, List, Optional, Literal # Import our existing modules import simulation_env_enhanced as simulation_env import llm_interface import evaluation # Create the MCP server mcp = FastMCP("Robot-Drone-Design-Server") # Global configuration MAX_ITERATIONS = 5 SIMULATION_DURATION_SEC = 10 OBSTACLE_FAR_EDGE_X = 0.8 @mcp.tool() def get_obstacle_info() -> Dict[str, Any]: """Get information about the obstacle that vehicles need to cross""" return simulation_env.get_obstacle_info() @mcp.tool() def design_vehicle( vehicle_type: Literal["robot", "drone"], specifications: Dict[str, Any], design_reasoning: str = "" ) -> Dict[str, Any]: """ Design a robot or drone with specified parameters Args: vehicle_type: Either "robot" or "drone" specifications: Vehicle parameters (depends on type) design_reasoning: Explanation of design choices Returns: Design validation and summary """ try: # Add vehicle_type to specifications specs = specifications.copy() specs["vehicle_type"] = vehicle_type # Validate specifications based on vehicle type if vehicle_type == "robot": required_params = ["wheel_type", "body_clearance_cm", "main_material"] valid_wheel_types = ["small_high_grip", "large_smooth", "tracked_base"] valid_materials = ["light_plastic", "sturdy_metal_alloy"] # Validate parameters for param in required_params: if param not in specs: return {"error": f"Missing required parameter: {param}"} if specs["wheel_type"] not in valid_wheel_types: return {"error": f"Invalid wheel_type. Must be one of: {valid_wheel_types}"} if specs["main_material"] not in valid_materials: return {"error": f"Invalid main_material. Must be one of: {valid_materials}"} if not (1 <= specs["body_clearance_cm"] <= 10): return {"error": "body_clearance_cm must be between 1 and 10"} elif vehicle_type == "drone": required_params = ["propeller_size", "flight_height_cm", "main_material"] valid_propeller_sizes = ["small_agile", "medium", "large_stable"] valid_materials = ["light_carbon_fiber", "sturdy_aluminum"] # Validate parameters for param in required_params: if param not in specs: return {"error": f"Missing required parameter: {param}"} if specs["propeller_size"] not in valid_propeller_sizes: return {"error": f"Invalid propeller_size. Must be one of: {valid_propeller_sizes}"} if specs["main_material"] not in valid_materials: return {"error": f"Invalid main_material. Must be one of: {valid_materials}"} if not (10 <= specs["flight_height_cm"] <= 50): return {"error": "flight_height_cm must be between 10 and 50"} else: return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"} return { "success": True, "vehicle_type": vehicle_type, "specifications": specs, "design_reasoning": design_reasoning, "status": "Design validated and ready for simulation" } except Exception as e: return {"error": f"Design validation failed: {str(e)}"} @mcp.tool() def simulate_vehicle( vehicle_type: Literal["robot", "drone"], specifications: Dict[str, Any], simulation_duration: float = 10.0 ) -> Dict[str, Any]: """ Run physics simulation of a vehicle attempting to cross the obstacle Args: vehicle_type: Either "robot" or "drone" specifications: Vehicle parameters simulation_duration: How long to run simulation (seconds) Returns: Simulation results and performance data """ try: # Add vehicle_type to specifications specs = specifications.copy() specs["vehicle_type"] = vehicle_type # Setup PyBullet environment obstacle_id, plane_id = simulation_env.setup_pybullet_environment() # Create vehicle if vehicle_type == "robot": vehicle_id, joint_indices, v_type = simulation_env.create_robot(specs) vehicle_props = None elif vehicle_type == "drone": vehicle_id, joint_indices, v_type, vehicle_props = simulation_env.create_drone(specs) else: return {"error": f"Invalid vehicle_type: {vehicle_type}"} # Run simulation start_time = time.time() final_feedback = {} simulation_steps = int(simulation_duration * 240) # 240 Hz for step in range(simulation_steps): # Run simulation step simulation_env.run_simulation_step( vehicle_id, joint_indices, {}, vehicle_type, vehicle_props if vehicle_type == "drone" else None ) current_sim_time = time.time() - start_time # Get feedback every 60 steps (4 times per second) if step % 60 == 0: feedback = simulation_env.get_simulation_feedback( vehicle_id, obstacle_id, start_time, current_sim_time, vehicle_type ) final_feedback = feedback # Check for early exit vehicle_x_pos = feedback['robot_position'][0] # Using robot_position for compatibility is_stable = feedback['is_robot_upright'] # Using is_robot_upright for compatibility if vehicle_x_pos > OBSTACLE_FAR_EDGE_X + 0.1 or not is_stable: break if current_sim_time > simulation_duration: break # Cleanup simulation_env.reset_simulation() return { "success": True, "vehicle_type": vehicle_type, "specifications": specs, "simulation_feedback": final_feedback, "simulation_duration": current_sim_time } except Exception as e: # Cleanup on error try: simulation_env.reset_simulation() except: pass return {"error": f"Simulation failed: {str(e)}"} @mcp.tool() def evaluate_performance(simulation_feedback: Dict[str, Any]) -> Dict[str, Any]: """ Evaluate vehicle performance based on simulation feedback Args: simulation_feedback: Feedback from simulation Returns: Performance evaluation and success criteria analysis """ try: # Use existing evaluation logic eval_results = evaluation.evaluate_simulation_outcome( simulation_feedback, OBSTACLE_FAR_EDGE_X ) feedback_str = evaluation.format_feedback_for_llm(eval_results) return { "success": True, "evaluation_results": eval_results, "feedback_summary": feedback_str, "success_criteria": { "crossed_obstacle": eval_results["robot_crossed_obstacle"], "remained_stable": eval_results["robot_remains_upright"], "minimal_collision": eval_results["no_significant_collision_with_obstacle_during_pass"], "overall_success": eval_results["overall_success"] } } except Exception as e: return {"error": f"Evaluation failed: {str(e)}"} @mcp.tool() def iterative_design_process( vehicle_type: Literal["robot", "drone"], task_description: str = "Cross the 5cm obstacle", max_iterations: int = 5 ) -> Dict[str, Any]: """ Run complete iterative LLM-driven vehicle design process Args: vehicle_type: Either "robot" or "drone" task_description: Description of the task max_iterations: Maximum number of design iterations Returns: Complete design process results with all iterations """ try: all_attempts = [] iteration = 1 while iteration <= max_iterations: # Generate design prompt if iteration == 1: if vehicle_type == "robot": prompt = llm_interface.generate_initial_robot_design_prompt() else: # drone prompt = llm_interface.generate_initial_drone_design_prompt() else: last_attempt = all_attempts[-1] if vehicle_type == "robot": prompt = llm_interface.generate_iterative_robot_design_prompt( last_attempt, iteration ) else: # drone prompt = llm_interface.generate_iterative_drone_design_prompt( last_attempt, iteration ) # Get LLM design llm_response = llm_interface.call_llm_api(prompt) vehicle_specs = llm_response.get('robot_specs', {}) # robot_specs used for both design_reasoning = llm_response.get('design_reasoning', '') # Add vehicle type vehicle_specs["vehicle_type"] = vehicle_type # Run simulation sim_result = simulate_vehicle(vehicle_type, vehicle_specs, SIMULATION_DURATION_SEC) if sim_result.get("error"): all_attempts.append({ "iteration": iteration, "error": sim_result["error"], "specifications": vehicle_specs }) iteration += 1 continue # Evaluate performance eval_result = evaluate_performance(sim_result["simulation_feedback"]) # Store attempt attempt_data = { "iteration": iteration, "vehicle_type": vehicle_type, "specifications": vehicle_specs, "design_reasoning": design_reasoning, "simulation_result": sim_result, "evaluation_result": eval_result, "success": eval_result.get("evaluation_results", {}).get("overall_success", False) } all_attempts.append(attempt_data) # Check for success if attempt_data["success"]: return { "success": True, "vehicle_type": vehicle_type, "iterations_needed": iteration, "successful_design": vehicle_specs, "all_attempts": all_attempts, "final_evaluation": eval_result } iteration += 1 # Max iterations reached best_attempt = max(all_attempts, key=lambda x: x.get("evaluation_result", {}).get("evaluation_results", {}).get("final_robot_x_position", 0), default={}) return { "success": False, "vehicle_type": vehicle_type, "max_iterations_reached": True, "best_attempt": best_attempt, "all_attempts": all_attempts } except Exception as e: return {"error": f"Iterative design process failed: {str(e)}"} @mcp.tool() def get_design_parameters(vehicle_type: Literal["robot", "drone"]) -> Dict[str, Any]: """ Get available design parameters for robots or drones Args: vehicle_type: Either "robot" or "drone" Returns: Available parameters and their valid ranges/options """ if vehicle_type == "robot": return { "vehicle_type": "robot", "parameters": { "wheel_type": { "type": "enum", "options": ["small_high_grip", "large_smooth", "tracked_base"], "description": "Type of wheels for traction and obstacle climbing" }, "body_clearance_cm": { "type": "integer", "range": [1, 10], "description": "Ground clearance in centimeters (obstacle is 5cm high)" }, "approach_sensor_enabled": { "type": "boolean", "description": "Enable obstacle detection sensors (conceptual for MVP)" }, "main_material": { "type": "enum", "options": ["light_plastic", "sturdy_metal_alloy"], "description": "Material affects weight and durability" } } } elif vehicle_type == "drone": return { "vehicle_type": "drone", "parameters": { "propeller_size": { "type": "enum", "options": ["small_agile", "medium", "large_stable"], "description": "Propeller size affects thrust and maneuverability" }, "flight_height_cm": { "type": "integer", "range": [10, 50], "description": "Target flight height in centimeters (obstacle is 5cm high)" }, "stability_mode": { "type": "enum", "options": ["auto_hover", "manual_control"], "description": "Flight stability control mode" }, "main_material": { "type": "enum", "options": ["light_carbon_fiber", "sturdy_aluminum"], "description": "Material affects weight and flight characteristics" } } } else: return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"} # Run the MCP server if __name__ == "__main__": mcp.run()