agentic-traffic / data /generators /roadnet_generator.py
Aditya2162's picture
Upload folder using huggingface_hub
9d09c45 verified
"""Road network topology generation and CityFlow roadnet assembly."""
from __future__ import annotations
import math
import random
from collections import defaultdict
from itertools import product
from .schemas import CityGraph, RoadRecord, TopologyType
from .utils import euclidean
class RoadnetGenerator:
"""Generate city intersection graph and convert to CityFlow roadnet format."""
def generate(
self,
city_id: str,
seed: int,
topology: TopologyType,
target_intersections: int,
ring_diagonal_keep_prob: float = 0.07,
ring_max_diagonal_fraction: float = 0.03,
) -> CityGraph:
rng = random.Random(seed)
coords, undirected_edges, arterial_pairs = self._build_topology(
topology=topology,
target_nodes=target_intersections,
rng=rng,
ring_diagonal_keep_prob=ring_diagonal_keep_prob,
ring_max_diagonal_fraction=ring_max_diagonal_fraction,
)
(
coords,
undirected_edges,
gateway_pairs,
gateway_nodes,
) = self._augment_with_perimeter_gateways(
coords=coords,
undirected_edges=undirected_edges,
rng=rng,
)
adjacency = self._to_adjacency(coords, undirected_edges)
directed_roads, arterial_road_ids, gateway_road_ids = self._build_directed_roads(
coords=coords,
undirected_edges=undirected_edges,
arterial_pairs=arterial_pairs,
gateway_pairs=gateway_pairs,
)
roadnet = self._build_roadnet(
coords=coords,
adjacency=adjacency,
directed_roads=directed_roads,
)
return CityGraph(
city_id=city_id,
topology=topology,
seed=seed,
intersections=coords,
adjacency=adjacency,
directed_roads=directed_roads,
roadnet=roadnet,
arterial_roads=arterial_road_ids,
gateway_intersections=gateway_nodes,
gateway_roads=gateway_road_ids,
)
def _build_topology(
self,
topology: TopologyType,
target_nodes: int,
rng: random.Random,
ring_diagonal_keep_prob: float,
ring_max_diagonal_fraction: float,
) -> tuple[
dict[str, tuple[float, float]],
list[tuple[str, str]],
set[frozenset[str]],
]:
if topology == "rectangular_grid":
return self._rectangular_grid(target_nodes, rng)
if topology == "irregular_grid":
return self._irregular_grid(target_nodes, rng)
if topology == "arterial_local":
return self._arterial_local(target_nodes, rng)
if topology == "ring_road":
return self._ring_road(
target_nodes=target_nodes,
rng=rng,
ring_diagonal_keep_prob=ring_diagonal_keep_prob,
ring_max_diagonal_fraction=ring_max_diagonal_fraction,
)
return self._mixed(target_nodes, rng)
def _dimensions(self, target_nodes: int) -> tuple[int, int]:
cols = max(3, int(round(math.sqrt(target_nodes))))
rows = max(3, int(math.ceil(target_nodes / cols)))
return rows, cols
def _grid_coords(
self,
rows: int,
cols: int,
spacing: float,
jitter: float,
rng: random.Random,
) -> dict[str, tuple[float, float]]:
coords: dict[str, tuple[float, float]] = {}
idx = 0
for r, c in product(range(rows), range(cols)):
x = c * spacing + rng.uniform(-jitter, jitter)
y = r * spacing + rng.uniform(-jitter, jitter)
coords[f"i_{idx:04d}"] = (x, y)
idx += 1
return coords
def _smooth_axis_offsets(
self,
size: int,
max_offset: float,
step_limit: float,
rng: random.Random,
) -> list[float]:
offsets = [0.0] * size
drift = 0.0
for idx in range(1, size - 1):
drift += rng.uniform(-step_limit, step_limit)
drift = max(-max_offset, min(max_offset, drift))
offsets[idx] = drift
if size > 2:
mean_mid = sum(offsets[1:-1]) / (size - 2)
for idx in range(1, size - 1):
centered = offsets[idx] - mean_mid
offsets[idx] = max(-max_offset, min(max_offset, centered))
offsets[0] = 0.0
offsets[-1] = 0.0
return offsets
def _boundary_stability_weight(self, idx: int, size: int) -> float:
distance = min(idx, size - 1 - idx)
if distance <= 0:
return 0.0
if distance == 1:
return 0.2
if distance == 2:
return 0.45
if distance == 3:
return 0.72
return 1.00
def _axis_positions(
self,
size: int,
spacing: float,
gap_variation: float,
rng: random.Random,
) -> list[float]:
if size <= 1:
return [0.0]
gap_profile = self._smooth_drop_profile(size - 1, rng)
positions = [0.0]
min_gap = spacing * (1.0 - gap_variation)
max_gap = spacing * (1.0 + gap_variation)
for gap_idx in range(size - 1):
centered = (gap_profile[gap_idx] - 0.5) * 2.0
edge_distance = min(gap_idx, (size - 2) - gap_idx)
if edge_distance <= 0:
edge_weight = 0.32
elif edge_distance == 1:
edge_weight = 0.55
elif edge_distance == 2:
edge_weight = 0.8
else:
edge_weight = 1.0
local_noise = rng.uniform(-0.22, 0.22) * gap_variation * edge_weight
gap = spacing * (1.0 + (centered * gap_variation * edge_weight) + local_noise)
gap = max(min_gap, min(max_gap, gap))
positions.append(positions[-1] + gap)
nominal_span = spacing * (size - 1)
actual_span = positions[-1]
if actual_span > 1e-9:
scale = nominal_span / actual_span
positions = [p * scale for p in positions]
return positions
def _irregular_grid_coords(
self,
rows: int,
cols: int,
spacing: float,
rng: random.Random,
) -> dict[str, tuple[float, float]]:
# Keep intersections mostly on row/column lines while varying block size.
row_positions = self._axis_positions(rows, spacing, gap_variation=0.16, rng=rng)
col_positions = self._axis_positions(cols, spacing, gap_variation=0.16, rng=rng)
# Small line-wise drift and very small local jitter.
max_row_offset = spacing * 0.018
max_col_offset = spacing * 0.018
row_step = spacing * 0.006
col_step = spacing * 0.006
local_jitter = spacing * 0.0045
row_offsets = self._smooth_axis_offsets(rows, max_row_offset, row_step, rng)
col_offsets = self._smooth_axis_offsets(cols, max_col_offset, col_step, rng)
coords: dict[str, tuple[float, float]] = {}
idx = 0
for r, c in product(range(rows), range(cols)):
# Keep perimeter nodes stable while allowing interior irregularity.
perimeter_weight = min(
self._boundary_stability_weight(r, rows),
self._boundary_stability_weight(c, cols),
)
jitter_weight = perimeter_weight * perimeter_weight
x = col_positions[c] + (col_offsets[c] * perimeter_weight)
y = row_positions[r] + (row_offsets[r] * perimeter_weight)
x += rng.uniform(-local_jitter, local_jitter) * jitter_weight
y += rng.uniform(-local_jitter, local_jitter) * jitter_weight
coords[f"i_{idx:04d}"] = (x, y)
idx += 1
return coords
def _grid_edges(self, rows: int, cols: int) -> list[tuple[int, int]]:
edges: list[tuple[int, int]] = []
for r in range(rows):
for c in range(cols):
idx = r * cols + c
if c + 1 < cols:
edges.append((idx, idx + 1))
if r + 1 < rows:
edges.append((idx, idx + cols))
return edges
def _estimate_spacing(
self,
coords: dict[str, tuple[float, float]],
undirected_edges: list[tuple[str, str]],
) -> float:
if not undirected_edges:
return 120.0
lengths = [
euclidean(coords[a], coords[b])
for a, b in undirected_edges
if a in coords and b in coords
]
if not lengths:
return 120.0
lengths.sort()
return lengths[len(lengths) // 2]
def _select_spread_nodes(
self,
candidates: list[str],
coords: dict[str, tuple[float, float]],
count: int,
axis: str,
) -> list[str]:
if count <= 0 or not candidates:
return []
if len(candidates) <= count:
return candidates
axis_idx = 0 if axis == "x" else 1
ordered = sorted(
candidates,
key=lambda nid: (coords[nid][axis_idx], nid),
)
selected: list[str] = []
for i in range(count):
pos = int(round((i * (len(ordered) - 1)) / max(1, count - 1)))
selected.append(ordered[pos])
deduped = sorted(set(selected), key=lambda nid: ordered.index(nid))
if len(deduped) >= count:
return deduped[:count]
for node in ordered:
if node in deduped:
continue
deduped.append(node)
if len(deduped) >= count:
break
return deduped
def _augment_with_perimeter_gateways(
self,
coords: dict[str, tuple[float, float]],
undirected_edges: list[tuple[str, str]],
rng: random.Random,
) -> tuple[
dict[str, tuple[float, float]],
list[tuple[str, str]],
set[frozenset[str]],
set[str],
]:
if not coords:
return coords, undirected_edges, set(), set()
min_x = min(x for x, _ in coords.values())
max_x = max(x for x, _ in coords.values())
min_y = min(y for _, y in coords.values())
max_y = max(y for _, y in coords.values())
spacing = self._estimate_spacing(coords, undirected_edges)
threshold = max(4.0, spacing * 0.08)
per_side_target = 2
side_to_candidates: dict[str, list[str]] = {
"west": [],
"east": [],
"south": [],
"north": [],
}
for node_id, (x, y) in coords.items():
distances = {
"west": abs(x - min_x),
"east": abs(x - max_x),
"south": abs(y - min_y),
"north": abs(y - max_y),
}
side = min(distances, key=distances.get)
if distances[side] <= threshold:
side_to_candidates[side].append(node_id)
selected_anchors: list[tuple[str, str]] = []
used_anchors: set[str] = set()
for side in ("west", "east", "south", "north"):
candidates = [n for n in side_to_candidates[side] if n not in used_anchors]
axis = "y" if side in {"west", "east"} else "x"
chosen = self._select_spread_nodes(
candidates=candidates,
coords=coords,
count=per_side_target,
axis=axis,
)
for anchor in chosen:
if anchor in used_anchors:
continue
used_anchors.add(anchor)
selected_anchors.append((side, anchor))
if not selected_anchors:
return coords, undirected_edges, set(), set()
offset = max(45.0, spacing * 0.82)
gateway_pairs: set[frozenset[str]] = set()
gateway_nodes: set[str] = set()
next_idx = 0
for side, anchor in selected_anchors:
ax, ay = coords[anchor]
if side == "west":
gx, gy = min_x - offset, ay + rng.uniform(-spacing * 0.03, spacing * 0.03)
elif side == "east":
gx, gy = max_x + offset, ay + rng.uniform(-spacing * 0.03, spacing * 0.03)
elif side == "south":
gx, gy = ax + rng.uniform(-spacing * 0.03, spacing * 0.03), min_y - offset
else:
gx, gy = ax + rng.uniform(-spacing * 0.03, spacing * 0.03), max_y + offset
gateway_id = f"g_{next_idx:04d}"
next_idx += 1
coords[gateway_id] = (gx, gy)
undirected_edges.append((anchor, gateway_id))
gateway_pairs.add(frozenset((anchor, gateway_id)))
gateway_nodes.add(gateway_id)
return coords, undirected_edges, gateway_pairs, gateway_nodes
def _arterial_indices(self, size: int) -> list[int]:
candidates = {size // 3, size // 2, (2 * size) // 3}
selected = sorted(i for i in candidates if 0 < i < size - 1)
if not selected and size > 2:
selected = [size // 2]
return selected
def _smooth_drop_profile(self, size: int, rng: random.Random) -> list[float]:
values: list[float] = []
state = rng.uniform(-0.25, 0.25)
for _ in range(size):
state = 0.78 * state + 0.22 * rng.uniform(-1.0, 1.0)
values.append(state)
low = min(values)
high = max(values)
if abs(high - low) < 1e-6:
return [0.5] * size
return [(value - low) / (high - low) for value in values]
def _edge_orientation(
self,
a: int,
b: int,
cols: int,
) -> tuple[str, int, int]:
ra, ca = divmod(a, cols)
rb, cb = divmod(b, cols)
if ra == rb:
return ("horizontal", ra, min(ca, cb))
return ("vertical", ca, min(ra, rb))
def _is_edge_protected(
self,
a: int,
b: int,
rows: int,
cols: int,
arterial_rows: set[int],
arterial_cols: set[int],
) -> tuple[bool, bool]:
ra, ca = divmod(a, cols)
rb, cb = divmod(b, cols)
if ra == rb:
on_perimeter = ra in {0, rows - 1}
on_arterial = ra in arterial_rows
else:
on_perimeter = ca in {0, cols - 1}
on_arterial = ca in arterial_cols
return on_perimeter or on_arterial, on_arterial
def _has_path_without_edge(
self,
start: int,
goal: int,
adjacency: dict[int, set[int]],
edge: tuple[int, int],
) -> bool:
blocked_u, blocked_v = edge
stack = [start]
visited = {start}
while stack:
node = stack.pop()
if node == goal:
return True
for nxt in adjacency[node]:
if (
(node == blocked_u and nxt == blocked_v)
or (node == blocked_v and nxt == blocked_u)
):
continue
if nxt in visited:
continue
visited.add(nxt)
stack.append(nxt)
return False
def _is_connected_without_str_edge(
self,
start: str,
goal: str,
adjacency: dict[str, set[str]],
edge: tuple[str, str],
) -> bool:
blocked_u, blocked_v = edge
stack = [start]
visited = {start}
while stack:
node = stack.pop()
if node == goal:
return True
for nxt in adjacency[node]:
if (
(node == blocked_u and nxt == blocked_v)
or (node == blocked_v and nxt == blocked_u)
):
continue
if nxt in visited:
continue
visited.add(nxt)
stack.append(nxt)
return False
def _rectangular_grid(
self, target_nodes: int, rng: random.Random
) -> tuple[dict[str, tuple[float, float]], list[tuple[str, str]], set[frozenset[str]]]:
rows, cols = self._dimensions(target_nodes)
coords = self._grid_coords(rows, cols, spacing=120.0, jitter=6.0, rng=rng)
edges_raw = self._grid_edges(rows, cols)
id_list = list(coords.keys())
edges = [(id_list[a], id_list[b]) for a, b in edges_raw]
return coords, edges, set()
def _irregular_grid(
self, target_nodes: int, rng: random.Random
) -> tuple[dict[str, tuple[float, float]], list[tuple[str, str]], set[frozenset[str]]]:
rows, cols = self._dimensions(int(target_nodes * 1.08))
coords = self._irregular_grid_coords(rows, cols, spacing=115.0, rng=rng)
edges_raw = self._grid_edges(rows, cols)
filtered: set[tuple[int, int]] = set(edges_raw)
adjacency: dict[int, set[int]] = defaultdict(set)
for a, b in edges_raw:
adjacency[a].add(b)
adjacency[b].add(a)
arterial_rows = set(self._arterial_indices(rows))
arterial_cols = set(self._arterial_indices(cols))
row_profile = self._smooth_drop_profile(rows, rng)
col_profile = self._smooth_drop_profile(cols, rng)
base_drop_prob = 0.11
interior_rows = [r for r in range(2, rows - 2) if r not in arterial_rows]
interior_cols = [c for c in range(2, cols - 2) if c not in arterial_cols]
row_gap_rows = set(
rng.sample(interior_rows, k=min(max(1, rows // 7), len(interior_rows)))
) if interior_rows else set()
col_gap_cols = set(
rng.sample(interior_cols, k=min(max(1, cols // 7), len(interior_cols)))
) if interior_cols else set()
arterial_pairs: set[frozenset[str]] = set()
removable: list[tuple[int, int]] = []
for a, b in edges_raw:
protected, arterial = self._is_edge_protected(
a=a,
b=b,
rows=rows,
cols=cols,
arterial_rows=arterial_rows,
arterial_cols=arterial_cols,
)
if arterial:
ida = f"i_{a:04d}"
idb = f"i_{b:04d}"
arterial_pairs.add(frozenset((ida, idb)))
if not protected:
removable.append((a, b))
rng.shuffle(removable)
for a, b in removable:
orientation, major_idx, minor_idx = self._edge_orientation(a, b, cols)
if orientation == "horizontal":
drop_prob = base_drop_prob + (0.11 * row_profile[major_idx]) + (
0.08 * col_profile[minor_idx]
)
if major_idx in row_gap_rows:
drop_prob += 0.16
else:
drop_prob = base_drop_prob + (0.11 * col_profile[major_idx]) + (
0.08 * row_profile[minor_idx]
)
if major_idx in col_gap_cols:
drop_prob += 0.16
# Keep perimeter-adjacent links denser to avoid sparse fringes.
ra, ca = divmod(a, cols)
rb, cb = divmod(b, cols)
boundary_distance = min(
ra,
rb,
rows - 1 - ra,
rows - 1 - rb,
ca,
cb,
cols - 1 - ca,
cols - 1 - cb,
)
if boundary_distance <= 1:
drop_prob *= 0.45
elif boundary_distance == 2:
drop_prob *= 0.75
if rng.random() > min(0.46, max(0.0, drop_prob)):
continue
if len(adjacency[a]) <= 2 or len(adjacency[b]) <= 2:
continue
if not self._has_path_without_edge(a, b, adjacency, (a, b)):
continue
adjacency[a].remove(b)
adjacency[b].remove(a)
filtered.remove((a, b))
id_list = list(coords.keys())
edges = [(id_list[a], id_list[b]) for a, b in sorted(filtered)]
return coords, edges, arterial_pairs
def _arterial_local(
self, target_nodes: int, rng: random.Random
) -> tuple[dict[str, tuple[float, float]], list[tuple[str, str]], set[frozenset[str]]]:
rows, cols = self._dimensions(target_nodes)
coords = self._grid_coords(rows, cols, spacing=130.0, jitter=8.0, rng=rng)
edges_raw = self._grid_edges(rows, cols)
arterial_rows = {rows // 3, (2 * rows) // 3}
arterial_cols = {cols // 3, (2 * cols) // 3}
id_list = list(coords.keys())
edges: list[tuple[str, str]] = []
arterial_pairs: set[frozenset[str]] = set()
for a, b in edges_raw:
ra, ca = divmod(a, cols)
rb, cb = divmod(b, cols)
ida, idb = id_list[a], id_list[b]
edges.append((ida, idb))
if (
ra in arterial_rows
or rb in arterial_rows
or ca in arterial_cols
or cb in arterial_cols
):
arterial_pairs.add(frozenset((ida, idb)))
elif rng.random() < 0.06:
arterial_pairs.add(frozenset((ida, idb)))
return coords, edges, arterial_pairs
def _ring_road(
self,
target_nodes: int,
rng: random.Random,
ring_diagonal_keep_prob: float,
ring_max_diagonal_fraction: float,
) -> tuple[dict[str, tuple[float, float]], list[tuple[str, str]], set[frozenset[str]]]:
ring_nodes = max(14, int(target_nodes * 0.22))
inner_nodes = max(24, target_nodes - ring_nodes)
rows, cols = self._dimensions(inner_nodes)
inner_coords = self._grid_coords(rows, cols, spacing=90.0, jitter=10.0, rng=rng)
inner_ids = list(inner_coords.keys())
inner_index = {node: idx for idx, node in enumerate(inner_ids)}
min_x = min(x for x, _ in inner_coords.values())
max_x = max(x for x, _ in inner_coords.values())
min_y = min(y for _, y in inner_coords.values())
max_y = max(y for _, y in inner_coords.values())
center = ((min_x + max_x) / 2.0, (min_y + max_y) / 2.0)
radius = max(max_x - min_x, max_y - min_y) * 0.70
def norm_edge(a: str, b: str) -> tuple[str, str]:
return (a, b) if a < b else (b, a)
coords: dict[str, tuple[float, float]] = dict(inner_coords)
edge_set: set[tuple[str, str]] = set()
arterial_pairs: set[frozenset[str]] = set()
for a, b in self._grid_edges(rows, cols):
edge_set.add(norm_edge(inner_ids[a], inner_ids[b]))
ring_ids: list[str] = []
start_idx = len(coords)
for i in range(ring_nodes):
angle = (2.0 * math.pi * i) / ring_nodes
x = center[0] + radius * math.cos(angle)
y = center[1] + radius * math.sin(angle)
rid = f"i_{start_idx + i:04d}"
ring_ids.append(rid)
coords[rid] = (x, y)
for i, rid in enumerate(ring_ids):
nxt = ring_ids[(i + 1) % ring_nodes]
edge = norm_edge(rid, nxt)
edge_set.add(edge)
arterial_pairs.add(frozenset(edge))
boundary_inner_nodes = [
nid
for nid in inner_ids
if ((inner_index[nid] // cols) in {0, rows - 1})
or ((inner_index[nid] % cols) in {0, cols - 1})
]
spokes = max(6, ring_nodes // 3)
anchor_pool = boundary_inner_nodes[:] if boundary_inner_nodes else inner_ids
anchor_ids = [
anchor_pool[(idx * len(anchor_pool)) // spokes]
for idx in range(spokes)
]
protected_spokes: set[tuple[str, str]] = set()
for i in range(spokes):
ring_node = ring_ids[(i * ring_nodes) // spokes]
inner_node = anchor_ids[i]
edge = norm_edge(ring_node, inner_node)
edge_set.add(edge)
arterial_pairs.add(frozenset(edge))
protected_spokes.add(edge)
# Optional non-primary diagonals: sparse extra radials + sparse interior diagonals.
diagonal_candidates: list[tuple[tuple[str, str], float]] = []
extra_spokes = max(2, min(5, ring_nodes // 4))
for i in range(extra_spokes):
ring_node = ring_ids[(i * ring_nodes) // extra_spokes]
inner_node = anchor_pool[(i * len(anchor_pool)) // extra_spokes]
edge = norm_edge(ring_node, inner_node)
if edge in edge_set or edge in protected_spokes:
continue
score = 1.2 + (0.003 * euclidean(coords[ring_node], coords[inner_node]))
diagonal_candidates.append((edge, score))
def _orientation(
p: tuple[float, float],
q: tuple[float, float],
r: tuple[float, float],
) -> int:
value = ((q[1] - p[1]) * (r[0] - q[0])) - ((q[0] - p[0]) * (r[1] - q[1]))
if abs(value) < 1e-9:
return 0
return 1 if value > 0 else 2
def _on_segment(
p: tuple[float, float],
q: tuple[float, float],
r: tuple[float, float],
) -> bool:
return (
min(p[0], r[0]) <= q[0] <= max(p[0], r[0])
and min(p[1], r[1]) <= q[1] <= max(p[1], r[1])
)
def _segments_intersect(
p1: tuple[float, float],
q1: tuple[float, float],
p2: tuple[float, float],
q2: tuple[float, float],
) -> bool:
o1 = _orientation(p1, q1, p2)
o2 = _orientation(p1, q1, q2)
o3 = _orientation(p2, q2, p1)
o4 = _orientation(p2, q2, q1)
if o1 != o2 and o3 != o4:
return True
if o1 == 0 and _on_segment(p1, p2, q1):
return True
if o2 == 0 and _on_segment(p1, q2, q1):
return True
if o3 == 0 and _on_segment(p2, p1, q2):
return True
if o4 == 0 and _on_segment(p2, q1, q2):
return True
return False
def _has_nonendpoint_intersection(
edge: tuple[str, str],
other_edges: set[tuple[str, str]],
) -> bool:
a, b = edge
p1, q1 = coords[a], coords[b]
for u, v in other_edges:
# Shared endpoint is expected at valid junctions.
if len({a, b, u, v}) < 4:
continue
p2, q2 = coords[u], coords[v]
if _segments_intersect(p1, q1, p2, q2):
return True
return False
for r in range(rows - 1):
for c in range(cols - 1):
if rng.random() > 0.05:
continue
tl = inner_ids[(r * cols) + c]
tr = inner_ids[(r * cols) + c + 1]
bl = inner_ids[((r + 1) * cols) + c]
br = inner_ids[((r + 1) * cols) + c + 1]
a, b = (tl, br) if rng.random() < 0.5 else (tr, bl)
edge = norm_edge(a, b)
if edge in edge_set:
continue
boundary_bonus = 0.30 if (r in {0, rows - 2} or c in {0, cols - 2}) else 0.0
diagonal_candidates.append((edge, 1.0 + boundary_bonus))
keep_prob = max(0.0, min(1.0, ring_diagonal_keep_prob))
kept_candidates: list[tuple[tuple[str, str], float]] = []
geometry_edges = set(edge_set)
for edge, score in diagonal_candidates:
if rng.random() > keep_prob:
continue
if _has_nonendpoint_intersection(edge, geometry_edges):
continue
kept_candidates.append((edge, score))
geometry_edges.add(edge)
if diagonal_candidates and not kept_candidates:
for edge, score in sorted(diagonal_candidates, key=lambda item: item[1], reverse=True):
if not _has_nonendpoint_intersection(edge, edge_set):
kept_candidates.append((edge, score))
break
for edge, _ in kept_candidates:
edge_set.add(edge)
# Prune weaker optional diagonals while preserving connectivity.
max_fraction = max(0.0, min(1.0, ring_max_diagonal_fraction))
max_kept = max(1, int(round(max_fraction * max(1, len(diagonal_candidates)))))
hard_cap = max(1, min(3, len(inner_ids) // 60))
max_kept = min(max_kept, hard_cap)
if len(kept_candidates) > max_kept:
adjacency: dict[str, set[str]] = {nid: set() for nid in coords}
for u, v in edge_set:
adjacency[u].add(v)
adjacency[v].add(u)
kept = len(kept_candidates)
for edge, _ in sorted(kept_candidates, key=lambda item: item[1]):
if kept <= max_kept:
break
u, v = edge
if edge not in edge_set:
continue
if len(adjacency[u]) <= 1 or len(adjacency[v]) <= 1:
continue
if not self._is_connected_without_str_edge(u, v, adjacency, edge):
continue
adjacency[u].remove(v)
adjacency[v].remove(u)
edge_set.remove(edge)
kept -= 1
edges = sorted(edge_set)
return coords, edges, arterial_pairs
def _mixed(
self, target_nodes: int, rng: random.Random
) -> tuple[dict[str, tuple[float, float]], list[tuple[str, str]], set[frozenset[str]]]:
coords, edges, arterial_pairs = self._arterial_local(target_nodes, rng)
ids = list(coords.keys())
for _ in range(max(3, len(ids) // 20)):
a, b = rng.sample(ids, 2)
if euclidean(coords[a], coords[b]) < 220.0:
edge = (a, b) if a < b else (b, a)
if edge not in edges:
edges.append(edge)
if rng.random() < 0.4:
arterial_pairs.add(frozenset(edge))
return coords, edges, arterial_pairs
def _to_adjacency(
self,
coords: dict[str, tuple[float, float]],
undirected_edges: list[tuple[str, str]],
) -> dict[str, set[str]]:
adjacency: dict[str, set[str]] = {nid: set() for nid in coords}
for a, b in undirected_edges:
if a == b:
continue
adjacency[a].add(b)
adjacency[b].add(a)
return adjacency
def _build_directed_roads(
self,
coords: dict[str, tuple[float, float]],
undirected_edges: list[tuple[str, str]],
arterial_pairs: set[frozenset[str]],
gateway_pairs: set[frozenset[str]],
) -> tuple[dict[str, RoadRecord], set[str], set[str]]:
roads: dict[str, RoadRecord] = {}
arterial_ids: set[str] = set()
gateway_ids: set[str] = set()
for a, b in undirected_edges:
for start, end in ((a, b), (b, a)):
is_arterial = frozenset((a, b)) in arterial_pairs
is_gateway = frozenset((a, b)) in gateway_pairs
if is_gateway:
speed = 12.0
lanes = 2
else:
speed = 14.0 if is_arterial else 11.0
lanes = 3 if is_arterial else 2
rid = f"r_{start}_{end}"
points = [
{"x": round(coords[start][0], 3), "y": round(coords[start][1], 3)},
{"x": round(coords[end][0], 3), "y": round(coords[end][1], 3)},
]
record = RoadRecord(
id=rid,
start_intersection=start,
end_intersection=end,
length=euclidean(coords[start], coords[end]),
speed_limit=speed,
num_lanes=lanes,
points=points,
is_arterial=is_arterial,
)
roads[rid] = record
if is_arterial:
arterial_ids.add(rid)
if is_gateway:
gateway_ids.add(rid)
return roads, arterial_ids, gateway_ids
def _build_roadnet(
self,
coords: dict[str, tuple[float, float]],
adjacency: dict[str, set[str]],
directed_roads: dict[str, RoadRecord],
) -> dict[str, list[dict[str, object]]]:
in_roads_by_node: dict[str, list[RoadRecord]] = defaultdict(list)
out_roads_by_node: dict[str, list[RoadRecord]] = defaultdict(list)
for road in directed_roads.values():
out_roads_by_node[road.start_intersection].append(road)
in_roads_by_node[road.end_intersection].append(road)
min_x = min(x for x, _ in coords.values())
max_x = max(x for x, _ in coords.values())
min_y = min(y for _, y in coords.values())
max_y = max(y for _, y in coords.values())
border_eps = 3.0
intersections: list[dict[str, object]] = []
for nid in sorted(coords):
x, y = coords[nid]
degree = len(adjacency[nid])
is_border = (
abs(x - min_x) < border_eps
or abs(x - max_x) < border_eps
or abs(y - min_y) < border_eps
or abs(y - max_y) < border_eps
)
# Keep boundary intersections non-virtual when they are part of the street grid.
# Mark only true stubs/dead-ends as virtual.
virtual = degree <= 1
road_links: list[dict[str, object]] = []
incoming = sorted(in_roads_by_node[nid], key=lambda r: r.id)
outgoing = sorted(out_roads_by_node[nid], key=lambda r: r.id)
for in_road in incoming:
for out_road in outgoing:
if out_road.end_intersection == in_road.start_intersection:
continue
lane_links = []
lane_count = min(in_road.num_lanes, out_road.num_lanes)
for lane_idx in range(lane_count):
lane_links.append(
{
"startLaneIndex": lane_idx,
"endLaneIndex": lane_idx,
"points": [
dict(in_road.points[-1]),
dict(out_road.points[0]),
],
}
)
road_links.append(
{
"type": self._movement_type(
coords[in_road.start_intersection],
coords[nid],
coords[out_road.end_intersection],
),
"startRoad": in_road.id,
"endRoad": out_road.id,
"laneLinks": lane_links,
}
)
lightphases = self._light_phases(nid, coords, incoming, road_links)
connected_roads = sorted(
{road.id for road in incoming + outgoing}
)
intersections.append(
{
"id": nid,
"point": {"x": round(x, 3), "y": round(y, 3)},
"width": 0,
"roads": connected_roads,
"virtual": virtual,
"roadLinks": road_links,
"trafficLight": {
"roadLinkIndices": list(range(len(road_links))),
"lightphases": lightphases,
},
}
)
roads: list[dict[str, object]] = []
for rid in sorted(directed_roads):
road = directed_roads[rid]
roads.append(
{
"id": road.id,
"startIntersection": road.start_intersection,
"endIntersection": road.end_intersection,
"points": road.points,
"lanes": [
{"maxSpeed": road.speed_limit, "width": 3.2}
for _ in range(road.num_lanes)
],
}
)
return {"intersections": intersections, "roads": roads}
def _movement_type(
self,
in_start: tuple[float, float],
center: tuple[float, float],
out_end: tuple[float, float],
) -> str:
v1 = (center[0] - in_start[0], center[1] - in_start[1])
v2 = (out_end[0] - center[0], out_end[1] - center[1])
cross = (v1[0] * v2[1]) - (v1[1] * v2[0])
dot = (v1[0] * v2[0]) + (v1[1] * v2[1])
angle = math.atan2(cross, dot)
if abs(angle) < (math.pi / 4):
return "go_straight"
if angle > 0:
return "turn_left"
return "turn_right"
def _light_phases(
self,
node_id: str,
coords: dict[str, tuple[float, float]],
incoming: list[RoadRecord],
road_links: list[dict[str, object]],
) -> list[dict[str, object]]:
if not road_links:
return [{"time": 30, "availableRoadLinks": []}]
vertical_incoming: set[str] = set()
horizontal_incoming: set[str] = set()
center = coords[node_id]
for road in incoming:
source = coords[road.start_intersection]
vx = center[0] - source[0]
vy = center[1] - source[1]
if abs(vy) >= abs(vx):
vertical_incoming.add(road.id)
else:
horizontal_incoming.add(road.id)
vertical_links: list[int] = []
horizontal_links: list[int] = []
for idx, link in enumerate(road_links):
start_road = str(link["startRoad"])
if start_road in vertical_incoming:
vertical_links.append(idx)
else:
horizontal_links.append(idx)
if not vertical_links or not horizontal_links:
return [{"time": 35, "availableRoadLinks": list(range(len(road_links)))}]
return [
{"time": 30, "availableRoadLinks": vertical_links},
{"time": 5, "availableRoadLinks": []},
{"time": 30, "availableRoadLinks": horizontal_links},
{"time": 5, "availableRoadLinks": []},
]