File size: 13,489 Bytes
9d09c45 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 | """Utilities for deterministic generation, IO, graph operations, and validation."""
from __future__ import annotations
import heapq
import json
import math
import random
from collections import Counter, defaultdict, deque
from pathlib import Path
from typing import Any, Iterable
from .schemas import CityGraph, DistrictData
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def write_json(path: Path, payload: Any) -> None:
ensure_dir(path.parent)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def euclidean(a: tuple[float, float], b: tuple[float, float]) -> float:
return math.hypot(a[0] - b[0], a[1] - b[1])
def clamp(value: int, low: int, high: int) -> int:
return max(low, min(high, value))
def choose_weighted(rng: random.Random, values: list[str], weights: list[float]) -> str:
total = sum(weights)
if total <= 0:
return values[rng.randrange(len(values))]
cutoff = rng.random() * total
cursor = 0.0
for value, weight in zip(values, weights):
cursor += weight
if cursor >= cutoff:
return value
return values[-1]
def connected_components(nodes: Iterable[str], adjacency: dict[str, set[str]]) -> list[set[str]]:
pending = set(nodes)
components: list[set[str]] = []
while pending:
root = pending.pop()
comp = {root}
queue = deque([root])
while queue:
cur = queue.popleft()
for nxt in adjacency[cur]:
if nxt in pending:
pending.remove(nxt)
comp.add(nxt)
queue.append(nxt)
components.append(comp)
return components
def dijkstra_shortest_path(
start: str,
end: str,
graph: dict[str, list[tuple[str, float, str]]],
) -> list[str] | None:
"""Return road-id path from start intersection to end intersection."""
if start == end:
return []
queue: list[tuple[float, str]] = [(0.0, start)]
dist: dict[str, float] = {start: 0.0}
prev: dict[str, tuple[str, str]] = {}
while queue:
cost, node = heapq.heappop(queue)
if node == end:
break
if cost > dist[node]:
continue
for nxt, edge_cost, road_id in graph.get(node, []):
candidate = cost + edge_cost
if candidate < dist.get(nxt, float("inf")):
dist[nxt] = candidate
prev[nxt] = (node, road_id)
heapq.heappush(queue, (candidate, nxt))
if end not in prev:
return None
route: list[str] = []
cursor = end
while cursor != start:
parent, road_id = prev[cursor]
route.append(road_id)
cursor = parent
route.reverse()
return route
def validate_unique_ids(city_graph: CityGraph) -> None:
intersection_ids = set(city_graph.intersections.keys())
if len(intersection_ids) != len(city_graph.intersections):
raise ValueError("Duplicate intersection IDs found.")
road_ids = set(city_graph.directed_roads.keys())
if len(road_ids) != len(city_graph.directed_roads):
raise ValueError("Duplicate road IDs found.")
def validate_district_contiguity(city_graph: CityGraph, district_data: DistrictData) -> None:
by_district: dict[str, set[str]] = defaultdict(set)
for intersection_id, district_id in district_data.intersection_to_district.items():
by_district[district_id].add(intersection_id)
for district_id, members in by_district.items():
if not members:
raise ValueError(f"District {district_id} has no intersections.")
components = connected_components(members, city_graph.adjacency)
if len(components) != 1:
raise ValueError(f"District {district_id} is not contiguous.")
def validate_inter_district_connectivity(district_data: DistrictData) -> None:
connected = sum(1 for roads in district_data.district_neighbors.values() if roads)
if connected == 0:
raise ValueError("No inter-district connections found.")
def validate_district_exit_capacity(
district_data: DistrictData,
min_exit_roads: int = 2,
min_entry_roads: int = 2,
min_neighbor_districts: int = 1,
) -> None:
underconnected_exit: list[str] = []
underconnected_entry: list[str] = []
underconnected_neighbors: list[str] = []
for district_id, district in district_data.districts.items():
if len(district.exit_roads) < min_exit_roads:
underconnected_exit.append(
f"{district_id}:{len(district.exit_roads)}"
)
if len(district.entry_roads) < min_entry_roads:
underconnected_entry.append(
f"{district_id}:{len(district.entry_roads)}"
)
if len(district.neighbors) < min_neighbor_districts:
underconnected_neighbors.append(
f"{district_id}:{len(district.neighbors)}"
)
if underconnected_exit or underconnected_entry or underconnected_neighbors:
parts: list[str] = []
if underconnected_exit:
parts.append(
f"exit<{min_exit_roads}: " + ", ".join(underconnected_exit[:8])
)
if underconnected_entry:
parts.append(
f"entry<{min_entry_roads}: " + ", ".join(underconnected_entry[:8])
)
if underconnected_neighbors:
parts.append(
f"neighbors<{min_neighbor_districts}: "
+ ", ".join(underconnected_neighbors[:8])
)
raise ValueError(
"District external connectivity too low: " + " | ".join(parts)
)
def validate_routes(
flow_entries: list[dict[str, Any]],
roads_by_id: dict[str, Any],
) -> None:
if not flow_entries:
raise ValueError("Scenario flow is empty.")
for idx, vehicle in enumerate(flow_entries):
route = vehicle.get("route", [])
if not route:
raise ValueError(f"Flow entry {idx} has empty route.")
for road_id in route:
if road_id not in roads_by_id:
raise ValueError(f"Flow entry {idx} references missing road {road_id}.")
for left, right in zip(route, route[1:]):
left_end = roads_by_id[left]["endIntersection"]
right_start = roads_by_id[right]["startIntersection"]
if left_end != right_start:
raise ValueError(
f"Invalid route transition: {left} -> {right} in entry {idx}."
)
def build_road_index(roadnet: dict[str, Any]) -> dict[str, dict[str, Any]]:
return {road["id"]: road for road in roadnet.get("roads", [])}
def build_roadlink_index(
roadnet: dict[str, Any],
) -> dict[str, set[tuple[str, str]]]:
roadlinks_by_intersection: dict[str, set[tuple[str, str]]] = defaultdict(set)
for intersection in roadnet.get("intersections", []):
if intersection.get("virtual", False):
# CityFlow ignores roadLinks on virtual intersections.
continue
iid = intersection["id"]
for road_link in intersection.get("roadLinks", []):
pair = (road_link["startRoad"], road_link["endRoad"])
roadlinks_by_intersection[iid].add(pair)
return roadlinks_by_intersection
def validate_route_with_reasons(
route: list[str],
roads_by_id: dict[str, dict[str, Any]],
roadlinks_by_intersection: dict[str, set[tuple[str, str]]],
) -> list[str]:
reasons: list[str] = []
if not route:
return ["empty_route"]
if len(route) < 2:
return ["route_too_short"]
for rid in route:
if rid not in roads_by_id:
reasons.append(f"missing_road:{rid}")
return reasons
for left, right in zip(route, route[1:]):
left_road = roads_by_id[left]
right_road = roads_by_id[right]
shared_intersection = left_road["endIntersection"]
if shared_intersection != right_road["startIntersection"]:
reasons.append("mismatched_intersection_transition")
continue
if (left, right) not in roadlinks_by_intersection.get(shared_intersection, set()):
reasons.append("missing_roadlink_transition")
return reasons
def summarize_route_validation(
flow_entries: list[dict[str, Any]],
roads_by_id: dict[str, dict[str, Any]],
roadlinks_by_intersection: dict[str, set[tuple[str, str]]],
) -> dict[str, Any]:
reason_counter: Counter[str] = Counter()
total = len(flow_entries)
valid = 0
invalid = 0
for flow in flow_entries:
reasons = validate_route_with_reasons(
route=flow.get("route", []),
roads_by_id=roads_by_id,
roadlinks_by_intersection=roadlinks_by_intersection,
)
if reasons:
invalid += 1
reason_counter.update(reasons)
else:
valid += 1
return {
"total_routes": total,
"valid_routes": valid,
"invalid_routes": invalid,
"top_failure_reasons": reason_counter.most_common(10),
}
def compute_scenario_diagnostics(
flow_entries: list[dict[str, Any]],
city_graph: CityGraph,
district_data: DistrictData,
) -> dict[str, Any]:
roads_by_id = build_road_index(city_graph.roadnet)
assignment = district_data.intersection_to_district
road_usage: Counter[str] = Counter()
origin_counter: Counter[str] = Counter()
destination_counter: Counter[str] = Counter()
corridor_counter: Counter[str] = Counter()
external_origin = 0
external_destination = 0
for flow in flow_entries:
route = flow.get("route", [])
if not route:
continue
first = roads_by_id.get(route[0])
last = roads_by_id.get(route[-1])
if first:
origin_key = assignment.get(first["startIntersection"], "external")
origin_counter[origin_key] += 1
if origin_key == "external":
external_origin += 1
if last:
destination_key = assignment.get(last["endIntersection"], "external")
destination_counter[destination_key] += 1
if destination_key == "external":
external_destination += 1
for road_id in route:
road_usage[road_id] += 1
road = roads_by_id[road_id]
ds = assignment.get(road["startIntersection"], "external")
de = assignment.get(road["endIntersection"], "external")
if ds != de:
corridor_counter[f"{ds}->{de}"] += 1
total_routes = len(flow_entries)
total_road_traversals = sum(road_usage.values())
total_roads = len(roads_by_id)
used_roads = len(road_usage)
unused_roads = total_roads - used_roads
boundary_roads = set(district_data.inter_district_roads)
gateway_roads = set(city_graph.gateway_roads)
boundary_usage = sum(
count for road_id, count in road_usage.items() if road_id in boundary_roads
)
gateway_usage = sum(
count for road_id, count in road_usage.items() if road_id in gateway_roads
)
top_road_usage = road_usage.most_common(15)
top_corridors = corridor_counter.most_common(10)
total_lanes = sum(road.num_lanes for road in city_graph.directed_roads.values())
demand_per_lane = total_routes / max(1.0, float(total_lanes))
avg_route_len = total_road_traversals / max(1.0, float(total_routes))
concentration = (
sum(count for _, count in top_road_usage[:10]) / max(1.0, float(total_road_traversals))
)
boundary_share = boundary_usage / max(1.0, float(total_road_traversals))
congestion_score = min(
100.0,
1.9 * demand_per_lane
+ 2.3 * avg_route_len
+ 46.0 * concentration
+ 34.0 * boundary_share,
)
if congestion_score < 30.0:
congestion_level = "manageable"
elif congestion_score < 52.0:
congestion_level = "moderate"
elif congestion_score < 72.0:
congestion_level = "heavy"
else:
congestion_level = "extreme"
return {
"total_vehicles": total_routes,
"vehicles_by_origin_district": dict(origin_counter),
"vehicles_by_destination_district": dict(destination_counter),
"vehicles_from_external": external_origin,
"vehicles_to_external": external_destination,
"roads_used": used_roads,
"unused_roads": unused_roads,
"boundary_road_usage": boundary_usage,
"gateway_road_usage": gateway_usage,
"boundary_road_share": round(boundary_share, 4),
"top_used_roads": [
{
"road_id": road_id,
"traversals": count,
"is_arterial": road_id in city_graph.arterial_roads,
"is_inter_district": road_id in boundary_roads,
"is_gateway": road_id in gateway_roads,
}
for road_id, count in top_road_usage
],
"top_used_corridors": [
{"corridor": corridor, "traversals": count}
for corridor, count in top_corridors
],
"estimated_congestion_intensity": {
"score": round(congestion_score, 3),
"level": congestion_level,
"demand_per_lane": round(demand_per_lane, 3),
"avg_route_length": round(avg_route_len, 3),
"road_usage_concentration": round(concentration, 4),
},
}
|