""" OpenStreetMap Data Extractor ============================ Downloads and processes real road network data from OpenStreetMap for use in accident reconstruction simulation. """ import os import json from pathlib import Path from typing import Dict, List, Tuple, Optional try: import osmnx as ox OSMNX_AVAILABLE = True except ImportError: OSMNX_AVAILABLE = False print("Warning: osmnx not installed. Install with: pip install osmnx") try: import folium FOLIUM_AVAILABLE = True except ImportError: FOLIUM_AVAILABLE = False import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from config import CASE_STUDY_LOCATION, ALTERNATIVE_LOCATIONS, OSM_DATA_DIR def download_road_network( latitude: float, longitude: float, radius: int = 200, network_type: str = "drive", save_path: Optional[str] = None ) -> Dict: """ Download road network from OpenStreetMap. Args: latitude: Center latitude longitude: Center longitude radius: Radius in meters network_type: Type of network ('drive', 'walk', 'bike', 'all') save_path: Optional path to save the network Returns: Dictionary containing network data """ if not OSMNX_AVAILABLE: print("osmnx not available. Using fallback data.") return create_fallback_network(latitude, longitude, radius) try: # Configure osmnx ox.settings.use_cache = True ox.settings.log_console = True # Download the road network print(f"Downloading road network for ({latitude}, {longitude})...") G = ox.graph_from_point( (latitude, longitude), dist=radius, network_type=network_type, simplify=True ) # Get nodes and edges nodes, edges = ox.graph_to_gdfs(G) # Convert to dictionary format network_data = { "center": {"lat": latitude, "lng": longitude}, "radius": radius, "nodes": [], "edges": [], "bounds": { "north": nodes.geometry.y.max(), "south": nodes.geometry.y.min(), "east": nodes.geometry.x.max(), "west": nodes.geometry.x.min() } } # Process nodes for idx, row in nodes.iterrows(): network_data["nodes"].append({ "id": str(idx), "lat": row.geometry.y, "lng": row.geometry.x }) # Process edges for idx, row in edges.iterrows(): edge_data = { "from": str(idx[0]), "to": str(idx[1]), "length": row.get("length", 0), "name": row.get("name", "Unknown"), "highway": row.get("highway", "unclassified"), "lanes": row.get("lanes", 1), "maxspeed": row.get("maxspeed", "50") } # Get geometry if available if hasattr(row.geometry, 'coords'): edge_data["geometry"] = list(row.geometry.coords) network_data["edges"].append(edge_data) # Save if path provided if save_path: save_path = Path(save_path) save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'w') as f: json.dump(network_data, f, indent=2) # Also save as GraphML for SUMO conversion graphml_path = save_path.with_suffix('.graphml') ox.save_graphml(G, graphml_path) print(f"Network saved to {save_path}") print(f"GraphML saved to {graphml_path}") return network_data except Exception as e: print(f"Error downloading network: {e}") return create_fallback_network(latitude, longitude, radius) def create_fallback_network( latitude: float, longitude: float, radius: int = 200 ) -> Dict: """ Create a fallback network when OSM download fails. Creates a simple roundabout structure. """ import math # Create a simple roundabout with 4 approaches network_data = { "center": {"lat": latitude, "lng": longitude}, "radius": radius, "nodes": [], "edges": [], "bounds": { "north": latitude + 0.002, "south": latitude - 0.002, "east": longitude + 0.002, "west": longitude - 0.002 } } # Create roundabout nodes (8 points in a circle) roundabout_radius = 0.0003 # Approximately 30 meters num_points = 8 for i in range(num_points): angle = (2 * math.pi * i) / num_points lat = latitude + roundabout_radius * math.cos(angle) lng = longitude + roundabout_radius * math.sin(angle) network_data["nodes"].append({ "id": f"r{i}", "lat": lat, "lng": lng, "type": "roundabout" }) # Create approach nodes (4 directions) approach_distance = 0.001 # Approximately 100 meters approaches = [ ("north", latitude + approach_distance, longitude), ("south", latitude - approach_distance, longitude), ("east", latitude, longitude + approach_distance), ("west", latitude, longitude - approach_distance) ] for name, lat, lng in approaches: network_data["nodes"].append({ "id": f"a_{name}", "lat": lat, "lng": lng, "type": "approach" }) # Create roundabout edges (circular) for i in range(num_points): next_i = (i + 1) % num_points network_data["edges"].append({ "from": f"r{i}", "to": f"r{next_i}", "length": 20, "name": "Roundabout", "highway": "primary", "lanes": 2 }) # Connect approaches to roundabout approach_connections = [ ("a_north", "r0", "r6"), ("a_east", "r2", "r0"), ("a_south", "r4", "r2"), ("a_west", "r6", "r4") ] for approach, entry, exit in approach_connections: # Entry edge network_data["edges"].append({ "from": approach, "to": entry, "length": 80, "name": "Approach Road", "highway": "primary", "lanes": 2 }) # Exit edge network_data["edges"].append({ "from": exit, "to": approach, "length": 80, "name": "Exit Road", "highway": "primary", "lanes": 2 }) return network_data def extract_intersection_data( latitude: float, longitude: float, radius: int = 100 ) -> Dict: """ Extract intersection-specific data from OSM. """ if not OSMNX_AVAILABLE: return create_fallback_intersection(latitude, longitude) try: # Get features around the point tags = {"highway": True} gdf = ox.features_from_point((latitude, longitude), tags, dist=radius) intersection_data = { "center": {"lat": latitude, "lng": longitude}, "features": [] } for idx, row in gdf.iterrows(): feature = { "type": row.get("highway", "unknown"), "name": row.get("name", "Unknown") } if hasattr(row.geometry, 'centroid'): feature["lat"] = row.geometry.centroid.y feature["lng"] = row.geometry.centroid.x intersection_data["features"].append(feature) return intersection_data except Exception as e: print(f"Error extracting intersection: {e}") return create_fallback_intersection(latitude, longitude) def create_fallback_intersection(latitude: float, longitude: float) -> Dict: """Create fallback intersection data.""" return { "center": {"lat": latitude, "lng": longitude}, "type": "roundabout", "approaches": 4, "lanes": 2, "features": [ {"type": "primary", "name": "Main Road North-South"}, {"type": "primary", "name": "Main Road East-West"} ] } def create_location_map( location: Dict, network_data: Optional[Dict] = None, save_path: Optional[str] = None ) -> 'folium.Map': """ Create an interactive Folium map for the location. """ if not FOLIUM_AVAILABLE: print("Folium not available") return None # Create base map m = folium.Map( location=[location["latitude"], location["longitude"]], zoom_start=17, tiles="OpenStreetMap" ) # Add center marker folium.Marker( location=[location["latitude"], location["longitude"]], popup=location.get("name", "Location"), icon=folium.Icon(color="red", icon="info-sign") ).add_to(m) # Add radius circle folium.Circle( location=[location["latitude"], location["longitude"]], radius=location.get("radius_meters", 200), color="blue", fill=True, fill_opacity=0.1 ).add_to(m) # Add network data if available if network_data: # Add nodes for node in network_data.get("nodes", []): folium.CircleMarker( location=[node["lat"], node["lng"]], radius=3, color="green", fill=True ).add_to(m) # Add edges for edge in network_data.get("edges", []): if "geometry" in edge: # Use actual geometry coords = [(c[1], c[0]) for c in edge["geometry"]] folium.PolyLine( locations=coords, color="gray", weight=2 ).add_to(m) # Save if path provided if save_path: m.save(save_path) print(f"Map saved to {save_path}") return m def download_all_locations(): """Download network data for all configured locations.""" # Create output directory OSM_DATA_DIR.mkdir(parents=True, exist_ok=True) # Download primary location print("\n" + "="*50) print("Downloading primary location...") print("="*50) primary_data = download_road_network( latitude=CASE_STUDY_LOCATION["latitude"], longitude=CASE_STUDY_LOCATION["longitude"], radius=CASE_STUDY_LOCATION["radius_meters"], save_path=OSM_DATA_DIR / "primary_location.json" ) # Create map for primary location create_location_map( CASE_STUDY_LOCATION, primary_data, save_path=str(OSM_DATA_DIR / "primary_location_map.html") ) # Download alternative locations for loc_key, loc_data in ALTERNATIVE_LOCATIONS.items(): print(f"\n" + "="*50) print(f"Downloading {loc_data['name']}...") print("="*50) network_data = download_road_network( latitude=loc_data["latitude"], longitude=loc_data["longitude"], radius=loc_data["radius_meters"], save_path=OSM_DATA_DIR / f"{loc_key}.json" ) create_location_map( { "latitude": loc_data["latitude"], "longitude": loc_data["longitude"], "radius_meters": loc_data["radius_meters"], "name": loc_data["name"] }, network_data, save_path=str(OSM_DATA_DIR / f"{loc_key}_map.html") ) print("\n" + "="*50) print("All locations downloaded successfully!") print("="*50) if __name__ == "__main__": download_all_locations()