| """Flow generation with district-aware O-D pressure and turn-feasible routing.""" |
|
|
| from __future__ import annotations |
|
|
| import heapq |
| import math |
| import random |
| from collections import defaultdict |
| from typing import Any |
|
|
| from .schemas import CityGraph, DistrictData, ScenarioPlan |
| from .utils import ( |
| build_road_index, |
| build_roadlink_index, |
| choose_weighted, |
| summarize_route_validation, |
| validate_route_with_reasons, |
| ) |
|
|
|
|
| class FlowGenerator: |
| """Create high-pressure CityFlow flow entries for each scenario.""" |
|
|
| DISTRICT_BASE_PRODUCTION = { |
| "residential": 1.70, |
| "commercial": 0.85, |
| "industrial": 0.95, |
| "mixed": 1.15, |
| } |
| DISTRICT_BASE_ATTRACTION = { |
| "residential": 0.90, |
| "commercial": 1.85, |
| "industrial": 1.55, |
| "mixed": 1.25, |
| } |
|
|
| INTENSITY_SCALE = { |
| "normal": 1.0, |
| "moderate_rush": 1.22, |
| "heavy_rush": 1.45, |
| "overload": 1.75, |
| "accident_overload": 2.0, |
| } |
|
|
| def generate( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| scenario: ScenarioPlan, |
| simulation_steps: int, |
| ) -> list[dict[str, Any]]: |
| rng = random.Random(scenario.seed) |
| district_ids = sorted(district_data.districts.keys()) |
| if len(district_ids) < 2: |
| raise ValueError("Flow generation requires at least 2 districts.") |
|
|
| base_density = int(scenario.metadata.get("base_demand_per_intersection", 36)) |
| trip_total = int( |
| len(city_graph.intersections) |
| * base_density |
| * scenario.trip_multiplier |
| ) |
| trip_total = max(260, min(42000, trip_total)) |
|
|
| routing_state = self._build_routing_state(city_graph, scenario) |
| district_features = self._build_district_features(city_graph, district_data) |
| connector_counts = self._build_connector_counts(city_graph, district_data) |
| production, attraction = self._district_weights( |
| city_graph=city_graph, |
| district_data=district_data, |
| district_features=district_features, |
| scenario=scenario, |
| ) |
| gateway_context = self._build_gateway_context(city_graph, district_data) |
| external_share = self._external_trip_share(scenario, has_gateways=bool(gateway_context["gateways"])) |
|
|
| flows: list[dict[str, Any]] = [] |
| max_global_sampling_attempts = max(5000, trip_total * 8) |
| global_attempts = 0 |
| while len(flows) < trip_total and global_attempts < max_global_sampling_attempts: |
| global_attempts += 1 |
| route = None |
| max_attempts = 45 |
| for _ in range(max_attempts): |
| external_mode = self._sample_external_mode( |
| rng=rng, |
| external_share=external_share, |
| ) |
| category = self._sample_trip_category(rng, scenario.trip_mix) |
| origin_node: str |
| destination_node: str |
| origin_district: str |
| destination_district: str |
|
|
| if external_mode == "inbound": |
| gateway = self._sample_gateway_for_inbound( |
| rng=rng, |
| gateway_context=gateway_context, |
| attraction_weights=attraction, |
| ) |
| if gateway is None: |
| continue |
| destination_district = self._sample_origin_district( |
| rng=rng, |
| district_ids=district_ids, |
| weights=attraction, |
| ) |
| origin_node = gateway |
| destination_node = self._sample_intersection_in_district( |
| rng=rng, |
| district_data=district_data, |
| district_id=destination_district, |
| favor_boundary=False, |
| excluded_nodes=gateway_context["anchor_nodes"], |
| ) |
| elif external_mode == "outbound": |
| origin_district = self._sample_origin_district( |
| rng=rng, |
| district_ids=district_ids, |
| weights=production, |
| ) |
| gateway = self._sample_gateway_for_outbound( |
| rng=rng, |
| origin_district=origin_district, |
| gateway_context=gateway_context, |
| connector_counts=connector_counts, |
| ) |
| if gateway is None: |
| continue |
| origin_node = self._sample_intersection_in_district( |
| rng=rng, |
| district_data=district_data, |
| district_id=origin_district, |
| favor_boundary=True, |
| excluded_nodes=gateway_context["anchor_nodes"], |
| ) |
| destination_node = gateway |
| else: |
| origin_district = self._sample_origin_district( |
| rng=rng, |
| district_ids=district_ids, |
| weights=production, |
| ) |
| destination_district = self._sample_destination_district( |
| rng=rng, |
| city_graph=city_graph, |
| origin_district=origin_district, |
| category=category, |
| district_data=district_data, |
| district_features=district_features, |
| district_ids=district_ids, |
| attraction_weights=attraction, |
| connector_counts=connector_counts, |
| ) |
| origin_node = self._sample_intersection_in_district( |
| rng=rng, |
| district_data=district_data, |
| district_id=origin_district, |
| favor_boundary=(category != "intra"), |
| excluded_nodes=set(), |
| ) |
| destination_node = self._sample_intersection_in_district( |
| rng=rng, |
| district_data=district_data, |
| district_id=destination_district, |
| favor_boundary=(category == "long"), |
| excluded_nodes=set(), |
| ) |
| if origin_node == destination_node: |
| continue |
|
|
| route = self._find_turn_feasible_route( |
| start_intersection=origin_node, |
| end_intersection=destination_node, |
| road_lookup=routing_state["road_lookup"], |
| start_roads_by_intersection=routing_state["start_roads_by_intersection"], |
| transitions=routing_state["transitions"], |
| road_cost=routing_state["road_cost"], |
| ) |
| if not route: |
| continue |
| reasons = validate_route_with_reasons( |
| route=route, |
| roads_by_id=routing_state["road_lookup"], |
| roadlinks_by_intersection=routing_state["roadlinks_by_intersection"], |
| ) |
| if reasons: |
| route = None |
| continue |
| break |
|
|
| if route: |
| start_time = self._sample_departure(rng, scenario, simulation_steps) |
| flows.append( |
| { |
| "vehicle": { |
| "length": 5.0, |
| "width": 2.0, |
| "maxPosAcc": 2.2, |
| "maxNegAcc": 4.6, |
| "usualPosAcc": 2.0, |
| "usualNegAcc": 4.2, |
| "minGap": 2.2, |
| "maxSpeed": 13.89, |
| "headwayTime": 1.4, |
| }, |
| "route": route, |
| "interval": 1.0, |
| "startTime": start_time, |
| "endTime": start_time, |
| } |
| ) |
|
|
| completion_ratio = len(flows) / max(1, trip_total) |
| min_completion_ratio = ( |
| 0.55 if scenario.name in {"accident", "construction"} else 0.70 |
| ) |
| if completion_ratio < min_completion_ratio: |
| raise ValueError( |
| f"Scenario {scenario.name} produced too few valid flows " |
| f"({len(flows)}/{trip_total}, completion={completion_ratio:.3f})." |
| ) |
|
|
| if not flows: |
| raise ValueError(f"Scenario {scenario.name} produced no valid flows.") |
| summary = summarize_route_validation( |
| flow_entries=flows, |
| roads_by_id=routing_state["road_lookup"], |
| roadlinks_by_intersection=routing_state["roadlinks_by_intersection"], |
| ) |
| if summary["invalid_routes"] > 0: |
| reasons = ", ".join( |
| f"{reason}={count}" for reason, count in summary["top_failure_reasons"] |
| ) |
| raise ValueError( |
| f"Scenario {scenario.name} has invalid routes after regeneration: {reasons}" |
| ) |
| return flows |
|
|
| def _build_gateway_context( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| ) -> dict[str, Any]: |
| assignment = district_data.intersection_to_district |
| gateways = sorted(city_graph.gateway_intersections) |
| anchors_by_gateway: dict[str, str] = {} |
| district_by_gateway: dict[str, str] = {} |
| gateways_by_district: dict[str, list[str]] = defaultdict(list) |
|
|
| for gateway in gateways: |
| neighbors = [ |
| n for n in city_graph.adjacency.get(gateway, set()) if n in assignment |
| ] |
| if not neighbors: |
| continue |
| anchor = sorted(neighbors)[0] |
| district_id = assignment[anchor] |
| anchors_by_gateway[gateway] = anchor |
| district_by_gateway[gateway] = district_id |
| gateways_by_district[district_id].append(gateway) |
|
|
| return { |
| "gateways": sorted(anchors_by_gateway.keys()), |
| "anchors_by_gateway": anchors_by_gateway, |
| "district_by_gateway": district_by_gateway, |
| "gateways_by_district": gateways_by_district, |
| "anchor_nodes": set(anchors_by_gateway.values()), |
| } |
|
|
| def _external_trip_share( |
| self, |
| scenario: ScenarioPlan, |
| has_gateways: bool, |
| ) -> float: |
| if not has_gateways: |
| return 0.0 |
| base = { |
| "normal": 0.12, |
| "morning_rush": 0.16, |
| "evening_rush": 0.16, |
| "accident": 0.20, |
| "construction": 0.18, |
| "event_spike": 0.18, |
| "district_overload": 0.19, |
| }[scenario.name] |
| intensity_boost = { |
| "normal": 0.00, |
| "moderate_rush": 0.02, |
| "heavy_rush": 0.04, |
| "overload": 0.07, |
| "accident_overload": 0.10, |
| }.get(scenario.intensity, 0.0) |
| return min(0.42, base + intensity_boost) |
|
|
| def _sample_external_mode( |
| self, |
| rng: random.Random, |
| external_share: float, |
| ) -> str: |
| if external_share <= 0.0: |
| return "none" |
| if rng.random() >= external_share: |
| return "none" |
| return "inbound" if rng.random() < 0.5 else "outbound" |
|
|
| def _sample_gateway_for_inbound( |
| self, |
| rng: random.Random, |
| gateway_context: dict[str, Any], |
| attraction_weights: dict[str, float], |
| ) -> str | None: |
| gateways = gateway_context["gateways"] |
| if not gateways: |
| return None |
| district_by_gateway = gateway_context["district_by_gateway"] |
| weights: list[float] = [] |
| for gateway in gateways: |
| district_id = district_by_gateway[gateway] |
| weights.append(1.0 + attraction_weights.get(district_id, 1.0)) |
| return choose_weighted(rng, gateways, weights) |
|
|
| def _sample_gateway_for_outbound( |
| self, |
| rng: random.Random, |
| origin_district: str, |
| gateway_context: dict[str, Any], |
| connector_counts: dict[tuple[str, str], int], |
| ) -> str | None: |
| gateways = gateway_context["gateways"] |
| if not gateways: |
| return None |
| district_by_gateway = gateway_context["district_by_gateway"] |
| weights: list[float] = [] |
| for gateway in gateways: |
| gateway_district = district_by_gateway[gateway] |
| connector_bonus = 1.0 + connector_counts.get( |
| (origin_district, gateway_district), 0 |
| ) |
| same_district_bonus = 2.0 if gateway_district == origin_district else 1.0 |
| weights.append(connector_bonus * same_district_bonus) |
| return choose_weighted(rng, gateways, weights) |
|
|
| def _build_routing_state( |
| self, |
| city_graph: CityGraph, |
| scenario: ScenarioPlan, |
| ) -> dict[str, Any]: |
| road_lookup = build_road_index(city_graph.roadnet) |
| roadlinks_by_intersection = build_roadlink_index(city_graph.roadnet) |
| start_roads_by_intersection: dict[str, list[str]] = defaultdict(list) |
| road_cost: dict[str, float] = {} |
| available_roads: set[str] = set() |
|
|
| for road in city_graph.directed_roads.values(): |
| if road.id in scenario.blocked_roads: |
| continue |
| cost = max(1.0, road.length / max(road.speed_limit, 1.0)) |
| if road.id in scenario.penalized_roads: |
| cost *= scenario.penalized_roads[road.id] |
| road_cost[road.id] = cost |
| available_roads.add(road.id) |
| start_roads_by_intersection[road.start_intersection].append(road.id) |
|
|
| transitions: dict[str, list[str]] = defaultdict(list) |
| for pairs in roadlinks_by_intersection.values(): |
| for start_road, end_road in pairs: |
| if start_road not in available_roads or end_road not in available_roads: |
| continue |
| transitions[start_road].append(end_road) |
| return { |
| "road_lookup": road_lookup, |
| "roadlinks_by_intersection": roadlinks_by_intersection, |
| "start_roads_by_intersection": start_roads_by_intersection, |
| "transitions": transitions, |
| "road_cost": road_cost, |
| } |
|
|
| def _find_turn_feasible_route( |
| self, |
| start_intersection: str, |
| end_intersection: str, |
| road_lookup: dict[str, dict[str, Any]], |
| start_roads_by_intersection: dict[str, list[str]], |
| transitions: dict[str, list[str]], |
| road_cost: dict[str, float], |
| ) -> list[str] | None: |
| if start_intersection == end_intersection: |
| return None |
| start_roads = start_roads_by_intersection.get(start_intersection, []) |
| if not start_roads: |
| return None |
|
|
| queue: list[tuple[float, str]] = [] |
| dist: dict[str, float] = {} |
| prev: dict[str, str | None] = {} |
|
|
| for road_id in start_roads: |
| if road_id not in road_cost: |
| continue |
| cost = road_cost[road_id] |
| dist[road_id] = cost |
| prev[road_id] = None |
| heapq.heappush(queue, (cost, road_id)) |
|
|
| best_terminal: str | None = None |
| while queue: |
| current_cost, current_road = heapq.heappop(queue) |
| if current_cost > dist.get(current_road, float("inf")): |
| continue |
|
|
| current_end = road_lookup[current_road]["endIntersection"] |
| if current_end == end_intersection: |
| best_terminal = current_road |
| break |
|
|
| for next_road in transitions.get(current_road, []): |
| next_cost = current_cost + road_cost[next_road] |
| if next_cost < dist.get(next_road, float("inf")): |
| dist[next_road] = next_cost |
| prev[next_road] = current_road |
| heapq.heappush(queue, (next_cost, next_road)) |
|
|
| if best_terminal is None: |
| return None |
|
|
| route: list[str] = [] |
| cursor: str | None = best_terminal |
| while cursor is not None: |
| route.append(cursor) |
| cursor = prev[cursor] |
| route.reverse() |
| return route |
|
|
| def _build_district_features( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| ) -> dict[str, dict[str, float]]: |
| features: dict[str, dict[str, float]] = {} |
| for did, district in district_data.districts.items(): |
| members = district.intersections |
| size = len(members) |
| cx = sum(city_graph.intersections[n][0] for n in members) / max(1, size) |
| cy = sum(city_graph.intersections[n][1] for n in members) / max(1, size) |
| features[did] = { |
| "size": float(size), |
| "neighbors": float(len(district.neighbors)), |
| "exits": float(len(district.exit_roads)), |
| "boundary": float(len(district.boundary_intersections)), |
| "cx": cx, |
| "cy": cy, |
| } |
| return features |
|
|
| def _build_connector_counts( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| ) -> dict[tuple[str, str], int]: |
| connector_counts: dict[tuple[str, str], int] = defaultdict(int) |
| assignment = district_data.intersection_to_district |
| for a, neighbors in city_graph.adjacency.items(): |
| if a not in assignment: |
| continue |
| da = assignment[a] |
| for b in neighbors: |
| if b not in assignment: |
| continue |
| db = assignment[b] |
| if da == db: |
| continue |
| connector_counts[(da, db)] += 1 |
| return connector_counts |
|
|
| def _district_weights( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| district_features: dict[str, dict[str, float]], |
| scenario: ScenarioPlan, |
| ) -> tuple[dict[str, float], dict[str, float]]: |
| production: dict[str, float] = {} |
| attraction: dict[str, float] = {} |
| intensity = self.INTENSITY_SCALE.get(scenario.intensity, 1.0) |
|
|
| for did, district in district_data.districts.items(): |
| feature = district_features[did] |
| size_factor = max(0.85, min(2.1, math.sqrt(feature["size"]) / 2.0)) |
| connector_factor = 1.0 + min(1.6, feature["exits"] / 7.0) |
| base_prod = self.DISTRICT_BASE_PRODUCTION[district.district_type] |
| base_attr = self.DISTRICT_BASE_ATTRACTION[district.district_type] |
| production[did] = base_prod * size_factor * (0.60 + 0.40 * connector_factor) |
| attraction[did] = base_attr * size_factor * (0.62 + 0.38 * connector_factor) |
|
|
| if scenario.name == "morning_rush": |
| self._scale_by_type(district_data, production, "residential", 3.0 * intensity) |
| self._scale_by_type(district_data, production, "mixed", 1.3 * intensity) |
| self._scale_by_type(district_data, attraction, "commercial", 3.2 * intensity) |
| self._scale_by_type(district_data, attraction, "industrial", 2.8 * intensity) |
| self._scale_by_type(district_data, attraction, "residential", 0.58) |
| elif scenario.name == "evening_rush": |
| self._scale_by_type(district_data, production, "commercial", 3.1 * intensity) |
| self._scale_by_type(district_data, production, "industrial", 2.7 * intensity) |
| self._scale_by_type(district_data, attraction, "residential", 3.0 * intensity) |
| self._scale_by_type(district_data, attraction, "commercial", 0.62) |
| elif scenario.name == "event_spike" and scenario.event_district: |
| attraction[scenario.event_district] *= 3.8 * intensity |
| production[scenario.event_district] *= 1.9 * intensity |
| elif scenario.name == "district_overload" and scenario.overload_district: |
| production[scenario.overload_district] *= 3.2 * intensity |
| attraction[scenario.overload_district] *= 3.0 * intensity |
|
|
| if scenario.name in {"accident", "construction"}: |
| impacted = self._impacted_districts(city_graph, district_data, scenario) |
| for did in impacted: |
| attraction[did] *= 1.6 * intensity |
| production[did] *= 1.45 * intensity |
| for neighbor in district_data.district_neighbors.get(did, []): |
| attraction[neighbor] *= 1.18 |
| production[neighbor] *= 1.16 |
|
|
| return production, attraction |
|
|
| def _impacted_districts( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| scenario: ScenarioPlan, |
| ) -> set[str]: |
| impacted: set[str] = set() |
| assignment = district_data.intersection_to_district |
| for road_id in set(scenario.blocked_roads) | set(scenario.penalized_roads.keys()): |
| road = city_graph.directed_roads.get(road_id) |
| if road is None: |
| continue |
| if road.start_intersection in assignment: |
| impacted.add(assignment[road.start_intersection]) |
| if road.end_intersection in assignment: |
| impacted.add(assignment[road.end_intersection]) |
| return impacted |
|
|
| def _scale_by_type( |
| self, |
| district_data: DistrictData, |
| weights: dict[str, float], |
| district_type: str, |
| factor: float, |
| ) -> None: |
| for did, district in district_data.districts.items(): |
| if district.district_type == district_type: |
| weights[did] *= factor |
|
|
| def _sample_trip_category(self, rng: random.Random, trip_mix: Any) -> str: |
| labels = ["intra", "adjacent", "long"] |
| weights = [ |
| trip_mix.intra_district, |
| trip_mix.adjacent_district, |
| trip_mix.long_distance, |
| ] |
| return choose_weighted(rng, labels, weights) |
|
|
| def _sample_origin_district( |
| self, |
| rng: random.Random, |
| district_ids: list[str], |
| weights: dict[str, float], |
| ) -> str: |
| values = district_ids |
| scalar = [weights[d] for d in district_ids] |
| return choose_weighted(rng, values, scalar) |
|
|
| def _sample_destination_district( |
| self, |
| rng: random.Random, |
| city_graph: CityGraph, |
| origin_district: str, |
| category: str, |
| district_data: DistrictData, |
| district_features: dict[str, dict[str, float]], |
| district_ids: list[str], |
| attraction_weights: dict[str, float], |
| connector_counts: dict[tuple[str, str], int], |
| ) -> str: |
| if category == "intra": |
| return origin_district |
|
|
| if category == "adjacent": |
| neighbors = district_data.district_neighbors.get(origin_district, []) |
| if neighbors: |
| weights = [] |
| for neighbor in neighbors: |
| connector = 1.0 + connector_counts.get((origin_district, neighbor), 0) |
| weights.append(attraction_weights[neighbor] * connector) |
| return choose_weighted(rng, neighbors, weights) |
|
|
| origin_feature = district_features[origin_district] |
| candidates = [d for d in district_ids if d != origin_district] |
| if category == "long": |
| candidates = [ |
| did |
| for did in candidates |
| if did not in district_data.district_neighbors.get(origin_district, []) |
| ] or [d for d in district_ids if d != origin_district] |
|
|
| weights: list[float] = [] |
| for candidate in candidates: |
| feature = district_features[candidate] |
| dx = feature["cx"] - origin_feature["cx"] |
| dy = feature["cy"] - origin_feature["cy"] |
| distance = math.hypot(dx, dy) |
| normalized_distance = max(1.0, distance / 260.0) |
| corridor_bonus = 1.0 + min( |
| 1.5, |
| ( |
| feature["exits"] + feature["neighbors"] |
| ) / 10.0, |
| ) |
| if category == "long": |
| weight = attraction_weights[candidate] * normalized_distance * corridor_bonus |
| else: |
| weight = attraction_weights[candidate] * (0.85 + 0.15 * corridor_bonus) |
| weights.append(weight) |
| return choose_weighted(rng, candidates, weights) |
|
|
| def _sample_intersection_in_district( |
| self, |
| rng: random.Random, |
| district_data: DistrictData, |
| district_id: str, |
| favor_boundary: bool, |
| excluded_nodes: set[str], |
| ) -> str: |
| district = district_data.districts[district_id] |
| values = [node for node in district.intersections if node not in excluded_nodes] |
| if not values: |
| values = district.intersections |
| boundary = set(district.boundary_intersections) |
| weights: list[float] = [] |
| for node in values: |
| if node in boundary: |
| weights.append(2.3 if favor_boundary else 1.15) |
| else: |
| weights.append(0.95 if favor_boundary else 1.2) |
| return choose_weighted(rng, values, weights) |
|
|
| def _sample_departure( |
| self, |
| rng: random.Random, |
| scenario: ScenarioPlan, |
| simulation_steps: int, |
| ) -> int: |
| windows = scenario.departure_windows |
| labels = list(range(len(windows))) |
| weights = [w for _, _, w in windows] |
| selected = choose_weighted(rng, [str(i) for i in labels], weights) |
| window = windows[int(selected)] |
| start = int(window[0] * simulation_steps) |
| end = int(window[1] * simulation_steps) |
| if end <= start: |
| end = start + 1 |
| return rng.randint(start, max(start, end - 1)) |
|
|