|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import geopandas as gpd |
|
|
import osmnx as ox |
|
|
import folium |
|
|
from shapely.geometry import LineString, MultiLineString, Point |
|
|
from shapely import ops |
|
|
from sklearn.neighbors import BallTree |
|
|
from typing import Tuple, List |
|
|
from .utils import get_coordinates_from_network, sort_gps_by_greedy_path, add_weather_to_df |
|
|
from .mock_predictor import MockTrafficPredictor |
|
|
from geopy.distance import geodesic |
|
|
import re |
|
|
import math |
|
|
from datetime import datetime |
|
|
import sys |
|
|
import os |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
from model_v3.predict_road import RoadPredictor |
|
|
|
|
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
MODEL_PATH = os.path.join(PROJECT_ROOT, "model_v3", "final_lstm.pt") |
|
|
ENCODER_PATH = os.path.join(PROJECT_ROOT, "model_v3", "final_encoder.pkl") |
|
|
|
|
|
|
|
|
if not os.path.exists(MODEL_PATH): |
|
|
raise FileNotFoundError(f"Model file not found at: {MODEL_PATH}") |
|
|
if not os.path.exists(ENCODER_PATH): |
|
|
raise FileNotFoundError(f"Encoder file not found at: {ENCODER_PATH}") |
|
|
|
|
|
DIST_THRESHOLD_METERS_MAX = 1200 |
|
|
DIST_THRESHOLD_METERS_MIN = 10 |
|
|
|
|
|
|
|
|
class RoadMapManager: |
|
|
|
|
|
def __init__(self, city: str,bbox: Tuple[float,float,float,float], base_data_dir: str = "data"): |
|
|
self.city = city |
|
|
self.bbox = bbox |
|
|
self.base_data_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", base_data_dir)) |
|
|
self.city_path = os.path.join(self.base_data_dir, self.city) |
|
|
self.coordinates_path = os.path.join(self.city_path, 'coordinates') |
|
|
self.roads_path = os.path.join(self.city_path, 'roads') |
|
|
self.road_network_path = os.path.join(self.city_path, 'maps') |
|
|
self.visualizations_path = os.path.join(self.city_path, 'visualizations') |
|
|
|
|
|
self.roads = dict() |
|
|
|
|
|
self._validate_structure() |
|
|
self._load_road_network(self.bbox) |
|
|
|
|
|
|
|
|
def _validate_structure(self): |
|
|
for path in [self.coordinates_path, self.roads_path, self.road_network_path, self.visualizations_path]: |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
@staticmethod |
|
|
def split_road_name_direction(road_name: str) -> Tuple[str, str]: |
|
|
parts = road_name.split() |
|
|
return " ".join(parts[:-1]), parts[-1] |
|
|
|
|
|
def set_roads(self, roads: List[str]): |
|
|
os.makedirs(self.coordinates_path, exist_ok=True) |
|
|
|
|
|
for road in roads: |
|
|
road_name, direction = self.split_road_name_direction(road) |
|
|
file_name = f"{road_name} {direction}.csv" |
|
|
file_path = os.path.join(self.coordinates_path,file_name) |
|
|
|
|
|
if os.path.exists(file_path): |
|
|
print(f"DataFrame for {road_name} - {direction} already exists") |
|
|
df = pd.read_csv(file_path) |
|
|
else: |
|
|
print(f"Downloading DataFrame for {road_name} - {direction}") |
|
|
df = get_coordinates_from_network(self.road_network, road_name, direction) |
|
|
df.to_csv(file_path, index=False) |
|
|
|
|
|
self.roads[(road_name,direction)] = df |
|
|
|
|
|
def get_roads(self): |
|
|
""" |
|
|
Used for testing |
|
|
""" |
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
print(f"road name: {road_name} - {direction}") |
|
|
print(df.head()) |
|
|
print("\n" + "-"*40 + "\n") |
|
|
|
|
|
def _load_road_network(self, bbox: Tuple[float,float,float,float]): |
|
|
network_filename = f"{self.city.replace(' ', '_')}_network.graphml" |
|
|
network_path = os.path.join(self.road_network_path, network_filename) |
|
|
|
|
|
if os.path.exists(network_path): |
|
|
print("map already exists") |
|
|
self.road_network = ox.load_graphml(network_path) |
|
|
else: |
|
|
print("Downloading map") |
|
|
self.road_network = ox.graph_from_bbox( |
|
|
bbox=bbox, |
|
|
network_type='drive' |
|
|
) |
|
|
|
|
|
self.road_network = ox.bearing.add_edge_bearings(self.road_network) |
|
|
|
|
|
ox.save_graphml(self.road_network, filepath=network_path) |
|
|
|
|
|
def apply_prediction_data(self, predict_time: datetime | None = None): |
|
|
""" |
|
|
Needed data to predict: |
|
|
Gather data about weather in current day in time for each point |
|
|
Speed limit - from the road network |
|
|
Road name, Direction - key of the coordinates dict |
|
|
Cooridnate from the coordinate df |
|
|
Lanes - either from coordinate if given else from road network |
|
|
Time - input from user |
|
|
""" |
|
|
predictions = {} |
|
|
road_predictor = RoadPredictor(MODEL_PATH, ENCODER_PATH) |
|
|
for (road_name, direction) in self.roads.keys(): |
|
|
road_under = road_name.replace(" ", "_") |
|
|
df = pd.read_csv(os.path.join(self.roads_path, f"{road_under}_{direction.lower()}.csv.gz"), compression='gzip') |
|
|
predictions[(road_name, direction)] = road_predictor.predict_road_speeds(df, road_name, direction, predict_time) |
|
|
|
|
|
|
|
|
self._map_predictions_to_roads(predictions) |
|
|
|
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
print(df.head()) |
|
|
df = sort_gps_by_greedy_path(df) |
|
|
self.roads[(road_name, direction)] = df |
|
|
|
|
|
def _map_predictions_to_roads(self, predictions: dict): |
|
|
""" |
|
|
Map predicted speeds to the closest points in self.roads coordinates. |
|
|
|
|
|
Args: |
|
|
predictions: Dictionary with (road_name, direction) keys and prediction DataFrames as values |
|
|
""" |
|
|
from sklearn.neighbors import BallTree |
|
|
|
|
|
for (road_name, direction), road_df in self.roads.items(): |
|
|
if (road_name, direction) not in predictions: |
|
|
print(f"No predictions found for {road_name} {direction}") |
|
|
continue |
|
|
|
|
|
pred_df = predictions[(road_name, direction)] |
|
|
|
|
|
if pred_df.empty: |
|
|
print(f"Empty predictions for {road_name} {direction}") |
|
|
continue |
|
|
|
|
|
|
|
|
road_coords = road_df[['Latitude', 'Longitude']].values |
|
|
|
|
|
|
|
|
pred_coords = pred_df[['Latitude', 'Longitude']].values |
|
|
|
|
|
|
|
|
|
|
|
road_coords_rad = np.radians(road_coords) |
|
|
pred_coords_rad = np.radians(pred_coords) |
|
|
|
|
|
tree = BallTree(pred_coords_rad, metric='haversine') |
|
|
|
|
|
|
|
|
distances, indices = tree.query(road_coords_rad, k=1) |
|
|
|
|
|
|
|
|
distances_meters = distances.flatten() * 6371000 |
|
|
|
|
|
|
|
|
closest_pred_speeds = pred_df.iloc[indices.flatten()]['predicted_speed'].values |
|
|
|
|
|
|
|
|
if 'real_speed' in pred_df.columns: |
|
|
closest_real_speeds = pred_df.iloc[indices.flatten()]['real_speed'].values |
|
|
road_df['real_speed'] = closest_real_speeds |
|
|
else: |
|
|
road_df['real_speed'] = None |
|
|
|
|
|
|
|
|
road_df['predicted_speed'] = closest_pred_speeds |
|
|
road_df['prediction_distance_m'] = distances_meters |
|
|
|
|
|
|
|
|
road_df['speed'] = road_df['predicted_speed'] |
|
|
|
|
|
|
|
|
max_distance_threshold = 1000 |
|
|
far_points = distances_meters > max_distance_threshold |
|
|
|
|
|
if far_points.any(): |
|
|
print(f"Warning: {far_points.sum()} points in {road_name} {direction} are >{max_distance_threshold}m from predictions") |
|
|
|
|
|
road_df.loc[far_points, 'predicted_speed'] = road_df.loc[~far_points, 'predicted_speed'].mean() |
|
|
road_df.loc[far_points, 'speed'] = road_df.loc[~far_points, 'speed'].mean() |
|
|
|
|
|
print(f"Mapped predictions for {road_name} {direction}: " |
|
|
f"{len(road_df)} points, avg distance: {distances_meters.mean():.1f}m") |
|
|
|
|
|
def get_prediction_statistics(self) -> dict: |
|
|
""" |
|
|
Get statistics about the prediction mapping for all roads. |
|
|
|
|
|
Returns: |
|
|
Dictionary with statistics for each road |
|
|
""" |
|
|
stats = {} |
|
|
|
|
|
for (road_name, direction), road_df in self.roads.items(): |
|
|
if 'predicted_speed' not in road_df.columns: |
|
|
continue |
|
|
|
|
|
stats[(road_name, direction)] = { |
|
|
'total_points': len(road_df), |
|
|
'avg_predicted_speed': road_df['predicted_speed'].mean(), |
|
|
'min_predicted_speed': road_df['predicted_speed'].min(), |
|
|
'max_predicted_speed': road_df['predicted_speed'].max(), |
|
|
'avg_distance_to_prediction': road_df.get('prediction_distance_m', pd.Series([0])).mean(), |
|
|
'max_distance_to_prediction': road_df.get('prediction_distance_m', pd.Series([0])).max(), |
|
|
'points_with_predictions': road_df['predicted_speed'].notna().sum() |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def print_prediction_summary(self): |
|
|
"""Print a summary of prediction statistics for all roads.""" |
|
|
stats = self.get_prediction_statistics() |
|
|
|
|
|
if not stats: |
|
|
print("No prediction statistics available. Run apply_prediction_data() first.") |
|
|
return |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("PREDICTION MAPPING SUMMARY") |
|
|
print("="*80) |
|
|
|
|
|
for (road_name, direction), stat in stats.items(): |
|
|
print(f"\n{road_name} {direction}:") |
|
|
print(f" Points: {stat['points_with_predictions']}/{stat['total_points']}") |
|
|
print(f" Speed: {stat['avg_predicted_speed']:.1f} mph (range: {stat['min_predicted_speed']:.1f}-{stat['max_predicted_speed']:.1f})") |
|
|
print(f" Avg distance to prediction: {stat['avg_distance_to_prediction']:.1f}m") |
|
|
print(f" Max distance to prediction: {stat['max_distance_to_prediction']:.1f}m") |
|
|
|
|
|
|
|
|
|
|
|
def draw_map(self): |
|
|
|
|
|
def get_color(speed, max_speed): |
|
|
if speed >= 0.85 * max_speed: |
|
|
return '#00FF00' |
|
|
elif speed >= 0.55 * max_speed: |
|
|
return '#FFA500' |
|
|
else: |
|
|
return '#FF0000' |
|
|
|
|
|
center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
|
|
center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
|
|
|
m = folium.Map( |
|
|
location=[center_lat, center_lon], |
|
|
zoom_start=13, |
|
|
tiles='CartoDB dark_matter' |
|
|
) |
|
|
|
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
for i in range(len(df) - 1): |
|
|
lat1, lon1, speed1 = df.loc[i, ['Latitude', 'Longitude', 'speed']] |
|
|
lat2, lon2, speed2 = df.loc[i+1, ['Latitude', 'Longitude', 'speed']] |
|
|
raw_speed = df.loc[i, 'maxspeed'] |
|
|
match = re.search(r'\d+', str(raw_speed)) |
|
|
if match: |
|
|
max_speed = float(match.group()) |
|
|
else: |
|
|
max_speed = 60 |
|
|
|
|
|
dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
|
|
if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
|
|
continue |
|
|
|
|
|
avg_speed = (speed1 + speed2) / 2 |
|
|
color = get_color(avg_speed,max_speed) |
|
|
|
|
|
folium.PolyLine( |
|
|
locations=[(lat1, lon1), (lat2, lon2)], |
|
|
color=color, |
|
|
weight=1, |
|
|
opacity=0.9 |
|
|
).add_to(m) |
|
|
|
|
|
output_path = os.path.join(self.visualizations_path, "sorted_path.html") |
|
|
m.save(output_path) |
|
|
print("Saved map with distance filtering to 'sorted_path.html'") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def draw_map_offset(self): |
|
|
|
|
|
def get_color(speed, max_speed): |
|
|
if speed >= 0.85 * max_speed: |
|
|
return '#00FF00' |
|
|
elif speed >= 0.55 * max_speed: |
|
|
return '#FFA500' |
|
|
else: |
|
|
return '#FF0000' |
|
|
|
|
|
def get_maxspeed(raw_speed): |
|
|
match = re.search(r'\d+', str(raw_speed)) |
|
|
return float(match.group()) if match else 60 |
|
|
|
|
|
def apply_offset(lat, lon, bearing, direction): |
|
|
"""Offset lat/lon a little perpendicular to bearing, based on direction.""" |
|
|
offset_meters = -600 if direction.lower() in ["north", "east"] else 600 |
|
|
|
|
|
|
|
|
angle_rad = math.radians((bearing + 90) % 360) |
|
|
delta_lat = offset_meters * math.cos(angle_rad) / 111111 |
|
|
delta_lon = offset_meters * math.sin(angle_rad) / (111111 * math.cos(math.radians(lat))) |
|
|
|
|
|
return lat + delta_lat, lon + delta_lon |
|
|
|
|
|
|
|
|
center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
|
|
center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
|
|
|
m = folium.Map( |
|
|
location=[center_lat, center_lon], |
|
|
zoom_start=13, |
|
|
tiles='CartoDB dark_matter' |
|
|
) |
|
|
|
|
|
|
|
|
road_groups = {} |
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
road_groups.setdefault(road_name, {})[direction] = df |
|
|
|
|
|
for road_name, direction_map in road_groups.items(): |
|
|
for direction, df in direction_map.items(): |
|
|
for i in range(len(df) - 1): |
|
|
lat1, lon1, speed1 = df.loc[i, ['Latitude', 'Longitude', 'speed']] |
|
|
lat2, lon2, speed2 = df.loc[i + 1, ['Latitude', 'Longitude', 'speed']] |
|
|
raw_speed = df.loc[i, 'maxspeed'] |
|
|
max_speed = get_maxspeed(raw_speed) |
|
|
bearing = df.loc[i, 'bearing'] if 'bearing' in df.columns else 0 |
|
|
|
|
|
dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
|
|
if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
|
|
continue |
|
|
|
|
|
avg_speed = (speed1 + speed2) / 2 |
|
|
color = get_color(avg_speed, max_speed) |
|
|
|
|
|
|
|
|
has_opposite = len(direction_map) > 1 |
|
|
if has_opposite: |
|
|
lat1, lon1 = apply_offset(lat1, lon1, bearing, direction) |
|
|
lat2, lon2 = apply_offset(lat2, lon2, bearing, direction) |
|
|
|
|
|
folium.PolyLine( |
|
|
locations=[(lat1, lon1), (lat2, lon2)], |
|
|
color=color, |
|
|
weight=2, |
|
|
opacity=0.95 |
|
|
).add_to(m) |
|
|
|
|
|
output_path = os.path.join(self.visualizations_path, "direction_offset_map.html") |
|
|
m.save(output_path) |
|
|
print("✅ Saved map with directional offsets to 'direction_offset_map.html'") |
|
|
return m |
|
|
|
|
|
def draw_map_with_real_speed(self): |
|
|
""" |
|
|
Draw map using real speed data instead of predicted speed. |
|
|
""" |
|
|
def get_color(speed, max_speed): |
|
|
if speed >= 0.85 * max_speed: |
|
|
return '#00FF00' |
|
|
elif speed >= 0.55 * max_speed: |
|
|
return '#FFA500' |
|
|
else: |
|
|
return '#FF0000' |
|
|
|
|
|
def get_maxspeed(raw_speed): |
|
|
match = re.search(r'\d+', str(raw_speed)) |
|
|
return float(match.group()) if match else 60 |
|
|
|
|
|
def apply_offset(lat, lon, bearing, direction): |
|
|
"""Offset lat/lon a little perpendicular to bearing, based on direction.""" |
|
|
offset_meters = -600 if direction.lower() in ["north", "east"] else 600 |
|
|
|
|
|
|
|
|
angle_rad = math.radians((bearing + 90) % 360) |
|
|
delta_lat = offset_meters * math.cos(angle_rad) / 111111 |
|
|
delta_lon = offset_meters * math.sin(angle_rad) / (111111 * math.cos(math.radians(lat))) |
|
|
|
|
|
return lat + delta_lat, lon + delta_lon |
|
|
|
|
|
|
|
|
center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
|
|
center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
|
|
|
m = folium.Map( |
|
|
location=[center_lat, center_lon], |
|
|
zoom_start=13, |
|
|
tiles='CartoDB dark_matter' |
|
|
) |
|
|
|
|
|
|
|
|
road_groups = {} |
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
road_groups.setdefault(road_name, {})[direction] = df |
|
|
|
|
|
for road_name, direction_map in road_groups.items(): |
|
|
for direction, df in direction_map.items(): |
|
|
for i in range(len(df) - 1): |
|
|
lat1, lon1 = df.loc[i, ['Latitude', 'Longitude']] |
|
|
lat2, lon2 = df.loc[i + 1, ['Latitude', 'Longitude']] |
|
|
|
|
|
|
|
|
if 'real_speed' in df.columns and pd.notna(df.loc[i, 'real_speed']): |
|
|
speed1 = df.loc[i, 'real_speed'] |
|
|
speed2 = df.loc[i + 1, 'real_speed'] if i + 1 < len(df) and pd.notna(df.loc[i + 1, 'real_speed']) else speed1 |
|
|
else: |
|
|
speed1 = df.loc[i, 'speed'] |
|
|
speed2 = df.loc[i + 1, 'speed'] |
|
|
|
|
|
raw_speed = df.loc[i, 'maxspeed'] |
|
|
max_speed = get_maxspeed(raw_speed) |
|
|
bearing = df.loc[i, 'bearing'] if 'bearing' in df.columns else 0 |
|
|
|
|
|
dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
|
|
if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
|
|
continue |
|
|
|
|
|
avg_speed = (speed1 + speed2) / 2 |
|
|
color = get_color(avg_speed, max_speed) |
|
|
|
|
|
|
|
|
has_opposite = len(direction_map) > 1 |
|
|
if has_opposite: |
|
|
lat1, lon1 = apply_offset(lat1, lon1, bearing, direction) |
|
|
lat2, lon2 = apply_offset(lat2, lon2, bearing, direction) |
|
|
|
|
|
folium.PolyLine( |
|
|
locations=[(lat1, lon1), (lat2, lon2)], |
|
|
color=color, |
|
|
weight=2, |
|
|
opacity=0.95 |
|
|
).add_to(m) |
|
|
|
|
|
output_path = os.path.join(self.visualizations_path, "real_speed_map.html") |
|
|
m.save(output_path) |
|
|
print("✅ Saved map with real speed data to 'real_speed_map.html'") |
|
|
return m |
|
|
|
|
|
def draw_side_by_side_maps(self): |
|
|
""" |
|
|
Create side-by-side maps showing both predicted and real speeds. |
|
|
Returns a tuple of (predicted_map, real_map) for use in Streamlit. |
|
|
""" |
|
|
|
|
|
predicted_map = self.draw_map_offset() |
|
|
|
|
|
|
|
|
real_map = self.draw_map_with_real_speed() |
|
|
|
|
|
return predicted_map, real_map |
|
|
|
|
|
|
|
|
""" |
|
|
mock_predictor = MockTrafficPredictor({ |
|
|
'I 405 North': 'moderate', |
|
|
'I 405 South': 'free', |
|
|
'US 101 North': 'busy', |
|
|
'US 101 South': 'free', |
|
|
'I 5 North': 'busy', |
|
|
'I 5 South': 'free', |
|
|
'I 10 East': 'moderate', |
|
|
'I 10 West': 'moderate', |
|
|
'I 110 North': 'busy', |
|
|
'I 110 South': 'busy', |
|
|
'CA 110 North': 'busy', |
|
|
'CA 110 South': 'busy', |
|
|
'CA 170 North': 'moderate', |
|
|
'CA 170 South': 'free', |
|
|
'CA 118 East': 'free', |
|
|
'CA 118 West': 'free', |
|
|
'CA 134 East': 'moderate', |
|
|
'CA 134 West': 'free', |
|
|
'CA 2 North': 'moderate', |
|
|
'CA 2 South': 'moderate', |
|
|
'I 605 North': 'busy', |
|
|
'I 605': 'free', |
|
|
'I 210 East' : 'free', |
|
|
'I 210 West' : 'busy' |
|
|
}) |
|
|
|
|
|
if predict_time is None: |
|
|
predict_time = datetime.now() |
|
|
|
|
|
for (road_name, direction), df in self.roads.items(): |
|
|
#self.roads[(road_name, direction)] = add_weather_to_df(self.roads[(road_name, direction)], time = predict_time) |
|
|
print(f"Mocking for {road_name} - {direction}") |
|
|
df = mock_predictor.predict(df) |
|
|
print(df.head()) |
|
|
df = sort_gps_by_greedy_path(df) |
|
|
self.roads[(road_name, direction)] = df |
|
|
|
|
|
""" |