Agent2Robot / mcp_server.py
sam133
Finalize th rest of the files
2fbee2e
raw
history blame
14.7 kB
"""
MCP Server for LLM-Agent-Designed Obstacle-Passing Vehicle System
This server exposes vehicle design, simulation, and evaluation tools via the Model Context Protocol.
Supports both robots and drones for crossing a 5cm obstacle.
"""
from mcp.server.fastmcp import FastMCP
import json
import time
import os
import imageio
from typing import Dict, Any, List, Optional, Literal
# Import our existing modules
import simulation_env_enhanced as simulation_env
import llm_interface
import evaluation
# Create the MCP server
mcp = FastMCP("Robot-Drone-Design-Server")
# Global configuration
MAX_ITERATIONS = 5
SIMULATION_DURATION_SEC = 10
OBSTACLE_FAR_EDGE_X = 0.8
@mcp.tool()
def get_obstacle_info() -> Dict[str, Any]:
"""Get information about the obstacle that vehicles need to cross"""
return simulation_env.get_obstacle_info()
@mcp.tool()
def design_vehicle(
vehicle_type: Literal["robot", "drone"],
specifications: Dict[str, Any],
design_reasoning: str = ""
) -> Dict[str, Any]:
"""
Design a robot or drone with specified parameters
Args:
vehicle_type: Either "robot" or "drone"
specifications: Vehicle parameters (depends on type)
design_reasoning: Explanation of design choices
Returns:
Design validation and summary
"""
try:
# Add vehicle_type to specifications
specs = specifications.copy()
specs["vehicle_type"] = vehicle_type
# Validate specifications based on vehicle type
if vehicle_type == "robot":
required_params = ["wheel_type", "body_clearance_cm", "main_material"]
valid_wheel_types = ["small_high_grip", "large_smooth", "tracked_base"]
valid_materials = ["light_plastic", "sturdy_metal_alloy"]
# Validate parameters
for param in required_params:
if param not in specs:
return {"error": f"Missing required parameter: {param}"}
if specs["wheel_type"] not in valid_wheel_types:
return {"error": f"Invalid wheel_type. Must be one of: {valid_wheel_types}"}
if specs["main_material"] not in valid_materials:
return {"error": f"Invalid main_material. Must be one of: {valid_materials}"}
if not (1 <= specs["body_clearance_cm"] <= 10):
return {"error": "body_clearance_cm must be between 1 and 10"}
elif vehicle_type == "drone":
required_params = ["propeller_size", "flight_height_cm", "main_material"]
valid_propeller_sizes = ["small_agile", "medium", "large_stable"]
valid_materials = ["light_carbon_fiber", "sturdy_aluminum"]
# Validate parameters
for param in required_params:
if param not in specs:
return {"error": f"Missing required parameter: {param}"}
if specs["propeller_size"] not in valid_propeller_sizes:
return {"error": f"Invalid propeller_size. Must be one of: {valid_propeller_sizes}"}
if specs["main_material"] not in valid_materials:
return {"error": f"Invalid main_material. Must be one of: {valid_materials}"}
if not (10 <= specs["flight_height_cm"] <= 50):
return {"error": "flight_height_cm must be between 10 and 50"}
else:
return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"}
return {
"success": True,
"vehicle_type": vehicle_type,
"specifications": specs,
"design_reasoning": design_reasoning,
"status": "Design validated and ready for simulation"
}
except Exception as e:
return {"error": f"Design validation failed: {str(e)}"}
@mcp.tool()
def simulate_vehicle(
vehicle_type: Literal["robot", "drone"],
specifications: Dict[str, Any],
simulation_duration: float = 10.0
) -> Dict[str, Any]:
"""
Run physics simulation of a vehicle attempting to cross the obstacle
Args:
vehicle_type: Either "robot" or "drone"
specifications: Vehicle parameters
simulation_duration: How long to run simulation (seconds)
Returns:
Simulation results and performance data
"""
try:
# Add vehicle_type to specifications
specs = specifications.copy()
specs["vehicle_type"] = vehicle_type
# Setup PyBullet environment
obstacle_id, plane_id = simulation_env.setup_pybullet_environment()
# Create vehicle
if vehicle_type == "robot":
vehicle_id, joint_indices, v_type = simulation_env.create_robot(specs)
vehicle_props = None
elif vehicle_type == "drone":
vehicle_id, joint_indices, v_type, vehicle_props = simulation_env.create_drone(specs)
else:
return {"error": f"Invalid vehicle_type: {vehicle_type}"}
# Run simulation
start_time = time.time()
final_feedback = {}
simulation_steps = int(simulation_duration * 240) # 240 Hz
for step in range(simulation_steps):
# Run simulation step
simulation_env.run_simulation_step(
vehicle_id, joint_indices, {}, vehicle_type,
vehicle_props if vehicle_type == "drone" else None
)
current_sim_time = time.time() - start_time
# Get feedback every 60 steps (4 times per second)
if step % 60 == 0:
feedback = simulation_env.get_simulation_feedback(
vehicle_id, obstacle_id, start_time, current_sim_time, vehicle_type
)
final_feedback = feedback
# Check for early exit
vehicle_x_pos = feedback['robot_position'][0] # Using robot_position for compatibility
is_stable = feedback['is_robot_upright'] # Using is_robot_upright for compatibility
if vehicle_x_pos > OBSTACLE_FAR_EDGE_X + 0.1 or not is_stable:
break
if current_sim_time > simulation_duration:
break
# Cleanup
simulation_env.reset_simulation()
return {
"success": True,
"vehicle_type": vehicle_type,
"specifications": specs,
"simulation_feedback": final_feedback,
"simulation_duration": current_sim_time
}
except Exception as e:
# Cleanup on error
try:
simulation_env.reset_simulation()
except:
pass
return {"error": f"Simulation failed: {str(e)}"}
@mcp.tool()
def evaluate_performance(simulation_feedback: Dict[str, Any]) -> Dict[str, Any]:
"""
Evaluate vehicle performance based on simulation feedback
Args:
simulation_feedback: Feedback from simulation
Returns:
Performance evaluation and success criteria analysis
"""
try:
# Use existing evaluation logic
eval_results = evaluation.evaluate_simulation_outcome(
simulation_feedback, OBSTACLE_FAR_EDGE_X
)
feedback_str = evaluation.format_feedback_for_llm(eval_results)
return {
"success": True,
"evaluation_results": eval_results,
"feedback_summary": feedback_str,
"success_criteria": {
"crossed_obstacle": eval_results["robot_crossed_obstacle"],
"remained_stable": eval_results["robot_remains_upright"],
"minimal_collision": eval_results["no_significant_collision_with_obstacle_during_pass"],
"overall_success": eval_results["overall_success"]
}
}
except Exception as e:
return {"error": f"Evaluation failed: {str(e)}"}
@mcp.tool()
def iterative_design_process(
vehicle_type: Literal["robot", "drone"],
task_description: str = "Cross the 5cm obstacle",
max_iterations: int = 5
) -> Dict[str, Any]:
"""
Run complete iterative LLM-driven vehicle design process
Args:
vehicle_type: Either "robot" or "drone"
task_description: Description of the task
max_iterations: Maximum number of design iterations
Returns:
Complete design process results with all iterations
"""
try:
all_attempts = []
iteration = 1
while iteration <= max_iterations:
# Generate design prompt
if iteration == 1:
if vehicle_type == "robot":
prompt = llm_interface.generate_initial_robot_design_prompt()
else: # drone
prompt = llm_interface.generate_initial_drone_design_prompt()
else:
last_attempt = all_attempts[-1]
if vehicle_type == "robot":
prompt = llm_interface.generate_iterative_robot_design_prompt(
last_attempt, iteration
)
else: # drone
prompt = llm_interface.generate_iterative_drone_design_prompt(
last_attempt, iteration
)
# Get LLM design
llm_response = llm_interface.call_llm_api(prompt)
vehicle_specs = llm_response.get('robot_specs', {}) # robot_specs used for both
design_reasoning = llm_response.get('design_reasoning', '')
# Add vehicle type
vehicle_specs["vehicle_type"] = vehicle_type
# Run simulation
sim_result = simulate_vehicle(vehicle_type, vehicle_specs, SIMULATION_DURATION_SEC)
if sim_result.get("error"):
all_attempts.append({
"iteration": iteration,
"error": sim_result["error"],
"specifications": vehicle_specs
})
iteration += 1
continue
# Evaluate performance
eval_result = evaluate_performance(sim_result["simulation_feedback"])
# Store attempt
attempt_data = {
"iteration": iteration,
"vehicle_type": vehicle_type,
"specifications": vehicle_specs,
"design_reasoning": design_reasoning,
"simulation_result": sim_result,
"evaluation_result": eval_result,
"success": eval_result.get("evaluation_results", {}).get("overall_success", False)
}
all_attempts.append(attempt_data)
# Check for success
if attempt_data["success"]:
return {
"success": True,
"vehicle_type": vehicle_type,
"iterations_needed": iteration,
"successful_design": vehicle_specs,
"all_attempts": all_attempts,
"final_evaluation": eval_result
}
iteration += 1
# Max iterations reached
best_attempt = max(all_attempts,
key=lambda x: x.get("evaluation_result", {}).get("evaluation_results", {}).get("final_robot_x_position", 0),
default={})
return {
"success": False,
"vehicle_type": vehicle_type,
"max_iterations_reached": True,
"best_attempt": best_attempt,
"all_attempts": all_attempts
}
except Exception as e:
return {"error": f"Iterative design process failed: {str(e)}"}
@mcp.tool()
def get_design_parameters(vehicle_type: Literal["robot", "drone"]) -> Dict[str, Any]:
"""
Get available design parameters for robots or drones
Args:
vehicle_type: Either "robot" or "drone"
Returns:
Available parameters and their valid ranges/options
"""
if vehicle_type == "robot":
return {
"vehicle_type": "robot",
"parameters": {
"wheel_type": {
"type": "enum",
"options": ["small_high_grip", "large_smooth", "tracked_base"],
"description": "Type of wheels for traction and obstacle climbing"
},
"body_clearance_cm": {
"type": "integer",
"range": [1, 10],
"description": "Ground clearance in centimeters (obstacle is 5cm high)"
},
"approach_sensor_enabled": {
"type": "boolean",
"description": "Enable obstacle detection sensors (conceptual for MVP)"
},
"main_material": {
"type": "enum",
"options": ["light_plastic", "sturdy_metal_alloy"],
"description": "Material affects weight and durability"
}
}
}
elif vehicle_type == "drone":
return {
"vehicle_type": "drone",
"parameters": {
"propeller_size": {
"type": "enum",
"options": ["small_agile", "medium", "large_stable"],
"description": "Propeller size affects thrust and maneuverability"
},
"flight_height_cm": {
"type": "integer",
"range": [10, 50],
"description": "Target flight height in centimeters (obstacle is 5cm high)"
},
"stability_mode": {
"type": "enum",
"options": ["auto_hover", "manual_control"],
"description": "Flight stability control mode"
},
"main_material": {
"type": "enum",
"options": ["light_carbon_fiber", "sturdy_aluminum"],
"description": "Material affects weight and flight characteristics"
}
}
}
else:
return {"error": f"Invalid vehicle_type: {vehicle_type}. Must be 'robot' or 'drone'"}
# Run the MCP server
if __name__ == "__main__":
mcp.run()