agentic-traffic / data /generators /scenario_generator.py
Aditya2162's picture
Upload folder using huggingface_hub
9d09c45 verified
"""Scenario plan generation for demand intensity and network disturbances."""
from __future__ import annotations
import random
from collections import defaultdict
from .schemas import (
CityGraph,
DatasetGenerationConfig,
DemandIntensity,
DistrictData,
ScenarioPlan,
ScenarioType,
TripMix,
)
class ScenarioGenerator:
"""Generate scenario-specific modifiers, bottlenecks, and demand intensity."""
INTENSITY_MULTIPLIER: dict[DemandIntensity, float] = {
"normal": 1.0,
"moderate_rush": 1.45,
"heavy_rush": 2.1,
"overload": 2.9,
"accident_overload": 3.6,
}
SCENARIO_BASE_MULTIPLIER: dict[ScenarioType, float] = {
"normal": 1.15,
"morning_rush": 1.35,
"evening_rush": 1.35,
"accident": 1.85,
"construction": 1.65,
"event_spike": 1.75,
"district_overload": 1.80,
}
BASE_DEMAND_PER_INTERSECTION: dict[ScenarioType, int] = {
"normal": 42,
"morning_rush": 52,
"evening_rush": 52,
"accident": 60,
"construction": 56,
"event_spike": 62,
"district_overload": 64,
}
INTENSITY_ALLOWED_BY_SCENARIO: dict[ScenarioType, list[DemandIntensity]] = {
"normal": ["normal", "moderate_rush", "heavy_rush"],
"morning_rush": ["moderate_rush", "heavy_rush", "overload"],
"evening_rush": ["moderate_rush", "heavy_rush", "overload"],
"accident": ["heavy_rush", "overload", "accident_overload"],
"construction": ["moderate_rush", "heavy_rush", "overload", "accident_overload"],
"event_spike": ["moderate_rush", "heavy_rush", "overload", "accident_overload"],
"district_overload": ["moderate_rush", "heavy_rush", "overload", "accident_overload"],
}
def generate(
self,
city_graph: CityGraph,
district_data: DistrictData,
scenario_names: list[ScenarioType],
base_seed: int,
config: DatasetGenerationConfig,
) -> dict[str, ScenarioPlan]:
plans: dict[str, ScenarioPlan] = {}
for idx, name in enumerate(scenario_names):
seed = base_seed + (idx * 101)
rng = random.Random(seed)
intensity = self._sample_intensity(name, rng, config)
trip_multiplier = self._trip_multiplier(name, intensity, config)
trip_mix = self._trip_mix(name, intensity)
departure_windows = self._departure_windows(name, intensity)
base_demand = self.BASE_DEMAND_PER_INTERSECTION[name]
if name == "normal":
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
metadata={
"description": "Balanced baseline traffic with sampled intensity.",
"base_demand_per_intersection": base_demand,
},
)
elif name == "morning_rush":
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
metadata={
"description": "Strong residential outbound and work-district inbound pressure.",
"base_demand_per_intersection": base_demand,
},
)
elif name == "evening_rush":
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
metadata={
"description": "Strong work-district outbound and residential inbound pressure.",
"base_demand_per_intersection": base_demand,
},
)
elif name == "accident":
blocked, penalized = self._accident_impairments(
city_graph, district_data, intensity, rng
)
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
blocked_roads=blocked,
penalized_roads=penalized,
metadata={
"description": "Severe disruption on connector/arterial corridors.",
"base_demand_per_intersection": base_demand,
"accident_roads": sorted(blocked),
"bottleneck_penalties": {k: v for k, v in penalized.items() if k not in blocked},
},
)
elif name == "construction":
blocked, penalized = self._construction_impairments(
city_graph, district_data, intensity, rng
)
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
blocked_roads=blocked,
penalized_roads=penalized,
metadata={
"description": "Localized but severe construction bottlenecks.",
"base_demand_per_intersection": base_demand,
"construction_roads": sorted(blocked),
"bottleneck_penalties": {k: v for k, v in penalized.items() if k not in blocked},
},
)
elif name == "event_spike":
event_district = self._pick_high_pressure_district(district_data, rng)
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
event_district=event_district,
metadata={
"description": "Pre-event surge and outbound release around event district.",
"base_demand_per_intersection": base_demand,
"event_district": event_district,
},
)
elif name == "district_overload":
overload = self._pick_high_pressure_district(district_data, rng)
plans[name] = ScenarioPlan(
name=name,
intensity=intensity,
seed=seed,
trip_multiplier=trip_multiplier,
trip_mix=trip_mix,
departure_windows=departure_windows,
overload_district=overload,
metadata={
"description": "One district receives amplified production and attraction.",
"base_demand_per_intersection": base_demand,
"overload_district": overload,
},
)
else:
raise ValueError(f"Unsupported scenario: {name}")
return plans
def _sample_intensity(
self,
scenario: ScenarioType,
rng: random.Random,
config: DatasetGenerationConfig,
) -> DemandIntensity:
allowed = self.INTENSITY_ALLOWED_BY_SCENARIO[scenario]
candidates = [i for i in config.intensity_levels if i in allowed]
if not candidates:
candidates = allowed
scenario_bias: dict[ScenarioType, dict[DemandIntensity, float]] = {
"normal": {"normal": 1.35},
"morning_rush": {"heavy_rush": 1.4, "overload": 1.45},
"evening_rush": {"heavy_rush": 1.4, "overload": 1.45},
"accident": {"overload": 1.8, "accident_overload": 2.2},
"construction": {"heavy_rush": 1.3, "overload": 1.7, "accident_overload": 2.0},
"event_spike": {"heavy_rush": 1.35, "overload": 1.65, "accident_overload": 1.8},
"district_overload": {"heavy_rush": 1.35, "overload": 1.75, "accident_overload": 1.95},
}
bias = scenario_bias.get(scenario, {})
weights = [
max(0.0, config.intensity_distribution.get(i, 0.0)) * bias.get(i, 1.0)
for i in candidates
]
if sum(weights) <= 0.0:
weights = [1.0] * len(candidates)
return rng.choices(candidates, weights=weights, k=1)[0]
def _trip_multiplier(
self,
scenario: ScenarioType,
intensity: DemandIntensity,
config: DatasetGenerationConfig,
) -> float:
base = self.SCENARIO_BASE_MULTIPLIER[scenario]
intensity_scale = self.INTENSITY_MULTIPLIER[intensity]
per_scenario = config.scenario_demand_multipliers.get(scenario, 1.0)
return base * intensity_scale * config.global_demand_multiplier * per_scenario
def _trip_mix(
self,
scenario: ScenarioType,
intensity: DemandIntensity,
) -> TripMix:
base: dict[ScenarioType, tuple[float, float, float]] = {
"normal": (0.44, 0.34, 0.22),
"morning_rush": (0.34, 0.38, 0.28),
"evening_rush": (0.34, 0.38, 0.28),
"accident": (0.28, 0.40, 0.32),
"construction": (0.30, 0.40, 0.30),
"event_spike": (0.24, 0.40, 0.36),
"district_overload": (0.26, 0.42, 0.32),
}
intra, adjacent, long = base[scenario]
intensity_shift = {
"normal": 0.00,
"moderate_rush": 0.03,
"heavy_rush": 0.06,
"overload": 0.09,
"accident_overload": 0.12,
}[intensity]
intra = max(0.14, intra - intensity_shift)
adjacent = min(0.56, adjacent + (0.55 * intensity_shift))
long = min(0.44, long + (0.45 * intensity_shift))
norm = intra + adjacent + long
return TripMix(
intra_district=intra / norm,
adjacent_district=adjacent / norm,
long_distance=long / norm,
)
def _departure_windows(
self,
scenario: ScenarioType,
intensity: DemandIntensity,
) -> list[tuple[float, float, float]]:
compression = {
"normal": 1.00,
"moderate_rush": 0.82,
"heavy_rush": 0.62,
"overload": 0.46,
"accident_overload": 0.35,
}[intensity]
if scenario == "morning_rush":
peak_width = 0.34 * compression
peak_start = max(0.07, 0.26 - peak_width / 2.0)
peak_end = min(0.58, peak_start + peak_width)
return [(0.0, peak_start, 0.12), (peak_start, peak_end, 0.76), (peak_end, 1.0, 0.12)]
if scenario == "evening_rush":
peak_width = 0.34 * compression
peak_end = min(0.95, 0.74 + peak_width / 2.0)
peak_start = max(0.34, peak_end - peak_width)
return [(0.0, peak_start, 0.10), (peak_start, peak_end, 0.76), (peak_end, 1.0, 0.14)]
if scenario in {"event_spike", "district_overload"}:
peak_width = 0.42 * compression
peak_start = max(0.20, 0.52 - peak_width / 2.0)
peak_end = min(0.90, peak_start + peak_width)
return [(0.0, peak_start, 0.10), (peak_start, peak_end, 0.74), (peak_end, 1.0, 0.16)]
if scenario in {"accident", "construction"}:
peak_width = 0.45 * compression
peak_start = max(0.16, 0.48 - peak_width / 2.0)
peak_end = min(0.88, peak_start + peak_width)
return [(0.0, peak_start, 0.16), (peak_start, peak_end, 0.68), (peak_end, 1.0, 0.16)]
return [(0.0, 0.28, 0.22), (0.28, 0.72, 0.56), (0.72, 1.0, 0.22)]
def _road_importance_scores(
self,
city_graph: CityGraph,
district_data: DistrictData,
) -> dict[str, float]:
boundary_nodes = set(district_data.boundary_intersections)
scores: dict[str, float] = {}
for road_id, road in city_graph.directed_roads.items():
score = 1.0
if road_id in city_graph.arterial_roads:
score += 3.0
if road_id in district_data.inter_district_roads:
score += 2.5
if road.start_intersection in boundary_nodes or road.end_intersection in boundary_nodes:
score += 1.8
score += 0.30 * len(city_graph.adjacency[road.start_intersection])
score += 0.30 * len(city_graph.adjacency[road.end_intersection])
scores[road_id] = score
return scores
def _weighted_sample_without_replacement(
self,
candidates: list[str],
weights: dict[str, float],
k: int,
rng: random.Random,
) -> list[str]:
remaining = list(candidates)
picked: list[str] = []
k = min(k, len(remaining))
while remaining and len(picked) < k:
probs = [max(0.01, weights.get(c, 0.01)) for c in remaining]
choice = rng.choices(remaining, weights=probs, k=1)[0]
picked.append(choice)
remaining.remove(choice)
return picked
def _accident_impairments(
self,
city_graph: CityGraph,
district_data: DistrictData,
intensity: DemandIntensity,
rng: random.Random,
) -> tuple[set[str], dict[str, float]]:
importance = self._road_importance_scores(city_graph, district_data)
ranked = sorted(importance.keys(), key=lambda rid: importance[rid], reverse=True)
block_count = {
"normal": 1,
"moderate_rush": 2,
"heavy_rush": 2,
"overload": 3,
"accident_overload": 4,
}[intensity]
blocked = set(self._weighted_sample_without_replacement(ranked[:120], importance, block_count, rng))
severity = {
"normal": 7.0,
"moderate_rush": 8.5,
"heavy_rush": 10.5,
"overload": 12.5,
"accident_overload": 14.5,
}[intensity]
penalized: dict[str, float] = {rid: severity for rid in blocked}
# Expand impairment to adjacent connector roads to create spillback.
by_intersection: dict[str, set[str]] = defaultdict(set)
for road_id, road in city_graph.directed_roads.items():
by_intersection[road.start_intersection].add(road_id)
by_intersection[road.end_intersection].add(road_id)
for rid in blocked:
road = city_graph.directed_roads[rid]
nearby = by_intersection[road.start_intersection] | by_intersection[road.end_intersection]
for neighbor in nearby:
if neighbor in blocked:
continue
if neighbor in district_data.inter_district_roads or neighbor in city_graph.arterial_roads:
penalized[neighbor] = max(penalized.get(neighbor, 0.0), severity * 0.78)
return blocked, penalized
def _construction_impairments(
self,
city_graph: CityGraph,
district_data: DistrictData,
intensity: DemandIntensity,
rng: random.Random,
) -> tuple[set[str], dict[str, float]]:
candidate_district = self._pick_high_pressure_district(district_data, rng)
members = set(district_data.districts[candidate_district].intersections)
localized = [
rid
for rid, road in city_graph.directed_roads.items()
if road.start_intersection in members or road.end_intersection in members
]
if not localized:
localized = sorted(city_graph.directed_roads.keys())
importance = self._road_importance_scores(city_graph, district_data)
block_count = {
"normal": 1,
"moderate_rush": 1,
"heavy_rush": 2,
"overload": 3,
"accident_overload": 4,
}[intensity]
penalize_count = {
"normal": 10,
"moderate_rush": 16,
"heavy_rush": 22,
"overload": 30,
"accident_overload": 36,
}[intensity]
blocked = set(
self._weighted_sample_without_replacement(localized, importance, block_count, rng)
)
penalize_candidates = self._weighted_sample_without_replacement(
localized, importance, penalize_count, rng
)
severity = {
"normal": 5.0,
"moderate_rush": 6.5,
"heavy_rush": 8.5,
"overload": 10.5,
"accident_overload": 12.5,
}[intensity]
penalized: dict[str, float] = {}
for rid in penalize_candidates:
factor = severity
if rid in blocked:
factor = severity * 1.35
penalized[rid] = max(penalized.get(rid, 0.0), factor)
for rid in blocked:
penalized[rid] = max(penalized.get(rid, 0.0), severity * 1.55)
return blocked, penalized
def _pick_high_pressure_district(
self,
district_data: DistrictData,
rng: random.Random,
) -> str:
district_ids = sorted(district_data.districts.keys())
weights: list[float] = []
for did in district_ids:
district = district_data.districts[did]
w = 1.0 + 0.20 * len(district.neighbors) + 0.06 * len(district.boundary_intersections)
weights.append(w)
return rng.choices(district_ids, weights=weights, k=1)[0]