car_web2 / data /osm_extractor.py
wesam0099's picture
Deploy REAL original app
f158aab
"""
OpenStreetMap Data Extractor
============================
Downloads and processes real road network data from OpenStreetMap
for use in accident reconstruction simulation.
"""
import os
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional
try:
import osmnx as ox
OSMNX_AVAILABLE = True
except ImportError:
OSMNX_AVAILABLE = False
print("Warning: osmnx not installed. Install with: pip install osmnx")
try:
import folium
FOLIUM_AVAILABLE = True
except ImportError:
FOLIUM_AVAILABLE = False
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import CASE_STUDY_LOCATION, ALTERNATIVE_LOCATIONS, OSM_DATA_DIR
def download_road_network(
latitude: float,
longitude: float,
radius: int = 200,
network_type: str = "drive",
save_path: Optional[str] = None
) -> Dict:
"""
Download road network from OpenStreetMap.
Args:
latitude: Center latitude
longitude: Center longitude
radius: Radius in meters
network_type: Type of network ('drive', 'walk', 'bike', 'all')
save_path: Optional path to save the network
Returns:
Dictionary containing network data
"""
if not OSMNX_AVAILABLE:
print("osmnx not available. Using fallback data.")
return create_fallback_network(latitude, longitude, radius)
try:
# Configure osmnx
ox.settings.use_cache = True
ox.settings.log_console = True
# Download the road network
print(f"Downloading road network for ({latitude}, {longitude})...")
G = ox.graph_from_point(
(latitude, longitude),
dist=radius,
network_type=network_type,
simplify=True
)
# Get nodes and edges
nodes, edges = ox.graph_to_gdfs(G)
# Convert to dictionary format
network_data = {
"center": {"lat": latitude, "lng": longitude},
"radius": radius,
"nodes": [],
"edges": [],
"bounds": {
"north": nodes.geometry.y.max(),
"south": nodes.geometry.y.min(),
"east": nodes.geometry.x.max(),
"west": nodes.geometry.x.min()
}
}
# Process nodes
for idx, row in nodes.iterrows():
network_data["nodes"].append({
"id": str(idx),
"lat": row.geometry.y,
"lng": row.geometry.x
})
# Process edges
for idx, row in edges.iterrows():
edge_data = {
"from": str(idx[0]),
"to": str(idx[1]),
"length": row.get("length", 0),
"name": row.get("name", "Unknown"),
"highway": row.get("highway", "unclassified"),
"lanes": row.get("lanes", 1),
"maxspeed": row.get("maxspeed", "50")
}
# Get geometry if available
if hasattr(row.geometry, 'coords'):
edge_data["geometry"] = list(row.geometry.coords)
network_data["edges"].append(edge_data)
# Save if path provided
if save_path:
save_path = Path(save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'w') as f:
json.dump(network_data, f, indent=2)
# Also save as GraphML for SUMO conversion
graphml_path = save_path.with_suffix('.graphml')
ox.save_graphml(G, graphml_path)
print(f"Network saved to {save_path}")
print(f"GraphML saved to {graphml_path}")
return network_data
except Exception as e:
print(f"Error downloading network: {e}")
return create_fallback_network(latitude, longitude, radius)
def create_fallback_network(
latitude: float,
longitude: float,
radius: int = 200
) -> Dict:
"""
Create a fallback network when OSM download fails.
Creates a simple roundabout structure.
"""
import math
# Create a simple roundabout with 4 approaches
network_data = {
"center": {"lat": latitude, "lng": longitude},
"radius": radius,
"nodes": [],
"edges": [],
"bounds": {
"north": latitude + 0.002,
"south": latitude - 0.002,
"east": longitude + 0.002,
"west": longitude - 0.002
}
}
# Create roundabout nodes (8 points in a circle)
roundabout_radius = 0.0003 # Approximately 30 meters
num_points = 8
for i in range(num_points):
angle = (2 * math.pi * i) / num_points
lat = latitude + roundabout_radius * math.cos(angle)
lng = longitude + roundabout_radius * math.sin(angle)
network_data["nodes"].append({
"id": f"r{i}",
"lat": lat,
"lng": lng,
"type": "roundabout"
})
# Create approach nodes (4 directions)
approach_distance = 0.001 # Approximately 100 meters
approaches = [
("north", latitude + approach_distance, longitude),
("south", latitude - approach_distance, longitude),
("east", latitude, longitude + approach_distance),
("west", latitude, longitude - approach_distance)
]
for name, lat, lng in approaches:
network_data["nodes"].append({
"id": f"a_{name}",
"lat": lat,
"lng": lng,
"type": "approach"
})
# Create roundabout edges (circular)
for i in range(num_points):
next_i = (i + 1) % num_points
network_data["edges"].append({
"from": f"r{i}",
"to": f"r{next_i}",
"length": 20,
"name": "Roundabout",
"highway": "primary",
"lanes": 2
})
# Connect approaches to roundabout
approach_connections = [
("a_north", "r0", "r6"),
("a_east", "r2", "r0"),
("a_south", "r4", "r2"),
("a_west", "r6", "r4")
]
for approach, entry, exit in approach_connections:
# Entry edge
network_data["edges"].append({
"from": approach,
"to": entry,
"length": 80,
"name": "Approach Road",
"highway": "primary",
"lanes": 2
})
# Exit edge
network_data["edges"].append({
"from": exit,
"to": approach,
"length": 80,
"name": "Exit Road",
"highway": "primary",
"lanes": 2
})
return network_data
def extract_intersection_data(
latitude: float,
longitude: float,
radius: int = 100
) -> Dict:
"""
Extract intersection-specific data from OSM.
"""
if not OSMNX_AVAILABLE:
return create_fallback_intersection(latitude, longitude)
try:
# Get features around the point
tags = {"highway": True}
gdf = ox.features_from_point((latitude, longitude), tags, dist=radius)
intersection_data = {
"center": {"lat": latitude, "lng": longitude},
"features": []
}
for idx, row in gdf.iterrows():
feature = {
"type": row.get("highway", "unknown"),
"name": row.get("name", "Unknown")
}
if hasattr(row.geometry, 'centroid'):
feature["lat"] = row.geometry.centroid.y
feature["lng"] = row.geometry.centroid.x
intersection_data["features"].append(feature)
return intersection_data
except Exception as e:
print(f"Error extracting intersection: {e}")
return create_fallback_intersection(latitude, longitude)
def create_fallback_intersection(latitude: float, longitude: float) -> Dict:
"""Create fallback intersection data."""
return {
"center": {"lat": latitude, "lng": longitude},
"type": "roundabout",
"approaches": 4,
"lanes": 2,
"features": [
{"type": "primary", "name": "Main Road North-South"},
{"type": "primary", "name": "Main Road East-West"}
]
}
def create_location_map(
location: Dict,
network_data: Optional[Dict] = None,
save_path: Optional[str] = None
) -> 'folium.Map':
"""
Create an interactive Folium map for the location.
"""
if not FOLIUM_AVAILABLE:
print("Folium not available")
return None
# Create base map
m = folium.Map(
location=[location["latitude"], location["longitude"]],
zoom_start=17,
tiles="OpenStreetMap"
)
# Add center marker
folium.Marker(
location=[location["latitude"], location["longitude"]],
popup=location.get("name", "Location"),
icon=folium.Icon(color="red", icon="info-sign")
).add_to(m)
# Add radius circle
folium.Circle(
location=[location["latitude"], location["longitude"]],
radius=location.get("radius_meters", 200),
color="blue",
fill=True,
fill_opacity=0.1
).add_to(m)
# Add network data if available
if network_data:
# Add nodes
for node in network_data.get("nodes", []):
folium.CircleMarker(
location=[node["lat"], node["lng"]],
radius=3,
color="green",
fill=True
).add_to(m)
# Add edges
for edge in network_data.get("edges", []):
if "geometry" in edge:
# Use actual geometry
coords = [(c[1], c[0]) for c in edge["geometry"]]
folium.PolyLine(
locations=coords,
color="gray",
weight=2
).add_to(m)
# Save if path provided
if save_path:
m.save(save_path)
print(f"Map saved to {save_path}")
return m
def download_all_locations():
"""Download network data for all configured locations."""
# Create output directory
OSM_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Download primary location
print("\n" + "="*50)
print("Downloading primary location...")
print("="*50)
primary_data = download_road_network(
latitude=CASE_STUDY_LOCATION["latitude"],
longitude=CASE_STUDY_LOCATION["longitude"],
radius=CASE_STUDY_LOCATION["radius_meters"],
save_path=OSM_DATA_DIR / "primary_location.json"
)
# Create map for primary location
create_location_map(
CASE_STUDY_LOCATION,
primary_data,
save_path=str(OSM_DATA_DIR / "primary_location_map.html")
)
# Download alternative locations
for loc_key, loc_data in ALTERNATIVE_LOCATIONS.items():
print(f"\n" + "="*50)
print(f"Downloading {loc_data['name']}...")
print("="*50)
network_data = download_road_network(
latitude=loc_data["latitude"],
longitude=loc_data["longitude"],
radius=loc_data["radius_meters"],
save_path=OSM_DATA_DIR / f"{loc_key}.json"
)
create_location_map(
{
"latitude": loc_data["latitude"],
"longitude": loc_data["longitude"],
"radius_meters": loc_data["radius_meters"],
"name": loc_data["name"]
},
network_data,
save_path=str(OSM_DATA_DIR / f"{loc_key}_map.html")
)
print("\n" + "="*50)
print("All locations downloaded successfully!")
print("="*50)
if __name__ == "__main__":
download_all_locations()