agentic-traffic / data /generators /district_generator.py
Aditya2162's picture
Upload folder using huggingface_hub
9d09c45 verified
"""District partitioning and district-level metadata generation."""
from __future__ import annotations
import random
from collections import Counter, defaultdict, deque
from .schemas import CityGraph, DistrictData, DistrictRecord, DistrictType
from .utils import connected_components, euclidean
class DistrictGenerator:
"""Generate contiguous district partitions over the city intersection graph."""
DISTRICT_TYPE_WEIGHTS: dict[DistrictType, float] = {
"residential": 0.35,
"commercial": 0.25,
"industrial": 0.20,
"mixed": 0.20,
}
def generate(
self,
city_graph: CityGraph,
num_districts: int,
seed: int,
) -> DistrictData:
rng = random.Random(seed)
node_ids = sorted(
n
for n in city_graph.intersections.keys()
if n not in city_graph.gateway_intersections
)
if len(node_ids) < 2:
raise ValueError("Insufficient non-gateway intersections for districting.")
if num_districts >= len(node_ids):
num_districts = max(2, len(node_ids) // 2)
local_coords = {nid: city_graph.intersections[nid] for nid in node_ids}
seeds = self._farthest_seeds(local_coords, num_districts, rng)
assignment = self._grow_contiguous_regions(
local_coords=local_coords,
adjacency=city_graph.adjacency,
seeds=seeds,
rng=rng,
)
assignment = self._enforce_contiguity(
assignment=assignment,
adjacency=city_graph.adjacency,
coords=local_coords,
)
assignment = self._fill_empty_districts(
assignment=assignment,
node_ids=node_ids,
adjacency=city_graph.adjacency,
district_ids=list(seeds.keys()),
)
district_neighbors: dict[str, set[str]] = {
did: set() for did in seeds.keys()
}
boundary: set[str] = set()
for a, neighbors in city_graph.adjacency.items():
if a not in assignment:
continue
da = assignment[a]
for b in neighbors:
if b not in assignment:
continue
db = assignment[b]
if da != db:
district_neighbors[da].add(db)
boundary.add(a)
entry_roads: dict[str, list[str]] = defaultdict(list)
exit_roads: dict[str, list[str]] = defaultdict(list)
inter_district_roads: set[str] = set()
for road in city_graph.directed_roads.values():
if (
road.start_intersection not in assignment
or road.end_intersection not in assignment
):
continue
ds = assignment[road.start_intersection]
de = assignment[road.end_intersection]
if ds != de:
inter_district_roads.add(road.id)
exit_roads[ds].append(road.id)
entry_roads[de].append(road.id)
city_graph.inter_district_roads = inter_district_roads
district_records: dict[str, DistrictRecord] = {}
type_values = list(self.DISTRICT_TYPE_WEIGHTS.keys())
type_weights = list(self.DISTRICT_TYPE_WEIGHTS.values())
for district_id in seeds:
members = sorted([n for n, d in assignment.items() if d == district_id])
d_boundary = sorted([n for n in members if n in boundary])
district_type = rng.choices(type_values, weights=type_weights, k=1)[0]
district_records[district_id] = DistrictRecord(
id=district_id,
district_type=district_type,
intersections=members,
neighbors=sorted(district_neighbors[district_id]),
boundary_intersections=d_boundary,
entry_roads=sorted(set(entry_roads[district_id])),
exit_roads=sorted(set(exit_roads[district_id])),
)
return DistrictData(
intersection_to_district=assignment,
districts=district_records,
district_neighbors={
k: sorted(v) for k, v in district_neighbors.items()
},
boundary_intersections=sorted(boundary),
inter_district_roads=sorted(inter_district_roads),
)
def _farthest_seeds(
self,
coords: dict[str, tuple[float, float]],
num_districts: int,
rng: random.Random,
) -> dict[str, str]:
nodes = sorted(coords.keys())
first = rng.choice(nodes)
selected = [first]
while len(selected) < num_districts:
best_node = None
best_dist = -1.0
for node in nodes:
if node in selected:
continue
nearest = min(
euclidean(coords[node], coords[s]) for s in selected
)
if nearest > best_dist:
best_dist = nearest
best_node = node
if best_node is None:
break
selected.append(best_node)
return {f"d_{idx:02d}": node for idx, node in enumerate(selected)}
def _assign_nearest(
self,
coords: dict[str, tuple[float, float]],
seeds: dict[str, str],
) -> dict[str, str]:
assignment: dict[str, str] = {}
for node, point in coords.items():
district = min(
seeds.keys(),
key=lambda did: euclidean(point, coords[seeds[did]]),
)
assignment[node] = district
return assignment
def _grow_contiguous_regions(
self,
local_coords: dict[str, tuple[float, float]],
adjacency: dict[str, set[str]],
seeds: dict[str, str],
rng: random.Random,
) -> dict[str, str]:
districts = list(seeds.keys())
district_sizes = {district_id: 1 for district_id in districts}
assignment: dict[str, str] = {seed_node: district_id for district_id, seed_node in seeds.items()}
frontiers: dict[str, deque[str]] = {
district_id: deque([seed_node]) for district_id, seed_node in seeds.items()
}
remaining = set(local_coords.keys()) - set(assignment.keys())
if not remaining:
return assignment
target_avg = max(1, len(local_coords) // len(districts))
target_limits = {
district_id: target_avg + 2 for district_id in districts
}
overcap = 0
# Expand from multiple district frontiers to ensure contiguity by construction.
frontier_order = deque(districts)
while remaining:
if not frontier_order:
break
district_id = frontier_order.popleft()
current_frontier = frontiers[district_id]
if not current_frontier:
continue
source = current_frontier.popleft()
neighbors = [n for n in adjacency.get(source, set()) if n in remaining]
rng.shuffle(neighbors)
expanded = False
for neighbor in neighbors:
if neighbor not in remaining:
continue
can_expand = (
overcap > 3
or district_sizes[district_id] < target_limits[district_id]
)
if not can_expand and overcap <= 3:
continue
assignment[neighbor] = district_id
remaining.remove(neighbor)
current_frontier.append(neighbor)
district_sizes[district_id] += 1
frontier_order.append(district_id)
frontier_order.append(district_id)
expanded = True
break
if expanded:
continue
# If all districts reached targets, allow unrestricted growth to consume leftovers.
overcap += 1
for fallback_neighbor in neighbors:
if fallback_neighbor not in remaining:
continue
assignment[fallback_neighbor] = district_id
remaining.remove(fallback_neighbor)
current_frontier.append(fallback_neighbor)
district_sizes[district_id] += 1
frontier_order.append(district_id)
break
if not expanded and not current_frontier:
# keep exploring this district only if it can still absorb nodes.
if all(size >= target_limits[d] for d, size in district_sizes.items()):
continue
frontier_order.append(district_id)
if overcap > 10_000:
# Safety break for unexpected stalling.
break
# If anything remains because of local disconnectedness in the non-gateway subgraph,
# assign by nearest-seed fallback and rely on contiguity enforcement later.
if remaining:
fallback = self._assign_nearest(local_coords, seeds)
for node in remaining:
assignment[node] = fallback[node]
return assignment
def _enforce_contiguity(
self,
assignment: dict[str, str],
adjacency: dict[str, set[str]],
coords: dict[str, tuple[float, float]],
) -> dict[str, str]:
district_ids = sorted(set(assignment.values()))
changed = True
while changed:
changed = False
for district_id in district_ids:
nodes = [n for n, d in assignment.items() if d == district_id]
if len(nodes) <= 1:
continue
comps = connected_components(nodes, adjacency)
if len(comps) <= 1:
continue
comps.sort(key=len, reverse=True)
keep = comps[0]
for comp in comps[1:]:
for node in comp:
reassigned = self._best_neighbor_district(
node=node,
assignment=assignment,
adjacency=adjacency,
coords=coords,
)
if reassigned != district_id:
assignment[node] = reassigned
changed = True
return assignment
def _best_neighbor_district(
self,
node: str,
assignment: dict[str, str],
adjacency: dict[str, set[str]],
coords: dict[str, tuple[float, float]],
) -> str:
neighbors = [n for n in adjacency[node] if n in assignment]
if not neighbors:
return assignment[node]
counts = Counter(assignment[n] for n in neighbors)
best = counts.most_common(1)[0][0]
if len(counts) == 1:
return best
best_score = float("inf")
best_district = best
for district in counts.keys():
district_nodes = [n for n, d in assignment.items() if d == district]
if not district_nodes:
continue
centroid_x = sum(coords[n][0] for n in district_nodes) / len(district_nodes)
centroid_y = sum(coords[n][1] for n in district_nodes) / len(district_nodes)
dist = euclidean(coords[node], (centroid_x, centroid_y))
if dist < best_score:
best_score = dist
best_district = district
return best_district
def _fill_empty_districts(
self,
assignment: dict[str, str],
node_ids: list[str],
adjacency: dict[str, set[str]],
district_ids: list[str],
) -> dict[str, str]:
# This method now mainly keeps a lower bound on singleton-heavy districts.
# Keep existing behavior if an empty district somehow appears.
counts = Counter(assignment.values())
empty = [d for d in district_ids if counts[d] == 0]
if not empty:
return assignment
for empty_id in empty:
largest = max(district_ids, key=lambda d: counts[d])
donor_candidates = [n for n in node_ids if assignment[n] == largest]
if not donor_candidates:
continue
pivot = donor_candidates[0]
for candidate in donor_candidates:
if any(
assignment[n] != largest and assignment[n] != empty_id
for n in adjacency.get(candidate, set())
if n in assignment
):
pivot = candidate
break
assignment[pivot] = empty_id
counts[largest] -= 1
counts[empty_id] += 1
# For any missing district, steal a boundary node from the largest district.
for district in district_ids:
if counts[district] > 1:
continue
root = next((n for n in node_ids if assignment[n] == district), None)
if root is None:
continue
queue = deque([root])
while queue and counts[district] < 2:
current = queue.popleft()
for candidate in node_ids:
if assignment[candidate] == district:
continue
if candidate not in adjacency.get(current, set()):
continue
old = assignment[candidate]
if counts[old] <= 2:
continue
assignment[candidate] = district
counts[old] -= 1
counts[district] += 1
queue.append(candidate)
if counts[district] >= 2:
break
return assignment