import os import ssl import time import imageio import numpy as np from PIL import Image import json from datetime import datetime import simulation_env_enhanced as simulation_env import llm_interface_enhanced as llm_interface import evaluation # SSL workaround for Gradio issues try: import certifi os.environ['SSL_CERT_FILE'] = certifi.where() except ImportError: pass # Try to disable SSL verification as a workaround try: ssl._create_default_https_context = ssl._create_unverified_context except AttributeError: pass # Try to import Gradio with error handling GRADIO_AVAILABLE = False try: import gradio as gr GRADIO_AVAILABLE = True print("āœ“ Gradio imported successfully") except Exception as e: print(f"⚠ Gradio import failed: {e}") print("Will use console-based interface instead") GRADIO_AVAILABLE = False # Global configuration MAX_ITERATIONS = 5 SIMULATION_DURATION_SEC = 10 OBSTACLE_FAR_EDGE_X = 0.8 def run_design_and_simulation_iteration(llm_prompt_func, previous_attempt_data, current_iteration, vehicle_type="robot"): """Run one complete design and simulation iteration for robot or drone""" try: # Call LLM for vehicle design prompt = llm_prompt_func() llm_response_dict = llm_interface.call_llm_api(prompt) if not llm_response_dict: raise Exception("Failed to get valid LLM response") # Extract vehicle specs and validate vehicle_specs = llm_response_dict.get('robot_specs', {}) vehicle_specs["vehicle_type"] = vehicle_type # Add vehicle type design_reasoning = llm_response_dict.get('design_reasoning', 'No reasoning provided') # Setup PyBullet environment obstacle_id, plane_id = simulation_env.setup_pybullet_environment() # Create vehicle (robot or drone) if vehicle_type == "robot": vehicle_id, joint_indices, v_type = simulation_env.create_robot(vehicle_specs) vehicle_props = None elif vehicle_type == "drone": vehicle_id, joint_indices, v_type, vehicle_props = simulation_env.create_drone(vehicle_specs) else: raise ValueError(f"Unknown vehicle type: {vehicle_type}") # Run simulation loop current_sim_time = 0 frames_for_gif = [] final_pybullet_feedback = {} start_time = time.time() # Simulation loop simulation_steps = int(SIMULATION_DURATION_SEC * 240) for step in range(simulation_steps): # Run simulation step simulation_env.run_simulation_step( vehicle_id, joint_indices, {}, vehicle_type, vehicle_props if vehicle_type == "drone" else None ) current_sim_time = time.time() - start_time # Capture frame every 24 steps (10 fps for GIF) if step % 24 == 0: try: frame = simulation_env.capture_frame() frames_for_gif.append(frame) except: pass # Skip frame capture if it fails # Get current feedback pybullet_feedback_snapshot = simulation_env.get_simulation_feedback( vehicle_id, obstacle_id, start_time, current_sim_time, vehicle_type ) final_pybullet_feedback = pybullet_feedback_snapshot # Check for early exit conditions vehicle_x_pos = pybullet_feedback_snapshot['robot_position'][0] # Using robot_position for compatibility is_stable = pybullet_feedback_snapshot['is_robot_upright'] # Using is_robot_upright for compatibility # Exit if vehicle crossed well past obstacle or became unstable if vehicle_x_pos > OBSTACLE_FAR_EDGE_X + 0.1 or not is_stable: break # Exit if simulation time exceeded if current_sim_time > SIMULATION_DURATION_SEC: break # Evaluate results evaluation_results = evaluation.evaluate_simulation_outcome( final_pybullet_feedback, OBSTACLE_FAR_EDGE_X ) llm_feedback_string = evaluation.format_feedback_for_llm(evaluation_results) # Cleanup PyBullet simulation_env.reset_simulation() return llm_response_dict, evaluation_results, llm_feedback_string, frames_for_gif except Exception as e: print(f"Error in simulation iteration: {e}") # Cleanup on error try: simulation_env.reset_simulation() except: pass # Return error response error_response = { 'robot_design_iteration': current_iteration, 'design_reasoning': f'Error occurred: {str(e)}', 'robot_specs': {} } error_evaluation = { 'robot_crossed_obstacle': False, 'no_significant_collision_with_obstacle_during_pass': False, 'robot_remains_upright': False, 'overall_success': False, 'specific_failure_point': 'simulation_error', 'final_robot_x_position': 0.0 } error_feedback = f"Simulation failed with error: {str(e)}" return error_response, error_evaluation, error_feedback, [] def create_gif_from_frames(frames, filename): """Create GIF from simulation frames""" try: # Create outputs directory if it doesn't exist outputs_dir = "outputs" if not os.path.exists(outputs_dir): os.makedirs(outputs_dir) gif_path = os.path.join(outputs_dir, filename) # Convert PIL images to numpy arrays frame_arrays = [] for frame in frames: if isinstance(frame, Image.Image): frame_arrays.append(np.array(frame)) else: frame_arrays.append(frame) # Create GIF if frame_arrays: imageio.mimsave(gif_path, frame_arrays, fps=10, loop=0) return gif_path else: return None except Exception as e: print(f"Error creating GIF: {e}") return None def find_best_attempt(all_attempts_data): """Find the best attempt from all attempts""" if not all_attempts_data: return None # Sort by final position reached best_attempt = max(all_attempts_data, key=lambda x: x['evaluation_results']['final_robot_x_position']) return best_attempt def design_vehicle_for_obstacle(vehicle_type, user_task_description): """Main function for Gradio interface - design vehicle iteratively""" # Initialize iteration = 1 all_attempts_data = [] overall_process_log = [] vehicle_name = "robot" if vehicle_type == "robot" else "drone" overall_process_log.append(f"Starting {vehicle_name} design process for task: {user_task_description}") overall_process_log.append(f"Target: Cross 5cm high obstacle at x=0.75m") overall_process_log.append(f"Success criteria: Cross obstacle (x>0.8m), stay upright/stable, minimal collision") overall_process_log.append("") # Main iteration loop while iteration <= MAX_ITERATIONS: overall_process_log.append(f"--- Iteration {iteration} ---") # Yield current progress yield "\n".join(overall_process_log), None, None try: # Determine prompt function if iteration == 1: if vehicle_type == "robot": prompt_func = lambda: llm_interface.generate_initial_robot_design_prompt() else: # drone prompt_func = lambda: llm_interface.generate_initial_drone_design_prompt() prev_attempt = None else: last_attempt = all_attempts_data[-1] if vehicle_type == "robot": prompt_func = lambda: llm_interface.generate_iterative_robot_design_prompt( last_attempt, iteration ) else: # drone prompt_func = lambda: llm_interface.generate_iterative_drone_design_prompt( last_attempt, iteration ) prev_attempt = last_attempt # Run design and simulation iteration llm_design, eval_results, feedback_str, frames = run_design_and_simulation_iteration( prompt_func, prev_attempt, iteration, vehicle_type ) # Store attempt data current_attempt_data = { "llm_design": llm_design, "robot_specs": llm_design.get('robot_specs', {}), "design_reasoning": llm_design.get('design_reasoning', ''), "evaluation_results": eval_results, "feedback_from_simulation": feedback_str } all_attempts_data.append(current_attempt_data) # Update log overall_process_log.append(f"LLM Design ({iteration}): {llm_design.get('robot_specs')}") overall_process_log.append(f"Design Reasoning: {llm_design.get('design_reasoning', 'N/A')}") overall_process_log.append(f"Simulation Feedback: {feedback_str}") overall_process_log.append("") # Create GIF from frames gif_path = None if frames: gif_filename = f"{vehicle_name}_iteration_{iteration}.gif" gif_path = create_gif_from_frames(frames, gif_filename) # Check for success if eval_results.get('overall_success', False): overall_process_log.append(f"šŸŽ‰ SUCCESS! {vehicle_name.title()} passed the obstacle in {iteration} iterations.") overall_process_log.append(f"Final {vehicle_name} specs: {llm_design.get('robot_specs')}") # Return final results final_log = "\n".join(overall_process_log) final_specs = llm_design.get('robot_specs', {}) yield final_log, gif_path, final_specs return # Yield current progress with GIF yield "\n".join(overall_process_log), gif_path, llm_design.get('robot_specs', {}) except Exception as e: overall_process_log.append(f"Error in iteration {iteration}: {str(e)}") overall_process_log.append("") iteration += 1 # Max iterations reached without success overall_process_log.append(f"āŒ FAILURE: Max {MAX_ITERATIONS} iterations reached. Obstacle not passed.") # Find best attempt best_attempt = find_best_attempt(all_attempts_data) best_specs = best_attempt['robot_specs'] if best_attempt else "No valid designs generated" if best_attempt: overall_process_log.append(f"Best attempt reached x={best_attempt['evaluation_results']['final_robot_x_position']:.2f}m") overall_process_log.append(f"Best {vehicle_name} specs: {best_specs}") # Create final GIF from last attempt final_gif_path = None if all_attempts_data and 'frames' in locals(): final_gif_path = create_gif_from_frames(frames, f"final_{vehicle_name}_attempt.gif") final_log = "\n".join(overall_process_log) yield final_log, final_gif_path, best_specs def console_design_vehicle_for_obstacle(vehicle_type, user_task_description): """Console-based version of the vehicle design system""" vehicle_name = "robot" if vehicle_type == "robot" else "drone" print("=" * 60) print(f"LLM {vehicle_name.title()} Design System - Console Interface") print("=" * 60) print(f"Task: {user_task_description}") print(f"Target: Cross 5cm high obstacle at x=0.75m") print(f"Success criteria: Cross obstacle (x>0.8m), stay upright/stable, minimal collision") print("") # Initialize best design tracking iteration = 1 all_attempts_data = [] best_attempt_data = None best_iteration = 0 # Main iteration loop while iteration <= MAX_ITERATIONS: print(f"--- Iteration {iteration} ---") try: # Determine prompt function if iteration == 1: if vehicle_type == "robot": prompt_func = lambda: llm_interface.generate_initial_robot_design_prompt() else: # drone prompt_func = lambda: llm_interface.generate_initial_drone_design_prompt() prev_attempt = None else: last_attempt = all_attempts_data[-1] if vehicle_type == "robot": prompt_func = lambda: llm_interface.generate_iterative_robot_design_prompt( last_attempt, iteration ) else: # drone prompt_func = lambda: llm_interface.generate_iterative_drone_design_prompt( last_attempt, iteration ) prev_attempt = last_attempt # Run design and simulation iteration llm_design, eval_results, feedback_str, frames = run_design_and_simulation_iteration( prompt_func, prev_attempt, iteration, vehicle_type ) # Store attempt data current_attempt_data = { "llm_design": llm_design, "robot_specs": llm_design.get('robot_specs', {}), "design_reasoning": llm_design.get('design_reasoning', ''), "evaluation_results": eval_results, "feedback_from_simulation": feedback_str, "iteration": iteration } all_attempts_data.append(current_attempt_data) # Check if this is the best design so far if best_attempt_data is None or is_current_better( eval_results, iteration, best_attempt_data['evaluation_results'], best_iteration ): best_attempt_data = current_attempt_data best_iteration = iteration print(f"šŸ† New best design found in iteration {iteration}!") # Display results print(f"LLM Design ({iteration}): {llm_design.get('robot_specs')}") print(f"Design Reasoning: {llm_design.get('design_reasoning', 'N/A')}") print(f"Simulation Feedback: {feedback_str}") print("") # Create GIF from frames if frames: gif_filename = f"{vehicle_name}_iteration_{iteration}.gif" gif_path = create_gif_from_frames(frames, gif_filename) if gif_path: print(f"Simulation GIF saved: {gif_path}") # Check for success if eval_results.get('overall_success', False): print(f"šŸŽ‰ SUCCESS! {vehicle_name.title()} passed the obstacle in {iteration} iterations.") print(f"Final {vehicle_name} specs: {llm_design.get('robot_specs')}") # Save best design JSON json_file = save_best_design_json(best_attempt_data, vehicle_type, iteration) return True, llm_design.get('robot_specs') except Exception as e: print(f"Error in iteration {iteration}: {str(e)}") iteration += 1 # Max iterations reached without success print(f"āŒ FAILURE: Max {MAX_ITERATIONS} iterations reached. Obstacle not passed.") # Display best design summary if best_attempt_data: print(f"\nšŸ† BEST DESIGN SUMMARY (from iteration {best_iteration}):") print(f" • Final Position: {best_attempt_data['evaluation_results']['final_robot_x_position']:.3f}m") print(f" • Success: {best_attempt_data['evaluation_results']['overall_success']}") best_specs = best_attempt_data['robot_specs'] if vehicle_type == "robot": print(f" • Wheel Type: {best_specs.get('wheel_type', 'unknown')}") print(f" • Body Clearance: {best_specs.get('body_clearance_cm', 0)}cm") print(f" • Material: {best_specs.get('main_material', 'unknown')}") else: # drone print(f" • Propeller Size: {best_specs.get('propeller_size', 'unknown')}") print(f" • Flight Height: {best_specs.get('flight_height_cm', 0)}cm") print(f" • Material: {best_specs.get('main_material', 'unknown')}") print(f" • Design Reasoning: {best_attempt_data['design_reasoning']}") # Save best design JSON json_file = save_best_design_json(best_attempt_data, vehicle_type, MAX_ITERATIONS) return False, best_specs else: print("No valid designs generated") return False, "No valid designs generated" # Gradio Interface Setup def create_gradio_interface(): """Create and configure Gradio interface with vehicle type selection""" with gr.Blocks(title="LLM Vehicle Designer - Robots & Drones", theme=gr.themes.Soft()) as iface: gr.Markdown("# šŸ¤–šŸš LLM-Agent-Designed Obstacle-Passing Vehicle System") gr.Markdown("This system uses an LLM agent to iteratively design **robots** or **drones** that can pass a 5cm high obstacle.") with gr.Row(): with gr.Column(scale=2): vehicle_type = gr.Radio( choices=["robot", "drone"], label="Vehicle Type", value="robot", info="Choose between a ground robot or flying drone" ) task_input = gr.Textbox( label="Task Description", value="Design a vehicle that can pass the 5cm high obstacle", placeholder="Describe what you want the vehicle to accomplish...", lines=2 ) submit_btn = gr.Button("šŸš€ Start Vehicle Design Process", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### Process Info") gr.Markdown("- **Obstacle**: 5cm high, 50cm wide, 10cm deep") gr.Markdown("- **Success**: Vehicle crosses (x > 0.8m), stays stable") gr.Markdown("- **Max Iterations**: 5") gr.Markdown("- **Robot**: Wheels, clearance, materials") gr.Markdown("- **Drone**: Propellers, flight height, materials") with gr.Row(): with gr.Column(scale=2): process_log = gr.Textbox( label="šŸ”„ Design Process Log", lines=20, max_lines=30, show_copy_button=True ) with gr.Column(scale=1): simulation_gif = gr.Image( label="šŸŽ¬ Simulation Visualization", type="filepath" ) vehicle_specs = gr.JSON( label="šŸ”§ Final Vehicle Specifications" ) # Set up the interface interaction submit_btn.click( fn=design_vehicle_for_obstacle, inputs=[vehicle_type, task_input], outputs=[process_log, simulation_gif, vehicle_specs] ) gr.Markdown("---") gr.Markdown("### How it works:") gr.Markdown("1. **Vehicle Selection**: Choose robot (ground) or drone (flying)") gr.Markdown("2. **LLM Design**: AI agent proposes vehicle parameters") gr.Markdown("3. **Simulation**: Vehicle tested in PyBullet physics") gr.Markdown("4. **Evaluation**: Performance measured against success criteria") gr.Markdown("5. **Iteration**: Feedback used to improve design") gr.Markdown("6. **Success/Failure**: Process continues until success or max iterations") return iface def is_current_better(current_eval, current_iteration, best_eval, best_iteration): """ Determine if current design is better than best design using priority criteria: 1. Priority 1: Any design achieving overall_success == True 2. Priority 2: Among successful designs, fewer iterations preferred 3. Priority 3: Designs that crossed obstacle (even if failed other criteria) 4. Priority 4: Design with furthest final_robot_x_position 5. Priority 5: If tied, use last design """ # Priority 1: Success vs non-success current_success = current_eval.get('overall_success', False) best_success = best_eval.get('overall_success', False) if current_success and not best_success: return True elif best_success and not current_success: return False elif current_success and best_success: # Priority 2: Among successful, prefer fewer iterations return current_iteration < best_iteration # Priority 3: Obstacle crossing (even if overall failed) current_crossed = current_eval.get('robot_crossed_obstacle', False) best_crossed = best_eval.get('robot_crossed_obstacle', False) if current_crossed and not best_crossed: return True elif best_crossed and not current_crossed: return False # Priority 4: Furthest distance current_distance = current_eval.get('final_robot_x_position', 0.0) best_distance = best_eval.get('final_robot_x_position', 0.0) if current_distance > best_distance: return True elif current_distance < best_distance: return False # Priority 5: If tied, use last design (current) return True def format_best_design_performance(eval_results): """Format evaluation results for user-friendly display""" success_status = "āœ… SUCCESS" if eval_results.get('overall_success', False) else "āŒ FAILED" performance_text = f""" {success_status} šŸ“Š **Performance Metrics:** • Final Position: {eval_results.get('final_robot_x_position', 0.0):.3f}m • Crossed Obstacle: {'āœ… Yes' if eval_results.get('robot_crossed_obstacle', False) else 'āŒ No'} • Remained Upright: {'āœ… Yes' if eval_results.get('robot_remains_upright', False) else 'āŒ No'} • Clean Pass: {'āœ… Yes' if eval_results.get('no_significant_collision_with_obstacle_during_pass', False) else 'āŒ No'} šŸŽÆ **Success Criteria:** • Target: Cross obstacle (reach x > 0.8m) • Current: {eval_results.get('final_robot_x_position', 0.0):.3f}m • Status: {'ACHIEVED' if eval_results.get('final_robot_x_position', 0.0) > 0.8 else 'NOT ACHIEVED'} āš ļø **Failure Analysis:** {eval_results.get('specific_failure_point', 'No specific failure identified')} """ return performance_text.strip() def save_best_design_json(best_attempt_data, vehicle_type, iteration_count): """Save the best design to a comprehensive JSON file""" if not best_attempt_data: return None # Create timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Determine vehicle specifications key if vehicle_type == "robot": specs_key = "robot_specs" json_filename = f"best_robot_design_{timestamp}.json" else: specs_key = "robot_specs" # Using same key for compatibility json_filename = f"best_drone_design_{timestamp}.json" # Extract specifications vehicle_specs = best_attempt_data.get(specs_key, {}) # Create comprehensive JSON structure json_data = { "timestamp": datetime.now().isoformat(), "vehicle_type": vehicle_type, "total_iterations": iteration_count, "design_process_summary": { "success": best_attempt_data['evaluation_results'].get('overall_success', False), "best_iteration": "Final attempt", # Could be enhanced to track actual best iteration "total_attempts": iteration_count } } # Add vehicle-specific specifications if vehicle_type == "robot": json_data["robot_specifications"] = { "wheel_type": vehicle_specs.get('wheel_type', 'unknown'), "body_clearance_cm": vehicle_specs.get('body_clearance_cm', 0), "approach_sensor_enabled": vehicle_specs.get('approach_sensor_enabled', False), "main_material": vehicle_specs.get('main_material', 'unknown'), "vehicle_type": vehicle_specs.get('vehicle_type', 'robot') } else: # drone json_data["drone_specifications"] = { "propeller_size": vehicle_specs.get('propeller_size', 'unknown'), "flight_height_cm": vehicle_specs.get('flight_height_cm', 0), "stability_mode": vehicle_specs.get('stability_mode', 'unknown'), "main_material": vehicle_specs.get('main_material', 'unknown'), "vehicle_type": vehicle_specs.get('vehicle_type', 'drone') } # Add design reasoning json_data["design_reasoning"] = best_attempt_data.get('design_reasoning', 'No reasoning provided') # Add performance results eval_results = best_attempt_data['evaluation_results'] json_data["performance_results"] = { "overall_success": eval_results.get('overall_success', False), "robot_crossed_obstacle": eval_results.get('robot_crossed_obstacle', False), "robot_remains_upright": eval_results.get('robot_remains_upright', False), "no_significant_collision_with_obstacle_during_pass": eval_results.get('no_significant_collision_with_obstacle_during_pass', False), "final_robot_x_position": eval_results.get('final_robot_x_position', 0.0), "specific_failure_point": eval_results.get('specific_failure_point', 'none') } # Add success criteria breakdown json_data["success_criteria_analysis"] = { "target_distance": 0.8, "achieved_distance": eval_results.get('final_robot_x_position', 0.0), "distance_success": eval_results.get('final_robot_x_position', 0.0) > 0.8, "stability_success": eval_results.get('robot_remains_upright', False), "collision_success": eval_results.get('no_significant_collision_with_obstacle_during_pass', False), "overall_success": eval_results.get('overall_success', False) } # Add performance summary json_data["performance_summary"] = { "distance_traveled": f"{eval_results.get('final_robot_x_position', 0.0):.3f}m", "success_rate": "100%" if eval_results.get('overall_success', False) else "0%", "primary_failure": eval_results.get('specific_failure_point', 'unknown') if not eval_results.get('overall_success', False) else None } # Save JSON file try: with open(json_filename, 'w', encoding='utf-8') as f: json.dump(json_data, f, indent=2, ensure_ascii=False) print(f"šŸ“„ Best {vehicle_type} design saved to: {json_filename}") return json_filename except Exception as e: print(f"āŒ Error saving JSON file: {e}") return None if __name__ == "__main__": print("šŸ¤–šŸš LLM-Agent-Designed Vehicle System (Robots & Drones)") print("=" * 60) if GRADIO_AVAILABLE: print("Starting Gradio web interface...") try: # Create and launch Gradio interface interface = create_gradio_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True ) except Exception as e: print(f"Failed to start Gradio interface: {e}") print("Falling back to console interface...") GRADIO_AVAILABLE = False if not GRADIO_AVAILABLE: print("Using console interface...") print("Note: Simulation GIFs will be saved to the 'outputs' directory") print("") # Get user input vehicle_type = input("Choose vehicle type (robot/drone) [robot]: ").strip().lower() if vehicle_type not in ["robot", "drone"]: vehicle_type = "robot" task_description = input("Enter task description (or press Enter for default): ").strip() if not task_description: task_description = f"Design a {vehicle_type} that can pass the 5cm high obstacle" print("") # Run the design process success, final_specs = console_design_vehicle_for_obstacle(vehicle_type, task_description) print("\n" + "=" * 60) if success: print("šŸŽ‰ MISSION ACCOMPLISHED!") print(f"Final successful {vehicle_type} design: {final_specs}") else: print("āŒ Mission failed, but valuable data collected.") print(f"Best attempt specs: {final_specs}") print("\nCheck the 'outputs' directory for simulation GIFs.") print("=" * 60)