import os import math import folium import folium.plugins import xml.etree.ElementTree as ET import heapq from flask import Flask, render_template, request, jsonify, session from werkzeug.utils import secure_filename import importlib.util import sys import json import copy # Torch and image libraries import torch import torch.nn as nn from torchvision import models import torchvision.transforms as transforms from PIL import Image SEVERITY_MULTIPLIERS = { "none": 1.0, "minor": 2.0, "moderate": 3.0, "severe": 4.0 } # Global variables for default map data (for Hugging Face deployment) DEFAULT_MAP_DATA = { 'nodes': None, 'ways': None, 'cameras': None, 'meta': None, 'osm_nodes': None, 'graph': None, 'txt_path': None, 'osm_path': None } # In-memory cache for uploaded map data (survives across requests) UPLOADED_MAP_CACHE = { 'nodes': None, 'ways': None, 'cameras': None, 'meta': None, 'osm_nodes': None, 'graph': None, 'txt_path': None, 'osm_path': None, 'timestamp': None } # Global accident store (survives across requests) GLOBAL_ACCIDENTS = {} # ===================================================== # Image preprocessing (MUST match training - ImageNet normalization) # ===================================================== accident_transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize( mean=[0.485, 0.456, 0.406], # ImageNet mean std=[0.229, 0.224, 0.225] # ImageNet std ) ]) MODEL_DIR = "model/saved_models/" # Based on ur directory structure, load the appropriate model def load_model(model_name): FILE_MAP = { "mobilenet": "mobilenet/version_2/mobilenet_v2_final.pth", "resnet": "resnet/version_2/resnet18_final.pth", "efficientnet": "efficientnet/version_2/efficientnet_b0_final.pth" } path = MODEL_DIR + FILE_MAP[model_name] # Load the saved state_dict first state = torch.load(path, map_location="cpu") # ------------------------------- # MOBILENET V2 # ------------------------------- if model_name == "mobilenet": model = models.mobilenet_v2(weights=None) # Check if trained with Sequential head if "classifier.2.weight" in state: model.classifier = nn.Sequential( nn.Dropout(0.2), nn.Linear(1280, 256), nn.ReLU(), nn.Linear(256, 4) ) else: model.classifier[1] = nn.Linear(1280, 4) model.load_state_dict(state) # ------------------------------- # RESNET18 # ------------------------------- elif model_name == "resnet": model = models.resnet18(weights=None) state = torch.load(path, map_location="cpu") # Detect YOUR exact architecture (ReLU + Linear) if "fc.1.weight" in state and "fc.0.weight" not in state: model.fc = nn.Sequential( nn.ReLU(), # fc.0 nn.Linear(512, 4) # fc.1 ) else: model.fc = nn.Linear(512, 4) model.load_state_dict(state) # ------------------------------- # EFFICIENTNET B0 # ------------------------------- elif model_name == "efficientnet": model = models.efficientnet_b0(weights=None) # EfficientNet uses classifier[1] normally, but # if Sequential was used in training β†’ build it again if "classifier.2.weight" in state: model.classifier = nn.Sequential( nn.Dropout(0.2), nn.Linear(1280, 256), nn.ReLU(), nn.Linear(256, 4) ) else: model.classifier[1] = nn.Linear(1280, 4) model.load_state_dict(state) else: raise ValueError("Unknown model type") model.eval() print(f"βœ… Loaded model: {path}") return model # ===================================================== # Load models once on server start # ===================================================== mobilenet_model = load_model("mobilenet") resnet_model = load_model("resnet") # Load EfficientNet if available; fall back gracefully try: efficientnet_model = load_model("efficientnet") except Exception as e: print("Failed to load EfficientNet model:", e) efficientnet_model = None # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def load_default_map_data(): """Load default map files if they exist (optional - for convenience)""" default_txt = 'heritage_assignment_15_time_asymmetric-1.txt' default_osm = 'map.osm' if os.path.exists(default_txt) and os.path.exists(default_osm): try: print(f"πŸ—ΊοΈ Loading default map files: {default_txt}, {default_osm}") nodes, ways, cameras, meta = parse_map_data_file(default_txt) osm_nodes, graph = load_osm_graph(default_osm) DEFAULT_MAP_DATA['nodes'] = nodes DEFAULT_MAP_DATA['ways'] = ways DEFAULT_MAP_DATA['cameras'] = cameras DEFAULT_MAP_DATA['meta'] = meta DEFAULT_MAP_DATA['osm_nodes'] = osm_nodes DEFAULT_MAP_DATA['graph'] = graph DEFAULT_MAP_DATA['txt_path'] = default_txt DEFAULT_MAP_DATA['osm_path'] = default_osm print(f"βœ… Default map loaded: {len(nodes)} nodes, {len(ways)} ways") return True except Exception as e: print(f"⚠️ Failed to load default map: {e}") return False else: print("ℹ️ No default map files found (optional). Users will upload their own files.") return False def get_current_map_data(): """Get current map data - from cache (uploaded) or default map""" import copy # Priority 1: Check uploaded files cache FIRST (regardless of session) if UPLOADED_MAP_CACHE['nodes'] is not None: # ALWAYS use cached ways (which includes accidents) - don't rely on session ways = copy.deepcopy(UPLOADED_MAP_CACHE['ways']) accidents_count = sum(1 for w in ways if w.get('accident', False)) print(f"βœ… Using uploaded map cache (accidents applied: {accidents_count})") return ( UPLOADED_MAP_CACHE['nodes'], ways, UPLOADED_MAP_CACHE['cameras'], UPLOADED_MAP_CACHE['meta'], UPLOADED_MAP_CACHE['osm_nodes'], UPLOADED_MAP_CACHE['graph'], UPLOADED_MAP_CACHE['txt_path'], UPLOADED_MAP_CACHE['osm_path'] ) # Priority 2: Use default map if available if DEFAULT_MAP_DATA['nodes'] is not None: # ALWAYS use cached ways (which includes accidents) - don't rely on session ways = copy.deepcopy(DEFAULT_MAP_DATA['ways']) accidents_count = sum(1 for w in ways if w.get('accident', False)) print(f"βœ… Using default map (accidents applied: {accidents_count})") return ( DEFAULT_MAP_DATA['nodes'], ways, DEFAULT_MAP_DATA['cameras'], DEFAULT_MAP_DATA['meta'], DEFAULT_MAP_DATA['osm_nodes'], DEFAULT_MAP_DATA['graph'], DEFAULT_MAP_DATA['txt_path'], DEFAULT_MAP_DATA['osm_path'] ) # Priority 3: Try loading from file paths in session (last resort) txt_path = session.get('txt_path') osm_path = session.get('osm_path') if txt_path and osm_path and os.path.exists(txt_path) and os.path.exists(osm_path): try: print(f"⚠️ Loading from disk: {txt_path}, {osm_path}") nodes, ways, cameras, meta = parse_map_data_file(txt_path) osm_nodes, graph = load_osm_graph(osm_path) return nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path except Exception as e: print(f"❌ Error loading from disk: {e}") print("❌ No map data available") print(f" - Uploaded cache: {UPLOADED_MAP_CACHE['nodes'] is not None}") print(f" - Default map: {DEFAULT_MAP_DATA['nodes'] is not None}") print(f" - Session: {dict(session)}") return None, None, None, None, None, None, None, None def load_func_from_path(path, func_name): """Dynamically load a function from a Python file""" # Prepare module metadata so we can import a function from an arbitrary file module_dir = os.path.dirname(path) spec = importlib.util.spec_from_file_location(func_name, path) mod = importlib.util.module_from_spec(spec) added = False try: # Insert the module directory temporarily to resolve relative imports if module_dir not in sys.path: sys.path.insert(0, module_dir) added = True # Execute the module so the requested function becomes available spec.loader.exec_module(mod) finally: if added: try: # Always clean up sys.path even if the import fails sys.path.remove(module_dir) except ValueError: pass return getattr(mod, func_name) def split_csv_allow_commas(line, min_fields): """Parse CSV line while respecting commas inside parentheses""" # Accumulate parsed pieces and track parentheses depth to honor commas inside pairs parts = [] buf = [] depth = 0 for ch in line: if ch == '(': # Increase depth because commas inside parentheses must be preserved depth += 1 buf.append(ch) elif ch == ')': # Drop depth (with floor at 0) once a closing parenthesis is seen depth = max(depth - 1, 0) buf.append(ch) elif ch == ',': if depth == 0: # Split on commas only if we are not inside parentheses parts.append("".join(buf).strip()) buf = [] else: buf.append(ch) else: buf.append(ch) if buf: # Flush the final buffer after iterating through the line parts.append("".join(buf).strip()) if len(parts) < min_fields: raise ValueError(f"Line '{line}' parsed into too few fields: {parts}") return parts def haversine_km(lat1, lon1, lat2, lon2): """Calculate distance between two coordinates in kilometers""" # Earth radius in kilometers for the haversine formula R = 6371.0 la1, la2 = math.radians(lat1), math.radians(lat2) dla = la2 - la1 dlo = math.radians(lon2 - lon1) # Classic haversine distance computation with spherical adjustment a = math.sin(dla/2)**2 + math.cos(la1)*math.cos(la2)*math.sin(dlo/2)**2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) def is_header(line): """Check if line is a section header""" # Headers are bracketed (e.g., [NODES]) in the custom format return line.startswith("[") and line.endswith("]") def ignore_line(line): """Check if line should be ignored""" # Skip blank lines and comments when reading the dataset return (not line.strip()) or line.strip().startswith("#") # ============================================================================ # FILE PARSING FUNCTIONS # ============================================================================ def parse_map_data_file(path): """Parse the custom map data file format""" # Track which section we are reading so each line is interpreted correctly section = None nodes = {} ways = [] cameras = {} meta = {"start": None, "goals": [], "accident_multiplier": None} with open(path, "r", encoding="utf-8") as f: for raw in f: line = raw.strip() if ignore_line(line): continue if is_header(line): # Switch parsing mode whenever a new header is encountered section = line.upper() continue if section == "[NODES]": p = split_csv_allow_commas(line, 4) nid, lat, lon, label = p[0], float(p[1]), float(p[2]), p[3] nodes[nid] = {"lat": lat, "lon": lon, "label": label} elif section == "[WAYS]": p = split_csv_allow_commas(line, 6) time_val = float(p[5]) ways.append({ "way_id": p[0], "from": p[1], "to": p[2], "road_name": p[3], "highway_type": p[4], "time_min": time_val, "original_time": time_val, "accident": False }) elif section == "[CAMERAS]": p = split_csv_allow_commas(line, 2) cameras[p[0]] = p[1] elif section == "[META]": p = [x.strip() for x in line.split(",")] key = p[0].upper() # Populate meta information that influences visualization if key == "START": meta["start"] = p[1] elif key == "GOAL": meta["goals"] = p[1:] elif key == "ACCIDENT_MULTIPLIER": meta["accident_multiplier"] = float(p[1]) return nodes, ways, cameras, meta def load_osm_graph(osm_path): """Parse OSM file and build graph structure""" tree = ET.parse(osm_path) root = tree.getroot() # Extract all nodes osm_nodes = {} for n in root.findall("node"): nid = n.attrib["id"] lat = float(n.attrib["lat"]) lon = float(n.attrib["lon"]) osm_nodes[nid] = (lat, lon) # Build adjacency graph graph = {nid: [] for nid in osm_nodes} for w in root.findall("way"): nd_refs = [nd.attrib["ref"] for nd in w.findall("nd")] tags = {t.attrib.get("k"): t.attrib.get("v") for t in w.findall("tag")} if "highway" not in tags: # Only keep ways that represent roads continue for i in range(len(nd_refs) - 1): a, b = nd_refs[i], nd_refs[i+1] if a in osm_nodes and b in osm_nodes: lat1, lon1 = osm_nodes[a] lat2, lon2 = osm_nodes[b] # Use haversine distance to approximate segment cost dist = haversine_km(lat1, lon1, lat2, lon2) graph[a].append((b, dist)) graph[b].append((a, dist)) # Remove isolated nodes isolated = [nid for nid, nbrs in graph.items() if len(nbrs) == 0] for nid in isolated: del graph[nid] return osm_nodes, graph # ============================================================================ # GRAPH ALGORITHM FUNCTIONS # ============================================================================ def k_nearest_graph_nodes(lat, lon, osm_nodes, graph, k=5): """Find k nearest nodes in the graph to given coordinates""" # Maintain (distance squared, node id) pairs to avoid repetitive sqrt candidates = [] for nid, (nlat, nlon) in osm_nodes.items(): if nid not in graph: continue d_lat = lat - nlat d_lon = lon - nlon d2 = d_lat*d_lat + d_lon*d_lon candidates.append((d2, nid)) candidates.sort(key=lambda x: x[0]) return [nid for d2, nid in candidates[:k]] def dijkstra_path(graph, start_id, goal_id): """Find shortest path using Dijkstra's algorithm""" dist = {start_id: 0.0} prev = {} pq = [(0.0, start_id)] visited = set() while pq: cur_d, cur = heapq.heappop(pq) if cur in visited: continue visited.add(cur) if cur == goal_id: break for nbr, w in graph.get(cur, []): # Relax neighboring nodes when a shorter distance is discovered nd = cur_d + w if nbr not in dist or nd < dist[nbr]: dist[nbr] = nd prev[nbr] = cur heapq.heappush(pq, (nd, nbr)) if goal_id not in dist: return None # Reconstruct path path = [] node = goal_id while True: path.append(node) if node == start_id: break node = prev.get(node) if node is None: return None path.reverse() return path def path_length_km(path, graph): """Calculate total length of a path in kilometers""" if not path or len(path) < 2: return 0.0 total = 0.0 for a, b in zip(path[:-1], path[1:]): for nbr, w in graph.get(a, []): if nbr == b: # Accumulate edge weights that correspond to sequential nodes total += w break return total def build_edges_map(ways): """Build edge dictionary from ways list""" # Store travel time for each directed edge for quick lookup during search edges = {} for way in ways: edges[(way["from"], way["to"])] = { "time": way["time_min"], "accident": way.get("accident", False) } return edges # ============================================================================ # VISUALIZATION FUNCTIONS # ============================================================================ def color_for_highway(hwy_type): """Return color code for highway type""" h = (hwy_type or "").lower() colors = { "primary": "deepskyblue", "secondary": "purple", "tertiary": "darkblue", "service": "slategray" } # Default to white for unknown highway types return colors.get(h, "white") def get_map_center(nodes): """Calculate center point of all nodes""" lats = [info["lat"] for info in nodes.values()] lons = [info["lon"] for info in nodes.values()] return sum(lats)/len(lats), sum(lons)/len(lons) GAP_TO_NODE_TOLERANCE_M = 2.5 # meters of acceptable gap between OSM snap and custom node ROUTE_NODE_CLEARANCE_M = 4.0 # keep polylines slightly off the start/goal markers NODE_OVERLAP_PRUNE_M = 0.75 # drop lat/lng points that sit almost exactly on top of a node def register_arrow_segment(map_obj, coords, color, speed_mps=120): """Record a polyline for animated arrow rendering on the client.""" if not coords or len(coords) < 2: return if not hasattr(map_obj, "_arrow_segments"): map_obj._arrow_segments = [] map_obj._arrow_segments.append({ "coords": coords, "color": color, "speed": speed_mps }) def inject_arrow_animation(map_obj, base_speed_mps=120): """Inject a client-side script that animates arrows along stored segments.""" segments = getattr(map_obj, "_arrow_segments", []) if not segments: return payload = { "mapName": map_obj.get_name(), "segments": segments, "baseSpeed": base_speed_mps } arrow_script = """ """ map_obj.get_root().html.add_child( folium.Element(arrow_script.replace("__ARROW_CONFIG__", json.dumps(payload))) ) # Route/search playback injection def inject_route_playback(map_obj, config_payload): """Add client-side route/search step playback controls to a Folium map.""" if not config_payload: return script_template = """ """ animation_script = script_template.replace("__CONFIG__", json.dumps(config_payload)) map_obj.get_root().html.add_child(folium.Element(animation_script)) def create_polyline(locations, color, weight, dash_array, opacity, tooltip_text, popup_html, map_obj, add_arrow=True): """Helper function to create polylines with consistent styling and optional arrows""" # Centralized folium polyline creation ensures consistent style arguments line = folium.PolyLine( locations=locations, color=color, weight=weight, dash_array=dash_array, opacity=opacity, tooltip=tooltip_text, popup=folium.Popup(popup_html, max_width=250) ).add_to(map_obj) # Capture this segment for client-side arrow animation if add_arrow and len(locations) >= 2: register_arrow_segment(map_obj, locations, color) return line def extend_path_to_custom_nodes(latlngs, start_coord, end_coord, tolerance_m=GAP_TO_NODE_TOLERANCE_M): """Ensure rendered polylines reach the custom nodes but stop just outside the markers.""" if not latlngs: return [start_coord, end_coord] extended = list(latlngs) def needs_connector(coord_a, coord_b): if coord_a is None or coord_b is None: return False # Compare geodesic distance and determine if a short connector is needed return haversine_km(coord_a[0], coord_a[1], coord_b[0], coord_b[1]) * 1000 > tolerance_m if needs_connector(start_coord, extended[0]): extended.insert(0, start_coord) if needs_connector(extended[-1], end_coord): extended.append(end_coord) def prune_endpoint(points, node_coord, from_start=True): if not node_coord: return while points: idx = 0 if from_start else -1 target = points[idx] dist_m = haversine_km(node_coord[0], node_coord[1], target[0], target[1]) * 1000 if dist_m <= NODE_OVERLAP_PRUNE_M: points.pop(idx) else: break def offset_from_node(node_coord, anchor_coord): if not node_coord or not anchor_coord: return None dist_m = haversine_km(node_coord[0], node_coord[1], anchor_coord[0], anchor_coord[1]) * 1000 if dist_m <= 1e-6: return None ratio = min(ROUTE_NODE_CLEARANCE_M / dist_m, 0.45) if ratio <= 0: return None return ( node_coord[0] + (anchor_coord[0] - node_coord[0]) * ratio, node_coord[1] + (anchor_coord[1] - node_coord[1]) * ratio ) prune_endpoint(extended, start_coord, from_start=True) prune_endpoint(extended, end_coord, from_start=False) if not extended and start_coord and end_coord: start_point = offset_from_node(start_coord, end_coord) end_point = offset_from_node(end_coord, start_coord) if start_point and end_point: extended = [start_point, end_point] if not extended: if start_coord and end_coord: return [start_coord, end_coord] return extended anchor_start = extended[0] if extended else end_coord start_point = offset_from_node(start_coord, anchor_start) if start_point: extended.insert(0, start_point) anchor_end = extended[-1] if extended else start_coord end_point = offset_from_node(end_coord, anchor_end) if end_point: extended.append(end_point) return extended def visualize_simple_graph(nodes, ways, cameras, meta, out_html, solution_path=None, other_paths=None, exploration_steps=None): """Generate simplified graph visualization with plain background and arrow-headed paths""" center_lat, center_lon = get_map_center(nodes) exploration_steps = exploration_steps or [] # Create map with CartoDB Positron (light, minimal background) m = folium.Map( location=[center_lat, center_lon], zoom_start=16, tiles='CartoDB positron', attr='Simple Graph View' ) # Add custom CSS to make background even lighter/whiter custom_css = """ """ m.get_root().html.add_child(folium.Element(custom_css)) # Build solution edges set solution_edges = set() if solution_path and len(solution_path) > 1: for i in range(len(solution_path) - 1): solution_edges.add((solution_path[i], solution_path[i+1])) solution_nodes_set = set(solution_path or []) # Build other solution edges other_solution_edges = set() if other_paths: for entry in other_paths: path = entry.get("path") if isinstance(entry, dict) else entry if not path or len(path) < 2: continue for i in range(len(path) - 1): edge = (path[i], path[i + 1]) if edge not in solution_edges: other_solution_edges.add(edge) # Quick lookup for reverse edges so we can position paired arrows without overlap edge_lookup = {(w["from"], w["to"]): w for w in ways} solution_segment_layers = [] # Draw all edges as simple lines with arrows for w in ways: way_id = w["way_id"] u, v = w["from"], w["to"] rn = w["road_name"] time_min = w["time_min"] start_coord = (nodes[u]["lat"], nodes[u]["lon"]) end_coord = (nodes[v]["lat"], nodes[v]["lon"]) # Determine edge type (but don't highlight solution paths) is_camera = way_id in cameras is_accident = w.get("accident", False) # Style based on edge type - accidents get priority coloring if is_accident: # Get severity to determine color severity = w.get("severity", "severe").lower() # Map severity to colors: yellow=minor, orange=moderate, red=severe severity_colors = { "minor": "#FFC107", # Yellow "moderate": "#FF9800", # Orange "severe": "#F44336" # Red } color = severity_colors.get(severity, "#F44336") # Default to red weight = 5 opacity = 1.0 elif is_camera: color = '#ef4444' # Red for camera roads weight = 3 opacity = 0.92 else: color = '#64748b' # Slate gray for all regular roads weight = 2.6 opacity = 0.78 # Create simple line line = folium.PolyLine( locations=[start_coord, end_coord], color=color, weight=weight, opacity=opacity, tooltip=f"{rn} ({time_min} min) | {u} -> {v}" ) line.add_to(m) register_arrow_segment(m, [start_coord, end_coord], color) # Add overlay polylines for the selected solution path so we can animate step-by-step playback if solution_path and len(solution_path) > 1: for idx in range(len(solution_path) - 1): u = solution_path[idx] v = solution_path[idx + 1] w = edge_lookup.get((u, v)) if w: start_coord = (nodes[u]["lat"], nodes[u]["lon"]) end_coord = (nodes[v]["lat"], nodes[v]["lon"]) else: start_coord = (nodes.get(u, {}).get("lat"), nodes.get(u, {}).get("lon")) end_coord = (nodes.get(v, {}).get("lat"), nodes.get(v, {}).get("lon")) if None in (*start_coord, *end_coord): continue seg = folium.PolyLine( locations=[start_coord, end_coord], color="#2563eb", weight=5, opacity=0.0, tooltip=f"Step {idx + 1}: {u} -> {v}" ).add_to(m) solution_segment_layers.append(seg.get_name()) # Draw nodes start_node = meta.get("start") goal_nodes = set(meta.get("goals", [])) node_marker_names = {} node_marker_styles = {} node_metadata = {} for nid, info in nodes.items(): lat, lon = info["lat"], info["lon"] label = info["label"] # Determine node style - only highlight start and goal if nid == start_node: color = '#10b981' fill_color = '#d1fae5' radius = 10 icon_text = 'S' elif nid in goal_nodes: color = '#f59e0b' fill_color = '#fef3c7' radius = 10 icon_text = 'G' else: # All other nodes are uniform color = '#64748b' fill_color = '#f1f5f9' radius = 6 icon_text = '' if nid in solution_nodes_set and nid not in goal_nodes and nid != start_node: color = '#2563eb' fill_color = '#e0ecff' radius = max(radius, 7) # Create circle marker for node marker = folium.CircleMarker( location=(lat, lon), radius=radius, color=color, fill=True, fillColor=fill_color, fillOpacity=0.9, weight=2, popup=f"{label}
ID: {nid}", tooltip=label ).add_to(m) marker_name = marker.get_name() node_marker_names[str(nid)] = marker_name node_marker_styles[marker_name] = { "color": color, "fillColor": fill_color, "radius": radius, "weight": 2, "fillOpacity": 0.9, "opacity": 1.0 } node_metadata[str(nid)] = {"label": label} # Add text label for start/goal nodes if icon_text: folium.Marker( location=(lat, lon), icon=folium.DivIcon( html=f'
{icon_text}
' ) ).add_to(m) # Inject step-by-step playback controls for the simplified map ordered_node_markers = [] if solution_path: for nid in solution_path: marker_name = node_marker_names.get(str(nid)) if marker_name: if nid == start_node: role = "start" elif nid in goal_nodes: role = "goal" else: role = "normal" ordered_node_markers.append({ "id": nid, "name": marker_name, "label": nodes[nid]["label"], "role": role }) if ordered_node_markers: config_payload = { "routeSegments": solution_segment_layers, "routeNodes": ordered_node_markers, "markerStyles": node_marker_styles, "nodeMarkerLookup": node_marker_names, "nodeMetadata": node_metadata, "searchSteps": exploration_steps, "routeStepDelay": 850, "searchStepDelay": 1050 } inject_route_playback(m, config_payload) # Add legend legend_html = """
Simple Graph View
One-way edge
Camera road
Start node (S)
Goal node (G)
Regular nodes
Two-way edges show two arrows on the line.
""" m.get_root().html.add_child(folium.Element(legend_html)) inject_arrow_animation(m) m.save(out_html) def visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph, out_html, solution_path=None, other_paths=None, exploration_steps=None, k_snap=8): """Generate interactive Folium map with road visualization""" center_lat, center_lon = get_map_center(nodes) # Base map uses OpenStreetMap tiles around the dataset centroid m = folium.Map( location=[center_lat, center_lon], zoom_start=16, tiles="OpenStreetMap" ) # Convenience set for quick membership tests while styling nodes solution_nodes_set = set(solution_path or []) exploration_steps = exploration_steps or [] # Pre-compute snap candidates for all nodes snap_candidates = {} for nid, info in nodes.items(): # Snap each custom node to multiple nearby OSM nodes for routing snap_candidates[nid] = k_nearest_graph_nodes( info["lat"], info["lon"], osm_nodes, graph, k=k_snap ) # Build solution edges set solution_edges = set() if solution_path and len(solution_path) > 1: for i in range(len(solution_path) - 1): # Store each directed edge belonging to the selected solution solution_edges.add((solution_path[i], solution_path[i+1])) # Build other solution edges and route mapping # Track edges for alternate solutions plus associated Folium layers other_solution_edges = set() edge_route_map = {} solution_edge_layers = {} if other_paths: for entry in other_paths: path = entry.get("path") if isinstance(entry, dict) else entry route_id = entry.get("id") if isinstance(entry, dict) else None if not path or len(path) < 2: continue for i in range(len(path) - 1): edge = (path[i], path[i + 1]) if edge not in solution_edges: other_solution_edges.add(edge) if route_id is not None: # Track which optional route uses a given edge edge_route_map.setdefault(edge, set()).add(route_id) # Keep JS references to dynamic polylines for interactivity hooks route_polyline_refs = {} solution_line_names = [] # Build a set of reverse edges to detect parallel roads reverse_edges = {} for w in ways: reverse_edges[(w["from"], w["to"])] = w["way_id"] # Draw all ways/roads for w in ways: way_id = w["way_id"] u, v = w["from"], w["to"] rn = w["road_name"] hwy = w["highway_type"] time_min = w["time_min"] start_coord = (nodes[u]["lat"], nodes[u]["lon"]) end_coord = (nodes[v]["lat"], nodes[v]["lon"]) # Find best OSM path between nodes best_path = None best_len = None for u_osm in snap_candidates[u]: for v_osm in snap_candidates[v]: path = dijkstra_path(graph, u_osm, v_osm) if path is None: continue plen = path_length_km(path, graph) if best_len is None or plen < best_len: best_len = plen best_path = path # Build coordinates list if best_path is None: # Fall back to drawing a straight line if OSM snapping fails latlngs = [start_coord, end_coord] else: latlngs = [(osm_nodes[nid][0], osm_nodes[nid][1]) for nid in best_path] # Always append the true node coordinates so the line reaches the endpoints latlngs = extend_path_to_custom_nodes(latlngs, start_coord, end_coord) # Check if there's a reverse edge (parallel road) has_reverse = (v, u) in reverse_edges # Offset parallel roads slightly to make them visually distinct and clickable if has_reverse and len(latlngs) >= 2: # Apply perpendicular offset to separate parallel roads offset_distance = 0.00003 # ~3 meters offset offset_latlngs = [] for i, coord in enumerate(latlngs): if i == 0 and i + 1 < len(latlngs): # First point - use direction to next point lat1, lon1 = coord lat2, lon2 = latlngs[i + 1] elif i > 0: # Other points - use direction from previous point lat2, lon2 = coord lat1, lon1 = latlngs[i - 1] else: offset_latlngs.append(coord) continue # Calculate perpendicular offset (rotate direction by 90 degrees) dlat = lat2 - lat1 dlon = lon2 - lon1 length = (dlat**2 + dlon**2)**0.5 if length > 0: # Perpendicular vector (rotate 90Β° right) perp_lat = -dlon / length * offset_distance perp_lon = dlat / length * offset_distance offset_latlngs.append((coord[0] + perp_lat, coord[1] + perp_lon)) else: offset_latlngs.append(coord) latlngs = offset_latlngs # Determine edge type and styling is_solution = (u, v) in solution_edges route_ids_for_edge = list(edge_route_map.get((u, v), [])) is_other_solution = (u, v) in other_solution_edges is_camera = way_id in cameras # These booleans determine styling (color/weight/dashed) for each polyline tooltip_text = f"{rn} ({time_min} min)" popup_html = ( f"{rn}
" f"way_id: {way_id}
" f"type: {hwy}
" f"time: {time_min} min
" ) if is_camera: popup_html += "CAMERA MONITORED
Accident x time multiplied
" if is_solution: popup_html += "* SELECTED SOLUTION *" elif is_other_solution: popup_html += "* OTHER SOLUTION *" # Create appropriate polyline # Create appropriate polyline polyline_for_events = None # Check if this road has an accident (draw accident color first if needed) has_accident = w.get("accident", False) severity = w.get("severity", "none").lower() if has_accident else "none" # Only draw accident color if severity is not 'none' if has_accident and severity != "none": severity_colors = { "minor": "#FFC107", # Yellow "moderate": "#FF9800", # Orange "severe": "#F44336" # Red } accident_color = severity_colors.get(severity, "#F44336") # Draw accident color as base layer (will be under blue if it's a solution path) folium.PolyLine( locations=latlngs, color=accident_color, weight=7, opacity=1.0, tooltip=tooltip_text + f" (ACCIDENT: {severity.upper()})" ).add_to(m) # πŸ”΅ 1. SELECTED SOLUTION (blue) - DRAW ON TOP if is_solution: polyline_for_events = create_polyline( latlngs, "#1f6feb", 5, None, 0.85, tooltip_text, popup_html, m ) layer_name = polyline_for_events.get_name() solution_line_names.append(layer_name) solution_edge_layers[(u, v)] = layer_name # πŸ”Ή 2. OTHER SOLUTIONS elif is_other_solution: folium.PolyLine( locations=latlngs, color="#1c4fd8", weight=7, dash_array=None, opacity=0.9 ).add_to(m) polyline_for_events = create_polyline( latlngs, "#e4f0ff", 5, None, 0.95, tooltip_text, popup_html, m ) # πŸ”₯ 3. ACCIDENT ROADS (only if not already drawn above) elif has_accident: # Already drawn as base layer above, just create event handler polyline_for_events = create_polyline( latlngs, "transparent", 1, None, 0, tooltip_text + f" (ACCIDENT: {severity.upper()})", popup_html, m ) # πŸ“Έ 4. CAMERA ROADS elif is_camera: polyline_for_events = create_polyline( latlngs, "crimson", 3, "8,4", 0.8, tooltip_text, popup_html, m ) # πŸ›£ 5. NORMAL ROADS else: weight = 3 if hwy in ["primary", "secondary"] else 2 polyline_for_events = create_polyline( latlngs, color_for_highway(hwy), weight, None, 0.25, tooltip_text, popup_html, m ) # Attach click events if route_ids_for_edge: layer_name = polyline_for_events.get_name() for rid in route_ids_for_edge: route_polyline_refs.setdefault(rid, []).append(layer_name) # Add interactivity scripts for route selection if route_polyline_refs or solution_line_names: script_lines = ["" ]) m.get_root().html.add_child(folium.Element("\n".join(script_lines))) # Draw node markers start_node = meta.get("start") goal_nodes = set(meta.get("goals", [])) # Store references to folium markers for later animation/click handling node_marker_names = {} node_marker_styles = {} node_metadata = {} for nid, info in nodes.items(): lat, lon, label = info["lat"], info["lon"], info["label"] # Determine node styling if nid == start_node: fill_color = "green" role = "START" elif nid in goal_nodes: fill_color = "#ffd447" role = "GOAL" else: fill_color = "white" role = "Node" if nid in solution_nodes_set: border_color = "#0ea5e9" border_weight = 2 else: border_color = "black" border_weight = 1 if nid == start_node or nid in goal_nodes: border_weight = 1 circle_marker = folium.CircleMarker( location=(lat, lon), radius=8, color=border_color, weight=border_weight, fill=True, fill_color=fill_color, fill_opacity=0.9, opacity=0.9, popup=folium.Popup( f"Node {nid}: {label}
" f"lat: {lat:.6f}
lon: {lon:.6f}
{role}", max_width=250 ), tooltip=f"{nid}: {label} ({role})" ).add_to(m) marker_name = circle_marker.get_name() node_marker_names[str(nid)] = marker_name node_marker_styles[marker_name] = { "color": border_color, "fillColor": fill_color, "radius": 8, "weight": border_weight, "fillOpacity": 0.9, "opacity": 1.0 } node_metadata[str(nid)] = {"label": label} # Save styling info so animation can temporarily modify the markers # Add label using HTML so node id and label stay visible folium.map.Marker( [lat, lon], icon=folium.DivIcon( html=f"""
{nid}: {label}
""", icon_size=(0, 0), icon_anchor=(-10, 10) ) ).add_to(m) # Add path animation for solution if solution_path: ordered_segment_layers = [] for i in range(len(solution_path) - 1): layer_name = solution_edge_layers.get((solution_path[i], solution_path[i + 1])) if layer_name: # Preserve the segment order so the animation highlights edges sequentially ordered_segment_layers.append(layer_name) ordered_node_markers = [] for nid in solution_path: marker_name = node_marker_names.get(str(nid)) if marker_name: if nid == start_node: role = "start" elif nid in goal_nodes: role = "goal" else: role = "normal" ordered_node_markers.append({ "id": nid, "name": marker_name, "label": nodes[nid]["label"], "role": role }) if ordered_node_markers: config_payload = { "routeSegments": ordered_segment_layers, "routeNodes": ordered_node_markers, "markerStyles": node_marker_styles, "nodeMarkerLookup": node_marker_names, "nodeMetadata": node_metadata, "searchSteps": exploration_steps, "routeStepDelay": 900, "searchStepDelay": 1100 } script_template = """ """ animation_script = script_template.replace("__CONFIG__", json.dumps(config_payload)) m.get_root().html.add_child(folium.Element(animation_script)) # Add legend acc_mult = meta.get("accident_multiplier", 1) # Provide a custom legend so the user understands each styling cue legend_html = f"""
Kuching Heritage Graph
β–¬ Selected solution
β–¬ Other solution paths
β–¬ Camera road (accident x time x {acc_mult})
β–¬ Primary road
β–¬ Secondary road
β–¬ Tertiary road
β–¬ Service / alley
""" m.get_root().html.add_child(folium.Element(legend_html)) inject_arrow_animation(m) m.save(out_html) # ============================================================================ # FLASK APP SETUP # ============================================================================ BASE_DIR = os.path.dirname(__file__) MAP_DATA_DIR = os.path.join(BASE_DIR, 'algorithms') # Load all algorithms dynamically ALGORITHM_CONFIG = { 'astar': ('aStar.py', 'aStar'), 'beam': ('beam.py', 'beam_search_lds'), 'dfs': ('dfs.py', 'dfs'), 'gbfs': ('gbfs.py', 'gbfs'), 'iddfs': ('iddfs.py', 'iddfs'), 'bfs': ('bfs.py', 'bfs') } algorithms_impl = {} for name, (filename, func_name) in ALGORITHM_CONFIG.items(): try: # Attempt to import each algorithm strategy from the algorithms folder algorithms_impl[name] = load_func_from_path( os.path.join(MAP_DATA_DIR, filename), func_name ) except Exception: # Missing/invalid implementations are recorded as None for graceful fallback algorithms_impl[name] = None app = Flask(__name__) app.secret_key = 'your-secret-key-here-change-in-production' app.config['UPLOAD_FOLDER'] = 'uploads' app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 app.config['ALLOWED_EXTENSIONS'] = {'txt', 'osm'} app.config['SESSION_TYPE'] = 'filesystem' app.config['SESSION_PERMANENT'] = False os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs('static', exist_ok=True) def allowed_file(filename): """Check if file extension is allowed""" # Ensure uploads only include supported text or OSM inputs return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] # ============================================================================ # FLASK ROUTES # ============================================================================ @app.route('/') def index(): return render_template('index.html') @app.route('/test_accidents') def test_accidents(): """Test page for accident system""" return render_template('test_accidents.html') @app.route('/get_default_nodes', methods=['GET']) def get_default_nodes(): """Get nodes from default map without uploading files""" try: if DEFAULT_MAP_DATA['nodes'] is None: return jsonify({'error': 'Default map not available'}), 404 nodes = DEFAULT_MAP_DATA['nodes'] ways = copy.deepcopy(DEFAULT_MAP_DATA['ways']) cameras = DEFAULT_MAP_DATA['cameras'] meta = DEFAULT_MAP_DATA['meta'] osm_nodes = DEFAULT_MAP_DATA['osm_nodes'] graph = DEFAULT_MAP_DATA['graph'] # Clear any previous session data and mark as using default session.clear() session['using_default_map'] = True session['has_uploaded_files'] = False session.modified = True # Return list of available nodes nodes_list = [{'id': nid, 'label': info['label']} for nid, info in nodes.items()] # List of ways for accident system ways_list = [{ "way_id": w["way_id"], "from": w["from"], "to": w["to"], "road_name": w["road_name"], "highway_type": w["highway_type"] } for w in ways] # Generate base maps base_map_path = os.path.join('static', 'heritage_map_base.html') base_simple_map_path = os.path.join('static', 'heritage_map_simple_base.html') visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph, base_map_path, solution_path=None) visualize_simple_graph(nodes, ways, cameras, meta, base_simple_map_path, solution_path=None, other_paths=None) return jsonify({ 'success': True, 'nodes': nodes_list, 'ways': ways_list, 'map_url': "/static/heritage_map_base.html", 'map_url_simple': "/static/heritage_map_simple_base.html", 'using_default': True }) except Exception as e: print(f"❌ Error in get_default_nodes: {e}") import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 @app.route('/upload_files', methods=['POST']) def upload_files(): """Upload files and return available nodes""" try: if 'txt_file' not in request.files or 'osm_file' not in request.files: return jsonify({'error': 'Both .txt and .osm files are required'}), 400 txt_file = request.files['txt_file'] osm_file = request.files['osm_file'] algorithm = request.form.get('algorithm', 'bfs') if txt_file.filename == '' or osm_file.filename == '': return jsonify({'error': 'Please select both files'}), 400 if not (allowed_file(txt_file.filename) and allowed_file(osm_file.filename)): return jsonify({'error': 'Invalid file type'}), 400 # Sanitize filenames and save to the upload folder txt_filename = secure_filename(txt_file.filename) osm_filename = secure_filename(osm_file.filename) txt_path = os.path.join(app.config['UPLOAD_FOLDER'], txt_filename) osm_path = os.path.join(app.config['UPLOAD_FOLDER'], osm_filename) txt_file.save(txt_path) osm_file.save(osm_path) # Parse files and cache in session nodes, ways, cameras, meta = parse_map_data_file(txt_path) osm_nodes, graph = load_osm_graph(osm_path) # Store in global cache (survives across requests) import time UPLOADED_MAP_CACHE['nodes'] = nodes UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways) UPLOADED_MAP_CACHE['cameras'] = cameras UPLOADED_MAP_CACHE['meta'] = meta UPLOADED_MAP_CACHE['osm_nodes'] = osm_nodes UPLOADED_MAP_CACHE['graph'] = graph UPLOADED_MAP_CACHE['txt_path'] = txt_path UPLOADED_MAP_CACHE['osm_path'] = osm_path UPLOADED_MAP_CACHE['timestamp'] = time.time() # Store paths in session for later use session['txt_path'] = txt_path session['osm_path'] = osm_path session['algorithm'] = algorithm session['using_default_map'] = False session['has_uploaded_files'] = True session['nodes_count'] = len(nodes) session['ways_count'] = len(ways) # Clear all previous state when new files are uploaded session.pop("modified_ways", None) # RESET modified accident data session.pop("edge_accidents", None) # RESET accident records session.pop("origin", None) # RESET origin session.pop("destination", None) # RESET destination session.modified = True # Return list of available nodes # Return list of available nodes nodes_list = [{'id': nid, 'label': info['label']} for nid, info in nodes.items()] # NEW: list of ways for accident system ways_list = [{ "way_id": w["way_id"], "from": w["from"], "to": w["to"], "road_name": w["road_name"], "highway_type": w["highway_type"] } for w in ways] # Generate base maps (without highlighted paths) so users see the area right after upload base_map_path = os.path.join('static', 'heritage_map_base.html') base_simple_map_path = os.path.join('static', 'heritage_map_simple_base.html') visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph, base_map_path, solution_path=None) visualize_simple_graph(nodes, ways, cameras, meta, base_simple_map_path, solution_path=None, other_paths=None) return jsonify({ 'success': True, 'nodes': nodes_list, 'ways': ways_list, # IMPORTANT 'map_url': "/static/heritage_map_base.html", 'map_url_simple': "/static/heritage_map_simple_base.html" }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/find_routes', methods=['POST']) def find_routes(): """Find routes with user-specified origin and destination""" try: data = request.get_json() # Pull the requested origin/destination and allow client to override algorithm origin = data.get('origin') destination = data.get('destination') algorithm = data.get('algorithm', session.get('algorithm', 'bfs')) if not origin or not destination: # Input validation before expensive parsing work return jsonify({'error': 'Origin and destination are required'}), 400 # Save for later (used by accident system + ALL algorithms comparison) session['origin'] = origin session['destination'] = destination # Get current map data (uploaded or default) nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path = get_current_map_data() if nodes is None: return jsonify({'error': 'Please upload files first'}), 400 # Check if accidents are applied (use global store) edge_accidents = GLOBAL_ACCIDENTS if edge_accidents: print(f"βœ… Route finding with {len(edge_accidents)} active accident(s)") for way_id, info in edge_accidents.items(): print(f" 🚨 {info['road_name']}: {info['severity']} ({info['multiplier']}x)") else: print(f"ℹ️ Route finding with no accidents") # Build edges map for pathfinding edges_map = build_edges_map(ways) # Algorithms expect nodes mapped to coordinate tuples, not metadata dicts algo_nodes = {nid: (info["lat"], info["lon"]) for nid, info in nodes.items()} # Get the selected algorithm function algo_func = algorithms_impl.get(algorithm.lower()) if algo_func is None: return jsonify({'error': f'Algorithm "{algorithm}" is not available or failed to load'}), 400 # Run the selected algorithm with step tracking result = algo_func(algo_nodes, edges_map, origin, [destination], track=False, visualise=True) if result[0] is None or not result[2]: # Check if goal was found and path exists return jsonify({'error': f'No path found using {algorithm} algorithm'}), 404 exploration_steps = [] if len(result) >= 4: goal_node, nodes_created, path, exploration_steps = result[0], result[1], result[2], result[3] else: goal_node, nodes_created, path = result[0], result[1], result[2] # Calculate the actual cost of the path def calculate_path_cost(p): total = 0 for i in range(len(p) - 1): edge_key = (p[i], p[i + 1]) if edge_key in edges_map: total += edges_map[edge_key]["time"] return total total_cost = calculate_path_cost(path) # Prepare route data route_definitions = [] routes = [] found_paths = set() # Add the primary path from the selected algorithm as route 0 route_definitions.append({'id': 0, 'path': path}) path_str_parts = [str(nid) for nid in path] routes.append({ 'id': 0, 'path': path, 'cost': round(total_cost, 2), 'length': len(path), 'path_str': ' β†’ '.join(path_str_parts), 'primary': True }) found_paths.add(tuple(path)) # Find alternative paths by temporarily removing edges and re-running algorithm alt_id = 1 max_alternatives = 4 edges_to_remove = [] # Try to find alternatives by blocking edges from the primary path for i in range(len(path) - 1): if alt_id > max_alternatives: break # Create a modified edge map excluding one edge from the original path modified_edges = edges_map.copy() edge_to_block = (path[i], path[i + 1]) if edge_to_block in modified_edges: del modified_edges[edge_to_block] edges_to_remove.append(edge_to_block) # Run algorithm with modified edges try: alt_result = algo_func(algo_nodes, modified_edges, origin, [destination], track=False, visualise=False) if alt_result[0] is not None and alt_result[2]: alt_path = alt_result[2] # Only add if it's a different path if tuple(alt_path) not in found_paths: alt_cost = calculate_path_cost(alt_path) route_definitions.append({'id': alt_id, 'path': alt_path}) path_str_parts = [str(nid) for nid in alt_path] routes.append({ 'id': alt_id, 'path': alt_path, 'cost': round(alt_cost, 2), 'length': len(alt_path), 'path_str': ' β†’ '.join(path_str_parts), 'primary': False }) found_paths.add(tuple(alt_path)) alt_id += 1 except Exception: # If algorithm fails with modified edges, continue to next attempt pass # Generate individual maps for each route # Generate individual maps for each route (both OSM and simplified views) for route_def in route_definitions: # Each result gets its own HTML map with the chosen path highlighted map_output = os.path.join('static', f'heritage_map_{route_def["id"]}.html') simple_output = os.path.join('static', f'heritage_map_simple_{route_def["id"]}.html') display_meta = { "start": origin, "goals": [destination], "accident_multiplier": meta.get("accident_multiplier", 1) } try: # Generate OSM view visualize_with_roads( nodes, ways, cameras, display_meta, osm_nodes, graph, map_output, route_def['path'], other_paths=None, exploration_steps=exploration_steps ) # Generate simplified graph view visualize_simple_graph( nodes, ways, cameras, display_meta, simple_output, route_def['path'], other_paths=None, exploration_steps=exploration_steps ) except Exception as e: print(f"Error generating maps for route {route_def['id']}: {str(e)}") # Continue with other routes even if one fails # Sort routes by cost but keep primary route first # Separate primary from alternatives, sort alternatives, then recombine primary_route = [r for r in routes if r.get('primary', False)] alternative_routes = [r for r in routes if not r.get('primary', False)] alternative_routes.sort(key=lambda r: r['cost']) # Final route list: primary first, then sorted alternatives sorted_routes = primary_route + alternative_routes return jsonify({ 'success': True, 'routes': sorted_routes, 'algorithm': algorithm.upper(), 'stats': { 'nodes': len(nodes), 'ways': len(ways), 'routes_found': len(sorted_routes), 'nodes_created': nodes_created } }) except Exception as e: return jsonify({'error': str(e)}), 500 # ======================================================================== # ACCIDENT IMAGE CLASSIFICATION API # ======================================================================== @app.route('/predict_accident', methods=['POST']) def predict_accident(): """Classify accident image severity using AI model""" model_name = request.form.get("model", "resnet") # Select model if model_name == "mobilenet": model = mobilenet_model elif model_name == "resnet": model = resnet_model elif model_name == "efficientnet": if efficientnet_model is None: return jsonify({"success": False, "error": "EfficientNet model not available"}) model = efficientnet_model else: return jsonify({"success": False, "error": "Invalid model"}), 400 # Load image if "image" not in request.files: return jsonify({"success": False, "error": "No image uploaded"}), 400 img_file = request.files["image"] img = Image.open(img_file).convert("RGB") # Preprocess x = accident_transform(img).unsqueeze(0) # Predict with torch.no_grad(): logits = model(x) probs = torch.softmax(logits, dim=1) confidence, label_idx = torch.max(probs, dim=1) CLASS_NAMES = ["none", "minor", "moderate", "severe"] predicted_severity = CLASS_NAMES[int(label_idx.item())] confidence_val = float(confidence.item()) # Get all probabilities for each class all_probs = { CLASS_NAMES[i]: float(probs[0][i].item()) for i in range(len(CLASS_NAMES)) } return jsonify({ "success": True, "label": predicted_severity, "confidence": confidence_val, "all_probabilities": all_probs }) @app.route('/reset_accident', methods=['POST']) def reset_accident(): """Remove accident impact and revert to original travel times""" try: # Clear global accidents GLOBAL_ACCIDENTS.clear() print("βœ… Cleared all accidents from GLOBAL_ACCIDENTS") # Clear session session.pop("edge_accidents", None) session.pop("modified_ways", None) session.pop("default_map_modified_ways", None) session.modified = True # Reset UPLOADED cache ways to original if UPLOADED_MAP_CACHE['txt_path'] and os.path.exists(UPLOADED_MAP_CACHE['txt_path']): _, original_ways, _, _ = parse_map_data_file(UPLOADED_MAP_CACHE['txt_path']) UPLOADED_MAP_CACHE['ways'] = original_ways print("βœ… Reset UPLOADED_MAP_CACHE ways to original") # Reset DEFAULT map ways to original if DEFAULT_MAP_DATA['txt_path'] and os.path.exists(DEFAULT_MAP_DATA['txt_path']): _, original_ways, _, _ = parse_map_data_file(DEFAULT_MAP_DATA['txt_path']) DEFAULT_MAP_DATA['ways'] = original_ways print("βœ… Reset DEFAULT_MAP_DATA ways to original") return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/clear_cache', methods=['POST']) def clear_cache(): """Clear uploaded map cache and reset session""" try: # Clear uploaded cache for key in UPLOADED_MAP_CACHE: UPLOADED_MAP_CACHE[key] = None # Clear global accidents GLOBAL_ACCIDENTS.clear() # Clear session session.clear() return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/debug_session', methods=['GET']) def debug_session(): """Debug endpoint to check session and cache state""" global_accidents_list = [] for road_id, info in GLOBAL_ACCIDENTS.items(): global_accidents_list.append({ "road_id": road_id, "road_name": info.get("road_name", "Unknown"), "severity": info.get("severity", "unknown"), "multiplier": info.get("multiplier", 1.0) }) return jsonify({ "session": dict(session), "global_accidents": dict(GLOBAL_ACCIDENTS), "global_accidents_count": len(GLOBAL_ACCIDENTS), "global_accidents_list": global_accidents_list, "has_uploaded_cache": UPLOADED_MAP_CACHE['nodes'] is not None, "has_default_map": DEFAULT_MAP_DATA['nodes'] is not None, "uploaded_cache_timestamp": UPLOADED_MAP_CACHE.get('timestamp'), "uploaded_cache_txt_path": UPLOADED_MAP_CACHE.get('txt_path'), "default_map_txt_path": DEFAULT_MAP_DATA.get('txt_path') }) @app.route('/remove_accident', methods=['POST']) def remove_accident(): """Remove a specific accident from a road""" try: data = request.get_json() road_id = data.get("road_id") if not road_id: return jsonify({"success": False, "error": "Missing road_id"}), 400 # Use string key for consistency road_id_str = str(road_id) # Check global accidents store if road_id_str not in GLOBAL_ACCIDENTS: return jsonify({"success": False, "error": f"No accident found for road {road_id_str}"}), 400 # Remove the accident from global store removed_info = GLOBAL_ACCIDENTS.pop(road_id_str) print(f"πŸ—‘οΈ Removing accident from road {road_id_str}: {removed_info['road_name']}") print(f" Remaining accidents in GLOBAL_ACCIDENTS: {len(GLOBAL_ACCIDENTS)}") # Update session session["edge_accidents"] = dict(GLOBAL_ACCIDENTS) session.modified = True # Get fresh copy of original ways by re-parsing the file import copy using_uploaded = False using_default = False if UPLOADED_MAP_CACHE['nodes'] is not None: # Get original ways from file (not cache) original_txt = UPLOADED_MAP_CACHE['txt_path'] nodes, ways, cameras, meta = parse_map_data_file(original_txt) osm_nodes = UPLOADED_MAP_CACHE['osm_nodes'] graph = UPLOADED_MAP_CACHE['graph'] txt_path = UPLOADED_MAP_CACHE['txt_path'] osm_path = UPLOADED_MAP_CACHE['osm_path'] using_uploaded = True elif DEFAULT_MAP_DATA['nodes'] is not None: # Get original ways from file (not cache) original_txt = DEFAULT_MAP_DATA['txt_path'] nodes, ways, cameras, meta = parse_map_data_file(original_txt) osm_nodes = DEFAULT_MAP_DATA['osm_nodes'] graph = DEFAULT_MAP_DATA['graph'] txt_path = DEFAULT_MAP_DATA['txt_path'] osm_path = DEFAULT_MAP_DATA['osm_path'] using_default = True else: return jsonify({"success": False, "error": "No map data available"}), 400 print(f"πŸ”„ Rebuilding ways after removing accident from road {road_id}") # Initialize original times for w in ways: if "original_time" not in w: w["original_time"] = w["time_min"] # Reapply remaining accidents from GLOBAL_ACCIDENTS print(f"\nπŸ”§ Reapplying {len(GLOBAL_ACCIDENTS)} remaining accident(s)...") for way_id, accident_info in GLOBAL_ACCIDENTS.items(): multiplier = accident_info["multiplier"] found = False for w in ways: if str(w["way_id"]) == str(way_id): w["time_min"] = w["original_time"] * multiplier w["accident"] = True w["severity"] = accident_info["severity"] found = True print(f"βœ… Reapplied {accident_info['severity']} ({multiplier}x) to way {way_id}: {w.get('road_name', 'Unknown')}") break if not found: print(f"⚠️ Way {way_id} not found during reapplication") # Update cache and session if GLOBAL_ACCIDENTS: session["modified_ways"] = ways if using_uploaded: UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways) print("βœ… Updated UPLOADED_MAP_CACHE with remaining accidents") elif using_default: DEFAULT_MAP_DATA['ways'] = copy.deepcopy(ways) print("βœ… Updated DEFAULT_MAP_DATA with remaining accidents") else: # No accidents left - clear everything and restore original ways session.pop("modified_ways", None) session.pop("default_map_modified_ways", None) if using_uploaded: # Reload original ways _, original_ways, _, _ = parse_map_data_file(UPLOADED_MAP_CACHE['txt_path']) UPLOADED_MAP_CACHE['ways'] = original_ways print("βœ… Reset UPLOADED_MAP_CACHE to original (no accidents)") elif using_default: # Reload original ways _, original_ways, _, _ = parse_map_data_file(DEFAULT_MAP_DATA['txt_path']) DEFAULT_MAP_DATA['ways'] = original_ways print("βœ… Reset DEFAULT_MAP_DATA to original (no accidents)") session.modified = True # Always regenerate map to show current state (with or without remaining accidents) accident_map_output = os.path.join('static', 'heritage_map_accidents.html') accident_map_simple = os.path.join('static', 'heritage_map_simple_accidents.html') try: # Use the ways we just rebuilt (with accidents cleared) visualize_with_roads( nodes, ways, cameras, {"start": None, "goals": []}, osm_nodes, graph, accident_map_output, solution_path=None, exploration_steps=[] ) visualize_simple_graph( nodes, ways, cameras, {"start": None, "goals": []}, accident_map_simple, solution_path=None, other_paths=None, exploration_steps=[] ) print(f"βœ… Regenerated map with {len(GLOBAL_ACCIDENTS)} remaining accident(s)") except Exception as e: print(f"⚠️ Error regenerating accident map: {str(e)}") # Return updated accident list from global store all_accidents = {} for way_id, accident_info in GLOBAL_ACCIDENTS.items(): all_accidents[way_id] = { "road_name": accident_info["road_name"], "severity": accident_info["severity"], "from_node": accident_info.get("from_node", "?"), "to_node": accident_info.get("to_node", "?") } import time timestamp = int(time.time()) return jsonify({ "success": True, "removed_road": removed_info["road_name"], "accidents": all_accidents, "map_url": f"/static/heritage_map_accidents.html?t={timestamp}", "has_accidents": len(GLOBAL_ACCIDENTS) > 0 }) except Exception as e: print(f"❌ Error removing accident: {str(e)}") import traceback traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ======================================================================== # APPLY ACCIDENT IMPACT TO SPECIFIC EDGES IN ROUTES # ======================================================================== @app.route('/apply_accident', methods=['POST']) def apply_accident(): """ Apply accident severity multiplier to a specific way (road). Stores accidents in session for later route calculation. Supports multiple accidents on different roads. """ try: print("\nπŸ”₯ APPLY ACCIDENT TO WAY") # Get road_id and severity from request data = request.get_json() road_id = data.get("road_id") severity = data.get("severity") if not road_id or not severity: return jsonify({"success": False, "error": "Missing road_id or severity"}), 400 # --- LOAD DATA --- nodes, ways, cameras, meta, osm_nodes_data, graph_data, txt_path, osm_path = get_current_map_data() if nodes is None: print("❌ No map data available!") return jsonify({"success": False, "error": "No map data available. Please upload map files or refresh the page."}), 400 # Use global accident store (persists across all requests) edge_accidents = GLOBAL_ACCIDENTS # Check if this road already has an accident is_override = str(road_id) in edge_accidents # Get road name and nodes for response road_name = "Unknown" from_node = "?" to_node = "?" for w in ways: if str(w["way_id"]) == str(road_id): road_name = w.get("road_name", "Unknown") from_node = w.get("from", "?") to_node = w.get("to", "?") break # Store this accident for the road (way_id) - use string key for consistency GLOBAL_ACCIDENTS[str(road_id)] = { "severity": severity, "multiplier": SEVERITY_MULTIPLIERS.get(severity.lower(), 1.0), "road_name": road_name, "from_node": from_node, "to_node": to_node } # Also update session for backwards compatibility session["edge_accidents"] = dict(GLOBAL_ACCIDENTS) session.modified = True print(f"βœ… {'Overridden' if is_override else 'Applied'} accident: {road_id} ({road_name}) = {severity}") print(f" Total accidents in GLOBAL_ACCIDENTS: {len(GLOBAL_ACCIDENTS)}") print(f" All accidents: {list(GLOBAL_ACCIDENTS.keys())}") print(f"βœ… {'Overridden' if is_override else 'Applied'} accident: {road_id} ({road_name}) = {severity} (multiplier: {edge_accidents[road_id]['multiplier']})") # ========================================================== # Apply accidents to ways for later route calculation # ========================================================== for w in ways: # Store original time if not already stored if "original_time" not in w: w["original_time"] = w["time_min"] # Reset to original w["time_min"] = w["original_time"] w["accident"] = False # Apply accidents to specific ways (roads) affected_ways = [] print(f"\nπŸ”§ Applying {len(edge_accidents)} accident(s) to ways...") for way_id, accident_info in edge_accidents.items(): multiplier = accident_info["multiplier"] found = False for w in ways: # Check if this is the affected way (compare as strings) if str(w["way_id"]) == str(way_id): original_time = w["original_time"] new_time = original_time * multiplier w["time_min"] = new_time w["accident"] = True w["severity"] = accident_info["severity"] affected_ways.append(way_id) found = True print(f"🚨 Applied {accident_info['severity']} ({multiplier}x) to way {way_id}: {w.get('road_name', 'Unknown')}") print(f" ⏱️ {original_time:.2f} min β†’ {new_time:.2f} min") break if not found: print(f"⚠️ Way {way_id} not found in ways list!") # Store modified ways in BOTH session AND cache session["modified_ways"] = ways session.modified = True # Update the cache with modified ways so all routes see the accidents if UPLOADED_MAP_CACHE['nodes'] is not None: import copy UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways) print(f"βœ… Updated UPLOADED_MAP_CACHE with accidents") elif DEFAULT_MAP_DATA['nodes'] is not None: # For default map, update the DEFAULT_MAP_DATA directly import copy DEFAULT_MAP_DATA['ways'] = copy.deepcopy(ways) print(f"βœ… Updated DEFAULT_MAP_DATA with accidents") print(f"πŸ“¦ Total accidents stored: {len(edge_accidents)}") # ========================================================== # Generate accident visualization map immediately # ========================================================== osm_nodes = osm_nodes_data graph = graph_data accident_map_output = os.path.join('static', 'heritage_map_accidents.html') accident_map_simple = os.path.join('static', 'heritage_map_simple_accidents.html') try: # Create detailed map with accident roads highlighted visualize_with_roads( nodes, ways, cameras, {"start": None, "goals": []}, osm_nodes, graph, accident_map_output, solution_path=None, exploration_steps=[] ) # Create simple map with accident roads highlighted visualize_simple_graph( nodes, ways, cameras, {"start": None, "goals": []}, accident_map_simple, solution_path=None, other_paths=None, exploration_steps=[] ) print(f"βœ… Generated accident visualization maps") except Exception as e: print(f"⚠️ Error generating accident maps: {str(e)}") import traceback traceback.print_exc() # ========================================================== # Return all accidents for frontend to display # ========================================================== all_accidents = {} for way_id, accident_info in GLOBAL_ACCIDENTS.items(): all_accidents[way_id] = { "road_name": accident_info["road_name"], "severity": accident_info["severity"], "from_node": accident_info.get("from_node", "?"), "to_node": accident_info.get("to_node", "?") } return jsonify({ "success": True, "road_id": road_id, "road_name": road_name, "severity": severity, "is_override": is_override, "accidents": all_accidents, "map_url": "/static/heritage_map_accidents.html", "map_simple_url": "/static/heritage_map_simple_accidents.html" }) except Exception as e: print(f"❌ Error in apply_accident: {str(e)}") import traceback traceback.print_exc() return jsonify({ "success": False, "error": f"Server error: {str(e)}" }), 500 # ======================================================================== # ALL ALGORITHMS COMPARISON (WITH EDGE ACCIDENT SUPPORT) # ======================================================================== @app.route('/find_all_algorithms', methods=['POST']) def find_all_algorithms(): """Compare all algorithms with current edge accident conditions""" # Try to get from request body first, fallback to session data = request.get_json() or {} origin = data.get("origin") or session.get("origin") destination = data.get("destination") or session.get("destination") print("STEP 4 ORIGIN =", origin) print("STEP 4 DESTINATION =", destination) if not origin or not destination: return jsonify({"success": True, "routes": []}) nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path = get_current_map_data() if nodes is None: return jsonify({"success": False, "error": "No map data available"}), 400 # If modified ways exist (accident applied), use them if "modified_ways" in session: ways = session["modified_ways"] print(f"βœ… Using modified ways with {len(GLOBAL_ACCIDENTS)} accidents applied") edges_map = build_edges_map(ways) algo_nodes = {nid: (info["lat"], info["lon"]) for nid, info in nodes.items()} results = [] for algo_name, func in algorithms_impl.items(): if func is None: continue try: # Run algorithm with tracking so the exploration panel can be used result = func(algo_nodes, edges_map, origin, [destination], track=True, visualise=True) if result is None or len(result) < 3: continue path = result[2] exploration_steps = result[3] if len(result) >= 4 else [] if not path: continue total_cost = 0 for i in range(len(path) - 1): key = (path[i], path[i + 1]) if key in edges_map: total_cost += edges_map[key]["time"] # Save each algorithm map out_html = f"static/algomap_{algo_name}.html" simple_out_html = f"static/algomap_simple_{algo_name}.html" visualize_with_roads( nodes, ways, cameras, {"start": origin, "goals": [destination], "accident_multiplier": meta.get("accident_multiplier", 1)}, osm_nodes, graph, out_html, solution_path=path, exploration_steps=exploration_steps ) visualize_simple_graph( nodes, ways, cameras, {"start": origin, "goals": [destination], "accident_multiplier": meta.get("accident_multiplier", 1)}, simple_out_html, solution_path=path, other_paths=None, exploration_steps=exploration_steps ) results.append({ "algorithm": algo_name.upper(), "path": path, "cost": round(total_cost, 2), "length": len(path), "path_str": " β†’ ".join(path), "map_url": f"/static/algomap_{algo_name}.html", "map_url_simple": f"/static/algomap_simple_{algo_name}.html" }) except Exception as e: print("ERROR in algo", algo_name, ":", e) continue return jsonify({"success": True, "routes": results}) # ======================================================================== # CLEAR ACCIDENT STATE (FOR NEW ROUTE SELECTION) # ======================================================================== @app.route('/clear_route_state', methods=['POST']) def clear_route_state(): """Clear accidents and modified ways when selecting new origin/destination""" try: session.pop("edge_accidents", None) session.pop("modified_ways", None) session.modified = True return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 if __name__ == '__main__': # Load default map data on startup (for Hugging Face deployment) load_default_map_data() # Get port from environment variable (Hugging Face uses 7860) port = int(os.environ.get('PORT', 7860)) # Check if running in production (Hugging Face Spaces) is_production = os.environ.get('SPACE_ID') is not None if not is_production: # Only open browser in local development import threading import webbrowser import time def _open_browser_after_delay(url, delay=1.0): try: time.sleep(delay) webbrowser.open_new(url) except Exception: pass url = f'http://127.0.0.1:{port}/' threading.Thread(target=_open_browser_after_delay, args=(url, 1.0), daemon=True).start() # Run Flask - debug mode off in production app.run(debug=not is_production, host='0.0.0.0', port=port, use_reloader=False)