| from __future__ import annotations |
|
|
| import json |
| import math |
| from pathlib import Path |
|
|
| from env.intersection_config import ( |
| DEFAULT_DISTRICT_TYPE, |
| DISTRICT_TYPE_TO_INDEX, |
| DistrictConfig, |
| IntersectionConfig, |
| PhaseConfig, |
| ) |
|
|
|
|
| def load_json(path: str | Path) -> dict: |
| return json.loads(Path(path).read_text()) |
|
|
|
|
| def clamp(value: float, min_value: float, max_value: float) -> float: |
| return max(min_value, min(max_value, value)) |
|
|
|
|
| def normalize_scalar(value: float, scale: float) -> float: |
| if scale <= 0: |
| return float(value) |
| return float(value) / float(scale) |
|
|
|
|
| def lane_ids_for_road(road: dict) -> tuple[str, ...]: |
| return tuple(f"{road['id']}_{lane_index}" for lane_index, _ in enumerate(road["lanes"])) |
|
|
|
|
| def build_topology( |
| roadnet_path: str | Path, |
| district_map_path: str | Path | None = None, |
| metadata_path: str | Path | None = None, |
| ) -> tuple[dict[str, IntersectionConfig], dict[str, DistrictConfig]]: |
| roadnet = load_json(roadnet_path) |
| district_map = load_json(district_map_path) if district_map_path else {} |
| metadata = load_json(metadata_path) if metadata_path else {} |
|
|
| intersection_to_district = district_map.get("intersection_to_district", {}) |
| district_neighbors = district_map.get("district_neighbors", {}) |
| district_types = metadata.get("district_types", {}) |
|
|
| roads = {road["id"]: road for road in roadnet["roads"]} |
| road_lookup_by_end: dict[str, list[dict]] = {} |
| road_lookup_by_start: dict[str, list[dict]] = {} |
| for road in roadnet["roads"]: |
| road_lookup_by_end.setdefault(road["endIntersection"], []).append(road) |
| road_lookup_by_start.setdefault(road["startIntersection"], []).append(road) |
|
|
| intersections: dict[str, IntersectionConfig] = {} |
| district_to_intersections: dict[str, list[str]] = {} |
|
|
| for intersection in roadnet["intersections"]: |
| if intersection.get("virtual", False): |
| continue |
|
|
| intersection_id = intersection["id"] |
| district_id = intersection_to_district.get(intersection_id, "unknown") |
| incoming_roads = _sort_roads_around_intersection( |
| intersection=intersection, |
| roads=road_lookup_by_end.get(intersection_id, []), |
| incoming=True, |
| ) |
| outgoing_roads = _sort_roads_around_intersection( |
| intersection=intersection, |
| roads=road_lookup_by_start.get(intersection_id, []), |
| incoming=False, |
| ) |
|
|
| incoming_lanes = tuple( |
| lane_id |
| for road in incoming_roads |
| for lane_id in lane_ids_for_road(road) |
| ) |
| outgoing_lanes = tuple( |
| lane_id |
| for road in outgoing_roads |
| for lane_id in lane_ids_for_road(road) |
| ) |
|
|
| green_phases: list[PhaseConfig] = [] |
| lightphases = intersection.get("trafficLight", {}).get("lightphases", []) |
| road_links = intersection.get("roadLinks", []) |
| for engine_phase_index, phase in enumerate(lightphases): |
| available_road_links = tuple(phase.get("availableRoadLinks", [])) |
| if not available_road_links: |
| continue |
|
|
| served_incoming: set[str] = set() |
| served_outgoing: set[str] = set() |
| for road_link_index in available_road_links: |
| road_link = road_links[road_link_index] |
| start_road = road_link["startRoad"] |
| end_road = road_link["endRoad"] |
| for lane_link in road_link.get("laneLinks", []): |
| served_incoming.add( |
| f"{start_road}_{int(lane_link['startLaneIndex'])}" |
| ) |
| served_outgoing.add( |
| f"{end_road}_{int(lane_link['endLaneIndex'])}" |
| ) |
|
|
| green_phases.append( |
| PhaseConfig( |
| engine_phase_index=engine_phase_index, |
| available_road_links=available_road_links, |
| incoming_lanes_served=tuple(sorted(served_incoming)), |
| outgoing_lanes_served=tuple(sorted(served_outgoing)), |
| ) |
| ) |
|
|
| if len(green_phases) < 2: |
| continue |
|
|
| district_type = _normalize_district_type( |
| district_types.get(district_id, DEFAULT_DISTRICT_TYPE) |
| ) |
| initial_phase_index = ( |
| green_phases[0].engine_phase_index |
| if green_phases |
| else 0 |
| ) |
| intersections[intersection_id] = IntersectionConfig( |
| intersection_id=intersection_id, |
| district_id=district_id, |
| district_type=district_type, |
| district_type_index=DISTRICT_TYPE_TO_INDEX[district_type], |
| incoming_lanes=incoming_lanes, |
| outgoing_lanes=outgoing_lanes, |
| is_boundary=_is_boundary_intersection( |
| intersection_id=intersection_id, |
| district_id=district_id, |
| incoming_roads=incoming_roads, |
| outgoing_roads=outgoing_roads, |
| intersection_to_district=intersection_to_district, |
| ), |
| green_phases=tuple(green_phases), |
| all_phase_indices=tuple(range(len(lightphases))), |
| initial_engine_phase_index=initial_phase_index, |
| ) |
| district_to_intersections.setdefault(district_id, []).append(intersection_id) |
|
|
| districts: dict[str, DistrictConfig] = {} |
| for district_id, intersection_ids in district_to_intersections.items(): |
| district_type = _normalize_district_type( |
| district_types.get(district_id, DEFAULT_DISTRICT_TYPE) |
| ) |
| districts[district_id] = DistrictConfig( |
| district_id=district_id, |
| district_type=district_type, |
| district_type_index=DISTRICT_TYPE_TO_INDEX[district_type], |
| intersection_ids=tuple(sorted(intersection_ids)), |
| neighbor_districts=tuple(sorted(district_neighbors.get(district_id, []))), |
| ) |
|
|
| return intersections, districts |
|
|
|
|
| def _sort_roads_around_intersection( |
| intersection: dict, |
| roads: list[dict], |
| incoming: bool, |
| ) -> list[dict]: |
| center_x = float(intersection["point"]["x"]) |
| center_y = float(intersection["point"]["y"]) |
|
|
| def angle_for_road(road: dict) -> tuple[float, str]: |
| points = road.get("points", []) |
| if not points: |
| return (0.0, road["id"]) |
|
|
| reference_point = points[0] if incoming else points[-1] |
| dx = float(reference_point["x"]) - center_x |
| dy = float(reference_point["y"]) - center_y |
| angle = math.atan2(dy, dx) |
| return (angle, road["id"]) |
|
|
| return sorted(roads, key=angle_for_road) |
|
|
|
|
| def _normalize_district_type(value: str) -> str: |
| normalized = str(value).strip().lower() |
| if normalized not in DISTRICT_TYPE_TO_INDEX: |
| return DEFAULT_DISTRICT_TYPE |
| return normalized |
|
|
|
|
| def _is_boundary_intersection( |
| intersection_id: str, |
| district_id: str, |
| incoming_roads: list[dict], |
| outgoing_roads: list[dict], |
| intersection_to_district: dict[str, str], |
| ) -> bool: |
| connected_intersections = { |
| road["startIntersection"] for road in incoming_roads |
| } | { |
| road["endIntersection"] for road in outgoing_roads |
| } |
| connected_intersections.discard(intersection_id) |
| for neighbor_intersection_id in connected_intersections: |
| neighbor_district_id = intersection_to_district.get(neighbor_intersection_id) |
| if neighbor_district_id is not None and neighbor_district_id != district_id: |
| return True |
| return False |
|
|