File size: 7,828 Bytes
5893134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from __future__ import annotations

import json
import math
from pathlib import Path

from env.intersection_config import (
    DEFAULT_DISTRICT_TYPE,
    DISTRICT_TYPE_TO_INDEX,
    DistrictConfig,
    IntersectionConfig,
    PhaseConfig,
)


def load_json(path: str | Path) -> dict:
    return json.loads(Path(path).read_text())


def clamp(value: float, min_value: float, max_value: float) -> float:
    return max(min_value, min(max_value, value))


def normalize_scalar(value: float, scale: float) -> float:
    if scale <= 0:
        return float(value)
    return float(value) / float(scale)


def lane_ids_for_road(road: dict) -> tuple[str, ...]:
    return tuple(f"{road['id']}_{lane_index}" for lane_index, _ in enumerate(road["lanes"]))


def build_topology(

    roadnet_path: str | Path,

    district_map_path: str | Path | None = None,

    metadata_path: str | Path | None = None,

) -> tuple[dict[str, IntersectionConfig], dict[str, DistrictConfig]]:
    roadnet = load_json(roadnet_path)
    district_map = load_json(district_map_path) if district_map_path else {}
    metadata = load_json(metadata_path) if metadata_path else {}

    intersection_to_district = district_map.get("intersection_to_district", {})
    district_neighbors = district_map.get("district_neighbors", {})
    district_types = metadata.get("district_types", {})

    roads = {road["id"]: road for road in roadnet["roads"]}
    road_lookup_by_end: dict[str, list[dict]] = {}
    road_lookup_by_start: dict[str, list[dict]] = {}
    for road in roadnet["roads"]:
        road_lookup_by_end.setdefault(road["endIntersection"], []).append(road)
        road_lookup_by_start.setdefault(road["startIntersection"], []).append(road)

    intersections: dict[str, IntersectionConfig] = {}
    district_to_intersections: dict[str, list[str]] = {}

    for intersection in roadnet["intersections"]:
        if intersection.get("virtual", False):
            continue

        intersection_id = intersection["id"]
        district_id = intersection_to_district.get(intersection_id, "unknown")
        incoming_roads = _sort_roads_around_intersection(
            intersection=intersection,
            roads=road_lookup_by_end.get(intersection_id, []),
            incoming=True,
        )
        outgoing_roads = _sort_roads_around_intersection(
            intersection=intersection,
            roads=road_lookup_by_start.get(intersection_id, []),
            incoming=False,
        )

        incoming_lanes = tuple(
            lane_id
            for road in incoming_roads
            for lane_id in lane_ids_for_road(road)
        )
        outgoing_lanes = tuple(
            lane_id
            for road in outgoing_roads
            for lane_id in lane_ids_for_road(road)
        )

        green_phases: list[PhaseConfig] = []
        lightphases = intersection.get("trafficLight", {}).get("lightphases", [])
        road_links = intersection.get("roadLinks", [])
        for engine_phase_index, phase in enumerate(lightphases):
            available_road_links = tuple(phase.get("availableRoadLinks", []))
            if not available_road_links:
                continue

            served_incoming: set[str] = set()
            served_outgoing: set[str] = set()
            for road_link_index in available_road_links:
                road_link = road_links[road_link_index]
                start_road = road_link["startRoad"]
                end_road = road_link["endRoad"]
                for lane_link in road_link.get("laneLinks", []):
                    served_incoming.add(
                        f"{start_road}_{int(lane_link['startLaneIndex'])}"
                    )
                    served_outgoing.add(
                        f"{end_road}_{int(lane_link['endLaneIndex'])}"
                    )

            green_phases.append(
                PhaseConfig(
                    engine_phase_index=engine_phase_index,
                    available_road_links=available_road_links,
                    incoming_lanes_served=tuple(sorted(served_incoming)),
                    outgoing_lanes_served=tuple(sorted(served_outgoing)),
                )
            )

        if len(green_phases) < 2:
            continue

        district_type = _normalize_district_type(
            district_types.get(district_id, DEFAULT_DISTRICT_TYPE)
        )
        initial_phase_index = (
            green_phases[0].engine_phase_index
            if green_phases
            else 0
        )
        intersections[intersection_id] = IntersectionConfig(
            intersection_id=intersection_id,
            district_id=district_id,
            district_type=district_type,
            district_type_index=DISTRICT_TYPE_TO_INDEX[district_type],
            incoming_lanes=incoming_lanes,
            outgoing_lanes=outgoing_lanes,
            is_boundary=_is_boundary_intersection(
                intersection_id=intersection_id,
                district_id=district_id,
                incoming_roads=incoming_roads,
                outgoing_roads=outgoing_roads,
                intersection_to_district=intersection_to_district,
            ),
            green_phases=tuple(green_phases),
            all_phase_indices=tuple(range(len(lightphases))),
            initial_engine_phase_index=initial_phase_index,
        )
        district_to_intersections.setdefault(district_id, []).append(intersection_id)

    districts: dict[str, DistrictConfig] = {}
    for district_id, intersection_ids in district_to_intersections.items():
        district_type = _normalize_district_type(
            district_types.get(district_id, DEFAULT_DISTRICT_TYPE)
        )
        districts[district_id] = DistrictConfig(
            district_id=district_id,
            district_type=district_type,
            district_type_index=DISTRICT_TYPE_TO_INDEX[district_type],
            intersection_ids=tuple(sorted(intersection_ids)),
            neighbor_districts=tuple(sorted(district_neighbors.get(district_id, []))),
        )

    return intersections, districts


def _sort_roads_around_intersection(

    intersection: dict,

    roads: list[dict],

    incoming: bool,

) -> list[dict]:
    center_x = float(intersection["point"]["x"])
    center_y = float(intersection["point"]["y"])

    def angle_for_road(road: dict) -> tuple[float, str]:
        points = road.get("points", [])
        if not points:
            return (0.0, road["id"])

        reference_point = points[0] if incoming else points[-1]
        dx = float(reference_point["x"]) - center_x
        dy = float(reference_point["y"]) - center_y
        angle = math.atan2(dy, dx)
        return (angle, road["id"])

    return sorted(roads, key=angle_for_road)


def _normalize_district_type(value: str) -> str:
    normalized = str(value).strip().lower()
    if normalized not in DISTRICT_TYPE_TO_INDEX:
        return DEFAULT_DISTRICT_TYPE
    return normalized


def _is_boundary_intersection(

    intersection_id: str,

    district_id: str,

    incoming_roads: list[dict],

    outgoing_roads: list[dict],

    intersection_to_district: dict[str, str],

) -> bool:
    connected_intersections = {
        road["startIntersection"] for road in incoming_roads
    } | {
        road["endIntersection"] for road in outgoing_roads
    }
    connected_intersections.discard(intersection_id)
    for neighbor_intersection_id in connected_intersections:
        neighbor_district_id = intersection_to_district.get(neighbor_intersection_id)
        if neighbor_district_id is not None and neighbor_district_id != district_id:
            return True
    return False