""" SUMO Traffic Simulation Interface ================================== Handles integration with SUMO for 2D traffic simulation. """ import os import sys import json import subprocess from pathlib import Path from typing import Dict, List, Optional, Tuple import xml.etree.ElementTree as ET # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from config import ( SUMO_CONFIG, SUMO_NETWORKS_DIR, SUMO_OUTPUT_DIR, CASE_STUDY_LOCATION ) # Check for SUMO installation SUMO_HOME = os.environ.get("SUMO_HOME", "") if SUMO_HOME: sys.path.append(os.path.join(SUMO_HOME, "tools")) try: import traci TRACI_AVAILABLE = True except ImportError: TRACI_AVAILABLE = False print("Warning: traci not available. SUMO simulation will be limited.") try: import sumolib SUMOLIB_AVAILABLE = True except ImportError: SUMOLIB_AVAILABLE = False print("Warning: sumolib not available.") class SUMOSimulator: """ SUMO Traffic Simulator wrapper for accident reconstruction. """ def __init__(self, network_path: str = None): """ Initialize the SUMO simulator. Args: network_path: Path to SUMO network file (.net.xml) """ self.network_path = network_path self.simulation_running = False self.vehicles = {} self.collision_detected = False self.collision_data = None # Create directories SUMO_NETWORKS_DIR.mkdir(parents=True, exist_ok=True) SUMO_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def create_network_from_osm( self, osm_file: str, output_prefix: str = "network" ) -> str: """ Convert OSM data to SUMO network format. Args: osm_file: Path to OSM file output_prefix: Prefix for output files Returns: Path to generated network file """ output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" # Use netconvert if available netconvert_cmd = [ "netconvert", "--osm-files", osm_file, "--output-file", str(output_net), "--geometry.remove", "true", "--junctions.join", "true", "--tls.guess", "true", "--roundabouts.guess", "true" ] try: subprocess.run(netconvert_cmd, check=True, capture_output=True) print(f"Network created: {output_net}") self.network_path = str(output_net) return str(output_net) except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"netconvert failed: {e}") # Create a simple network manually return self.create_simple_network(output_prefix) def create_simple_network( self, output_prefix: str = "simple_network", location: Dict = None ) -> str: """ Create a simple SUMO network for a roundabout. Args: output_prefix: Prefix for output files location: Location dictionary with coordinates Returns: Path to generated network file """ if location is None: location = CASE_STUDY_LOCATION # Create nodes XML nodes_xml = self._create_nodes_xml(location) nodes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.nod.xml" with open(nodes_path, 'w') as f: f.write(nodes_xml) # Create edges XML edges_xml = self._create_edges_xml() edges_path = SUMO_NETWORKS_DIR / f"{output_prefix}.edg.xml" with open(edges_path, 'w') as f: f.write(edges_xml) # Create connections XML connections_xml = self._create_connections_xml() connections_path = SUMO_NETWORKS_DIR / f"{output_prefix}.con.xml" with open(connections_path, 'w') as f: f.write(connections_xml) # Try to build network with netconvert output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" try: netconvert_cmd = [ "netconvert", "--node-files", str(nodes_path), "--edge-files", str(edges_path), "--connection-files", str(connections_path), "--output-file", str(output_net) ] subprocess.run(netconvert_cmd, check=True, capture_output=True) self.network_path = str(output_net) return str(output_net) except (subprocess.CalledProcessError, FileNotFoundError): # Create a minimal network XML directly return self._create_minimal_network_xml(output_prefix) def _create_nodes_xml(self, location: Dict) -> str: """Create SUMO nodes XML for a roundabout.""" lat = location.get("latitude", 26.2285) lng = location.get("longitude", 50.5818) # Convert to local coordinates (simplified) # In a real implementation, use proper projection scale = 111000 # meters per degree (approximate) nodes = f''' ''' return nodes def _create_edges_xml(self) -> str: """Create SUMO edges XML for a roundabout.""" edges = ''' ''' return edges def _create_connections_xml(self) -> str: """Create SUMO connections XML.""" connections = ''' ''' return connections def _create_minimal_network_xml(self, output_prefix: str) -> str: """Create a minimal network XML directly (fallback).""" network_xml = ''' ''' output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" with open(output_net, 'w') as f: f.write(network_xml) self.network_path = str(output_net) print(f"Created minimal network: {output_net}") return str(output_net) def create_route_file( self, vehicle_1_route: List[str], vehicle_2_route: List[str], vehicle_1_speed: float = 50, vehicle_2_speed: float = 50, output_prefix: str = "routes" ) -> str: """ Create a SUMO route file for two vehicles. Args: vehicle_1_route: List of edge IDs for vehicle 1 vehicle_2_route: List of edge IDs for vehicle 2 vehicle_1_speed: Speed of vehicle 1 in km/h vehicle_2_speed: Speed of vehicle 2 in km/h output_prefix: Prefix for output file Returns: Path to route file """ # Convert km/h to m/s v1_speed_ms = vehicle_1_speed / 3.6 v2_speed_ms = vehicle_2_speed / 3.6 routes_xml = f''' ''' routes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.rou.xml" with open(routes_path, 'w') as f: f.write(routes_xml) print(f"Routes file created: {routes_path}") return str(routes_path) def create_config_file( self, network_file: str, route_file: str, output_prefix: str = "simulation" ) -> str: """ Create a SUMO configuration file. Args: network_file: Path to network file route_file: Path to route file output_prefix: Prefix for output files Returns: Path to configuration file """ config_xml = f''' ''' config_path = SUMO_NETWORKS_DIR / f"{output_prefix}.sumocfg" with open(config_path, 'w') as f: f.write(config_xml) print(f"Configuration file created: {config_path}") return str(config_path) def run_simulation( self, config_file: str, gui: bool = False ) -> Dict: """ Run a SUMO simulation. Args: config_file: Path to SUMO configuration file gui: Whether to run with GUI Returns: Dictionary containing simulation results """ if not TRACI_AVAILABLE: print("TraCI not available. Running simulation without real-time control.") return self._run_simulation_batch(config_file) results = { "steps": 0, "collision_detected": False, "collision_time": None, "collision_position": None, "vehicle_1_trajectory": [], "vehicle_2_trajectory": [], "vehicle_1_speeds": [], "vehicle_2_speeds": [] } try: # Start SUMO sumo_binary = "sumo-gui" if gui else "sumo" traci.start([sumo_binary, "-c", config_file]) step = 0 while traci.simulation.getMinExpectedNumber() > 0: traci.simulationStep() # Get vehicle positions if "vehicle_1" in traci.vehicle.getIDList(): pos = traci.vehicle.getPosition("vehicle_1") speed = traci.vehicle.getSpeed("vehicle_1") results["vehicle_1_trajectory"].append(pos) results["vehicle_1_speeds"].append(speed) if "vehicle_2" in traci.vehicle.getIDList(): pos = traci.vehicle.getPosition("vehicle_2") speed = traci.vehicle.getSpeed("vehicle_2") results["vehicle_2_trajectory"].append(pos) results["vehicle_2_speeds"].append(speed) # Check for collisions collisions = traci.simulation.getCollidingVehiclesIDList() if collisions: results["collision_detected"] = True results["collision_time"] = step * SUMO_CONFIG["step_length"] if results["vehicle_1_trajectory"]: results["collision_position"] = results["vehicle_1_trajectory"][-1] step += 1 results["steps"] = step traci.close() except Exception as e: print(f"Simulation error: {e}") if traci.isLoaded(): traci.close() return results def _run_simulation_batch(self, config_file: str) -> Dict: """Run simulation in batch mode without TraCI.""" try: result = subprocess.run( ["sumo", "-c", config_file], capture_output=True, text=True ) return { "steps": SUMO_CONFIG["simulation_duration"] / SUMO_CONFIG["step_length"], "collision_detected": "collision" in result.stdout.lower(), "stdout": result.stdout, "stderr": result.stderr } except FileNotFoundError: print("SUMO not found. Please install SUMO.") return {"error": "SUMO not installed"} def create_simulation_for_scenario( scenario: Dict, vehicle_1: Dict, vehicle_2: Dict, scenario_id: int = 1 ) -> Dict: """ Create and run a SUMO simulation for a specific accident scenario. Args: scenario: Scenario dictionary from AI analysis vehicle_1: Vehicle 1 data vehicle_2: Vehicle 2 data scenario_id: Unique identifier for this scenario Returns: Simulation results dictionary """ simulator = SUMOSimulator() # Create network network_path = simulator.create_simple_network(f"scenario_{scenario_id}") # Map directions to routes direction_routes = { "north": ["in_n", "r_n_e", "r_e_s", "out_s"], "south": ["in_s", "r_s_w", "r_w_n", "out_n"], "east": ["in_e", "r_e_s", "r_s_w", "out_w"], "west": ["in_w", "r_w_n", "r_n_e", "out_e"] } v1_direction = vehicle_1.get("direction", "north") v2_direction = vehicle_2.get("direction", "east") v1_route = direction_routes.get(v1_direction, direction_routes["north"]) v2_route = direction_routes.get(v2_direction, direction_routes["east"]) # Create route file route_path = simulator.create_route_file( vehicle_1_route=v1_route, vehicle_2_route=v2_route, vehicle_1_speed=vehicle_1.get("speed", 50), vehicle_2_speed=vehicle_2.get("speed", 50), output_prefix=f"scenario_{scenario_id}" ) # Create config file config_path = simulator.create_config_file( network_file=network_path, route_file=route_path, output_prefix=f"scenario_{scenario_id}" ) # Run simulation results = simulator.run_simulation(config_path, gui=False) return results if __name__ == "__main__": # Test simulation setup print("Testing SUMO simulation setup...") simulator = SUMOSimulator() # Create a test network network = simulator.create_simple_network("test_network") print(f"Network created: {network}") # Create test routes routes = simulator.create_route_file( vehicle_1_route=["in_n", "r_n_e", "r_e_s", "out_s"], vehicle_2_route=["in_e", "r_e_s", "r_s_w", "out_w"], vehicle_1_speed=50, vehicle_2_speed=60, output_prefix="test" ) # Create config config = simulator.create_config_file(network, routes, "test") print("\nSUMO setup complete!") print(f"Network: {network}") print(f"Routes: {routes}") print(f"Config: {config}")