| """Utilities for deterministic generation, IO, graph operations, and validation.""" |
|
|
| from __future__ import annotations |
|
|
| import heapq |
| import json |
| import math |
| import random |
| from collections import Counter, defaultdict, deque |
| from pathlib import Path |
| from typing import Any, Iterable |
|
|
| from .schemas import CityGraph, DistrictData |
|
|
|
|
| def ensure_dir(path: Path) -> None: |
| path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def write_json(path: Path, payload: Any) -> None: |
| ensure_dir(path.parent) |
| path.write_text(json.dumps(payload, indent=2), encoding="utf-8") |
|
|
|
|
| def euclidean(a: tuple[float, float], b: tuple[float, float]) -> float: |
| return math.hypot(a[0] - b[0], a[1] - b[1]) |
|
|
|
|
| def clamp(value: int, low: int, high: int) -> int: |
| return max(low, min(high, value)) |
|
|
|
|
| def choose_weighted(rng: random.Random, values: list[str], weights: list[float]) -> str: |
| total = sum(weights) |
| if total <= 0: |
| return values[rng.randrange(len(values))] |
| cutoff = rng.random() * total |
| cursor = 0.0 |
| for value, weight in zip(values, weights): |
| cursor += weight |
| if cursor >= cutoff: |
| return value |
| return values[-1] |
|
|
|
|
| def connected_components(nodes: Iterable[str], adjacency: dict[str, set[str]]) -> list[set[str]]: |
| pending = set(nodes) |
| components: list[set[str]] = [] |
| while pending: |
| root = pending.pop() |
| comp = {root} |
| queue = deque([root]) |
| while queue: |
| cur = queue.popleft() |
| for nxt in adjacency[cur]: |
| if nxt in pending: |
| pending.remove(nxt) |
| comp.add(nxt) |
| queue.append(nxt) |
| components.append(comp) |
| return components |
|
|
|
|
| def dijkstra_shortest_path( |
| start: str, |
| end: str, |
| graph: dict[str, list[tuple[str, float, str]]], |
| ) -> list[str] | None: |
| """Return road-id path from start intersection to end intersection.""" |
| if start == end: |
| return [] |
| queue: list[tuple[float, str]] = [(0.0, start)] |
| dist: dict[str, float] = {start: 0.0} |
| prev: dict[str, tuple[str, str]] = {} |
|
|
| while queue: |
| cost, node = heapq.heappop(queue) |
| if node == end: |
| break |
| if cost > dist[node]: |
| continue |
| for nxt, edge_cost, road_id in graph.get(node, []): |
| candidate = cost + edge_cost |
| if candidate < dist.get(nxt, float("inf")): |
| dist[nxt] = candidate |
| prev[nxt] = (node, road_id) |
| heapq.heappush(queue, (candidate, nxt)) |
|
|
| if end not in prev: |
| return None |
|
|
| route: list[str] = [] |
| cursor = end |
| while cursor != start: |
| parent, road_id = prev[cursor] |
| route.append(road_id) |
| cursor = parent |
| route.reverse() |
| return route |
|
|
|
|
| def validate_unique_ids(city_graph: CityGraph) -> None: |
| intersection_ids = set(city_graph.intersections.keys()) |
| if len(intersection_ids) != len(city_graph.intersections): |
| raise ValueError("Duplicate intersection IDs found.") |
| road_ids = set(city_graph.directed_roads.keys()) |
| if len(road_ids) != len(city_graph.directed_roads): |
| raise ValueError("Duplicate road IDs found.") |
|
|
|
|
| def validate_district_contiguity(city_graph: CityGraph, district_data: DistrictData) -> None: |
| by_district: dict[str, set[str]] = defaultdict(set) |
| for intersection_id, district_id in district_data.intersection_to_district.items(): |
| by_district[district_id].add(intersection_id) |
|
|
| for district_id, members in by_district.items(): |
| if not members: |
| raise ValueError(f"District {district_id} has no intersections.") |
| components = connected_components(members, city_graph.adjacency) |
| if len(components) != 1: |
| raise ValueError(f"District {district_id} is not contiguous.") |
|
|
|
|
| def validate_inter_district_connectivity(district_data: DistrictData) -> None: |
| connected = sum(1 for roads in district_data.district_neighbors.values() if roads) |
| if connected == 0: |
| raise ValueError("No inter-district connections found.") |
|
|
|
|
| def validate_district_exit_capacity( |
| district_data: DistrictData, |
| min_exit_roads: int = 2, |
| min_entry_roads: int = 2, |
| min_neighbor_districts: int = 1, |
| ) -> None: |
| underconnected_exit: list[str] = [] |
| underconnected_entry: list[str] = [] |
| underconnected_neighbors: list[str] = [] |
| for district_id, district in district_data.districts.items(): |
| if len(district.exit_roads) < min_exit_roads: |
| underconnected_exit.append( |
| f"{district_id}:{len(district.exit_roads)}" |
| ) |
| if len(district.entry_roads) < min_entry_roads: |
| underconnected_entry.append( |
| f"{district_id}:{len(district.entry_roads)}" |
| ) |
| if len(district.neighbors) < min_neighbor_districts: |
| underconnected_neighbors.append( |
| f"{district_id}:{len(district.neighbors)}" |
| ) |
| if underconnected_exit or underconnected_entry or underconnected_neighbors: |
| parts: list[str] = [] |
| if underconnected_exit: |
| parts.append( |
| f"exit<{min_exit_roads}: " + ", ".join(underconnected_exit[:8]) |
| ) |
| if underconnected_entry: |
| parts.append( |
| f"entry<{min_entry_roads}: " + ", ".join(underconnected_entry[:8]) |
| ) |
| if underconnected_neighbors: |
| parts.append( |
| f"neighbors<{min_neighbor_districts}: " |
| + ", ".join(underconnected_neighbors[:8]) |
| ) |
| raise ValueError( |
| "District external connectivity too low: " + " | ".join(parts) |
| ) |
|
|
|
|
| def validate_routes( |
| flow_entries: list[dict[str, Any]], |
| roads_by_id: dict[str, Any], |
| ) -> None: |
| if not flow_entries: |
| raise ValueError("Scenario flow is empty.") |
| for idx, vehicle in enumerate(flow_entries): |
| route = vehicle.get("route", []) |
| if not route: |
| raise ValueError(f"Flow entry {idx} has empty route.") |
| for road_id in route: |
| if road_id not in roads_by_id: |
| raise ValueError(f"Flow entry {idx} references missing road {road_id}.") |
| for left, right in zip(route, route[1:]): |
| left_end = roads_by_id[left]["endIntersection"] |
| right_start = roads_by_id[right]["startIntersection"] |
| if left_end != right_start: |
| raise ValueError( |
| f"Invalid route transition: {left} -> {right} in entry {idx}." |
| ) |
|
|
|
|
| def build_road_index(roadnet: dict[str, Any]) -> dict[str, dict[str, Any]]: |
| return {road["id"]: road for road in roadnet.get("roads", [])} |
|
|
|
|
| def build_roadlink_index( |
| roadnet: dict[str, Any], |
| ) -> dict[str, set[tuple[str, str]]]: |
| roadlinks_by_intersection: dict[str, set[tuple[str, str]]] = defaultdict(set) |
| for intersection in roadnet.get("intersections", []): |
| if intersection.get("virtual", False): |
| |
| continue |
| iid = intersection["id"] |
| for road_link in intersection.get("roadLinks", []): |
| pair = (road_link["startRoad"], road_link["endRoad"]) |
| roadlinks_by_intersection[iid].add(pair) |
| return roadlinks_by_intersection |
|
|
|
|
| def validate_route_with_reasons( |
| route: list[str], |
| roads_by_id: dict[str, dict[str, Any]], |
| roadlinks_by_intersection: dict[str, set[tuple[str, str]]], |
| ) -> list[str]: |
| reasons: list[str] = [] |
| if not route: |
| return ["empty_route"] |
| if len(route) < 2: |
| return ["route_too_short"] |
|
|
| for rid in route: |
| if rid not in roads_by_id: |
| reasons.append(f"missing_road:{rid}") |
| return reasons |
|
|
| for left, right in zip(route, route[1:]): |
| left_road = roads_by_id[left] |
| right_road = roads_by_id[right] |
| shared_intersection = left_road["endIntersection"] |
| if shared_intersection != right_road["startIntersection"]: |
| reasons.append("mismatched_intersection_transition") |
| continue |
| if (left, right) not in roadlinks_by_intersection.get(shared_intersection, set()): |
| reasons.append("missing_roadlink_transition") |
| return reasons |
|
|
|
|
| def summarize_route_validation( |
| flow_entries: list[dict[str, Any]], |
| roads_by_id: dict[str, dict[str, Any]], |
| roadlinks_by_intersection: dict[str, set[tuple[str, str]]], |
| ) -> dict[str, Any]: |
| reason_counter: Counter[str] = Counter() |
| total = len(flow_entries) |
| valid = 0 |
| invalid = 0 |
| for flow in flow_entries: |
| reasons = validate_route_with_reasons( |
| route=flow.get("route", []), |
| roads_by_id=roads_by_id, |
| roadlinks_by_intersection=roadlinks_by_intersection, |
| ) |
| if reasons: |
| invalid += 1 |
| reason_counter.update(reasons) |
| else: |
| valid += 1 |
| return { |
| "total_routes": total, |
| "valid_routes": valid, |
| "invalid_routes": invalid, |
| "top_failure_reasons": reason_counter.most_common(10), |
| } |
|
|
|
|
| def compute_scenario_diagnostics( |
| flow_entries: list[dict[str, Any]], |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| ) -> dict[str, Any]: |
| roads_by_id = build_road_index(city_graph.roadnet) |
| assignment = district_data.intersection_to_district |
| road_usage: Counter[str] = Counter() |
| origin_counter: Counter[str] = Counter() |
| destination_counter: Counter[str] = Counter() |
| corridor_counter: Counter[str] = Counter() |
| external_origin = 0 |
| external_destination = 0 |
|
|
| for flow in flow_entries: |
| route = flow.get("route", []) |
| if not route: |
| continue |
| first = roads_by_id.get(route[0]) |
| last = roads_by_id.get(route[-1]) |
| if first: |
| origin_key = assignment.get(first["startIntersection"], "external") |
| origin_counter[origin_key] += 1 |
| if origin_key == "external": |
| external_origin += 1 |
| if last: |
| destination_key = assignment.get(last["endIntersection"], "external") |
| destination_counter[destination_key] += 1 |
| if destination_key == "external": |
| external_destination += 1 |
| for road_id in route: |
| road_usage[road_id] += 1 |
| road = roads_by_id[road_id] |
| ds = assignment.get(road["startIntersection"], "external") |
| de = assignment.get(road["endIntersection"], "external") |
| if ds != de: |
| corridor_counter[f"{ds}->{de}"] += 1 |
|
|
| total_routes = len(flow_entries) |
| total_road_traversals = sum(road_usage.values()) |
| total_roads = len(roads_by_id) |
| used_roads = len(road_usage) |
| unused_roads = total_roads - used_roads |
| boundary_roads = set(district_data.inter_district_roads) |
| gateway_roads = set(city_graph.gateway_roads) |
| boundary_usage = sum( |
| count for road_id, count in road_usage.items() if road_id in boundary_roads |
| ) |
| gateway_usage = sum( |
| count for road_id, count in road_usage.items() if road_id in gateway_roads |
| ) |
| top_road_usage = road_usage.most_common(15) |
| top_corridors = corridor_counter.most_common(10) |
| total_lanes = sum(road.num_lanes for road in city_graph.directed_roads.values()) |
| demand_per_lane = total_routes / max(1.0, float(total_lanes)) |
| avg_route_len = total_road_traversals / max(1.0, float(total_routes)) |
| concentration = ( |
| sum(count for _, count in top_road_usage[:10]) / max(1.0, float(total_road_traversals)) |
| ) |
| boundary_share = boundary_usage / max(1.0, float(total_road_traversals)) |
| congestion_score = min( |
| 100.0, |
| 1.9 * demand_per_lane |
| + 2.3 * avg_route_len |
| + 46.0 * concentration |
| + 34.0 * boundary_share, |
| ) |
| if congestion_score < 30.0: |
| congestion_level = "manageable" |
| elif congestion_score < 52.0: |
| congestion_level = "moderate" |
| elif congestion_score < 72.0: |
| congestion_level = "heavy" |
| else: |
| congestion_level = "extreme" |
|
|
| return { |
| "total_vehicles": total_routes, |
| "vehicles_by_origin_district": dict(origin_counter), |
| "vehicles_by_destination_district": dict(destination_counter), |
| "vehicles_from_external": external_origin, |
| "vehicles_to_external": external_destination, |
| "roads_used": used_roads, |
| "unused_roads": unused_roads, |
| "boundary_road_usage": boundary_usage, |
| "gateway_road_usage": gateway_usage, |
| "boundary_road_share": round(boundary_share, 4), |
| "top_used_roads": [ |
| { |
| "road_id": road_id, |
| "traversals": count, |
| "is_arterial": road_id in city_graph.arterial_roads, |
| "is_inter_district": road_id in boundary_roads, |
| "is_gateway": road_id in gateway_roads, |
| } |
| for road_id, count in top_road_usage |
| ], |
| "top_used_corridors": [ |
| {"corridor": corridor, "traversals": count} |
| for corridor, count in top_corridors |
| ], |
| "estimated_congestion_intensity": { |
| "score": round(congestion_score, 3), |
| "level": congestion_level, |
| "demand_per_lane": round(demand_per_lane, 3), |
| "avg_route_length": round(avg_route_len, 3), |
| "road_usage_concentration": round(concentration, 4), |
| }, |
| } |
|
|