import os
import math
import folium
import folium.plugins
import xml.etree.ElementTree as ET
import heapq
from flask import Flask, render_template, request, jsonify, session
from werkzeug.utils import secure_filename
import importlib.util
import sys
import json
import copy
# Torch and image libraries
import torch
import torch.nn as nn
from torchvision import models
import torchvision.transforms as transforms
from PIL import Image
SEVERITY_MULTIPLIERS = {
"none": 1.0,
"minor": 2.0,
"moderate": 3.0,
"severe": 4.0
}
# Global variables for default map data (for Hugging Face deployment)
DEFAULT_MAP_DATA = {
'nodes': None,
'ways': None,
'cameras': None,
'meta': None,
'osm_nodes': None,
'graph': None,
'txt_path': None,
'osm_path': None
}
# In-memory cache for uploaded map data (survives across requests)
UPLOADED_MAP_CACHE = {
'nodes': None,
'ways': None,
'cameras': None,
'meta': None,
'osm_nodes': None,
'graph': None,
'txt_path': None,
'osm_path': None,
'timestamp': None
}
# Global accident store (survives across requests)
GLOBAL_ACCIDENTS = {}
# =====================================================
# Image preprocessing (MUST match training - ImageNet normalization)
# =====================================================
accident_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], # ImageNet mean
std=[0.229, 0.224, 0.225] # ImageNet std
)
])
MODEL_DIR = "model/saved_models/"
# Based on ur directory structure, load the appropriate model
def load_model(model_name):
FILE_MAP = {
"mobilenet": "mobilenet/version_2/mobilenet_v2_final.pth",
"resnet": "resnet/version_2/resnet18_final.pth",
"efficientnet": "efficientnet/version_2/efficientnet_b0_final.pth"
}
path = MODEL_DIR + FILE_MAP[model_name]
# Load the saved state_dict first
state = torch.load(path, map_location="cpu")
# -------------------------------
# MOBILENET V2
# -------------------------------
if model_name == "mobilenet":
model = models.mobilenet_v2(weights=None)
# Check if trained with Sequential head
if "classifier.2.weight" in state:
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(1280, 256),
nn.ReLU(),
nn.Linear(256, 4)
)
else:
model.classifier[1] = nn.Linear(1280, 4)
model.load_state_dict(state)
# -------------------------------
# RESNET18
# -------------------------------
elif model_name == "resnet":
model = models.resnet18(weights=None)
state = torch.load(path, map_location="cpu")
# Detect YOUR exact architecture (ReLU + Linear)
if "fc.1.weight" in state and "fc.0.weight" not in state:
model.fc = nn.Sequential(
nn.ReLU(), # fc.0
nn.Linear(512, 4) # fc.1
)
else:
model.fc = nn.Linear(512, 4)
model.load_state_dict(state)
# -------------------------------
# EFFICIENTNET B0
# -------------------------------
elif model_name == "efficientnet":
model = models.efficientnet_b0(weights=None)
# EfficientNet uses classifier[1] normally, but
# if Sequential was used in training β build it again
if "classifier.2.weight" in state:
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(1280, 256),
nn.ReLU(),
nn.Linear(256, 4)
)
else:
model.classifier[1] = nn.Linear(1280, 4)
model.load_state_dict(state)
else:
raise ValueError("Unknown model type")
model.eval()
print(f"β
Loaded model: {path}")
return model
# =====================================================
# Load models once on server start
# =====================================================
mobilenet_model = load_model("mobilenet")
resnet_model = load_model("resnet")
# Load EfficientNet if available; fall back gracefully
try:
efficientnet_model = load_model("efficientnet")
except Exception as e:
print("Failed to load EfficientNet model:", e)
efficientnet_model = None
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def load_default_map_data():
"""Load default map files if they exist (optional - for convenience)"""
default_txt = 'heritage_assignment_15_time_asymmetric-1.txt'
default_osm = 'map.osm'
if os.path.exists(default_txt) and os.path.exists(default_osm):
try:
print(f"πΊοΈ Loading default map files: {default_txt}, {default_osm}")
nodes, ways, cameras, meta = parse_map_data_file(default_txt)
osm_nodes, graph = load_osm_graph(default_osm)
DEFAULT_MAP_DATA['nodes'] = nodes
DEFAULT_MAP_DATA['ways'] = ways
DEFAULT_MAP_DATA['cameras'] = cameras
DEFAULT_MAP_DATA['meta'] = meta
DEFAULT_MAP_DATA['osm_nodes'] = osm_nodes
DEFAULT_MAP_DATA['graph'] = graph
DEFAULT_MAP_DATA['txt_path'] = default_txt
DEFAULT_MAP_DATA['osm_path'] = default_osm
print(f"β
Default map loaded: {len(nodes)} nodes, {len(ways)} ways")
return True
except Exception as e:
print(f"β οΈ Failed to load default map: {e}")
return False
else:
print("βΉοΈ No default map files found (optional). Users will upload their own files.")
return False
def get_current_map_data():
"""Get current map data - from cache (uploaded) or default map"""
import copy
# Priority 1: Check uploaded files cache FIRST (regardless of session)
if UPLOADED_MAP_CACHE['nodes'] is not None:
# ALWAYS use cached ways (which includes accidents) - don't rely on session
ways = copy.deepcopy(UPLOADED_MAP_CACHE['ways'])
accidents_count = sum(1 for w in ways if w.get('accident', False))
print(f"β
Using uploaded map cache (accidents applied: {accidents_count})")
return (
UPLOADED_MAP_CACHE['nodes'],
ways,
UPLOADED_MAP_CACHE['cameras'],
UPLOADED_MAP_CACHE['meta'],
UPLOADED_MAP_CACHE['osm_nodes'],
UPLOADED_MAP_CACHE['graph'],
UPLOADED_MAP_CACHE['txt_path'],
UPLOADED_MAP_CACHE['osm_path']
)
# Priority 2: Use default map if available
if DEFAULT_MAP_DATA['nodes'] is not None:
# ALWAYS use cached ways (which includes accidents) - don't rely on session
ways = copy.deepcopy(DEFAULT_MAP_DATA['ways'])
accidents_count = sum(1 for w in ways if w.get('accident', False))
print(f"β
Using default map (accidents applied: {accidents_count})")
return (
DEFAULT_MAP_DATA['nodes'],
ways,
DEFAULT_MAP_DATA['cameras'],
DEFAULT_MAP_DATA['meta'],
DEFAULT_MAP_DATA['osm_nodes'],
DEFAULT_MAP_DATA['graph'],
DEFAULT_MAP_DATA['txt_path'],
DEFAULT_MAP_DATA['osm_path']
)
# Priority 3: Try loading from file paths in session (last resort)
txt_path = session.get('txt_path')
osm_path = session.get('osm_path')
if txt_path and osm_path and os.path.exists(txt_path) and os.path.exists(osm_path):
try:
print(f"β οΈ Loading from disk: {txt_path}, {osm_path}")
nodes, ways, cameras, meta = parse_map_data_file(txt_path)
osm_nodes, graph = load_osm_graph(osm_path)
return nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path
except Exception as e:
print(f"β Error loading from disk: {e}")
print("β No map data available")
print(f" - Uploaded cache: {UPLOADED_MAP_CACHE['nodes'] is not None}")
print(f" - Default map: {DEFAULT_MAP_DATA['nodes'] is not None}")
print(f" - Session: {dict(session)}")
return None, None, None, None, None, None, None, None
def load_func_from_path(path, func_name):
"""Dynamically load a function from a Python file"""
# Prepare module metadata so we can import a function from an arbitrary file
module_dir = os.path.dirname(path)
spec = importlib.util.spec_from_file_location(func_name, path)
mod = importlib.util.module_from_spec(spec)
added = False
try:
# Insert the module directory temporarily to resolve relative imports
if module_dir not in sys.path:
sys.path.insert(0, module_dir)
added = True
# Execute the module so the requested function becomes available
spec.loader.exec_module(mod)
finally:
if added:
try:
# Always clean up sys.path even if the import fails
sys.path.remove(module_dir)
except ValueError:
pass
return getattr(mod, func_name)
def split_csv_allow_commas(line, min_fields):
"""Parse CSV line while respecting commas inside parentheses"""
# Accumulate parsed pieces and track parentheses depth to honor commas inside pairs
parts = []
buf = []
depth = 0
for ch in line:
if ch == '(':
# Increase depth because commas inside parentheses must be preserved
depth += 1
buf.append(ch)
elif ch == ')':
# Drop depth (with floor at 0) once a closing parenthesis is seen
depth = max(depth - 1, 0)
buf.append(ch)
elif ch == ',':
if depth == 0:
# Split on commas only if we are not inside parentheses
parts.append("".join(buf).strip())
buf = []
else:
buf.append(ch)
else:
buf.append(ch)
if buf:
# Flush the final buffer after iterating through the line
parts.append("".join(buf).strip())
if len(parts) < min_fields:
raise ValueError(f"Line '{line}' parsed into too few fields: {parts}")
return parts
def haversine_km(lat1, lon1, lat2, lon2):
"""Calculate distance between two coordinates in kilometers"""
# Earth radius in kilometers for the haversine formula
R = 6371.0
la1, la2 = math.radians(lat1), math.radians(lat2)
dla = la2 - la1
dlo = math.radians(lon2 - lon1)
# Classic haversine distance computation with spherical adjustment
a = math.sin(dla/2)**2 + math.cos(la1)*math.cos(la2)*math.sin(dlo/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
def is_header(line):
"""Check if line is a section header"""
# Headers are bracketed (e.g., [NODES]) in the custom format
return line.startswith("[") and line.endswith("]")
def ignore_line(line):
"""Check if line should be ignored"""
# Skip blank lines and comments when reading the dataset
return (not line.strip()) or line.strip().startswith("#")
# ============================================================================
# FILE PARSING FUNCTIONS
# ============================================================================
def parse_map_data_file(path):
"""Parse the custom map data file format"""
# Track which section we are reading so each line is interpreted correctly
section = None
nodes = {}
ways = []
cameras = {}
meta = {"start": None, "goals": [], "accident_multiplier": None}
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if ignore_line(line):
continue
if is_header(line):
# Switch parsing mode whenever a new header is encountered
section = line.upper()
continue
if section == "[NODES]":
p = split_csv_allow_commas(line, 4)
nid, lat, lon, label = p[0], float(p[1]), float(p[2]), p[3]
nodes[nid] = {"lat": lat, "lon": lon, "label": label}
elif section == "[WAYS]":
p = split_csv_allow_commas(line, 6)
time_val = float(p[5])
ways.append({
"way_id": p[0],
"from": p[1],
"to": p[2],
"road_name": p[3],
"highway_type": p[4],
"time_min": time_val,
"original_time": time_val,
"accident": False
})
elif section == "[CAMERAS]":
p = split_csv_allow_commas(line, 2)
cameras[p[0]] = p[1]
elif section == "[META]":
p = [x.strip() for x in line.split(",")]
key = p[0].upper()
# Populate meta information that influences visualization
if key == "START":
meta["start"] = p[1]
elif key == "GOAL":
meta["goals"] = p[1:]
elif key == "ACCIDENT_MULTIPLIER":
meta["accident_multiplier"] = float(p[1])
return nodes, ways, cameras, meta
def load_osm_graph(osm_path):
"""Parse OSM file and build graph structure"""
tree = ET.parse(osm_path)
root = tree.getroot()
# Extract all nodes
osm_nodes = {}
for n in root.findall("node"):
nid = n.attrib["id"]
lat = float(n.attrib["lat"])
lon = float(n.attrib["lon"])
osm_nodes[nid] = (lat, lon)
# Build adjacency graph
graph = {nid: [] for nid in osm_nodes}
for w in root.findall("way"):
nd_refs = [nd.attrib["ref"] for nd in w.findall("nd")]
tags = {t.attrib.get("k"): t.attrib.get("v") for t in w.findall("tag")}
if "highway" not in tags:
# Only keep ways that represent roads
continue
for i in range(len(nd_refs) - 1):
a, b = nd_refs[i], nd_refs[i+1]
if a in osm_nodes and b in osm_nodes:
lat1, lon1 = osm_nodes[a]
lat2, lon2 = osm_nodes[b]
# Use haversine distance to approximate segment cost
dist = haversine_km(lat1, lon1, lat2, lon2)
graph[a].append((b, dist))
graph[b].append((a, dist))
# Remove isolated nodes
isolated = [nid for nid, nbrs in graph.items() if len(nbrs) == 0]
for nid in isolated:
del graph[nid]
return osm_nodes, graph
# ============================================================================
# GRAPH ALGORITHM FUNCTIONS
# ============================================================================
def k_nearest_graph_nodes(lat, lon, osm_nodes, graph, k=5):
"""Find k nearest nodes in the graph to given coordinates"""
# Maintain (distance squared, node id) pairs to avoid repetitive sqrt
candidates = []
for nid, (nlat, nlon) in osm_nodes.items():
if nid not in graph:
continue
d_lat = lat - nlat
d_lon = lon - nlon
d2 = d_lat*d_lat + d_lon*d_lon
candidates.append((d2, nid))
candidates.sort(key=lambda x: x[0])
return [nid for d2, nid in candidates[:k]]
def dijkstra_path(graph, start_id, goal_id):
"""Find shortest path using Dijkstra's algorithm"""
dist = {start_id: 0.0}
prev = {}
pq = [(0.0, start_id)]
visited = set()
while pq:
cur_d, cur = heapq.heappop(pq)
if cur in visited:
continue
visited.add(cur)
if cur == goal_id:
break
for nbr, w in graph.get(cur, []):
# Relax neighboring nodes when a shorter distance is discovered
nd = cur_d + w
if nbr not in dist or nd < dist[nbr]:
dist[nbr] = nd
prev[nbr] = cur
heapq.heappush(pq, (nd, nbr))
if goal_id not in dist:
return None
# Reconstruct path
path = []
node = goal_id
while True:
path.append(node)
if node == start_id:
break
node = prev.get(node)
if node is None:
return None
path.reverse()
return path
def path_length_km(path, graph):
"""Calculate total length of a path in kilometers"""
if not path or len(path) < 2:
return 0.0
total = 0.0
for a, b in zip(path[:-1], path[1:]):
for nbr, w in graph.get(a, []):
if nbr == b:
# Accumulate edge weights that correspond to sequential nodes
total += w
break
return total
def build_edges_map(ways):
"""Build edge dictionary from ways list"""
# Store travel time for each directed edge for quick lookup during search
edges = {}
for way in ways:
edges[(way["from"], way["to"])] = {
"time": way["time_min"],
"accident": way.get("accident", False)
}
return edges
# ============================================================================
# VISUALIZATION FUNCTIONS
# ============================================================================
def color_for_highway(hwy_type):
"""Return color code for highway type"""
h = (hwy_type or "").lower()
colors = {
"primary": "deepskyblue",
"secondary": "purple",
"tertiary": "darkblue",
"service": "slategray"
}
# Default to white for unknown highway types
return colors.get(h, "white")
def get_map_center(nodes):
"""Calculate center point of all nodes"""
lats = [info["lat"] for info in nodes.values()]
lons = [info["lon"] for info in nodes.values()]
return sum(lats)/len(lats), sum(lons)/len(lons)
GAP_TO_NODE_TOLERANCE_M = 2.5 # meters of acceptable gap between OSM snap and custom node
ROUTE_NODE_CLEARANCE_M = 4.0 # keep polylines slightly off the start/goal markers
NODE_OVERLAP_PRUNE_M = 0.75 # drop lat/lng points that sit almost exactly on top of a node
def register_arrow_segment(map_obj, coords, color, speed_mps=120):
"""Record a polyline for animated arrow rendering on the client."""
if not coords or len(coords) < 2:
return
if not hasattr(map_obj, "_arrow_segments"):
map_obj._arrow_segments = []
map_obj._arrow_segments.append({
"coords": coords,
"color": color,
"speed": speed_mps
})
def inject_arrow_animation(map_obj, base_speed_mps=120):
"""Inject a client-side script that animates arrows along stored segments."""
segments = getattr(map_obj, "_arrow_segments", [])
if not segments:
return
payload = {
"mapName": map_obj.get_name(),
"segments": segments,
"baseSpeed": base_speed_mps
}
arrow_script = """
"""
map_obj.get_root().html.add_child(
folium.Element(arrow_script.replace("__ARROW_CONFIG__", json.dumps(payload)))
)
# Route/search playback injection
def inject_route_playback(map_obj, config_payload):
"""Add client-side route/search step playback controls to a Folium map."""
if not config_payload:
return
script_template = """
"""
animation_script = script_template.replace("__CONFIG__", json.dumps(config_payload))
map_obj.get_root().html.add_child(folium.Element(animation_script))
def create_polyline(locations, color, weight, dash_array, opacity, tooltip_text, popup_html, map_obj, add_arrow=True):
"""Helper function to create polylines with consistent styling and optional arrows"""
# Centralized folium polyline creation ensures consistent style arguments
line = folium.PolyLine(
locations=locations,
color=color,
weight=weight,
dash_array=dash_array,
opacity=opacity,
tooltip=tooltip_text,
popup=folium.Popup(popup_html, max_width=250)
).add_to(map_obj)
# Capture this segment for client-side arrow animation
if add_arrow and len(locations) >= 2:
register_arrow_segment(map_obj, locations, color)
return line
def extend_path_to_custom_nodes(latlngs, start_coord, end_coord, tolerance_m=GAP_TO_NODE_TOLERANCE_M):
"""Ensure rendered polylines reach the custom nodes but stop just outside the markers."""
if not latlngs:
return [start_coord, end_coord]
extended = list(latlngs)
def needs_connector(coord_a, coord_b):
if coord_a is None or coord_b is None:
return False
# Compare geodesic distance and determine if a short connector is needed
return haversine_km(coord_a[0], coord_a[1], coord_b[0], coord_b[1]) * 1000 > tolerance_m
if needs_connector(start_coord, extended[0]):
extended.insert(0, start_coord)
if needs_connector(extended[-1], end_coord):
extended.append(end_coord)
def prune_endpoint(points, node_coord, from_start=True):
if not node_coord:
return
while points:
idx = 0 if from_start else -1
target = points[idx]
dist_m = haversine_km(node_coord[0], node_coord[1], target[0], target[1]) * 1000
if dist_m <= NODE_OVERLAP_PRUNE_M:
points.pop(idx)
else:
break
def offset_from_node(node_coord, anchor_coord):
if not node_coord or not anchor_coord:
return None
dist_m = haversine_km(node_coord[0], node_coord[1], anchor_coord[0], anchor_coord[1]) * 1000
if dist_m <= 1e-6:
return None
ratio = min(ROUTE_NODE_CLEARANCE_M / dist_m, 0.45)
if ratio <= 0:
return None
return (
node_coord[0] + (anchor_coord[0] - node_coord[0]) * ratio,
node_coord[1] + (anchor_coord[1] - node_coord[1]) * ratio
)
prune_endpoint(extended, start_coord, from_start=True)
prune_endpoint(extended, end_coord, from_start=False)
if not extended and start_coord and end_coord:
start_point = offset_from_node(start_coord, end_coord)
end_point = offset_from_node(end_coord, start_coord)
if start_point and end_point:
extended = [start_point, end_point]
if not extended:
if start_coord and end_coord:
return [start_coord, end_coord]
return extended
anchor_start = extended[0] if extended else end_coord
start_point = offset_from_node(start_coord, anchor_start)
if start_point:
extended.insert(0, start_point)
anchor_end = extended[-1] if extended else start_coord
end_point = offset_from_node(end_coord, anchor_end)
if end_point:
extended.append(end_point)
return extended
def visualize_simple_graph(nodes, ways, cameras, meta, out_html, solution_path=None, other_paths=None, exploration_steps=None):
"""Generate simplified graph visualization with plain background and arrow-headed paths"""
center_lat, center_lon = get_map_center(nodes)
exploration_steps = exploration_steps or []
# Create map with CartoDB Positron (light, minimal background)
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=16,
tiles='CartoDB positron',
attr='Simple Graph View'
)
# Add custom CSS to make background even lighter/whiter
custom_css = """
"""
m.get_root().html.add_child(folium.Element(custom_css))
# Build solution edges set
solution_edges = set()
if solution_path and len(solution_path) > 1:
for i in range(len(solution_path) - 1):
solution_edges.add((solution_path[i], solution_path[i+1]))
solution_nodes_set = set(solution_path or [])
# Build other solution edges
other_solution_edges = set()
if other_paths:
for entry in other_paths:
path = entry.get("path") if isinstance(entry, dict) else entry
if not path or len(path) < 2:
continue
for i in range(len(path) - 1):
edge = (path[i], path[i + 1])
if edge not in solution_edges:
other_solution_edges.add(edge)
# Quick lookup for reverse edges so we can position paired arrows without overlap
edge_lookup = {(w["from"], w["to"]): w for w in ways}
solution_segment_layers = []
# Draw all edges as simple lines with arrows
for w in ways:
way_id = w["way_id"]
u, v = w["from"], w["to"]
rn = w["road_name"]
time_min = w["time_min"]
start_coord = (nodes[u]["lat"], nodes[u]["lon"])
end_coord = (nodes[v]["lat"], nodes[v]["lon"])
# Determine edge type (but don't highlight solution paths)
is_camera = way_id in cameras
is_accident = w.get("accident", False)
# Style based on edge type - accidents get priority coloring
if is_accident:
# Get severity to determine color
severity = w.get("severity", "severe").lower()
# Map severity to colors: yellow=minor, orange=moderate, red=severe
severity_colors = {
"minor": "#FFC107", # Yellow
"moderate": "#FF9800", # Orange
"severe": "#F44336" # Red
}
color = severity_colors.get(severity, "#F44336") # Default to red
weight = 5
opacity = 1.0
elif is_camera:
color = '#ef4444' # Red for camera roads
weight = 3
opacity = 0.92
else:
color = '#64748b' # Slate gray for all regular roads
weight = 2.6
opacity = 0.78
# Create simple line
line = folium.PolyLine(
locations=[start_coord, end_coord],
color=color,
weight=weight,
opacity=opacity,
tooltip=f"{rn} ({time_min} min) | {u} -> {v}"
)
line.add_to(m)
register_arrow_segment(m, [start_coord, end_coord], color)
# Add overlay polylines for the selected solution path so we can animate step-by-step playback
if solution_path and len(solution_path) > 1:
for idx in range(len(solution_path) - 1):
u = solution_path[idx]
v = solution_path[idx + 1]
w = edge_lookup.get((u, v))
if w:
start_coord = (nodes[u]["lat"], nodes[u]["lon"])
end_coord = (nodes[v]["lat"], nodes[v]["lon"])
else:
start_coord = (nodes.get(u, {}).get("lat"), nodes.get(u, {}).get("lon"))
end_coord = (nodes.get(v, {}).get("lat"), nodes.get(v, {}).get("lon"))
if None in (*start_coord, *end_coord):
continue
seg = folium.PolyLine(
locations=[start_coord, end_coord],
color="#2563eb",
weight=5,
opacity=0.0,
tooltip=f"Step {idx + 1}: {u} -> {v}"
).add_to(m)
solution_segment_layers.append(seg.get_name())
# Draw nodes
start_node = meta.get("start")
goal_nodes = set(meta.get("goals", []))
node_marker_names = {}
node_marker_styles = {}
node_metadata = {}
for nid, info in nodes.items():
lat, lon = info["lat"], info["lon"]
label = info["label"]
# Determine node style - only highlight start and goal
if nid == start_node:
color = '#10b981'
fill_color = '#d1fae5'
radius = 10
icon_text = 'S'
elif nid in goal_nodes:
color = '#f59e0b'
fill_color = '#fef3c7'
radius = 10
icon_text = 'G'
else:
# All other nodes are uniform
color = '#64748b'
fill_color = '#f1f5f9'
radius = 6
icon_text = ''
if nid in solution_nodes_set and nid not in goal_nodes and nid != start_node:
color = '#2563eb'
fill_color = '#e0ecff'
radius = max(radius, 7)
# Create circle marker for node
marker = folium.CircleMarker(
location=(lat, lon),
radius=radius,
color=color,
fill=True,
fillColor=fill_color,
fillOpacity=0.9,
weight=2,
popup=f"{label}
ID: {nid}",
tooltip=label
).add_to(m)
marker_name = marker.get_name()
node_marker_names[str(nid)] = marker_name
node_marker_styles[marker_name] = {
"color": color,
"fillColor": fill_color,
"radius": radius,
"weight": 2,
"fillOpacity": 0.9,
"opacity": 1.0
}
node_metadata[str(nid)] = {"label": label}
# Add text label for start/goal nodes
if icon_text:
folium.Marker(
location=(lat, lon),
icon=folium.DivIcon(
html=f'
{icon_text}
'
)
).add_to(m)
# Inject step-by-step playback controls for the simplified map
ordered_node_markers = []
if solution_path:
for nid in solution_path:
marker_name = node_marker_names.get(str(nid))
if marker_name:
if nid == start_node:
role = "start"
elif nid in goal_nodes:
role = "goal"
else:
role = "normal"
ordered_node_markers.append({
"id": nid,
"name": marker_name,
"label": nodes[nid]["label"],
"role": role
})
if ordered_node_markers:
config_payload = {
"routeSegments": solution_segment_layers,
"routeNodes": ordered_node_markers,
"markerStyles": node_marker_styles,
"nodeMarkerLookup": node_marker_names,
"nodeMetadata": node_metadata,
"searchSteps": exploration_steps,
"routeStepDelay": 850,
"searchStepDelay": 1050
}
inject_route_playback(m, config_payload)
# Add legend
legend_html = """
Simple Graph View
One-way edge
Camera road
Start node (S)
Goal node (G)
Regular nodes
Two-way edges show two arrows on the line.
"""
m.get_root().html.add_child(folium.Element(legend_html))
inject_arrow_animation(m)
m.save(out_html)
def visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph,
out_html, solution_path=None, other_paths=None, exploration_steps=None, k_snap=8):
"""Generate interactive Folium map with road visualization"""
center_lat, center_lon = get_map_center(nodes)
# Base map uses OpenStreetMap tiles around the dataset centroid
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=16,
tiles="OpenStreetMap"
)
# Convenience set for quick membership tests while styling nodes
solution_nodes_set = set(solution_path or [])
exploration_steps = exploration_steps or []
# Pre-compute snap candidates for all nodes
snap_candidates = {}
for nid, info in nodes.items():
# Snap each custom node to multiple nearby OSM nodes for routing
snap_candidates[nid] = k_nearest_graph_nodes(
info["lat"], info["lon"], osm_nodes, graph, k=k_snap
)
# Build solution edges set
solution_edges = set()
if solution_path and len(solution_path) > 1:
for i in range(len(solution_path) - 1):
# Store each directed edge belonging to the selected solution
solution_edges.add((solution_path[i], solution_path[i+1]))
# Build other solution edges and route mapping
# Track edges for alternate solutions plus associated Folium layers
other_solution_edges = set()
edge_route_map = {}
solution_edge_layers = {}
if other_paths:
for entry in other_paths:
path = entry.get("path") if isinstance(entry, dict) else entry
route_id = entry.get("id") if isinstance(entry, dict) else None
if not path or len(path) < 2:
continue
for i in range(len(path) - 1):
edge = (path[i], path[i + 1])
if edge not in solution_edges:
other_solution_edges.add(edge)
if route_id is not None:
# Track which optional route uses a given edge
edge_route_map.setdefault(edge, set()).add(route_id)
# Keep JS references to dynamic polylines for interactivity hooks
route_polyline_refs = {}
solution_line_names = []
# Build a set of reverse edges to detect parallel roads
reverse_edges = {}
for w in ways:
reverse_edges[(w["from"], w["to"])] = w["way_id"]
# Draw all ways/roads
for w in ways:
way_id = w["way_id"]
u, v = w["from"], w["to"]
rn = w["road_name"]
hwy = w["highway_type"]
time_min = w["time_min"]
start_coord = (nodes[u]["lat"], nodes[u]["lon"])
end_coord = (nodes[v]["lat"], nodes[v]["lon"])
# Find best OSM path between nodes
best_path = None
best_len = None
for u_osm in snap_candidates[u]:
for v_osm in snap_candidates[v]:
path = dijkstra_path(graph, u_osm, v_osm)
if path is None:
continue
plen = path_length_km(path, graph)
if best_len is None or plen < best_len:
best_len = plen
best_path = path
# Build coordinates list
if best_path is None:
# Fall back to drawing a straight line if OSM snapping fails
latlngs = [start_coord, end_coord]
else:
latlngs = [(osm_nodes[nid][0], osm_nodes[nid][1]) for nid in best_path]
# Always append the true node coordinates so the line reaches the endpoints
latlngs = extend_path_to_custom_nodes(latlngs, start_coord, end_coord)
# Check if there's a reverse edge (parallel road)
has_reverse = (v, u) in reverse_edges
# Offset parallel roads slightly to make them visually distinct and clickable
if has_reverse and len(latlngs) >= 2:
# Apply perpendicular offset to separate parallel roads
offset_distance = 0.00003 # ~3 meters offset
offset_latlngs = []
for i, coord in enumerate(latlngs):
if i == 0 and i + 1 < len(latlngs):
# First point - use direction to next point
lat1, lon1 = coord
lat2, lon2 = latlngs[i + 1]
elif i > 0:
# Other points - use direction from previous point
lat2, lon2 = coord
lat1, lon1 = latlngs[i - 1]
else:
offset_latlngs.append(coord)
continue
# Calculate perpendicular offset (rotate direction by 90 degrees)
dlat = lat2 - lat1
dlon = lon2 - lon1
length = (dlat**2 + dlon**2)**0.5
if length > 0:
# Perpendicular vector (rotate 90Β° right)
perp_lat = -dlon / length * offset_distance
perp_lon = dlat / length * offset_distance
offset_latlngs.append((coord[0] + perp_lat, coord[1] + perp_lon))
else:
offset_latlngs.append(coord)
latlngs = offset_latlngs
# Determine edge type and styling
is_solution = (u, v) in solution_edges
route_ids_for_edge = list(edge_route_map.get((u, v), []))
is_other_solution = (u, v) in other_solution_edges
is_camera = way_id in cameras
# These booleans determine styling (color/weight/dashed) for each polyline
tooltip_text = f"{rn} ({time_min} min)"
popup_html = (
f"{rn}
"
f"way_id: {way_id}
"
f"type: {hwy}
"
f"time: {time_min} min
"
)
if is_camera:
popup_html += "CAMERA MONITORED
Accident x time multiplied
"
if is_solution:
popup_html += "* SELECTED SOLUTION *"
elif is_other_solution:
popup_html += "* OTHER SOLUTION *"
# Create appropriate polyline
# Create appropriate polyline
polyline_for_events = None
# Check if this road has an accident (draw accident color first if needed)
has_accident = w.get("accident", False)
severity = w.get("severity", "none").lower() if has_accident else "none"
# Only draw accident color if severity is not 'none'
if has_accident and severity != "none":
severity_colors = {
"minor": "#FFC107", # Yellow
"moderate": "#FF9800", # Orange
"severe": "#F44336" # Red
}
accident_color = severity_colors.get(severity, "#F44336")
# Draw accident color as base layer (will be under blue if it's a solution path)
folium.PolyLine(
locations=latlngs,
color=accident_color,
weight=7,
opacity=1.0,
tooltip=tooltip_text + f" (ACCIDENT: {severity.upper()})"
).add_to(m)
# π΅ 1. SELECTED SOLUTION (blue) - DRAW ON TOP
if is_solution:
polyline_for_events = create_polyline(
latlngs, "#1f6feb", 5, None, 0.85, tooltip_text, popup_html, m
)
layer_name = polyline_for_events.get_name()
solution_line_names.append(layer_name)
solution_edge_layers[(u, v)] = layer_name
# πΉ 2. OTHER SOLUTIONS
elif is_other_solution:
folium.PolyLine(
locations=latlngs, color="#1c4fd8", weight=7,
dash_array=None, opacity=0.9
).add_to(m)
polyline_for_events = create_polyline(
latlngs, "#e4f0ff", 5, None, 0.95, tooltip_text, popup_html, m
)
# π₯ 3. ACCIDENT ROADS (only if not already drawn above)
elif has_accident:
# Already drawn as base layer above, just create event handler
polyline_for_events = create_polyline(
latlngs, "transparent", 1, None, 0,
tooltip_text + f" (ACCIDENT: {severity.upper()})",
popup_html,
m
)
# πΈ 4. CAMERA ROADS
elif is_camera:
polyline_for_events = create_polyline(
latlngs, "crimson", 3, "8,4", 0.8,
tooltip_text, popup_html, m
)
# π£ 5. NORMAL ROADS
else:
weight = 3 if hwy in ["primary", "secondary"] else 2
polyline_for_events = create_polyline(
latlngs, color_for_highway(hwy), weight, None, 0.25,
tooltip_text, popup_html, m
)
# Attach click events
if route_ids_for_edge:
layer_name = polyline_for_events.get_name()
for rid in route_ids_for_edge:
route_polyline_refs.setdefault(rid, []).append(layer_name)
# Add interactivity scripts for route selection
if route_polyline_refs or solution_line_names:
script_lines = [""
])
m.get_root().html.add_child(folium.Element("\n".join(script_lines)))
# Draw node markers
start_node = meta.get("start")
goal_nodes = set(meta.get("goals", []))
# Store references to folium markers for later animation/click handling
node_marker_names = {}
node_marker_styles = {}
node_metadata = {}
for nid, info in nodes.items():
lat, lon, label = info["lat"], info["lon"], info["label"]
# Determine node styling
if nid == start_node:
fill_color = "green"
role = "START"
elif nid in goal_nodes:
fill_color = "#ffd447"
role = "GOAL"
else:
fill_color = "white"
role = "Node"
if nid in solution_nodes_set:
border_color = "#0ea5e9"
border_weight = 2
else:
border_color = "black"
border_weight = 1
if nid == start_node or nid in goal_nodes:
border_weight = 1
circle_marker = folium.CircleMarker(
location=(lat, lon),
radius=8,
color=border_color,
weight=border_weight,
fill=True,
fill_color=fill_color,
fill_opacity=0.9,
opacity=0.9,
popup=folium.Popup(
f"Node {nid}: {label}
"
f"lat: {lat:.6f}
lon: {lon:.6f}
{role}",
max_width=250
),
tooltip=f"{nid}: {label} ({role})"
).add_to(m)
marker_name = circle_marker.get_name()
node_marker_names[str(nid)] = marker_name
node_marker_styles[marker_name] = {
"color": border_color,
"fillColor": fill_color,
"radius": 8,
"weight": border_weight,
"fillOpacity": 0.9,
"opacity": 1.0
}
node_metadata[str(nid)] = {"label": label}
# Save styling info so animation can temporarily modify the markers
# Add label using HTML so node id and label stay visible
folium.map.Marker(
[lat, lon],
icon=folium.DivIcon(
html=f"""
{nid}: {label}
""",
icon_size=(0, 0),
icon_anchor=(-10, 10)
)
).add_to(m)
# Add path animation for solution
if solution_path:
ordered_segment_layers = []
for i in range(len(solution_path) - 1):
layer_name = solution_edge_layers.get((solution_path[i], solution_path[i + 1]))
if layer_name:
# Preserve the segment order so the animation highlights edges sequentially
ordered_segment_layers.append(layer_name)
ordered_node_markers = []
for nid in solution_path:
marker_name = node_marker_names.get(str(nid))
if marker_name:
if nid == start_node:
role = "start"
elif nid in goal_nodes:
role = "goal"
else:
role = "normal"
ordered_node_markers.append({
"id": nid,
"name": marker_name,
"label": nodes[nid]["label"],
"role": role
})
if ordered_node_markers:
config_payload = {
"routeSegments": ordered_segment_layers,
"routeNodes": ordered_node_markers,
"markerStyles": node_marker_styles,
"nodeMarkerLookup": node_marker_names,
"nodeMetadata": node_metadata,
"searchSteps": exploration_steps,
"routeStepDelay": 900,
"searchStepDelay": 1100
}
script_template = """
"""
animation_script = script_template.replace("__CONFIG__", json.dumps(config_payload))
m.get_root().html.add_child(folium.Element(animation_script))
# Add legend
acc_mult = meta.get("accident_multiplier", 1)
# Provide a custom legend so the user understands each styling cue
legend_html = f"""
Kuching Heritage Graph
β¬ Selected solution
β¬ Other solution paths
β¬ Camera road (accident x time x {acc_mult})
β¬ Primary road
β¬ Secondary road
β¬ Tertiary road
β¬ Service / alley
"""
m.get_root().html.add_child(folium.Element(legend_html))
inject_arrow_animation(m)
m.save(out_html)
# ============================================================================
# FLASK APP SETUP
# ============================================================================
BASE_DIR = os.path.dirname(__file__)
MAP_DATA_DIR = os.path.join(BASE_DIR, 'algorithms')
# Load all algorithms dynamically
ALGORITHM_CONFIG = {
'astar': ('aStar.py', 'aStar'),
'beam': ('beam.py', 'beam_search_lds'),
'dfs': ('dfs.py', 'dfs'),
'gbfs': ('gbfs.py', 'gbfs'),
'iddfs': ('iddfs.py', 'iddfs'),
'bfs': ('bfs.py', 'bfs')
}
algorithms_impl = {}
for name, (filename, func_name) in ALGORITHM_CONFIG.items():
try:
# Attempt to import each algorithm strategy from the algorithms folder
algorithms_impl[name] = load_func_from_path(
os.path.join(MAP_DATA_DIR, filename), func_name
)
except Exception:
# Missing/invalid implementations are recorded as None for graceful fallback
algorithms_impl[name] = None
app = Flask(__name__)
app.secret_key = 'your-secret-key-here-change-in-production'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024
app.config['ALLOWED_EXTENSIONS'] = {'txt', 'osm'}
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_PERMANENT'] = False
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs('static', exist_ok=True)
def allowed_file(filename):
"""Check if file extension is allowed"""
# Ensure uploads only include supported text or OSM inputs
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
# ============================================================================
# FLASK ROUTES
# ============================================================================
@app.route('/')
def index():
return render_template('index.html')
@app.route('/test_accidents')
def test_accidents():
"""Test page for accident system"""
return render_template('test_accidents.html')
@app.route('/get_default_nodes', methods=['GET'])
def get_default_nodes():
"""Get nodes from default map without uploading files"""
try:
if DEFAULT_MAP_DATA['nodes'] is None:
return jsonify({'error': 'Default map not available'}), 404
nodes = DEFAULT_MAP_DATA['nodes']
ways = copy.deepcopy(DEFAULT_MAP_DATA['ways'])
cameras = DEFAULT_MAP_DATA['cameras']
meta = DEFAULT_MAP_DATA['meta']
osm_nodes = DEFAULT_MAP_DATA['osm_nodes']
graph = DEFAULT_MAP_DATA['graph']
# Clear any previous session data and mark as using default
session.clear()
session['using_default_map'] = True
session['has_uploaded_files'] = False
session.modified = True
# Return list of available nodes
nodes_list = [{'id': nid, 'label': info['label']} for nid, info in nodes.items()]
# List of ways for accident system
ways_list = [{
"way_id": w["way_id"],
"from": w["from"],
"to": w["to"],
"road_name": w["road_name"],
"highway_type": w["highway_type"]
} for w in ways]
# Generate base maps
base_map_path = os.path.join('static', 'heritage_map_base.html')
base_simple_map_path = os.path.join('static', 'heritage_map_simple_base.html')
visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph, base_map_path, solution_path=None)
visualize_simple_graph(nodes, ways, cameras, meta, base_simple_map_path, solution_path=None, other_paths=None)
return jsonify({
'success': True,
'nodes': nodes_list,
'ways': ways_list,
'map_url': "/static/heritage_map_base.html",
'map_url_simple': "/static/heritage_map_simple_base.html",
'using_default': True
})
except Exception as e:
print(f"β Error in get_default_nodes: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/upload_files', methods=['POST'])
def upload_files():
"""Upload files and return available nodes"""
try:
if 'txt_file' not in request.files or 'osm_file' not in request.files:
return jsonify({'error': 'Both .txt and .osm files are required'}), 400
txt_file = request.files['txt_file']
osm_file = request.files['osm_file']
algorithm = request.form.get('algorithm', 'bfs')
if txt_file.filename == '' or osm_file.filename == '':
return jsonify({'error': 'Please select both files'}), 400
if not (allowed_file(txt_file.filename) and allowed_file(osm_file.filename)):
return jsonify({'error': 'Invalid file type'}), 400
# Sanitize filenames and save to the upload folder
txt_filename = secure_filename(txt_file.filename)
osm_filename = secure_filename(osm_file.filename)
txt_path = os.path.join(app.config['UPLOAD_FOLDER'], txt_filename)
osm_path = os.path.join(app.config['UPLOAD_FOLDER'], osm_filename)
txt_file.save(txt_path)
osm_file.save(osm_path)
# Parse files and cache in session
nodes, ways, cameras, meta = parse_map_data_file(txt_path)
osm_nodes, graph = load_osm_graph(osm_path)
# Store in global cache (survives across requests)
import time
UPLOADED_MAP_CACHE['nodes'] = nodes
UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways)
UPLOADED_MAP_CACHE['cameras'] = cameras
UPLOADED_MAP_CACHE['meta'] = meta
UPLOADED_MAP_CACHE['osm_nodes'] = osm_nodes
UPLOADED_MAP_CACHE['graph'] = graph
UPLOADED_MAP_CACHE['txt_path'] = txt_path
UPLOADED_MAP_CACHE['osm_path'] = osm_path
UPLOADED_MAP_CACHE['timestamp'] = time.time()
# Store paths in session for later use
session['txt_path'] = txt_path
session['osm_path'] = osm_path
session['algorithm'] = algorithm
session['using_default_map'] = False
session['has_uploaded_files'] = True
session['nodes_count'] = len(nodes)
session['ways_count'] = len(ways)
# Clear all previous state when new files are uploaded
session.pop("modified_ways", None) # RESET modified accident data
session.pop("edge_accidents", None) # RESET accident records
session.pop("origin", None) # RESET origin
session.pop("destination", None) # RESET destination
session.modified = True
# Return list of available nodes
# Return list of available nodes
nodes_list = [{'id': nid, 'label': info['label']} for nid, info in nodes.items()]
# NEW: list of ways for accident system
ways_list = [{
"way_id": w["way_id"],
"from": w["from"],
"to": w["to"],
"road_name": w["road_name"],
"highway_type": w["highway_type"]
} for w in ways]
# Generate base maps (without highlighted paths) so users see the area right after upload
base_map_path = os.path.join('static', 'heritage_map_base.html')
base_simple_map_path = os.path.join('static', 'heritage_map_simple_base.html')
visualize_with_roads(nodes, ways, cameras, meta, osm_nodes, graph, base_map_path, solution_path=None)
visualize_simple_graph(nodes, ways, cameras, meta, base_simple_map_path, solution_path=None, other_paths=None)
return jsonify({
'success': True,
'nodes': nodes_list,
'ways': ways_list, # IMPORTANT
'map_url': "/static/heritage_map_base.html",
'map_url_simple': "/static/heritage_map_simple_base.html"
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/find_routes', methods=['POST'])
def find_routes():
"""Find routes with user-specified origin and destination"""
try:
data = request.get_json()
# Pull the requested origin/destination and allow client to override algorithm
origin = data.get('origin')
destination = data.get('destination')
algorithm = data.get('algorithm', session.get('algorithm', 'bfs'))
if not origin or not destination:
# Input validation before expensive parsing work
return jsonify({'error': 'Origin and destination are required'}), 400
# Save for later (used by accident system + ALL algorithms comparison)
session['origin'] = origin
session['destination'] = destination
# Get current map data (uploaded or default)
nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path = get_current_map_data()
if nodes is None:
return jsonify({'error': 'Please upload files first'}), 400
# Check if accidents are applied (use global store)
edge_accidents = GLOBAL_ACCIDENTS
if edge_accidents:
print(f"β
Route finding with {len(edge_accidents)} active accident(s)")
for way_id, info in edge_accidents.items():
print(f" π¨ {info['road_name']}: {info['severity']} ({info['multiplier']}x)")
else:
print(f"βΉοΈ Route finding with no accidents")
# Build edges map for pathfinding
edges_map = build_edges_map(ways)
# Algorithms expect nodes mapped to coordinate tuples, not metadata dicts
algo_nodes = {nid: (info["lat"], info["lon"]) for nid, info in nodes.items()}
# Get the selected algorithm function
algo_func = algorithms_impl.get(algorithm.lower())
if algo_func is None:
return jsonify({'error': f'Algorithm "{algorithm}" is not available or failed to load'}), 400
# Run the selected algorithm with step tracking
result = algo_func(algo_nodes, edges_map, origin, [destination], track=False, visualise=True)
if result[0] is None or not result[2]: # Check if goal was found and path exists
return jsonify({'error': f'No path found using {algorithm} algorithm'}), 404
exploration_steps = []
if len(result) >= 4:
goal_node, nodes_created, path, exploration_steps = result[0], result[1], result[2], result[3]
else:
goal_node, nodes_created, path = result[0], result[1], result[2]
# Calculate the actual cost of the path
def calculate_path_cost(p):
total = 0
for i in range(len(p) - 1):
edge_key = (p[i], p[i + 1])
if edge_key in edges_map:
total += edges_map[edge_key]["time"]
return total
total_cost = calculate_path_cost(path)
# Prepare route data
route_definitions = []
routes = []
found_paths = set()
# Add the primary path from the selected algorithm as route 0
route_definitions.append({'id': 0, 'path': path})
path_str_parts = [str(nid) for nid in path]
routes.append({
'id': 0,
'path': path,
'cost': round(total_cost, 2),
'length': len(path),
'path_str': ' β '.join(path_str_parts),
'primary': True
})
found_paths.add(tuple(path))
# Find alternative paths by temporarily removing edges and re-running algorithm
alt_id = 1
max_alternatives = 4
edges_to_remove = []
# Try to find alternatives by blocking edges from the primary path
for i in range(len(path) - 1):
if alt_id > max_alternatives:
break
# Create a modified edge map excluding one edge from the original path
modified_edges = edges_map.copy()
edge_to_block = (path[i], path[i + 1])
if edge_to_block in modified_edges:
del modified_edges[edge_to_block]
edges_to_remove.append(edge_to_block)
# Run algorithm with modified edges
try:
alt_result = algo_func(algo_nodes, modified_edges, origin, [destination], track=False, visualise=False)
if alt_result[0] is not None and alt_result[2]:
alt_path = alt_result[2]
# Only add if it's a different path
if tuple(alt_path) not in found_paths:
alt_cost = calculate_path_cost(alt_path)
route_definitions.append({'id': alt_id, 'path': alt_path})
path_str_parts = [str(nid) for nid in alt_path]
routes.append({
'id': alt_id,
'path': alt_path,
'cost': round(alt_cost, 2),
'length': len(alt_path),
'path_str': ' β '.join(path_str_parts),
'primary': False
})
found_paths.add(tuple(alt_path))
alt_id += 1
except Exception:
# If algorithm fails with modified edges, continue to next attempt
pass
# Generate individual maps for each route
# Generate individual maps for each route (both OSM and simplified views)
for route_def in route_definitions:
# Each result gets its own HTML map with the chosen path highlighted
map_output = os.path.join('static', f'heritage_map_{route_def["id"]}.html')
simple_output = os.path.join('static', f'heritage_map_simple_{route_def["id"]}.html')
display_meta = {
"start": origin,
"goals": [destination],
"accident_multiplier": meta.get("accident_multiplier", 1)
}
try:
# Generate OSM view
visualize_with_roads(
nodes, ways, cameras, display_meta, osm_nodes, graph,
map_output, route_def['path'], other_paths=None,
exploration_steps=exploration_steps
)
# Generate simplified graph view
visualize_simple_graph(
nodes, ways, cameras, display_meta,
simple_output, route_def['path'], other_paths=None,
exploration_steps=exploration_steps
)
except Exception as e:
print(f"Error generating maps for route {route_def['id']}: {str(e)}")
# Continue with other routes even if one fails
# Sort routes by cost but keep primary route first
# Separate primary from alternatives, sort alternatives, then recombine
primary_route = [r for r in routes if r.get('primary', False)]
alternative_routes = [r for r in routes if not r.get('primary', False)]
alternative_routes.sort(key=lambda r: r['cost'])
# Final route list: primary first, then sorted alternatives
sorted_routes = primary_route + alternative_routes
return jsonify({
'success': True,
'routes': sorted_routes,
'algorithm': algorithm.upper(),
'stats': {
'nodes': len(nodes),
'ways': len(ways),
'routes_found': len(sorted_routes),
'nodes_created': nodes_created
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ========================================================================
# ACCIDENT IMAGE CLASSIFICATION API
# ========================================================================
@app.route('/predict_accident', methods=['POST'])
def predict_accident():
"""Classify accident image severity using AI model"""
model_name = request.form.get("model", "resnet")
# Select model
if model_name == "mobilenet":
model = mobilenet_model
elif model_name == "resnet":
model = resnet_model
elif model_name == "efficientnet":
if efficientnet_model is None:
return jsonify({"success": False, "error": "EfficientNet model not available"})
model = efficientnet_model
else:
return jsonify({"success": False, "error": "Invalid model"}), 400
# Load image
if "image" not in request.files:
return jsonify({"success": False, "error": "No image uploaded"}), 400
img_file = request.files["image"]
img = Image.open(img_file).convert("RGB")
# Preprocess
x = accident_transform(img).unsqueeze(0)
# Predict
with torch.no_grad():
logits = model(x)
probs = torch.softmax(logits, dim=1)
confidence, label_idx = torch.max(probs, dim=1)
CLASS_NAMES = ["none", "minor", "moderate", "severe"]
predicted_severity = CLASS_NAMES[int(label_idx.item())]
confidence_val = float(confidence.item())
# Get all probabilities for each class
all_probs = {
CLASS_NAMES[i]: float(probs[0][i].item())
for i in range(len(CLASS_NAMES))
}
return jsonify({
"success": True,
"label": predicted_severity,
"confidence": confidence_val,
"all_probabilities": all_probs
})
@app.route('/reset_accident', methods=['POST'])
def reset_accident():
"""Remove accident impact and revert to original travel times"""
try:
# Clear global accidents
GLOBAL_ACCIDENTS.clear()
print("β
Cleared all accidents from GLOBAL_ACCIDENTS")
# Clear session
session.pop("edge_accidents", None)
session.pop("modified_ways", None)
session.pop("default_map_modified_ways", None)
session.modified = True
# Reset UPLOADED cache ways to original
if UPLOADED_MAP_CACHE['txt_path'] and os.path.exists(UPLOADED_MAP_CACHE['txt_path']):
_, original_ways, _, _ = parse_map_data_file(UPLOADED_MAP_CACHE['txt_path'])
UPLOADED_MAP_CACHE['ways'] = original_ways
print("β
Reset UPLOADED_MAP_CACHE ways to original")
# Reset DEFAULT map ways to original
if DEFAULT_MAP_DATA['txt_path'] and os.path.exists(DEFAULT_MAP_DATA['txt_path']):
_, original_ways, _, _ = parse_map_data_file(DEFAULT_MAP_DATA['txt_path'])
DEFAULT_MAP_DATA['ways'] = original_ways
print("β
Reset DEFAULT_MAP_DATA ways to original")
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/clear_cache', methods=['POST'])
def clear_cache():
"""Clear uploaded map cache and reset session"""
try:
# Clear uploaded cache
for key in UPLOADED_MAP_CACHE:
UPLOADED_MAP_CACHE[key] = None
# Clear global accidents
GLOBAL_ACCIDENTS.clear()
# Clear session
session.clear()
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/debug_session', methods=['GET'])
def debug_session():
"""Debug endpoint to check session and cache state"""
global_accidents_list = []
for road_id, info in GLOBAL_ACCIDENTS.items():
global_accidents_list.append({
"road_id": road_id,
"road_name": info.get("road_name", "Unknown"),
"severity": info.get("severity", "unknown"),
"multiplier": info.get("multiplier", 1.0)
})
return jsonify({
"session": dict(session),
"global_accidents": dict(GLOBAL_ACCIDENTS),
"global_accidents_count": len(GLOBAL_ACCIDENTS),
"global_accidents_list": global_accidents_list,
"has_uploaded_cache": UPLOADED_MAP_CACHE['nodes'] is not None,
"has_default_map": DEFAULT_MAP_DATA['nodes'] is not None,
"uploaded_cache_timestamp": UPLOADED_MAP_CACHE.get('timestamp'),
"uploaded_cache_txt_path": UPLOADED_MAP_CACHE.get('txt_path'),
"default_map_txt_path": DEFAULT_MAP_DATA.get('txt_path')
})
@app.route('/remove_accident', methods=['POST'])
def remove_accident():
"""Remove a specific accident from a road"""
try:
data = request.get_json()
road_id = data.get("road_id")
if not road_id:
return jsonify({"success": False, "error": "Missing road_id"}), 400
# Use string key for consistency
road_id_str = str(road_id)
# Check global accidents store
if road_id_str not in GLOBAL_ACCIDENTS:
return jsonify({"success": False, "error": f"No accident found for road {road_id_str}"}), 400
# Remove the accident from global store
removed_info = GLOBAL_ACCIDENTS.pop(road_id_str)
print(f"ποΈ Removing accident from road {road_id_str}: {removed_info['road_name']}")
print(f" Remaining accidents in GLOBAL_ACCIDENTS: {len(GLOBAL_ACCIDENTS)}")
# Update session
session["edge_accidents"] = dict(GLOBAL_ACCIDENTS)
session.modified = True
# Get fresh copy of original ways by re-parsing the file
import copy
using_uploaded = False
using_default = False
if UPLOADED_MAP_CACHE['nodes'] is not None:
# Get original ways from file (not cache)
original_txt = UPLOADED_MAP_CACHE['txt_path']
nodes, ways, cameras, meta = parse_map_data_file(original_txt)
osm_nodes = UPLOADED_MAP_CACHE['osm_nodes']
graph = UPLOADED_MAP_CACHE['graph']
txt_path = UPLOADED_MAP_CACHE['txt_path']
osm_path = UPLOADED_MAP_CACHE['osm_path']
using_uploaded = True
elif DEFAULT_MAP_DATA['nodes'] is not None:
# Get original ways from file (not cache)
original_txt = DEFAULT_MAP_DATA['txt_path']
nodes, ways, cameras, meta = parse_map_data_file(original_txt)
osm_nodes = DEFAULT_MAP_DATA['osm_nodes']
graph = DEFAULT_MAP_DATA['graph']
txt_path = DEFAULT_MAP_DATA['txt_path']
osm_path = DEFAULT_MAP_DATA['osm_path']
using_default = True
else:
return jsonify({"success": False, "error": "No map data available"}), 400
print(f"π Rebuilding ways after removing accident from road {road_id}")
# Initialize original times
for w in ways:
if "original_time" not in w:
w["original_time"] = w["time_min"]
# Reapply remaining accidents from GLOBAL_ACCIDENTS
print(f"\nπ§ Reapplying {len(GLOBAL_ACCIDENTS)} remaining accident(s)...")
for way_id, accident_info in GLOBAL_ACCIDENTS.items():
multiplier = accident_info["multiplier"]
found = False
for w in ways:
if str(w["way_id"]) == str(way_id):
w["time_min"] = w["original_time"] * multiplier
w["accident"] = True
w["severity"] = accident_info["severity"]
found = True
print(f"β
Reapplied {accident_info['severity']} ({multiplier}x) to way {way_id}: {w.get('road_name', 'Unknown')}")
break
if not found:
print(f"β οΈ Way {way_id} not found during reapplication")
# Update cache and session
if GLOBAL_ACCIDENTS:
session["modified_ways"] = ways
if using_uploaded:
UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways)
print("β
Updated UPLOADED_MAP_CACHE with remaining accidents")
elif using_default:
DEFAULT_MAP_DATA['ways'] = copy.deepcopy(ways)
print("β
Updated DEFAULT_MAP_DATA with remaining accidents")
else:
# No accidents left - clear everything and restore original ways
session.pop("modified_ways", None)
session.pop("default_map_modified_ways", None)
if using_uploaded:
# Reload original ways
_, original_ways, _, _ = parse_map_data_file(UPLOADED_MAP_CACHE['txt_path'])
UPLOADED_MAP_CACHE['ways'] = original_ways
print("β
Reset UPLOADED_MAP_CACHE to original (no accidents)")
elif using_default:
# Reload original ways
_, original_ways, _, _ = parse_map_data_file(DEFAULT_MAP_DATA['txt_path'])
DEFAULT_MAP_DATA['ways'] = original_ways
print("β
Reset DEFAULT_MAP_DATA to original (no accidents)")
session.modified = True
# Always regenerate map to show current state (with or without remaining accidents)
accident_map_output = os.path.join('static', 'heritage_map_accidents.html')
accident_map_simple = os.path.join('static', 'heritage_map_simple_accidents.html')
try:
# Use the ways we just rebuilt (with accidents cleared)
visualize_with_roads(
nodes, ways, cameras,
{"start": None, "goals": []},
osm_nodes, graph,
accident_map_output,
solution_path=None,
exploration_steps=[]
)
visualize_simple_graph(
nodes, ways, cameras,
{"start": None, "goals": []},
accident_map_simple,
solution_path=None,
other_paths=None,
exploration_steps=[]
)
print(f"β
Regenerated map with {len(GLOBAL_ACCIDENTS)} remaining accident(s)")
except Exception as e:
print(f"β οΈ Error regenerating accident map: {str(e)}")
# Return updated accident list from global store
all_accidents = {}
for way_id, accident_info in GLOBAL_ACCIDENTS.items():
all_accidents[way_id] = {
"road_name": accident_info["road_name"],
"severity": accident_info["severity"],
"from_node": accident_info.get("from_node", "?"),
"to_node": accident_info.get("to_node", "?")
}
import time
timestamp = int(time.time())
return jsonify({
"success": True,
"removed_road": removed_info["road_name"],
"accidents": all_accidents,
"map_url": f"/static/heritage_map_accidents.html?t={timestamp}",
"has_accidents": len(GLOBAL_ACCIDENTS) > 0
})
except Exception as e:
print(f"β Error removing accident: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": str(e)}), 500
# ========================================================================
# APPLY ACCIDENT IMPACT TO SPECIFIC EDGES IN ROUTES
# ========================================================================
@app.route('/apply_accident', methods=['POST'])
def apply_accident():
"""
Apply accident severity multiplier to a specific way (road).
Stores accidents in session for later route calculation.
Supports multiple accidents on different roads.
"""
try:
print("\nπ₯ APPLY ACCIDENT TO WAY")
# Get road_id and severity from request
data = request.get_json()
road_id = data.get("road_id")
severity = data.get("severity")
if not road_id or not severity:
return jsonify({"success": False, "error": "Missing road_id or severity"}), 400
# --- LOAD DATA ---
nodes, ways, cameras, meta, osm_nodes_data, graph_data, txt_path, osm_path = get_current_map_data()
if nodes is None:
print("β No map data available!")
return jsonify({"success": False, "error": "No map data available. Please upload map files or refresh the page."}), 400
# Use global accident store (persists across all requests)
edge_accidents = GLOBAL_ACCIDENTS
# Check if this road already has an accident
is_override = str(road_id) in edge_accidents
# Get road name and nodes for response
road_name = "Unknown"
from_node = "?"
to_node = "?"
for w in ways:
if str(w["way_id"]) == str(road_id):
road_name = w.get("road_name", "Unknown")
from_node = w.get("from", "?")
to_node = w.get("to", "?")
break
# Store this accident for the road (way_id) - use string key for consistency
GLOBAL_ACCIDENTS[str(road_id)] = {
"severity": severity,
"multiplier": SEVERITY_MULTIPLIERS.get(severity.lower(), 1.0),
"road_name": road_name,
"from_node": from_node,
"to_node": to_node
}
# Also update session for backwards compatibility
session["edge_accidents"] = dict(GLOBAL_ACCIDENTS)
session.modified = True
print(f"β
{'Overridden' if is_override else 'Applied'} accident: {road_id} ({road_name}) = {severity}")
print(f" Total accidents in GLOBAL_ACCIDENTS: {len(GLOBAL_ACCIDENTS)}")
print(f" All accidents: {list(GLOBAL_ACCIDENTS.keys())}")
print(f"β
{'Overridden' if is_override else 'Applied'} accident: {road_id} ({road_name}) = {severity} (multiplier: {edge_accidents[road_id]['multiplier']})")
# ==========================================================
# Apply accidents to ways for later route calculation
# ==========================================================
for w in ways:
# Store original time if not already stored
if "original_time" not in w:
w["original_time"] = w["time_min"]
# Reset to original
w["time_min"] = w["original_time"]
w["accident"] = False
# Apply accidents to specific ways (roads)
affected_ways = []
print(f"\nπ§ Applying {len(edge_accidents)} accident(s) to ways...")
for way_id, accident_info in edge_accidents.items():
multiplier = accident_info["multiplier"]
found = False
for w in ways:
# Check if this is the affected way (compare as strings)
if str(w["way_id"]) == str(way_id):
original_time = w["original_time"]
new_time = original_time * multiplier
w["time_min"] = new_time
w["accident"] = True
w["severity"] = accident_info["severity"]
affected_ways.append(way_id)
found = True
print(f"π¨ Applied {accident_info['severity']} ({multiplier}x) to way {way_id}: {w.get('road_name', 'Unknown')}")
print(f" β±οΈ {original_time:.2f} min β {new_time:.2f} min")
break
if not found:
print(f"β οΈ Way {way_id} not found in ways list!")
# Store modified ways in BOTH session AND cache
session["modified_ways"] = ways
session.modified = True
# Update the cache with modified ways so all routes see the accidents
if UPLOADED_MAP_CACHE['nodes'] is not None:
import copy
UPLOADED_MAP_CACHE['ways'] = copy.deepcopy(ways)
print(f"β
Updated UPLOADED_MAP_CACHE with accidents")
elif DEFAULT_MAP_DATA['nodes'] is not None:
# For default map, update the DEFAULT_MAP_DATA directly
import copy
DEFAULT_MAP_DATA['ways'] = copy.deepcopy(ways)
print(f"β
Updated DEFAULT_MAP_DATA with accidents")
print(f"π¦ Total accidents stored: {len(edge_accidents)}")
# ==========================================================
# Generate accident visualization map immediately
# ==========================================================
osm_nodes = osm_nodes_data
graph = graph_data
accident_map_output = os.path.join('static', 'heritage_map_accidents.html')
accident_map_simple = os.path.join('static', 'heritage_map_simple_accidents.html')
try:
# Create detailed map with accident roads highlighted
visualize_with_roads(
nodes, ways, cameras,
{"start": None, "goals": []},
osm_nodes, graph,
accident_map_output,
solution_path=None,
exploration_steps=[]
)
# Create simple map with accident roads highlighted
visualize_simple_graph(
nodes, ways, cameras,
{"start": None, "goals": []},
accident_map_simple,
solution_path=None,
other_paths=None,
exploration_steps=[]
)
print(f"β
Generated accident visualization maps")
except Exception as e:
print(f"β οΈ Error generating accident maps: {str(e)}")
import traceback
traceback.print_exc()
# ==========================================================
# Return all accidents for frontend to display
# ==========================================================
all_accidents = {}
for way_id, accident_info in GLOBAL_ACCIDENTS.items():
all_accidents[way_id] = {
"road_name": accident_info["road_name"],
"severity": accident_info["severity"],
"from_node": accident_info.get("from_node", "?"),
"to_node": accident_info.get("to_node", "?")
}
return jsonify({
"success": True,
"road_id": road_id,
"road_name": road_name,
"severity": severity,
"is_override": is_override,
"accidents": all_accidents,
"map_url": "/static/heritage_map_accidents.html",
"map_simple_url": "/static/heritage_map_simple_accidents.html"
})
except Exception as e:
print(f"β Error in apply_accident: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"error": f"Server error: {str(e)}"
}), 500
# ========================================================================
# ALL ALGORITHMS COMPARISON (WITH EDGE ACCIDENT SUPPORT)
# ========================================================================
@app.route('/find_all_algorithms', methods=['POST'])
def find_all_algorithms():
"""Compare all algorithms with current edge accident conditions"""
# Try to get from request body first, fallback to session
data = request.get_json() or {}
origin = data.get("origin") or session.get("origin")
destination = data.get("destination") or session.get("destination")
print("STEP 4 ORIGIN =", origin)
print("STEP 4 DESTINATION =", destination)
if not origin or not destination:
return jsonify({"success": True, "routes": []})
nodes, ways, cameras, meta, osm_nodes, graph, txt_path, osm_path = get_current_map_data()
if nodes is None:
return jsonify({"success": False, "error": "No map data available"}), 400
# If modified ways exist (accident applied), use them
if "modified_ways" in session:
ways = session["modified_ways"]
print(f"β
Using modified ways with {len(GLOBAL_ACCIDENTS)} accidents applied")
edges_map = build_edges_map(ways)
algo_nodes = {nid: (info["lat"], info["lon"]) for nid, info in nodes.items()}
results = []
for algo_name, func in algorithms_impl.items():
if func is None:
continue
try:
# Run algorithm with tracking so the exploration panel can be used
result = func(algo_nodes, edges_map, origin, [destination],
track=True, visualise=True)
if result is None or len(result) < 3:
continue
path = result[2]
exploration_steps = result[3] if len(result) >= 4 else []
if not path:
continue
total_cost = 0
for i in range(len(path) - 1):
key = (path[i], path[i + 1])
if key in edges_map:
total_cost += edges_map[key]["time"]
# Save each algorithm map
out_html = f"static/algomap_{algo_name}.html"
simple_out_html = f"static/algomap_simple_{algo_name}.html"
visualize_with_roads(
nodes, ways, cameras,
{"start": origin, "goals": [destination],
"accident_multiplier": meta.get("accident_multiplier", 1)},
osm_nodes, graph,
out_html,
solution_path=path,
exploration_steps=exploration_steps
)
visualize_simple_graph(
nodes, ways, cameras,
{"start": origin, "goals": [destination],
"accident_multiplier": meta.get("accident_multiplier", 1)},
simple_out_html,
solution_path=path,
other_paths=None,
exploration_steps=exploration_steps
)
results.append({
"algorithm": algo_name.upper(),
"path": path,
"cost": round(total_cost, 2),
"length": len(path),
"path_str": " β ".join(path),
"map_url": f"/static/algomap_{algo_name}.html",
"map_url_simple": f"/static/algomap_simple_{algo_name}.html"
})
except Exception as e:
print("ERROR in algo", algo_name, ":", e)
continue
return jsonify({"success": True, "routes": results})
# ========================================================================
# CLEAR ACCIDENT STATE (FOR NEW ROUTE SELECTION)
# ========================================================================
@app.route('/clear_route_state', methods=['POST'])
def clear_route_state():
"""Clear accidents and modified ways when selecting new origin/destination"""
try:
session.pop("edge_accidents", None)
session.pop("modified_ways", None)
session.modified = True
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
if __name__ == '__main__':
# Load default map data on startup (for Hugging Face deployment)
load_default_map_data()
# Get port from environment variable (Hugging Face uses 7860)
port = int(os.environ.get('PORT', 7860))
# Check if running in production (Hugging Face Spaces)
is_production = os.environ.get('SPACE_ID') is not None
if not is_production:
# Only open browser in local development
import threading
import webbrowser
import time
def _open_browser_after_delay(url, delay=1.0):
try:
time.sleep(delay)
webbrowser.open_new(url)
except Exception:
pass
url = f'http://127.0.0.1:{port}/'
threading.Thread(target=_open_browser_after_delay, args=(url, 1.0), daemon=True).start()
# Run Flask - debug mode off in production
app.run(debug=not is_production, host='0.0.0.0', port=port, use_reloader=False)