agentic-traffic / data /generators /city_generator.py
Aditya2162's picture
Upload folder using huggingface_hub
9d09c45 verified
"""City-level orchestration: topology, districts, scenarios, flows, configs, validation."""
from __future__ import annotations
import random
from dataclasses import asdict
from pathlib import Path
from typing import Any
from .config_generator import ConfigGenerator
from .district_generator import DistrictGenerator
from .flow_generator import FlowGenerator
from .roadnet_generator import RoadnetGenerator
from .scenario_generator import ScenarioGenerator
from .schemas import DatasetGenerationConfig, TopologyType
from .utils import (
build_road_index,
build_roadlink_index,
clamp,
compute_scenario_diagnostics,
ensure_dir,
summarize_route_validation,
validate_district_contiguity,
validate_district_exit_capacity,
validate_inter_district_connectivity,
validate_unique_ids,
write_json,
)
class CityGenerator:
"""Generate one or many synthetic cities with scenario-specific CityFlow files."""
def __init__(self) -> None:
self.roadnet_generator = RoadnetGenerator()
self.district_generator = DistrictGenerator()
self.scenario_generator = ScenarioGenerator()
self.flow_generator = FlowGenerator()
self.config_generator = ConfigGenerator()
def generate_dataset(self, config: DatasetGenerationConfig) -> None:
ensure_dir(config.output_dir)
failures: list[tuple[str, str]] = []
for idx in range(config.num_cities):
city_id = f"city_{idx + 1:04d}"
city_seed = config.seed + idx * 10_003
try:
self.generate_city(
city_id=city_id,
output_dir=config.output_dir / city_id,
config=config,
city_seed=city_seed,
)
except Exception as exc:
failures.append((city_id, str(exc)))
if config.fail_fast:
raise
if failures:
details = "; ".join(
f"{city}: {message}" for city, message in failures[:5]
)
raise RuntimeError(
f"Dataset generation failed for {len(failures)} city/cities. {details}"
)
def generate_city(
self,
city_id: str,
output_dir: Path,
config: DatasetGenerationConfig,
city_seed: int,
) -> None:
ensure_dir(output_dir)
rng = random.Random(city_seed)
topology_pool: list[TopologyType] = list(config.topologies)
if not topology_pool:
raise ValueError("No topology families provided in configuration.")
attempts_per_topology = 10
ordered_topologies = topology_pool.copy()
rng.shuffle(ordered_topologies)
max_attempts = attempts_per_topology * len(ordered_topologies)
attempt_count = 0
city_graph = None
district_data = None
last_error: Exception | None = None
for topology in ordered_topologies:
for topology_attempt in range(attempts_per_topology):
attempt_count += 1
attempt_seed = city_seed + ((attempt_count - 1) * 1009)
target_intersections = clamp(
rng.randint(
config.min_districts * config.min_intersections_per_district,
config.max_districts * config.max_intersections_per_district,
),
low=config.min_districts * config.min_intersections_per_district,
high=config.max_districts * config.max_intersections_per_district + 36,
)
try:
city_graph = self.roadnet_generator.generate(
city_id=city_id,
seed=attempt_seed,
topology=topology,
target_intersections=target_intersections,
ring_diagonal_keep_prob=config.ring_diagonal_keep_prob,
ring_max_diagonal_fraction=config.ring_max_diagonal_fraction,
)
validate_unique_ids(city_graph)
max_districts = max(
config.min_districts,
min(
config.max_districts,
max(
2,
len(
[
nid
for nid in city_graph.intersections
if nid not in city_graph.gateway_intersections
]
)
// max(1, config.min_intersections_per_district),
),
),
)
if topology == "ring_road":
max_districts = min(max_districts, max(6, target_intersections // 10))
max_districts = max(config.min_districts, max_districts)
num_districts = rng.randint(config.min_districts, max_districts)
district_data = self.district_generator.generate(
city_graph=city_graph,
num_districts=num_districts,
seed=attempt_seed + 17,
)
validate_district_contiguity(city_graph, district_data)
validate_inter_district_connectivity(district_data)
min_exit_roads = 2 if topology == "ring_road" else 3
min_entry_roads = 2 if topology == "ring_road" else 3
min_neighbor_districts = 1 if topology == "ring_road" else 2
validate_district_exit_capacity(
district_data=district_data,
min_exit_roads=min_exit_roads,
min_entry_roads=min_entry_roads,
min_neighbor_districts=min_neighbor_districts,
)
print(
f"[INFO] {city_id} attempt={attempt_count} "
f"topology={topology} topology_try={topology_attempt + 1}/{attempts_per_topology} "
"generated successfully"
)
break
except Exception as exc:
message = str(exc)
print(
f"[WARN] {city_id} attempt={attempt_count} "
f"topology={topology} topology_try={topology_attempt + 1}/{attempts_per_topology} "
f"failed: {message}"
)
last_error = exc
city_graph = None
district_data = None
continue
if city_graph is not None and district_data is not None:
break
if city_graph is None or district_data is None:
raise ValueError(
f"Unable to produce a structurally valid city after {max_attempts} attempts: {last_error}"
)
roadnet_path = output_dir / "roadnet.json"
write_json(roadnet_path, city_graph.roadnet)
district_map = {
"intersection_to_district": district_data.intersection_to_district,
"district_neighbors": district_data.district_neighbors,
"boundary_intersections": district_data.boundary_intersections,
"gateway_intersections": sorted(city_graph.gateway_intersections),
"gateway_roads": sorted(city_graph.gateway_roads),
"districts": [
{
"id": d.id,
"type": d.district_type,
"intersections": d.intersections,
"neighbors": d.neighbors,
"boundary_intersections": d.boundary_intersections,
"entry_roads": d.entry_roads,
"exit_roads": d.exit_roads,
}
for d in district_data.districts.values()
],
}
write_json(output_dir / "district_map.json", district_map)
metadata = self._city_metadata(
city_id=city_id,
topology=topology,
city_seed=city_seed,
city_graph=city_graph,
district_data=district_data,
config=config,
)
write_json(output_dir / "metadata.json", metadata)
print(f"[INFO] {city_id} generated: topology={topology}, districts={len(district_data.districts)}")
scenario_plans = self.scenario_generator.generate(
city_graph=city_graph,
district_data=district_data,
scenario_names=config.scenarios,
base_seed=city_seed + 1000,
config=config,
)
self._generate_scenarios(
output_dir=output_dir,
city_graph=city_graph,
district_data=district_data,
scenario_plans=scenario_plans,
config=config,
roadnet_path=roadnet_path,
)
def _generate_scenarios(
self,
output_dir: Path,
city_graph: Any,
district_data: Any,
scenario_plans: dict[str, Any],
config: DatasetGenerationConfig,
roadnet_path: Path,
) -> None:
roads_by_id = build_road_index(city_graph.roadnet)
roadlinks_by_intersection = build_roadlink_index(city_graph.roadnet)
scenarios_dir = output_dir / "scenarios"
ensure_dir(scenarios_dir)
for scenario_name, plan in scenario_plans.items():
scenario_dir = scenarios_dir / scenario_name
ensure_dir(scenario_dir)
flows = self.flow_generator.generate(
city_graph=city_graph,
district_data=district_data,
scenario=plan,
simulation_steps=config.simulation_steps,
)
validation_summary = summarize_route_validation(
flow_entries=flows,
roads_by_id=roads_by_id,
roadlinks_by_intersection=roadlinks_by_intersection,
)
if validation_summary["invalid_routes"] > 0:
reasons = ", ".join(
f"{reason}={count}"
for reason, count in validation_summary["top_failure_reasons"]
)
raise ValueError(
f"{scenario_name} contains invalid routes after generation: {reasons}"
)
write_json(scenario_dir / "flow.json", flows)
diagnostics = compute_scenario_diagnostics(
flow_entries=flows,
city_graph=city_graph,
district_data=district_data,
)
config_payload = self.config_generator.generate(
simulation_steps=config.simulation_steps,
interval=config.interval,
seed=plan.seed,
save_replay=config.save_replay,
roadnet_file=roadnet_path,
flow_file=scenario_dir / "flow.json",
scenario_dir=scenario_dir,
)
write_json(scenario_dir / "config.json", config_payload)
write_json(
scenario_dir / "scenario_metadata.json",
{
"name": scenario_name,
"intensity": plan.intensity,
"seed": plan.seed,
"trip_multiplier": plan.trip_multiplier,
"trip_mix": asdict(plan.trip_mix),
"blocked_roads": sorted(plan.blocked_roads),
"penalized_roads": plan.penalized_roads,
"event_district": plan.event_district,
"overload_district": plan.overload_district,
"diagnostics": diagnostics,
"details": plan.metadata,
},
)
def _city_metadata(
self,
city_id: str,
topology: TopologyType,
city_seed: int,
city_graph: Any,
district_data: Any,
config: DatasetGenerationConfig,
) -> dict[str, Any]:
total_lanes = sum(
road.num_lanes for road in city_graph.directed_roads.values()
)
district_types = {
did: district.district_type
for did, district in district_data.districts.items()
}
return {
"city_id": city_id,
"topology": topology,
"seed": city_seed,
"counts": {
"intersections": len(city_graph.intersections),
"roads": len(city_graph.directed_roads),
"lanes": total_lanes,
"districts": len(district_data.districts),
},
"district_types": district_types,
"district_adjacency_graph": district_data.district_neighbors,
"inter_district_connector_roads": district_data.inter_district_roads,
"arterial_roads": sorted(city_graph.arterial_roads),
"gateway_intersections": sorted(city_graph.gateway_intersections),
"gateway_connector_roads": sorted(city_graph.gateway_roads),
"generation_parameters": {
"min_districts": config.min_districts,
"max_districts": config.max_districts,
"min_intersections_per_district": config.min_intersections_per_district,
"max_intersections_per_district": config.max_intersections_per_district,
"simulation_steps": config.simulation_steps,
"interval": config.interval,
"intensity_levels": config.intensity_levels,
"global_demand_multiplier": config.global_demand_multiplier,
"intensity_distribution": config.intensity_distribution,
"scenario_demand_multipliers": config.scenario_demand_multipliers,
"ring_diagonal_keep_prob": config.ring_diagonal_keep_prob,
"ring_max_diagonal_fraction": config.ring_max_diagonal_fraction,
},
}