| """Scenario plan generation for demand intensity and network disturbances.""" |
|
|
| from __future__ import annotations |
|
|
| import random |
| from collections import defaultdict |
|
|
| from .schemas import ( |
| CityGraph, |
| DatasetGenerationConfig, |
| DemandIntensity, |
| DistrictData, |
| ScenarioPlan, |
| ScenarioType, |
| TripMix, |
| ) |
|
|
|
|
| class ScenarioGenerator: |
| """Generate scenario-specific modifiers, bottlenecks, and demand intensity.""" |
|
|
| INTENSITY_MULTIPLIER: dict[DemandIntensity, float] = { |
| "normal": 1.0, |
| "moderate_rush": 1.45, |
| "heavy_rush": 2.1, |
| "overload": 2.9, |
| "accident_overload": 3.6, |
| } |
|
|
| SCENARIO_BASE_MULTIPLIER: dict[ScenarioType, float] = { |
| "normal": 1.15, |
| "morning_rush": 1.35, |
| "evening_rush": 1.35, |
| "accident": 1.85, |
| "construction": 1.65, |
| "event_spike": 1.75, |
| "district_overload": 1.80, |
| } |
|
|
| BASE_DEMAND_PER_INTERSECTION: dict[ScenarioType, int] = { |
| "normal": 42, |
| "morning_rush": 52, |
| "evening_rush": 52, |
| "accident": 60, |
| "construction": 56, |
| "event_spike": 62, |
| "district_overload": 64, |
| } |
|
|
| INTENSITY_ALLOWED_BY_SCENARIO: dict[ScenarioType, list[DemandIntensity]] = { |
| "normal": ["normal", "moderate_rush", "heavy_rush"], |
| "morning_rush": ["moderate_rush", "heavy_rush", "overload"], |
| "evening_rush": ["moderate_rush", "heavy_rush", "overload"], |
| "accident": ["heavy_rush", "overload", "accident_overload"], |
| "construction": ["moderate_rush", "heavy_rush", "overload", "accident_overload"], |
| "event_spike": ["moderate_rush", "heavy_rush", "overload", "accident_overload"], |
| "district_overload": ["moderate_rush", "heavy_rush", "overload", "accident_overload"], |
| } |
|
|
| def generate( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| scenario_names: list[ScenarioType], |
| base_seed: int, |
| config: DatasetGenerationConfig, |
| ) -> dict[str, ScenarioPlan]: |
| plans: dict[str, ScenarioPlan] = {} |
| for idx, name in enumerate(scenario_names): |
| seed = base_seed + (idx * 101) |
| rng = random.Random(seed) |
| intensity = self._sample_intensity(name, rng, config) |
| trip_multiplier = self._trip_multiplier(name, intensity, config) |
| trip_mix = self._trip_mix(name, intensity) |
| departure_windows = self._departure_windows(name, intensity) |
| base_demand = self.BASE_DEMAND_PER_INTERSECTION[name] |
|
|
| if name == "normal": |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| metadata={ |
| "description": "Balanced baseline traffic with sampled intensity.", |
| "base_demand_per_intersection": base_demand, |
| }, |
| ) |
| elif name == "morning_rush": |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| metadata={ |
| "description": "Strong residential outbound and work-district inbound pressure.", |
| "base_demand_per_intersection": base_demand, |
| }, |
| ) |
| elif name == "evening_rush": |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| metadata={ |
| "description": "Strong work-district outbound and residential inbound pressure.", |
| "base_demand_per_intersection": base_demand, |
| }, |
| ) |
| elif name == "accident": |
| blocked, penalized = self._accident_impairments( |
| city_graph, district_data, intensity, rng |
| ) |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| blocked_roads=blocked, |
| penalized_roads=penalized, |
| metadata={ |
| "description": "Severe disruption on connector/arterial corridors.", |
| "base_demand_per_intersection": base_demand, |
| "accident_roads": sorted(blocked), |
| "bottleneck_penalties": {k: v for k, v in penalized.items() if k not in blocked}, |
| }, |
| ) |
| elif name == "construction": |
| blocked, penalized = self._construction_impairments( |
| city_graph, district_data, intensity, rng |
| ) |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| blocked_roads=blocked, |
| penalized_roads=penalized, |
| metadata={ |
| "description": "Localized but severe construction bottlenecks.", |
| "base_demand_per_intersection": base_demand, |
| "construction_roads": sorted(blocked), |
| "bottleneck_penalties": {k: v for k, v in penalized.items() if k not in blocked}, |
| }, |
| ) |
| elif name == "event_spike": |
| event_district = self._pick_high_pressure_district(district_data, rng) |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| event_district=event_district, |
| metadata={ |
| "description": "Pre-event surge and outbound release around event district.", |
| "base_demand_per_intersection": base_demand, |
| "event_district": event_district, |
| }, |
| ) |
| elif name == "district_overload": |
| overload = self._pick_high_pressure_district(district_data, rng) |
| plans[name] = ScenarioPlan( |
| name=name, |
| intensity=intensity, |
| seed=seed, |
| trip_multiplier=trip_multiplier, |
| trip_mix=trip_mix, |
| departure_windows=departure_windows, |
| overload_district=overload, |
| metadata={ |
| "description": "One district receives amplified production and attraction.", |
| "base_demand_per_intersection": base_demand, |
| "overload_district": overload, |
| }, |
| ) |
| else: |
| raise ValueError(f"Unsupported scenario: {name}") |
| return plans |
|
|
| def _sample_intensity( |
| self, |
| scenario: ScenarioType, |
| rng: random.Random, |
| config: DatasetGenerationConfig, |
| ) -> DemandIntensity: |
| allowed = self.INTENSITY_ALLOWED_BY_SCENARIO[scenario] |
| candidates = [i for i in config.intensity_levels if i in allowed] |
| if not candidates: |
| candidates = allowed |
| scenario_bias: dict[ScenarioType, dict[DemandIntensity, float]] = { |
| "normal": {"normal": 1.35}, |
| "morning_rush": {"heavy_rush": 1.4, "overload": 1.45}, |
| "evening_rush": {"heavy_rush": 1.4, "overload": 1.45}, |
| "accident": {"overload": 1.8, "accident_overload": 2.2}, |
| "construction": {"heavy_rush": 1.3, "overload": 1.7, "accident_overload": 2.0}, |
| "event_spike": {"heavy_rush": 1.35, "overload": 1.65, "accident_overload": 1.8}, |
| "district_overload": {"heavy_rush": 1.35, "overload": 1.75, "accident_overload": 1.95}, |
| } |
| bias = scenario_bias.get(scenario, {}) |
| weights = [ |
| max(0.0, config.intensity_distribution.get(i, 0.0)) * bias.get(i, 1.0) |
| for i in candidates |
| ] |
| if sum(weights) <= 0.0: |
| weights = [1.0] * len(candidates) |
| return rng.choices(candidates, weights=weights, k=1)[0] |
|
|
| def _trip_multiplier( |
| self, |
| scenario: ScenarioType, |
| intensity: DemandIntensity, |
| config: DatasetGenerationConfig, |
| ) -> float: |
| base = self.SCENARIO_BASE_MULTIPLIER[scenario] |
| intensity_scale = self.INTENSITY_MULTIPLIER[intensity] |
| per_scenario = config.scenario_demand_multipliers.get(scenario, 1.0) |
| return base * intensity_scale * config.global_demand_multiplier * per_scenario |
|
|
| def _trip_mix( |
| self, |
| scenario: ScenarioType, |
| intensity: DemandIntensity, |
| ) -> TripMix: |
| base: dict[ScenarioType, tuple[float, float, float]] = { |
| "normal": (0.44, 0.34, 0.22), |
| "morning_rush": (0.34, 0.38, 0.28), |
| "evening_rush": (0.34, 0.38, 0.28), |
| "accident": (0.28, 0.40, 0.32), |
| "construction": (0.30, 0.40, 0.30), |
| "event_spike": (0.24, 0.40, 0.36), |
| "district_overload": (0.26, 0.42, 0.32), |
| } |
| intra, adjacent, long = base[scenario] |
| intensity_shift = { |
| "normal": 0.00, |
| "moderate_rush": 0.03, |
| "heavy_rush": 0.06, |
| "overload": 0.09, |
| "accident_overload": 0.12, |
| }[intensity] |
| intra = max(0.14, intra - intensity_shift) |
| adjacent = min(0.56, adjacent + (0.55 * intensity_shift)) |
| long = min(0.44, long + (0.45 * intensity_shift)) |
| norm = intra + adjacent + long |
| return TripMix( |
| intra_district=intra / norm, |
| adjacent_district=adjacent / norm, |
| long_distance=long / norm, |
| ) |
|
|
| def _departure_windows( |
| self, |
| scenario: ScenarioType, |
| intensity: DemandIntensity, |
| ) -> list[tuple[float, float, float]]: |
| compression = { |
| "normal": 1.00, |
| "moderate_rush": 0.82, |
| "heavy_rush": 0.62, |
| "overload": 0.46, |
| "accident_overload": 0.35, |
| }[intensity] |
|
|
| if scenario == "morning_rush": |
| peak_width = 0.34 * compression |
| peak_start = max(0.07, 0.26 - peak_width / 2.0) |
| peak_end = min(0.58, peak_start + peak_width) |
| return [(0.0, peak_start, 0.12), (peak_start, peak_end, 0.76), (peak_end, 1.0, 0.12)] |
| if scenario == "evening_rush": |
| peak_width = 0.34 * compression |
| peak_end = min(0.95, 0.74 + peak_width / 2.0) |
| peak_start = max(0.34, peak_end - peak_width) |
| return [(0.0, peak_start, 0.10), (peak_start, peak_end, 0.76), (peak_end, 1.0, 0.14)] |
| if scenario in {"event_spike", "district_overload"}: |
| peak_width = 0.42 * compression |
| peak_start = max(0.20, 0.52 - peak_width / 2.0) |
| peak_end = min(0.90, peak_start + peak_width) |
| return [(0.0, peak_start, 0.10), (peak_start, peak_end, 0.74), (peak_end, 1.0, 0.16)] |
| if scenario in {"accident", "construction"}: |
| peak_width = 0.45 * compression |
| peak_start = max(0.16, 0.48 - peak_width / 2.0) |
| peak_end = min(0.88, peak_start + peak_width) |
| return [(0.0, peak_start, 0.16), (peak_start, peak_end, 0.68), (peak_end, 1.0, 0.16)] |
| return [(0.0, 0.28, 0.22), (0.28, 0.72, 0.56), (0.72, 1.0, 0.22)] |
|
|
| def _road_importance_scores( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| ) -> dict[str, float]: |
| boundary_nodes = set(district_data.boundary_intersections) |
| scores: dict[str, float] = {} |
| for road_id, road in city_graph.directed_roads.items(): |
| score = 1.0 |
| if road_id in city_graph.arterial_roads: |
| score += 3.0 |
| if road_id in district_data.inter_district_roads: |
| score += 2.5 |
| if road.start_intersection in boundary_nodes or road.end_intersection in boundary_nodes: |
| score += 1.8 |
| score += 0.30 * len(city_graph.adjacency[road.start_intersection]) |
| score += 0.30 * len(city_graph.adjacency[road.end_intersection]) |
| scores[road_id] = score |
| return scores |
|
|
| def _weighted_sample_without_replacement( |
| self, |
| candidates: list[str], |
| weights: dict[str, float], |
| k: int, |
| rng: random.Random, |
| ) -> list[str]: |
| remaining = list(candidates) |
| picked: list[str] = [] |
| k = min(k, len(remaining)) |
| while remaining and len(picked) < k: |
| probs = [max(0.01, weights.get(c, 0.01)) for c in remaining] |
| choice = rng.choices(remaining, weights=probs, k=1)[0] |
| picked.append(choice) |
| remaining.remove(choice) |
| return picked |
|
|
| def _accident_impairments( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| intensity: DemandIntensity, |
| rng: random.Random, |
| ) -> tuple[set[str], dict[str, float]]: |
| importance = self._road_importance_scores(city_graph, district_data) |
| ranked = sorted(importance.keys(), key=lambda rid: importance[rid], reverse=True) |
| block_count = { |
| "normal": 1, |
| "moderate_rush": 2, |
| "heavy_rush": 2, |
| "overload": 3, |
| "accident_overload": 4, |
| }[intensity] |
| blocked = set(self._weighted_sample_without_replacement(ranked[:120], importance, block_count, rng)) |
| severity = { |
| "normal": 7.0, |
| "moderate_rush": 8.5, |
| "heavy_rush": 10.5, |
| "overload": 12.5, |
| "accident_overload": 14.5, |
| }[intensity] |
| penalized: dict[str, float] = {rid: severity for rid in blocked} |
|
|
| |
| by_intersection: dict[str, set[str]] = defaultdict(set) |
| for road_id, road in city_graph.directed_roads.items(): |
| by_intersection[road.start_intersection].add(road_id) |
| by_intersection[road.end_intersection].add(road_id) |
| for rid in blocked: |
| road = city_graph.directed_roads[rid] |
| nearby = by_intersection[road.start_intersection] | by_intersection[road.end_intersection] |
| for neighbor in nearby: |
| if neighbor in blocked: |
| continue |
| if neighbor in district_data.inter_district_roads or neighbor in city_graph.arterial_roads: |
| penalized[neighbor] = max(penalized.get(neighbor, 0.0), severity * 0.78) |
| return blocked, penalized |
|
|
| def _construction_impairments( |
| self, |
| city_graph: CityGraph, |
| district_data: DistrictData, |
| intensity: DemandIntensity, |
| rng: random.Random, |
| ) -> tuple[set[str], dict[str, float]]: |
| candidate_district = self._pick_high_pressure_district(district_data, rng) |
| members = set(district_data.districts[candidate_district].intersections) |
| localized = [ |
| rid |
| for rid, road in city_graph.directed_roads.items() |
| if road.start_intersection in members or road.end_intersection in members |
| ] |
| if not localized: |
| localized = sorted(city_graph.directed_roads.keys()) |
|
|
| importance = self._road_importance_scores(city_graph, district_data) |
| block_count = { |
| "normal": 1, |
| "moderate_rush": 1, |
| "heavy_rush": 2, |
| "overload": 3, |
| "accident_overload": 4, |
| }[intensity] |
| penalize_count = { |
| "normal": 10, |
| "moderate_rush": 16, |
| "heavy_rush": 22, |
| "overload": 30, |
| "accident_overload": 36, |
| }[intensity] |
| blocked = set( |
| self._weighted_sample_without_replacement(localized, importance, block_count, rng) |
| ) |
| penalize_candidates = self._weighted_sample_without_replacement( |
| localized, importance, penalize_count, rng |
| ) |
| severity = { |
| "normal": 5.0, |
| "moderate_rush": 6.5, |
| "heavy_rush": 8.5, |
| "overload": 10.5, |
| "accident_overload": 12.5, |
| }[intensity] |
| penalized: dict[str, float] = {} |
| for rid in penalize_candidates: |
| factor = severity |
| if rid in blocked: |
| factor = severity * 1.35 |
| penalized[rid] = max(penalized.get(rid, 0.0), factor) |
| for rid in blocked: |
| penalized[rid] = max(penalized.get(rid, 0.0), severity * 1.55) |
| return blocked, penalized |
|
|
| def _pick_high_pressure_district( |
| self, |
| district_data: DistrictData, |
| rng: random.Random, |
| ) -> str: |
| district_ids = sorted(district_data.districts.keys()) |
| weights: list[float] = [] |
| for did in district_ids: |
| district = district_data.districts[did] |
| w = 1.0 + 0.20 * len(district.neighbors) + 0.06 * len(district.boundary_intersections) |
| weights.append(w) |
| return rng.choices(district_ids, weights=weights, k=1)[0] |
|
|