| | """ |
| | OpenStreetMap Data Extractor |
| | ============================ |
| | Downloads and processes real road network data from OpenStreetMap |
| | for use in accident reconstruction simulation. |
| | """ |
| |
|
| | import os |
| | import json |
| | from pathlib import Path |
| | from typing import Dict, List, Tuple, Optional |
| |
|
| | try: |
| | import osmnx as ox |
| | OSMNX_AVAILABLE = True |
| | except ImportError: |
| | OSMNX_AVAILABLE = False |
| | print("Warning: osmnx not installed. Install with: pip install osmnx") |
| |
|
| | try: |
| | import folium |
| | FOLIUM_AVAILABLE = True |
| | except ImportError: |
| | FOLIUM_AVAILABLE = False |
| |
|
| | import sys |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | from config import CASE_STUDY_LOCATION, ALTERNATIVE_LOCATIONS, OSM_DATA_DIR |
| |
|
| |
|
| | def download_road_network( |
| | latitude: float, |
| | longitude: float, |
| | radius: int = 200, |
| | network_type: str = "drive", |
| | save_path: Optional[str] = None |
| | ) -> Dict: |
| | """ |
| | Download road network from OpenStreetMap. |
| | |
| | Args: |
| | latitude: Center latitude |
| | longitude: Center longitude |
| | radius: Radius in meters |
| | network_type: Type of network ('drive', 'walk', 'bike', 'all') |
| | save_path: Optional path to save the network |
| | |
| | Returns: |
| | Dictionary containing network data |
| | """ |
| | if not OSMNX_AVAILABLE: |
| | print("osmnx not available. Using fallback data.") |
| | return create_fallback_network(latitude, longitude, radius) |
| | |
| | try: |
| | |
| | ox.settings.use_cache = True |
| | ox.settings.log_console = True |
| | |
| | |
| | print(f"Downloading road network for ({latitude}, {longitude})...") |
| | G = ox.graph_from_point( |
| | (latitude, longitude), |
| | dist=radius, |
| | network_type=network_type, |
| | simplify=True |
| | ) |
| | |
| | |
| | nodes, edges = ox.graph_to_gdfs(G) |
| | |
| | |
| | network_data = { |
| | "center": {"lat": latitude, "lng": longitude}, |
| | "radius": radius, |
| | "nodes": [], |
| | "edges": [], |
| | "bounds": { |
| | "north": nodes.geometry.y.max(), |
| | "south": nodes.geometry.y.min(), |
| | "east": nodes.geometry.x.max(), |
| | "west": nodes.geometry.x.min() |
| | } |
| | } |
| | |
| | |
| | for idx, row in nodes.iterrows(): |
| | network_data["nodes"].append({ |
| | "id": str(idx), |
| | "lat": row.geometry.y, |
| | "lng": row.geometry.x |
| | }) |
| | |
| | |
| | for idx, row in edges.iterrows(): |
| | edge_data = { |
| | "from": str(idx[0]), |
| | "to": str(idx[1]), |
| | "length": row.get("length", 0), |
| | "name": row.get("name", "Unknown"), |
| | "highway": row.get("highway", "unclassified"), |
| | "lanes": row.get("lanes", 1), |
| | "maxspeed": row.get("maxspeed", "50") |
| | } |
| | |
| | |
| | if hasattr(row.geometry, 'coords'): |
| | edge_data["geometry"] = list(row.geometry.coords) |
| | |
| | network_data["edges"].append(edge_data) |
| | |
| | |
| | if save_path: |
| | save_path = Path(save_path) |
| | save_path.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | with open(save_path, 'w') as f: |
| | json.dump(network_data, f, indent=2) |
| | |
| | |
| | graphml_path = save_path.with_suffix('.graphml') |
| | ox.save_graphml(G, graphml_path) |
| | |
| | print(f"Network saved to {save_path}") |
| | print(f"GraphML saved to {graphml_path}") |
| | |
| | return network_data |
| | |
| | except Exception as e: |
| | print(f"Error downloading network: {e}") |
| | return create_fallback_network(latitude, longitude, radius) |
| |
|
| |
|
| | def create_fallback_network( |
| | latitude: float, |
| | longitude: float, |
| | radius: int = 200 |
| | ) -> Dict: |
| | """ |
| | Create a fallback network when OSM download fails. |
| | Creates a simple roundabout structure. |
| | """ |
| | import math |
| | |
| | |
| | network_data = { |
| | "center": {"lat": latitude, "lng": longitude}, |
| | "radius": radius, |
| | "nodes": [], |
| | "edges": [], |
| | "bounds": { |
| | "north": latitude + 0.002, |
| | "south": latitude - 0.002, |
| | "east": longitude + 0.002, |
| | "west": longitude - 0.002 |
| | } |
| | } |
| | |
| | |
| | roundabout_radius = 0.0003 |
| | num_points = 8 |
| | |
| | for i in range(num_points): |
| | angle = (2 * math.pi * i) / num_points |
| | lat = latitude + roundabout_radius * math.cos(angle) |
| | lng = longitude + roundabout_radius * math.sin(angle) |
| | |
| | network_data["nodes"].append({ |
| | "id": f"r{i}", |
| | "lat": lat, |
| | "lng": lng, |
| | "type": "roundabout" |
| | }) |
| | |
| | |
| | approach_distance = 0.001 |
| | approaches = [ |
| | ("north", latitude + approach_distance, longitude), |
| | ("south", latitude - approach_distance, longitude), |
| | ("east", latitude, longitude + approach_distance), |
| | ("west", latitude, longitude - approach_distance) |
| | ] |
| | |
| | for name, lat, lng in approaches: |
| | network_data["nodes"].append({ |
| | "id": f"a_{name}", |
| | "lat": lat, |
| | "lng": lng, |
| | "type": "approach" |
| | }) |
| | |
| | |
| | for i in range(num_points): |
| | next_i = (i + 1) % num_points |
| | network_data["edges"].append({ |
| | "from": f"r{i}", |
| | "to": f"r{next_i}", |
| | "length": 20, |
| | "name": "Roundabout", |
| | "highway": "primary", |
| | "lanes": 2 |
| | }) |
| | |
| | |
| | approach_connections = [ |
| | ("a_north", "r0", "r6"), |
| | ("a_east", "r2", "r0"), |
| | ("a_south", "r4", "r2"), |
| | ("a_west", "r6", "r4") |
| | ] |
| | |
| | for approach, entry, exit in approach_connections: |
| | |
| | network_data["edges"].append({ |
| | "from": approach, |
| | "to": entry, |
| | "length": 80, |
| | "name": "Approach Road", |
| | "highway": "primary", |
| | "lanes": 2 |
| | }) |
| | |
| | network_data["edges"].append({ |
| | "from": exit, |
| | "to": approach, |
| | "length": 80, |
| | "name": "Exit Road", |
| | "highway": "primary", |
| | "lanes": 2 |
| | }) |
| | |
| | return network_data |
| |
|
| |
|
| | def extract_intersection_data( |
| | latitude: float, |
| | longitude: float, |
| | radius: int = 100 |
| | ) -> Dict: |
| | """ |
| | Extract intersection-specific data from OSM. |
| | """ |
| | if not OSMNX_AVAILABLE: |
| | return create_fallback_intersection(latitude, longitude) |
| | |
| | try: |
| | |
| | tags = {"highway": True} |
| | gdf = ox.features_from_point((latitude, longitude), tags, dist=radius) |
| | |
| | intersection_data = { |
| | "center": {"lat": latitude, "lng": longitude}, |
| | "features": [] |
| | } |
| | |
| | for idx, row in gdf.iterrows(): |
| | feature = { |
| | "type": row.get("highway", "unknown"), |
| | "name": row.get("name", "Unknown") |
| | } |
| | if hasattr(row.geometry, 'centroid'): |
| | feature["lat"] = row.geometry.centroid.y |
| | feature["lng"] = row.geometry.centroid.x |
| | |
| | intersection_data["features"].append(feature) |
| | |
| | return intersection_data |
| | |
| | except Exception as e: |
| | print(f"Error extracting intersection: {e}") |
| | return create_fallback_intersection(latitude, longitude) |
| |
|
| |
|
| | def create_fallback_intersection(latitude: float, longitude: float) -> Dict: |
| | """Create fallback intersection data.""" |
| | return { |
| | "center": {"lat": latitude, "lng": longitude}, |
| | "type": "roundabout", |
| | "approaches": 4, |
| | "lanes": 2, |
| | "features": [ |
| | {"type": "primary", "name": "Main Road North-South"}, |
| | {"type": "primary", "name": "Main Road East-West"} |
| | ] |
| | } |
| |
|
| |
|
| | def create_location_map( |
| | location: Dict, |
| | network_data: Optional[Dict] = None, |
| | save_path: Optional[str] = None |
| | ) -> 'folium.Map': |
| | """ |
| | Create an interactive Folium map for the location. |
| | """ |
| | if not FOLIUM_AVAILABLE: |
| | print("Folium not available") |
| | return None |
| | |
| | |
| | m = folium.Map( |
| | location=[location["latitude"], location["longitude"]], |
| | zoom_start=17, |
| | tiles="OpenStreetMap" |
| | ) |
| | |
| | |
| | folium.Marker( |
| | location=[location["latitude"], location["longitude"]], |
| | popup=location.get("name", "Location"), |
| | icon=folium.Icon(color="red", icon="info-sign") |
| | ).add_to(m) |
| | |
| | |
| | folium.Circle( |
| | location=[location["latitude"], location["longitude"]], |
| | radius=location.get("radius_meters", 200), |
| | color="blue", |
| | fill=True, |
| | fill_opacity=0.1 |
| | ).add_to(m) |
| | |
| | |
| | if network_data: |
| | |
| | for node in network_data.get("nodes", []): |
| | folium.CircleMarker( |
| | location=[node["lat"], node["lng"]], |
| | radius=3, |
| | color="green", |
| | fill=True |
| | ).add_to(m) |
| | |
| | |
| | for edge in network_data.get("edges", []): |
| | if "geometry" in edge: |
| | |
| | coords = [(c[1], c[0]) for c in edge["geometry"]] |
| | folium.PolyLine( |
| | locations=coords, |
| | color="gray", |
| | weight=2 |
| | ).add_to(m) |
| | |
| | |
| | if save_path: |
| | m.save(save_path) |
| | print(f"Map saved to {save_path}") |
| | |
| | return m |
| |
|
| |
|
| | def download_all_locations(): |
| | """Download network data for all configured locations.""" |
| | |
| | |
| | OSM_DATA_DIR.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | print("\n" + "="*50) |
| | print("Downloading primary location...") |
| | print("="*50) |
| | |
| | primary_data = download_road_network( |
| | latitude=CASE_STUDY_LOCATION["latitude"], |
| | longitude=CASE_STUDY_LOCATION["longitude"], |
| | radius=CASE_STUDY_LOCATION["radius_meters"], |
| | save_path=OSM_DATA_DIR / "primary_location.json" |
| | ) |
| | |
| | |
| | create_location_map( |
| | CASE_STUDY_LOCATION, |
| | primary_data, |
| | save_path=str(OSM_DATA_DIR / "primary_location_map.html") |
| | ) |
| | |
| | |
| | for loc_key, loc_data in ALTERNATIVE_LOCATIONS.items(): |
| | print(f"\n" + "="*50) |
| | print(f"Downloading {loc_data['name']}...") |
| | print("="*50) |
| | |
| | network_data = download_road_network( |
| | latitude=loc_data["latitude"], |
| | longitude=loc_data["longitude"], |
| | radius=loc_data["radius_meters"], |
| | save_path=OSM_DATA_DIR / f"{loc_key}.json" |
| | ) |
| | |
| | create_location_map( |
| | { |
| | "latitude": loc_data["latitude"], |
| | "longitude": loc_data["longitude"], |
| | "radius_meters": loc_data["radius_meters"], |
| | "name": loc_data["name"] |
| | }, |
| | network_data, |
| | save_path=str(OSM_DATA_DIR / f"{loc_key}_map.html") |
| | ) |
| | |
| | print("\n" + "="*50) |
| | print("All locations downloaded successfully!") |
| | print("="*50) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | download_all_locations() |
| |
|