Agent2Robot / main_orchestrator.py
sam133
οΏ½ Implement generator-based real-time UI updates: Add main_orchestrator.py with iterative design process, live status updates, current/final specs display, simulation generation, and JSON download capability
dc221da
raw
history blame
11.4 kB
"""
Main Orchestrator for Agent2Robot
Provides generator-based design process with real-time UI updates
"""
import json
import time
import tempfile
import os
from typing import Dict, List, Any, Generator, Tuple
from design_tools import VehicleDesigner
import gradio as gr
class DesignOrchestrator:
"""Main orchestrator for the Agent2Robot design process"""
def __init__(self):
self.designer = VehicleDesigner()
self.MAX_ITERATIONS = 3
self.best_design_so_far = None
self.best_design_score = 0
def process_design_request(self, vehicle_type: str, design_requirements: str) -> Generator[Dict[str, Any], None, None]:
"""
Generator function that yields real-time updates during the design process
Args:
vehicle_type: Type of vehicle to design
design_requirements: User requirements and specifications
Yields:
Dictionary with updates for each Gradio component
"""
# Initialize process log
running_log = ""
current_iteration = 0
try:
# === INITIAL YIELD - START PROCESSING ===
running_log = """πŸš€ Agent2Robot Design Process Starting...
═══════════════════════════════════════════════════════
🎯 INITIALIZATION PHASE:
βœ… MCP Server Connection: Established
βœ… Vehicle Type: """ + vehicle_type + """
βœ… Requirements Analysis: Initiated
πŸ“‹ Processing Pipeline:
β€’ Step 1: Requirements interpretation ⏳
β€’ Step 2: Iterative design generation
β€’ Step 3: MCP validation & optimization
β€’ Step 4: Performance simulation
β€’ Step 5: Final report generation
πŸ”„ Starting iterative design process..."""
yield {
"process_log": running_log,
"current_specs": None,
"final_specs": None,
"simulation_video": None,
"status": "⏳ Processing your design request...",
}
time.sleep(1) # Brief pause for UX
# === DESIGN ITERATION LOOP ===
for iteration in range(1, self.MAX_ITERATIONS + 1):
current_iteration = iteration
# Update log for current iteration
running_log += f"""
πŸ”„ ITERATION {iteration}/{self.MAX_ITERATIONS}:
═══════════════════════════════════════════════════════
πŸ“Š Calling MCP Server for design generation..."""
yield {
"process_log": running_log,
"status": f"πŸ”„ Iteration {iteration}/{self.MAX_ITERATIONS} - Generating design...",
}
time.sleep(0.5) # Simulate processing time
# === LLM/MCP DESIGN GENERATION ===
try:
# Generate design using MCP
design_data = self.designer.mcp_client.generate_design(vehicle_type, design_requirements)
# Enhance the design
enhanced_design = self.designer._enhance_design(design_data)
enhanced_design["iteration"] = iteration
# Update log with LLM response
running_log += f"""
βœ… MCP Design Generated Successfully!
🎯 Design Specifications:
β€’ Vehicle Type: {enhanced_design['vehicle_type']}
β€’ Optimization Score: {enhanced_design['optimization_score']}%
β€’ Generated Features: {len(enhanced_design['generated_features'])} key features
β€’ Status: {enhanced_design['status']}
πŸ”§ Key Features Generated:
{chr(10).join(f' β€’ {feature}' for feature in enhanced_design['generated_features'][:3])}..."""
yield {
"process_log": running_log,
"current_specs": json.dumps(enhanced_design, indent=2),
"status": f"πŸ”„ Iteration {iteration}/{self.MAX_ITERATIONS} - Running simulation...",
}
time.sleep(0.5)
except Exception as e:
running_log += f"""
❌ Error in design generation: {str(e)}
πŸ”„ Attempting fallback design process..."""
yield {
"process_log": running_log,
"status": f"⚠️ Iteration {iteration}/{self.MAX_ITERATIONS} - Handling error...",
}
continue
# === SIMULATION & EVALUATION ===
try:
running_log += f"""
🎬 Running MCP Physics Simulation...
β€’ Initializing simulation environment
β€’ Loading vehicle parameters
β€’ Running behavior analysis"""
yield {
"process_log": running_log,
"status": f"🎬 Iteration {iteration}/{self.MAX_ITERATIONS} - Simulating physics...",
}
time.sleep(0.8) # Simulate longer simulation time
# Validate design
validation_result = self.designer.mcp_client.validate_design(enhanced_design)
enhanced_design["validation"] = validation_result
# Simulate evaluation scoring
simulation_success = validation_result.get('valid', True)
performance_score = enhanced_design['optimization_score']
running_log += f"""
βœ… Simulation Complete!
πŸ“Š Evaluation Results:
β€’ Simulation Success: {simulation_success}
β€’ Performance Score: {performance_score}%
β€’ Validation: {validation_result.get('confidence', 0.95)*100:.1f}% confidence
β€’ Status: {"PASSED" if simulation_success else "NEEDS IMPROVEMENT"}"""
# Track best design
if performance_score > self.best_design_score:
self.best_design_so_far = enhanced_design.copy()
self.best_design_score = performance_score
running_log += f"""
πŸ† NEW BEST DESIGN FOUND!
β€’ Score: {performance_score}% (Previous best: {self.best_design_score}%)
β€’ Design saved as current champion"""
yield {
"process_log": running_log,
"current_specs": json.dumps(enhanced_design, indent=2),
"status": f"βœ… Iteration {iteration}/{self.MAX_ITERATIONS} - Evaluation complete",
}
# Check if we have a good enough design
if performance_score >= 95:
running_log += f"""
🎯 EXCELLENT DESIGN ACHIEVED!
Performance threshold met. Proceeding to finalization..."""
break
except Exception as e:
running_log += f"""
❌ Simulation error: {str(e)}
πŸ”„ Continuing with next iteration..."""
yield {
"process_log": running_log,
"status": f"⚠️ Iteration {iteration}/{self.MAX_ITERATIONS} - Simulation error",
}
time.sleep(0.3) # Brief pause between iterations
# === FINAL PROCESSING ===
if self.best_design_so_far is None:
raise Exception("No successful designs generated")
running_log += f"""
🏁 DESIGN PROCESS COMPLETE!
═══════════════════════════════════════════════════════
βœ… Total Iterations: {current_iteration}
πŸ† Best Design Score: {self.best_design_score}%
🎯 Generating final outputs..."""
yield {
"process_log": running_log,
"status": "🎬 Generating final simulation and reports...",
}
time.sleep(1)
# === GENERATE FINAL OUTPUTS ===
# 1. Generate final simulation
simulation_info = self.designer.mcp_client.generate_simulation_video(self.best_design_so_far)
# 2. Generate final report
final_report = self.designer.format_design_report(self.best_design_so_far)
# 3. Create downloadable JSON file
json_filepath = self._create_download_json(self.best_design_so_far)
# 4. Final success message
final_status = "βœ… Design Process Completed Successfully!"
# === FINAL YIELD WITH ALL RESULTS ===
yield {
"process_log": final_report, # Replace log with final report
"current_specs": None, # Clear iterative display
"final_specs": json.dumps(self.best_design_so_far, indent=2),
"simulation_video": simulation_info,
"status": final_status,
"download_file": json_filepath,
}
except Exception as e:
# === ERROR HANDLING ===
error_log = f"""
❌ DESIGN PROCESS FAILED
═══════════════════════════════════════════════════════
Error: {str(e)}
πŸ”„ Attempted Iterations: {current_iteration}
πŸ“Š Process Status: Failed during {'initialization' if current_iteration == 0 else f'iteration {current_iteration}'}
πŸ› οΈ Troubleshooting:
β€’ Check MCP server connection
β€’ Verify input parameters
β€’ Review error logs above
πŸ”„ Please try again with different parameters or contact support."""
yield {
"process_log": error_log,
"current_specs": None,
"final_specs": None,
"simulation_video": None,
"status": "❌ Design process failed. Please try again.",
}
def _create_download_json(self, design_data: Dict[str, Any]) -> str:
"""Create a temporary JSON file for download"""
try:
# Create a temporary file
temp_dir = tempfile.gettempdir()
filename = f"agent2robot_design_{design_data.get('design_id', 'unknown')}.json"
filepath = os.path.join(temp_dir, filename)
# Write the design data to file
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(design_data, f, indent=2, ensure_ascii=False)
return filepath
except Exception as e:
print(f"Error creating download file: {e}")
return None
# Create global instance
orchestrator = DesignOrchestrator()
# Export the main function for use in app.py
def process_design_request(vehicle_type: str, design_requirements: str):
"""Main entry point for the design process"""
return orchestrator.process_design_request(vehicle_type, design_requirements)