| """District partitioning and district-level metadata generation.""" |
|
|
| from __future__ import annotations |
|
|
| import random |
| from collections import Counter, defaultdict, deque |
|
|
| from .schemas import CityGraph, DistrictData, DistrictRecord, DistrictType |
| from .utils import connected_components, euclidean |
|
|
|
|
| class DistrictGenerator: |
| """Generate contiguous district partitions over the city intersection graph.""" |
|
|
| DISTRICT_TYPE_WEIGHTS: dict[DistrictType, float] = { |
| "residential": 0.35, |
| "commercial": 0.25, |
| "industrial": 0.20, |
| "mixed": 0.20, |
| } |
|
|
| def generate( |
| self, |
| city_graph: CityGraph, |
| num_districts: int, |
| seed: int, |
| ) -> DistrictData: |
| rng = random.Random(seed) |
| node_ids = sorted( |
| n |
| for n in city_graph.intersections.keys() |
| if n not in city_graph.gateway_intersections |
| ) |
| if len(node_ids) < 2: |
| raise ValueError("Insufficient non-gateway intersections for districting.") |
| if num_districts >= len(node_ids): |
| num_districts = max(2, len(node_ids) // 2) |
|
|
| local_coords = {nid: city_graph.intersections[nid] for nid in node_ids} |
| seeds = self._farthest_seeds(local_coords, num_districts, rng) |
| assignment = self._grow_contiguous_regions( |
| local_coords=local_coords, |
| adjacency=city_graph.adjacency, |
| seeds=seeds, |
| rng=rng, |
| ) |
| assignment = self._enforce_contiguity( |
| assignment=assignment, |
| adjacency=city_graph.adjacency, |
| coords=local_coords, |
| ) |
| assignment = self._fill_empty_districts( |
| assignment=assignment, |
| node_ids=node_ids, |
| adjacency=city_graph.adjacency, |
| district_ids=list(seeds.keys()), |
| ) |
|
|
| district_neighbors: dict[str, set[str]] = { |
| did: set() for did in seeds.keys() |
| } |
| boundary: set[str] = set() |
| for a, neighbors in city_graph.adjacency.items(): |
| if a not in assignment: |
| continue |
| da = assignment[a] |
| for b in neighbors: |
| if b not in assignment: |
| continue |
| db = assignment[b] |
| if da != db: |
| district_neighbors[da].add(db) |
| boundary.add(a) |
|
|
| entry_roads: dict[str, list[str]] = defaultdict(list) |
| exit_roads: dict[str, list[str]] = defaultdict(list) |
| inter_district_roads: set[str] = set() |
| for road in city_graph.directed_roads.values(): |
| if ( |
| road.start_intersection not in assignment |
| or road.end_intersection not in assignment |
| ): |
| continue |
| ds = assignment[road.start_intersection] |
| de = assignment[road.end_intersection] |
| if ds != de: |
| inter_district_roads.add(road.id) |
| exit_roads[ds].append(road.id) |
| entry_roads[de].append(road.id) |
| city_graph.inter_district_roads = inter_district_roads |
|
|
| district_records: dict[str, DistrictRecord] = {} |
| type_values = list(self.DISTRICT_TYPE_WEIGHTS.keys()) |
| type_weights = list(self.DISTRICT_TYPE_WEIGHTS.values()) |
| for district_id in seeds: |
| members = sorted([n for n, d in assignment.items() if d == district_id]) |
| d_boundary = sorted([n for n in members if n in boundary]) |
| district_type = rng.choices(type_values, weights=type_weights, k=1)[0] |
| district_records[district_id] = DistrictRecord( |
| id=district_id, |
| district_type=district_type, |
| intersections=members, |
| neighbors=sorted(district_neighbors[district_id]), |
| boundary_intersections=d_boundary, |
| entry_roads=sorted(set(entry_roads[district_id])), |
| exit_roads=sorted(set(exit_roads[district_id])), |
| ) |
|
|
| return DistrictData( |
| intersection_to_district=assignment, |
| districts=district_records, |
| district_neighbors={ |
| k: sorted(v) for k, v in district_neighbors.items() |
| }, |
| boundary_intersections=sorted(boundary), |
| inter_district_roads=sorted(inter_district_roads), |
| ) |
|
|
| def _farthest_seeds( |
| self, |
| coords: dict[str, tuple[float, float]], |
| num_districts: int, |
| rng: random.Random, |
| ) -> dict[str, str]: |
| nodes = sorted(coords.keys()) |
| first = rng.choice(nodes) |
| selected = [first] |
|
|
| while len(selected) < num_districts: |
| best_node = None |
| best_dist = -1.0 |
| for node in nodes: |
| if node in selected: |
| continue |
| nearest = min( |
| euclidean(coords[node], coords[s]) for s in selected |
| ) |
| if nearest > best_dist: |
| best_dist = nearest |
| best_node = node |
| if best_node is None: |
| break |
| selected.append(best_node) |
| return {f"d_{idx:02d}": node for idx, node in enumerate(selected)} |
|
|
| def _assign_nearest( |
| self, |
| coords: dict[str, tuple[float, float]], |
| seeds: dict[str, str], |
| ) -> dict[str, str]: |
| assignment: dict[str, str] = {} |
| for node, point in coords.items(): |
| district = min( |
| seeds.keys(), |
| key=lambda did: euclidean(point, coords[seeds[did]]), |
| ) |
| assignment[node] = district |
| return assignment |
|
|
| def _grow_contiguous_regions( |
| self, |
| local_coords: dict[str, tuple[float, float]], |
| adjacency: dict[str, set[str]], |
| seeds: dict[str, str], |
| rng: random.Random, |
| ) -> dict[str, str]: |
| districts = list(seeds.keys()) |
| district_sizes = {district_id: 1 for district_id in districts} |
| assignment: dict[str, str] = {seed_node: district_id for district_id, seed_node in seeds.items()} |
| frontiers: dict[str, deque[str]] = { |
| district_id: deque([seed_node]) for district_id, seed_node in seeds.items() |
| } |
| remaining = set(local_coords.keys()) - set(assignment.keys()) |
|
|
| if not remaining: |
| return assignment |
|
|
| target_avg = max(1, len(local_coords) // len(districts)) |
| target_limits = { |
| district_id: target_avg + 2 for district_id in districts |
| } |
| overcap = 0 |
|
|
| |
| frontier_order = deque(districts) |
| while remaining: |
| if not frontier_order: |
| break |
| district_id = frontier_order.popleft() |
| current_frontier = frontiers[district_id] |
| if not current_frontier: |
| continue |
|
|
| source = current_frontier.popleft() |
| neighbors = [n for n in adjacency.get(source, set()) if n in remaining] |
| rng.shuffle(neighbors) |
|
|
| expanded = False |
| for neighbor in neighbors: |
| if neighbor not in remaining: |
| continue |
| can_expand = ( |
| overcap > 3 |
| or district_sizes[district_id] < target_limits[district_id] |
| ) |
| if not can_expand and overcap <= 3: |
| continue |
| assignment[neighbor] = district_id |
| remaining.remove(neighbor) |
| current_frontier.append(neighbor) |
| district_sizes[district_id] += 1 |
| frontier_order.append(district_id) |
| frontier_order.append(district_id) |
| expanded = True |
| break |
|
|
| if expanded: |
| continue |
|
|
| |
| overcap += 1 |
| for fallback_neighbor in neighbors: |
| if fallback_neighbor not in remaining: |
| continue |
| assignment[fallback_neighbor] = district_id |
| remaining.remove(fallback_neighbor) |
| current_frontier.append(fallback_neighbor) |
| district_sizes[district_id] += 1 |
| frontier_order.append(district_id) |
| break |
|
|
| if not expanded and not current_frontier: |
| |
| if all(size >= target_limits[d] for d, size in district_sizes.items()): |
| continue |
|
|
| frontier_order.append(district_id) |
|
|
| if overcap > 10_000: |
| |
| break |
|
|
| |
| |
| if remaining: |
| fallback = self._assign_nearest(local_coords, seeds) |
| for node in remaining: |
| assignment[node] = fallback[node] |
| return assignment |
|
|
| def _enforce_contiguity( |
| self, |
| assignment: dict[str, str], |
| adjacency: dict[str, set[str]], |
| coords: dict[str, tuple[float, float]], |
| ) -> dict[str, str]: |
| district_ids = sorted(set(assignment.values())) |
| changed = True |
| while changed: |
| changed = False |
| for district_id in district_ids: |
| nodes = [n for n, d in assignment.items() if d == district_id] |
| if len(nodes) <= 1: |
| continue |
| comps = connected_components(nodes, adjacency) |
| if len(comps) <= 1: |
| continue |
| comps.sort(key=len, reverse=True) |
| keep = comps[0] |
| for comp in comps[1:]: |
| for node in comp: |
| reassigned = self._best_neighbor_district( |
| node=node, |
| assignment=assignment, |
| adjacency=adjacency, |
| coords=coords, |
| ) |
| if reassigned != district_id: |
| assignment[node] = reassigned |
| changed = True |
| return assignment |
|
|
| def _best_neighbor_district( |
| self, |
| node: str, |
| assignment: dict[str, str], |
| adjacency: dict[str, set[str]], |
| coords: dict[str, tuple[float, float]], |
| ) -> str: |
| neighbors = [n for n in adjacency[node] if n in assignment] |
| if not neighbors: |
| return assignment[node] |
| counts = Counter(assignment[n] for n in neighbors) |
| best = counts.most_common(1)[0][0] |
| if len(counts) == 1: |
| return best |
| best_score = float("inf") |
| best_district = best |
| for district in counts.keys(): |
| district_nodes = [n for n, d in assignment.items() if d == district] |
| if not district_nodes: |
| continue |
| centroid_x = sum(coords[n][0] for n in district_nodes) / len(district_nodes) |
| centroid_y = sum(coords[n][1] for n in district_nodes) / len(district_nodes) |
| dist = euclidean(coords[node], (centroid_x, centroid_y)) |
| if dist < best_score: |
| best_score = dist |
| best_district = district |
| return best_district |
|
|
| def _fill_empty_districts( |
| self, |
| assignment: dict[str, str], |
| node_ids: list[str], |
| adjacency: dict[str, set[str]], |
| district_ids: list[str], |
| ) -> dict[str, str]: |
| |
| |
| counts = Counter(assignment.values()) |
| empty = [d for d in district_ids if counts[d] == 0] |
| if not empty: |
| return assignment |
|
|
| for empty_id in empty: |
| largest = max(district_ids, key=lambda d: counts[d]) |
| donor_candidates = [n for n in node_ids if assignment[n] == largest] |
| if not donor_candidates: |
| continue |
| pivot = donor_candidates[0] |
| for candidate in donor_candidates: |
| if any( |
| assignment[n] != largest and assignment[n] != empty_id |
| for n in adjacency.get(candidate, set()) |
| if n in assignment |
| ): |
| pivot = candidate |
| break |
| assignment[pivot] = empty_id |
| counts[largest] -= 1 |
| counts[empty_id] += 1 |
|
|
| |
| for district in district_ids: |
| if counts[district] > 1: |
| continue |
| root = next((n for n in node_ids if assignment[n] == district), None) |
| if root is None: |
| continue |
| queue = deque([root]) |
| while queue and counts[district] < 2: |
| current = queue.popleft() |
| for candidate in node_ids: |
| if assignment[candidate] == district: |
| continue |
| if candidate not in adjacency.get(current, set()): |
| continue |
| old = assignment[candidate] |
| if counts[old] <= 2: |
| continue |
| assignment[candidate] = district |
| counts[old] -= 1 |
| counts[district] += 1 |
| queue.append(candidate) |
| if counts[district] >= 2: |
| break |
| return assignment |
|
|