EUDORA / backend /routing /graph_builder.py
ThEyAtH's picture
eudora backend
0e4403d
"""
graph_builder.py
Builds or loads the Indore road network graph.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
SPEED OPTIMISATION: PICKLE INSTEAD OF GRAPHML
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
WHY GRAPHML IS SLOW:
GraphML is an XML text format. Every node ID, coordinate, edge
attribute (length, base_time, traffic_factor...) is stored as a
plain string like "0.034721". On load, Python has to:
1. Parse 95MB of XML character by character
2. Convert every value from string β†’ float (thousands of edges)
3. Reconstruct the NetworkX graph object in memory
On a typical server this takes 15–25 seconds.
WHY PICKLE IS FAST:
Python's pickle format stores the graph's in-memory binary
representation directly. On load it just:
1. Reads the binary file into memory
2. Deserialises the already-typed Python objects
The same graph loads in 1–3 seconds β€” roughly 10x faster.
TRADEOFF:
Pickle files are not human-readable and are Python-version
specific. We keep the .graphml as a portable backup. The .pkl
is purely a runtime performance cache.
HOW IT WORKS:
- First run: loads/downloads graphml, saves BOTH graphml + pkl
- All subsequent runs: loads pkl directly, skips graphml entirely
- To force a rebuild: delete indore.pkl (graphml stays intact)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
IMPACT ON ROUTE RESULTS:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Zero impact. The graph topology, edge weights, node positions,
and all attributes are byte-for-byte identical between graphml
and pickle. Pickle is purely a serialisation format change β€”
the graph data itself is unchanged.
"""
import os
import pickle
import logging
import osmnx as ox
logger = logging.getLogger(__name__)
# ── Fallback speeds (km/h) ────────────────────────────────────────────────────
ROAD_SPEEDS_KMPH = {
"motorway": 65.0,
"motorway_link": 50.0,
"trunk": 45.0,
"trunk_link": 35.0,
"primary": 25.0,
"primary_link": 20.0,
"secondary": 20.0,
"secondary_link": 18.0,
"tertiary": 18.0,
"tertiary_link": 15.0,
"residential": 14.0,
"living_street": 10.0,
"service": 10.0,
"unclassified": 16.0,
}
DEFAULT_SPEED_KMPH = 18.0
ROAD_TRAFFIC_VOLUME = {
"motorway": 0.7,
"motorway_link": 0.6,
"trunk": 0.9,
"trunk_link": 0.8,
"primary": 1.8,
"primary_link": 1.5,
"secondary": 1.5,
"secondary_link": 1.3,
"tertiary": 1.1,
"tertiary_link": 1.0,
"residential": 0.6,
"living_street": 0.4,
"service": 0.4,
"unclassified": 0.8,
}
DEFAULT_TRAFFIC_VOLUME = 0.9
MAJOR_ROAD_TYPES = {
"motorway", "motorway_link",
"trunk", "trunk_link",
"primary", "primary_link",
"secondary", "secondary_link",
}
def _road_speed(data: dict) -> float:
road_type = data.get("highway", "")
if isinstance(road_type, list):
road_type = road_type[0]
return ROAD_SPEEDS_KMPH.get(road_type, DEFAULT_SPEED_KMPH)
def _compute_edge_times(G):
for u, v, k, data in G.edges(keys=True, data=True):
length_m = float(data.get("length") or 0)
length_km = length_m / 1000.0
data["length"] = length_m
speed = _road_speed(data)
data["base_time"] = round((length_km / max(speed, 1.0)) * 60.0, 6)
if "traffic_factor" not in data or data.get("traffic_factor") == "":
data["traffic_factor"] = 1.0
else:
try:
data["traffic_factor"] = float(data["traffic_factor"])
except (ValueError, TypeError):
data["traffic_factor"] = 1.0
road_type = data.get("highway", "")
if isinstance(road_type, list):
road_type = road_type[0]
data["road_penalty"] = 0.0 if road_type in MAJOR_ROAD_TYPES else 0.8
data["time_with_behavior"] = round(
data["base_time"] + data["road_penalty"], 6
)
return G
def sanitize_loaded_graph(G):
float_fields = [
"length", "base_time", "traffic_factor", "road_penalty",
"time_with_behavior", "signal_delay", "time_with_signal",
"live_time", "pollution_delay", "pollution_exposure",
"congestion_ratio",
]
for u, v, k, data in G.edges(keys=True, data=True):
for key in float_fields:
if key in data:
try:
data[key] = float(data[key])
except (ValueError, TypeError):
data.pop(key, None)
G = _compute_edge_times(G)
return G
def prepare_graph(G):
return _compute_edge_times(G)
def build_graph(
place_name="Indore, Madhya Pradesh, India",
save=True,
load_if_exists=True,
filepath="indore.graphml",
):
# ── Derive pickle path from graphml path ──────────────────────────────────
# e.g. "indore.graphml" β†’ "indore.pkl"
pickle_path = os.path.splitext(filepath)[0] + ".pkl"
# ── 1. Try pickle first (fastest) ────────────────────────────────────────
if load_if_exists and os.path.exists(pickle_path):
logger.info(f"[Graph] Loading from pickle: {pickle_path}")
try:
with open(pickle_path, "rb") as f:
G = pickle.load(f)
# sanitize still runs to apply current speed table
# but skips the XML parsing entirely
G = sanitize_loaded_graph(G)
logger.info(f"[Graph] Loaded from pickle. Nodes: {len(G.nodes)} Edges: {len(G.edges)}")
return G
except Exception as e:
logger.warning(f"[Graph] Pickle load failed ({e}), falling back to graphml...")
# ── 2. Try graphml (slower, but portable) ────────────────────────────────
if load_if_exists and os.path.exists(filepath):
logger.info(f"[Graph] Loading from graphml: {filepath}")
G = ox.load_graphml(filepath)
G = sanitize_loaded_graph(G)
logger.info(f"[Graph] Loaded from graphml. Nodes: {len(G.nodes)} Edges: {len(G.edges)}")
# Save pickle now so next startup is fast
if save:
logger.info(f"[Graph] Saving pickle for fast future loads: {pickle_path}")
with open(pickle_path, "wb") as f:
pickle.dump(G, f, protocol=5)
return G
# ── 3. Download fresh from OSM ────────────────────────────────────────────
logger.info(f"[Graph] Downloading road network for {place_name}...")
G = ox.graph_from_place(place_name, network_type="drive", simplify=True)
logger.info(f"[Graph] Download complete. Nodes: {len(G.nodes)} Edges: {len(G.edges)}")
G = prepare_graph(G)
if save:
ox.save_graphml(G, filepath)
logger.info(f"[Graph] Saved graphml: {filepath}")
with open(pickle_path, "wb") as f:
pickle.dump(G, f, protocol=5)
logger.info(f"[Graph] Saved pickle: {pickle_path}")
return G
if __name__ == "__main__":
G = build_graph()
sample = next(
(d for u, v, d in G.edges(data=True)
if d.get("highway") == "primary" and d.get("length", 0) > 100), None
)
if sample:
km = sample["length"] / 1000
mins = sample["base_time"]
spd = (km / mins) * 60
print(f"Primary edge check: {km:.3f}km β†’ {mins:.2f}min β†’ {spd:.1f}km/h")