| # Tα»i Ζ―u ThuαΊt ToΓ‘n - Khu CΓ΄ng Nghiα»p αΊ’o | |
| > **TΓ i liα»u phΓ’n tΓch chi tiαΊΏt cΓ‘c tα»i Ζ°u thuαΊt toΓ‘n** cho hα» thα»ng "Khα»i tαΊ‘o khu cΓ΄ng nghiα»p bαΊ±ng kα»Ή sΖ° αΊ£o" | |
| --- | |
| ## Mα»₯c Lα»₯c | |
| 1. [Tα»ng quan kiαΊΏn trΓΊc](#1-tα»ng-quan-kiαΊΏn-trΓΊc) | |
| 2. [Stage 1: Grid Optimization (NSGA-II)](#2-stage-1-grid-optimization-nsga-ii) | |
| 3. [Stage 1 Alt: Voronoi Road Network](#3-stage-1-alternative-voronoi-road-network) | |
| 4. [Stage 2: Block Subdivision (OR-Tools)](#4-stage-2-block-subdivision-or-tools) | |
| 5. [Stage 3: Infrastructure Planning](#5-stage-3-infrastructure-planning) | |
| 6. [Cross-Stage Optimizations](#6-cross-stage-optimizations) | |
| 7. [Parallelization](#7-algorithm-level-parallelization) | |
| 8. [Priority Roadmap](#8-priority-roadmap) | |
| --- | |
| ## 1. Tα»ng Quan KiαΊΏn TrΓΊc | |
| ### 1.1 Pipeline Hiα»n TαΊ‘i | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β LAND REDISTRIBUTION PIPELINE β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ | |
| β β | |
| β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β | |
| β β STAGE 1 βββββΆβ STAGE 2 βββββΆβ STAGE 3 β β | |
| β β Grid/Voronoiβ β Subdivision β β Infrastructure β β | |
| β β NSGA-II β β OR-Tools β β MST + K-Means + Drainageβ β | |
| β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β | |
| β β | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ``` | |
| ### 1.2 CΓ‘c Module ChΓnh | |
| | Module | File | ThuαΊt toΓ‘n | Complexity | | |
| |--------|------|------------|------------| | |
| | Grid Optimizer | `grid_optimizer.py` | NSGA-II Genetic Algorithm | O(pop Γ gen Γ eval) | | |
| | Voronoi Generator | `voronoi.py` | Fortune's Algorithm (Shapely) | O(n log n) | | |
| | Subdivision Solver | `subdivision_solver.py` | OR-Tools CP-SAT | NP-hard (bounded) | | |
| | Network Planner | `network_planner.py` | Kruskal's MST + Redundancy | O(E log V) | | |
| | Transformer Planner | `transformer_planner.py` | K-Means Clustering | O(n Γ k Γ iterations) | | |
| | Drainage Planner | `drainage_planner.py` | Vector Projection | O(n) | | |
| --- | |
| ## 2. Stage 1: Grid Optimization (NSGA-II) | |
| ### 2.1 PhΓ’n TΓch Hiα»n TrαΊ‘ng | |
| **File**: `core/optimization/grid_optimizer.py` | |
| ```python | |
| # Genome hiα»n tαΊ‘i: 2 genes | |
| individual = [spacing, angle] # spacing: 20-80m, angle: 0-90Β° | |
| # Fitness: 2 objectives | |
| objectives = ( | |
| total_residential_area, # maximize | |
| fragmented_blocks # minimize | |
| ) | |
| ``` | |
| **Bottleneck chΓnh**: | |
| - `generate_grid_candidates()`: TαΊ‘o grid vΓ tΓnh intersection vα»i O(nΒ²) Shapely operations | |
| - Mα»i fitness evaluation yΓͺu cαΊ§u duyα»t tαΊ₯t cαΊ£ blocks | |
| ### 2.2 Tα»i Ζ―u 1: Mα» Rα»ng KhΓ΄ng Gian TΓ¬m KiαΊΏm | |
| #### VαΊ₯n Δα» | |
| Grid vuΓ΄ng (square) khΓ΄ng tα»i Ζ°u cho khu cΓ΄ng nghiα»p cαΊ§n lots chα»― nhαΊt. | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| # Δα» xuαΊ₯t: 5 genes thay vΓ¬ 2 | |
| class EnhancedGridOptimizer: | |
| def _setup_deap(self): | |
| # Gene definitions | |
| self.toolbox.register("attr_spacing_x", random.uniform, 20, 100) | |
| self.toolbox.register("attr_spacing_y", random.uniform, 20, 100) | |
| self.toolbox.register("attr_angle", random.uniform, 0, 90) | |
| self.toolbox.register("attr_offset_x", random.uniform, 0, 50) | |
| self.toolbox.register("attr_offset_y", random.uniform, 0, 50) | |
| self.toolbox.register( | |
| "individual", | |
| tools.initCycle, | |
| creator.Individual, | |
| ( | |
| self.toolbox.attr_spacing_x, | |
| self.toolbox.attr_spacing_y, | |
| self.toolbox.attr_angle, | |
| self.toolbox.attr_offset_x, | |
| self.toolbox.attr_offset_y, | |
| ), | |
| n=1 | |
| ) | |
| def generate_grid_candidates( | |
| self, | |
| spacing_x: float, | |
| spacing_y: float, | |
| angle_deg: float, | |
| offset_x: float, | |
| offset_y: float | |
| ) -> List[Polygon]: | |
| """Generate rectangular grid blocks.""" | |
| # Create rectangular base block | |
| base_block = Polygon([ | |
| (0, 0), | |
| (spacing_x, 0), | |
| (spacing_x, spacing_y), | |
| (0, spacing_y) | |
| ]) | |
| base_block = translate(base_block, -spacing_x/2 + offset_x, -spacing_y/2 + offset_y) | |
| # ... rotation and placement logic | |
| ``` | |
| #### Metrics | |
| - **Flexibility**: HỠtrợ lots 40x80m, 50x100m, etc. | |
| - **Trade-off**: TΔng search space β cαΊ§n tΔng population hoαΊ·c generations | |
| - **Implementation effort**: ThαΊ₯p (2-4 giα») | |
| --- | |
| ### 2.3 Tα»i Ζ―u 2: Adaptive Island Model | |
| #### VαΊ₯n Δα» | |
| Single population dα» bα» kαΊΉt α» local optima. | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from deap import tools, algorithms | |
| from concurrent.futures import ProcessPoolExecutor | |
| class IslandGridOptimizer: | |
| def __init__(self, land_polygon, num_islands=4, migration_interval=5): | |
| self.num_islands = num_islands | |
| self.migration_interval = migration_interval | |
| self.islands = [self._create_population() for _ in range(num_islands)] | |
| def optimize(self, population_size=50, generations=100): | |
| for gen in range(generations): | |
| # Evolve each island independently | |
| for island in self.islands: | |
| self._evolve_one_generation(island) | |
| # Migration: exchange best individuals between islands | |
| if gen % self.migration_interval == 0: | |
| self._migrate() | |
| # Return best from all islands | |
| all_individuals = [ind for island in self.islands for ind in island] | |
| return tools.selBest(all_individuals, 1)[0] | |
| def _migrate(self): | |
| """Ring topology migration.""" | |
| migrants = [] | |
| for island in self.islands: | |
| best = tools.selBest(island, 2) | |
| migrants.append([toolbox.clone(ind) for ind in best]) | |
| for i in range(self.num_islands): | |
| next_island = (i + 1) % self.num_islands | |
| # Replace worst with migrants | |
| worst = tools.selWorst(self.islands[next_island], 2) | |
| for w, m in zip(worst, migrants[i]): | |
| self.islands[next_island].remove(w) | |
| self.islands[next_island].append(m) | |
| ``` | |
| #### Metrics | |
| - **Diversity**: Giα»― Δược Δa dαΊ‘ng di truyα»n | |
| - **Parallel potential**: Mα»i island cΓ³ thα» chαΊ‘y trΓͺn core riΓͺng | |
| - **Speedup**: 2-4x vα»i 4 islands trΓͺn 4 cores | |
| - **Implementation effort**: Trung bình (1-2 ngà y) | |
| --- | |
| ### 2.4 Tα»i Ζ―u 3: Surrogate-Assisted Optimization | |
| #### VαΊ₯n Δα» | |
| Fitness evaluation (Shapely intersections) rαΊ₯t chαΊm. | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from sklearn.gaussian_process import GaussianProcessRegressor | |
| from sklearn.gaussian_process.kernels import RBF, ConstantKernel | |
| import numpy as np | |
| class SurrogateAssistedOptimizer: | |
| def __init__(self, land_polygon): | |
| self.land_poly = land_polygon | |
| self.surrogate = None | |
| self.training_data = {'X': [], 'y': []} | |
| self.real_eval_count = 0 | |
| def _build_surrogate(self): | |
| """Build GP surrogate model from collected samples.""" | |
| if len(self.training_data['X']) < 20: | |
| return None | |
| X = np.array(self.training_data['X']) | |
| y = np.array(self.training_data['y']) | |
| kernel = ConstantKernel(1.0) * RBF(length_scale=[10, 10]) | |
| gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5) | |
| gp.fit(X, y) | |
| return gp | |
| def _evaluate_with_surrogate(self, individual): | |
| """Evaluate using surrogate, with uncertainty-based real evaluation.""" | |
| if self.surrogate is None: | |
| # No surrogate yet, use real evaluation | |
| return self._real_evaluate(individual) | |
| X = np.array([[individual[0], individual[1]]]) | |
| pred, std = self.surrogate.predict(X, return_std=True) | |
| # If uncertainty is high, do real evaluation | |
| if std[0] > 0.1 * abs(pred[0]): # 10% uncertainty threshold | |
| return self._real_evaluate(individual) | |
| return (pred[0], 0) # Use surrogate prediction | |
| def _real_evaluate(self, individual): | |
| """Expensive real evaluation with Shapely.""" | |
| self.real_eval_count += 1 | |
| fitness = self._evaluate_layout(individual) | |
| # Store for training | |
| self.training_data['X'].append([individual[0], individual[1]]) | |
| self.training_data['y'].append(fitness[0]) | |
| # Rebuild surrogate periodically | |
| if len(self.training_data['X']) % 50 == 0: | |
| self.surrogate = self._build_surrogate() | |
| return fitness | |
| ``` | |
| #### Metrics | |
| - **Speedup**: 5-20x reduction in real evaluations | |
| - **Accuracy**: ~95% of true optimum | |
| - **Trade-off**: CαΊ§n initial samples Δα» train surrogate | |
| - **Implementation effort**: Cao (1 tuαΊ§n) | |
| --- | |
| ## 3. Stage 1 Alternative: Voronoi Road Network | |
| ### 3.1 PhΓ’n TΓch Hiα»n TrαΊ‘ng | |
| **File**: `core/geometry/voronoi.py` | |
| ```python | |
| def generate_voronoi_seeds(site, num_seeds=15, seed=None): | |
| # Random uniform distribution within bounding box | |
| for _ in range(num_seeds): | |
| x = random.uniform(minx, maxx) | |
| y = random.uniform(miny, maxy) | |
| seeds.append(Point(x, y)) | |
| ``` | |
| **VαΊ₯n Δα»**: Random seeds β blocks khΓ΄ng Δα»u, nhiα»u fragmented blocks. | |
| --- | |
| ### 3.2 Tα»i Ζ―u 1: Centroidal Voronoi Tessellation (CVT) | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| def generate_cvt_seeds( | |
| site: Polygon, | |
| num_seeds: int = 15, | |
| iterations: int = 20, | |
| seed: Optional[int] = None | |
| ) -> List[Point]: | |
| """ | |
| Generate Centroidal Voronoi Tessellation seeds. | |
| CVT iteratively moves seeds to the centroid of their Voronoi regions, | |
| resulting in more uniform block sizes. | |
| """ | |
| if seed is not None: | |
| random.seed(seed) | |
| minx, miny, maxx, maxy = site.bounds | |
| # Initialize with random seeds | |
| seeds = [] | |
| for _ in range(num_seeds): | |
| x = random.uniform(minx, maxx) | |
| y = random.uniform(miny, maxy) | |
| seeds.append(Point(x, y)) | |
| # CVT iterations | |
| for iteration in range(iterations): | |
| # Create Voronoi regions | |
| multi_point = MultiPoint(seeds) | |
| regions = voronoi_diagram(multi_point, envelope=site) | |
| # Move seeds to centroids | |
| new_seeds = [] | |
| for region in regions.geoms: | |
| if region.geom_type == 'Polygon': | |
| # Clip to site boundary | |
| clipped = region.intersection(site) | |
| if not clipped.is_empty and clipped.geom_type == 'Polygon': | |
| new_seeds.append(clipped.centroid) | |
| else: | |
| # Keep original if clipped is invalid | |
| new_seeds.append(region.centroid) | |
| # Check convergence | |
| if len(new_seeds) == len(seeds): | |
| max_movement = max( | |
| s1.distance(s2) for s1, s2 in zip(seeds, new_seeds) | |
| ) | |
| if max_movement < 0.1: # Convergence threshold | |
| break | |
| seeds = new_seeds | |
| return seeds | |
| def generate_weighted_cvt_seeds( | |
| site: Polygon, | |
| density_function: Callable[[float, float], float], | |
| num_seeds: int = 15, | |
| iterations: int = 30 | |
| ) -> List[Point]: | |
| """ | |
| Weighted CVT with density function. | |
| Args: | |
| density_function: f(x, y) -> weight, higher = smaller blocks | |
| """ | |
| # Use importance sampling for initial seeds | |
| candidates = [] | |
| weights = [] | |
| minx, miny, maxx, maxy = site.bounds | |
| for _ in range(num_seeds * 100): # Oversample | |
| x = random.uniform(minx, maxx) | |
| y = random.uniform(miny, maxy) | |
| p = Point(x, y) | |
| if site.contains(p): | |
| candidates.append(p) | |
| weights.append(density_function(x, y)) | |
| # Weighted sampling | |
| weights = np.array(weights) / sum(weights) | |
| selected_idx = np.random.choice( | |
| len(candidates), size=num_seeds, replace=False, p=weights | |
| ) | |
| seeds = [candidates[i] for i in selected_idx] | |
| # Run weighted CVT iterations | |
| for _ in range(iterations): | |
| # ... similar to regular CVT but with weighted centroids | |
| pass | |
| return seeds | |
| ``` | |
| #### Use Case: Industrial Park Zoning | |
| ```python | |
| def industrial_density(x, y): | |
| """ | |
| Higher density near main access roads and utilities. | |
| """ | |
| # Assume main road at y=0 | |
| dist_to_main_road = abs(y) | |
| # High density (smaller blocks) near road | |
| if dist_to_main_road < 100: | |
| return 2.0 | |
| elif dist_to_main_road < 200: | |
| return 1.5 | |
| else: | |
| return 1.0 | |
| seeds = generate_weighted_cvt_seeds( | |
| site=land_polygon, | |
| density_function=industrial_density, | |
| num_seeds=20 | |
| ) | |
| ``` | |
| #### Metrics | |
| - **Block uniformity**: +40-60% more uniform areas | |
| - **Fragmentation reduction**: -30-50% fragmented blocks | |
| - **Time overhead**: 20 iterations Γ O(n log n) = acceptable | |
| - **Implementation effort**: Trung bình (2-3 ngà y) | |
| --- | |
| ### 3.3 Tα»i Ζ―u 2: Constrained Voronoi | |
| #### VαΊ₯n Δα» | |
| Voronoi edges cΓ³ thα» khΓ΄ng align vα»i preferred road directions. | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from shapely.geometry import LineString | |
| from shapely.ops import split | |
| class ConstrainedVoronoi: | |
| """ | |
| Voronoi with hard constraints (e.g., main roads must be axis-aligned). | |
| """ | |
| def __init__(self, site: Polygon, main_roads: List[LineString]): | |
| self.site = site | |
| self.main_roads = main_roads # Pre-defined main road axes | |
| def generate(self, num_internal_seeds: int = 10) -> List[Polygon]: | |
| # Step 1: Split site by main roads | |
| regions = [self.site] | |
| for road in self.main_roads: | |
| new_regions = [] | |
| for region in regions: | |
| if road.intersects(region): | |
| parts = split(region, road) | |
| new_regions.extend(parts.geoms) | |
| else: | |
| new_regions.append(region) | |
| regions = [r for r in new_regions if r.geom_type == 'Polygon'] | |
| # Step 2: Apply Voronoi within each region | |
| all_blocks = [] | |
| for region in regions: | |
| seeds_per_region = max(2, int(num_internal_seeds * region.area / self.site.area)) | |
| seeds = generate_cvt_seeds(region, seeds_per_region) | |
| voronoi_regions = create_voronoi_diagram(seeds, region) | |
| if voronoi_regions: | |
| for v in voronoi_regions.geoms: | |
| clipped = v.intersection(region) | |
| if clipped.geom_type == 'Polygon' and clipped.area > 100: | |
| all_blocks.append(clipped) | |
| return all_blocks | |
| # Usage | |
| main_roads = [ | |
| LineString([(0, 500), (1000, 500)]), # Horizontal main road | |
| LineString([(500, 0), (500, 1000)]), # Vertical main road | |
| ] | |
| cv = ConstrainedVoronoi(land_polygon, main_roads) | |
| blocks = cv.generate(num_internal_seeds=15) | |
| ``` | |
| #### Metrics | |
| - **Road alignment**: Main roads guaranteed straight | |
| - **Flexibility**: Sub-blocks use organic Voronoi | |
| - **Implementation effort**: Trung bình (2 ngà y) | |
| --- | |
| ## 4. Stage 2: Block Subdivision (OR-Tools) | |
| ### 4.1 PhΓ’n TΓch Hiα»n TrαΊ‘ng | |
| **File**: `core/optimization/subdivision_solver.py` | |
| ```python | |
| # Hiα»n tαΊ‘i: 1D subdivision (width only) | |
| def solve_subdivision(total_length, min_width, max_width, target_width): | |
| # lot_vars[i] = width of lot i | |
| # Constraint: sum(lot_vars) == total_length | |
| # Objective: minimize deviation from target_width | |
| ``` | |
| **HαΊ‘n chαΊΏ**: Chα» chia theo 1 chiα»u, khΓ΄ng tα»i Ζ°u cho lots cαΊ§n aspect ratio cα»₯ thα». | |
| --- | |
| ### 4.2 Tα»i Ζ―u 1: 2D Bin Packing | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from ortools.sat.python import cp_model | |
| from typing import List, Tuple, Dict | |
| class TwoDimensionalSubdivisionSolver: | |
| """ | |
| 2D bin packing for lot placement within a block. | |
| Optimizes both lot placement and dimensions. | |
| """ | |
| @staticmethod | |
| def solve_2d_subdivision( | |
| block_width: float, | |
| block_height: float, | |
| min_lot_width: float, | |
| max_lot_width: float, | |
| min_lot_height: float, | |
| max_lot_height: float, | |
| target_aspect_ratio: float = 2.0, # height/width | |
| time_limit: float = 10.0 | |
| ) -> List[Dict]: | |
| """ | |
| Solve 2D lot placement using constraint programming. | |
| Returns: | |
| List of lots with {x, y, width, height} | |
| """ | |
| model = cp_model.CpModel() | |
| scale = 100 # 1cm precision | |
| # Estimate max lots | |
| min_lot_area = min_lot_width * min_lot_height | |
| max_lots = int((block_width * block_height) / min_lot_area) + 1 | |
| max_lots = min(max_lots, 50) # Cap for performance | |
| # Decision variables for each lot | |
| lots = [] | |
| for i in range(max_lots): | |
| lot = { | |
| 'x': model.NewIntVar(0, int(block_width * scale), f'x_{i}'), | |
| 'y': model.NewIntVar(0, int(block_height * scale), f'y_{i}'), | |
| 'width': model.NewIntVar( | |
| int(min_lot_width * scale), | |
| int(max_lot_width * scale), | |
| f'w_{i}' | |
| ), | |
| 'height': model.NewIntVar( | |
| int(min_lot_height * scale), | |
| int(max_lot_height * scale), | |
| f'h_{i}' | |
| ), | |
| 'used': model.NewBoolVar(f'used_{i}'), | |
| 'area': model.NewIntVar(0, int(block_width * block_height * scale * scale), f'area_{i}') | |
| } | |
| lots.append(lot) | |
| # Constraints | |
| for i, lot in enumerate(lots): | |
| # Lot must fit within block | |
| model.Add(lot['x'] + lot['width'] <= int(block_width * scale)).OnlyEnforceIf(lot['used']) | |
| model.Add(lot['y'] + lot['height'] <= int(block_height * scale)).OnlyEnforceIf(lot['used']) | |
| # If not used, dimensions are 0 | |
| model.Add(lot['width'] == 0).OnlyEnforceIf(lot['used'].Not()) | |
| model.Add(lot['height'] == 0).OnlyEnforceIf(lot['used'].Not()) | |
| # Area calculation | |
| model.AddMultiplicationEquality(lot['area'], [lot['width'], lot['height']]) | |
| # No overlap constraint (using intervals) | |
| x_intervals = [] | |
| y_intervals = [] | |
| for i, lot in enumerate(lots): | |
| x_interval = model.NewOptionalIntervalVar( | |
| lot['x'], lot['width'], lot['x'] + lot['width'], | |
| lot['used'], f'x_interval_{i}' | |
| ) | |
| y_interval = model.NewOptionalIntervalVar( | |
| lot['y'], lot['height'], lot['y'] + lot['height'], | |
| lot['used'], f'y_interval_{i}' | |
| ) | |
| x_intervals.append(x_interval) | |
| y_intervals.append(y_interval) | |
| model.AddNoOverlap2D(x_intervals, y_intervals) | |
| # Objective: Maximize total used area + Aspect ratio penalty | |
| total_area = sum(lot['area'] for lot in lots) | |
| # Aspect ratio deviation penalty | |
| aspect_penalties = [] | |
| target_ratio_scaled = int(target_aspect_ratio * 100) | |
| for i, lot in enumerate(lots): | |
| # height/width should be close to target_aspect_ratio | |
| deviation = model.NewIntVar(0, 1000, f'aspect_dev_{i}') | |
| actual_ratio = model.NewIntVar(0, 1000, f'ratio_{i}') | |
| # actual_ratio β (height * 100) / width | |
| model.AddDivisionEquality( | |
| actual_ratio, | |
| lot['height'] * 100, | |
| lot['width'] | |
| ) | |
| model.AddAbsEquality(deviation, actual_ratio - target_ratio_scaled) | |
| aspect_penalties.append(deviation) | |
| # Multi-objective: maximize area, minimize aspect deviation | |
| model.Maximize(total_area - sum(aspect_penalties) * 100) | |
| # Solve | |
| solver = cp_model.CpSolver() | |
| solver.parameters.max_time_in_seconds = time_limit | |
| status = solver.Solve(model) | |
| # Extract solution | |
| result = [] | |
| if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]: | |
| for lot in lots: | |
| if solver.Value(lot['used']): | |
| result.append({ | |
| 'x': solver.Value(lot['x']) / scale, | |
| 'y': solver.Value(lot['y']) / scale, | |
| 'width': solver.Value(lot['width']) / scale, | |
| 'height': solver.Value(lot['height']) / scale, | |
| }) | |
| return result | |
| ``` | |
| #### Metrics | |
| - **Space utilization**: +15-25% more efficient packing | |
| - **Aspect ratio control**: Lots match industrial requirements | |
| - **Time complexity**: Higher, nhΖ°ng bounded by time_limit | |
| - **Implementation effort**: Cao (1 tuαΊ§n) | |
| --- | |
| ### 4.3 Tα»i Ζ―u 2: Hierarchical Decomposition | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from concurrent.futures import ProcessPoolExecutor | |
| from shapely.geometry import box | |
| class HierarchicalSubdivisionSolver: | |
| """ | |
| Divide large blocks into quadrants, solve each in parallel. | |
| """ | |
| @staticmethod | |
| def subdivide_hierarchically( | |
| block: Polygon, | |
| max_block_size: float = 10000, # mΒ² | |
| **solver_kwargs | |
| ) -> List[Dict]: | |
| """ | |
| Recursively subdivide large blocks before optimization. | |
| """ | |
| if block.area <= max_block_size: | |
| # Small enough, solve directly | |
| return SubdivisionSolver.subdivide_block(block, **solver_kwargs) | |
| # Split into quadrants | |
| minx, miny, maxx, maxy = block.bounds | |
| midx, midy = (minx + maxx) / 2, (miny + maxy) / 2 | |
| quadrants = [ | |
| box(minx, miny, midx, midy), | |
| box(midx, miny, maxx, midy), | |
| box(minx, midy, midx, maxy), | |
| box(midx, midy, maxx, maxy), | |
| ] | |
| sub_blocks = [] | |
| for quad in quadrants: | |
| intersection = block.intersection(quad) | |
| if not intersection.is_empty and intersection.area > 100: | |
| sub_blocks.append(intersection) | |
| # Solve in parallel | |
| with ProcessPoolExecutor(max_workers=4) as executor: | |
| results = list(executor.map( | |
| lambda b: HierarchicalSubdivisionSolver.subdivide_hierarchically( | |
| b, max_block_size, **solver_kwargs | |
| ), | |
| sub_blocks | |
| )) | |
| # Merge results | |
| all_lots = [] | |
| for result in results: | |
| if 'lots' in result: | |
| all_lots.extend(result['lots']) | |
| return {'geometry': block, 'type': 'residential', 'lots': all_lots} | |
| ``` | |
| #### Metrics | |
| - **Parallel speedup**: 2-4x with 4 cores | |
| - **Scalability**: Handles very large blocks | |
| - **Implementation effort**: Trung bình (2-3 ngà y) | |
| --- | |
| ## 5. Stage 3: Infrastructure Planning | |
| ### 5.1 Network Planner - Steiner Tree | |
| **File**: `core/infrastructure/network_planner.py` | |
| #### VαΊ₯n Δα» Hiα»n TαΊ‘i | |
| MST + 15% redundancy lΓ heuristic, khΓ΄ng optimal. | |
| #### GiαΊ£i phΓ‘p: Steiner Tree | |
| ```python | |
| import networkx as nx | |
| from networkx.algorithms.approximation import steiner_tree | |
| from scipy.spatial import Delaunay | |
| def generate_steiner_network( | |
| lots: List[Polygon], | |
| steiner_candidates: int = 20, | |
| max_distance: float = 500.0, | |
| redundancy_ratio: float = 0.15 | |
| ) -> Tuple[List[List[float]], List[LineString]]: | |
| """ | |
| Generate network using Steiner Tree approximation. | |
| Steiner Tree allows intermediate nodes (not at lot centroids) | |
| which can reduce total cable length by 15-20%. | |
| """ | |
| if len(lots) < 2: | |
| return [], [] | |
| # Get lot centroids (terminal nodes) | |
| centroids = [lot.centroid for lot in lots] | |
| terminal_coords = [(p.x, p.y) for p in centroids] | |
| # Generate candidate Steiner points | |
| # Use centroid of triangles from Delaunay triangulation | |
| all_coords = np.array(terminal_coords) | |
| if len(all_coords) >= 3: | |
| tri = Delaunay(all_coords) | |
| steiner_candidates_points = [] | |
| for simplex in tri.simplices: | |
| triangle_points = all_coords[simplex] | |
| centroid = triangle_points.mean(axis=0) | |
| steiner_candidates_points.append(tuple(centroid)) | |
| else: | |
| steiner_candidates_points = [] | |
| # Build graph with terminals and Steiner candidates | |
| all_points = terminal_coords + steiner_candidates_points | |
| G = nx.Graph() | |
| for i, p in enumerate(all_points): | |
| G.add_node(i, pos=p, terminal=(i < len(terminal_coords))) | |
| # Add edges between nearby points | |
| for i in range(len(all_points)): | |
| for j in range(i + 1, len(all_points)): | |
| dist = np.sqrt( | |
| (all_points[i][0] - all_points[j][0])**2 + | |
| (all_points[i][1] - all_points[j][1])**2 | |
| ) | |
| if dist < max_distance: | |
| G.add_edge(i, j, weight=dist) | |
| # Find Steiner tree connecting all terminals | |
| terminal_nodes = list(range(len(terminal_coords))) | |
| try: | |
| st = steiner_tree(G, terminal_nodes, weight='weight') | |
| except Exception: | |
| # Fallback to MST | |
| st = nx.minimum_spanning_tree(G) | |
| # Add redundancy | |
| loop_graph = st.copy() | |
| all_edges = sorted(G.edges(data=True), key=lambda x: x[2]['weight']) | |
| target_extra = int(len(terminal_coords) * redundancy_ratio) | |
| added = 0 | |
| for u, v, data in all_edges: | |
| if not loop_graph.has_edge(u, v): | |
| loop_graph.add_edge(u, v, **data) | |
| added += 1 | |
| if added >= target_extra: | |
| break | |
| # Convert to LineStrings | |
| connections = [] | |
| for u, v in loop_graph.edges(): | |
| p1 = Point(all_points[u]) | |
| p2 = Point(all_points[v]) | |
| connections.append(LineString([p1, p2])) | |
| return [list(p) for p in all_points], connections | |
| ``` | |
| #### Metrics | |
| - **Cable length reduction**: 15-20% | |
| - **Cost savings**: Significant for large projects | |
| - **Implementation effort**: ThαΊ₯p (1 ngΓ y) | |
| --- | |
| ### 5.2 Transformer Placement - Multi-Objective | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from scipy.optimize import minimize | |
| from sklearn.cluster import KMeans | |
| import numpy as np | |
| def generate_transformers_multiobjective( | |
| lots: List[Polygon], | |
| power_per_lot: float = 100.0, # kW | |
| transformer_capacity: float = 1000.0, # kVA | |
| cable_cost_per_meter: float = 50.0, # USD | |
| transformer_cost: float = 50000.0 # USD | |
| ) -> Tuple[List[Tuple[float, float]], Dict]: | |
| """ | |
| Multi-objective transformer placement: | |
| 1. Minimize total cable length | |
| 2. Balance load across transformers | |
| 3. Minimize total cost (transformers + cables) | |
| """ | |
| if not lots: | |
| return [], {} | |
| lot_coords = np.array([[lot.centroid.x, lot.centroid.y] for lot in lots]) | |
| lot_powers = np.array([power_per_lot] * len(lots)) | |
| # Step 1: Find optimal number of transformers | |
| total_power = sum(lot_powers) | |
| min_transformers = int(np.ceil(total_power / transformer_capacity)) | |
| max_transformers = min(len(lots), min_transformers * 2) | |
| best_cost = float('inf') | |
| best_solution = None | |
| best_k = min_transformers | |
| for k in range(min_transformers, max_transformers + 1): | |
| # K-Means clustering | |
| kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) | |
| labels = kmeans.fit_predict(lot_coords) | |
| centers = kmeans.cluster_centers_ | |
| # Calculate total cable length | |
| total_cable = 0 | |
| load_imbalance = 0 | |
| cluster_loads = [0] * k | |
| for i, label in enumerate(labels): | |
| dist = np.sqrt( | |
| (lot_coords[i][0] - centers[label][0])**2 + | |
| (lot_coords[i][1] - centers[label][1])**2 | |
| ) | |
| total_cable += dist | |
| cluster_loads[label] += lot_powers[i] | |
| # Check capacity constraints | |
| capacity_ok = all(load <= transformer_capacity for load in cluster_loads) | |
| if not capacity_ok: | |
| continue | |
| # Load imbalance (variance) | |
| load_imbalance = np.var(cluster_loads) | |
| # Total cost | |
| cable_cost = total_cable * cable_cost_per_meter | |
| total_cost = k * transformer_cost + cable_cost | |
| # Penalize imbalance | |
| total_cost += load_imbalance * 0.1 | |
| if total_cost < best_cost: | |
| best_cost = total_cost | |
| best_k = k | |
| best_solution = { | |
| 'centers': [tuple(c) for c in centers], | |
| 'labels': labels.tolist(), | |
| 'cluster_loads': cluster_loads, | |
| 'total_cable': total_cable, | |
| 'total_cost': total_cost, | |
| } | |
| return best_solution['centers'], best_solution | |
| ``` | |
| --- | |
| ### 5.3 Drainage - Shortest Path on Road Network | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| import networkx as nx | |
| from shapely.geometry import LineString | |
| def calculate_drainage_on_network( | |
| lots: List[Polygon], | |
| road_network: nx.Graph, | |
| wwtp_node: int, | |
| arrow_length: float = 20.0 | |
| ) -> List[Dict]: | |
| """ | |
| Calculate drainage paths following the road network. | |
| More realistic than direct vectors - pipes follow roads. | |
| """ | |
| arrows = [] | |
| if not road_network.nodes(): | |
| return arrows | |
| # Precompute shortest paths from all nodes to WWTP | |
| try: | |
| lengths, paths = nx.single_source_dijkstra( | |
| road_network, wwtp_node, weight='length' | |
| ) | |
| except nx.NetworkXError: | |
| return arrows | |
| for lot in lots: | |
| c = lot.centroid | |
| # Find nearest road network node | |
| nearest_node = None | |
| min_dist = float('inf') | |
| for node, data in road_network.nodes(data=True): | |
| pos = data.get('pos', (node, node)) | |
| dist = c.distance(Point(pos)) | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest_node = node | |
| if nearest_node is None or nearest_node not in paths: | |
| continue | |
| # Get path to WWTP | |
| path = paths[nearest_node] | |
| if len(path) < 2: | |
| continue | |
| # Arrow direction: first segment of path | |
| first_node = path[0] | |
| second_node = path[1] | |
| pos1 = road_network.nodes[first_node].get('pos', (first_node, first_node)) | |
| pos2 = road_network.nodes[second_node].get('pos', (second_node, second_node)) | |
| dx = pos2[0] - pos1[0] | |
| dy = pos2[1] - pos1[1] | |
| length = np.sqrt(dx*dx + dy*dy) | |
| if length > 0: | |
| arrows.append({ | |
| 'start': (c.x, c.y), | |
| 'vector': (dx/length * arrow_length, dy/length * arrow_length), | |
| 'path_length': lengths.get(nearest_node, 0) | |
| }) | |
| return arrows | |
| ``` | |
| --- | |
| ## 6. Cross-Stage Optimizations | |
| ### 6.1 Spatial Indexing vα»i STRtree | |
| **VαΊ₯n Δα»**: O(nΒ²) intersection checks trong nhiα»u modules. | |
| #### GiαΊ£i phΓ‘p | |
| ```python | |
| from shapely.strtree import STRtree | |
| from shapely.geometry import Polygon | |
| class SpatiallyIndexedOperations: | |
| """ | |
| Use R-Tree spatial index for efficient geometric queries. | |
| """ | |
| def __init__(self, geometries: List[Polygon]): | |
| self.geometries = geometries | |
| self.tree = STRtree(geometries) | |
| def query_intersects(self, query_geom: Polygon) -> List[Polygon]: | |
| """O(log n) instead of O(n) for intersection queries.""" | |
| return self.tree.query(query_geom) | |
| def query_within_distance( | |
| self, | |
| query_geom: Polygon, | |
| distance: float | |
| ) -> List[Polygon]: | |
| """Find geometries within distance.""" | |
| buffered = query_geom.buffer(distance) | |
| return self.tree.query(buffered) | |
| # Integration into GridOptimizer | |
| class OptimizedGridOptimizer(GridOptimizer): | |
| def _evaluate_layout(self, individual): | |
| spacing, angle = individual | |
| blocks = self.generate_grid_candidates(spacing, angle) | |
| # Use spatial index for intersection checks | |
| tree = STRtree(blocks) | |
| candidates = tree.query(self.land_poly) | |
| total_residential_area = 0.0 | |
| fragmented_blocks = 0 | |
| original_area = spacing * spacing | |
| for blk in candidates: | |
| # Only evaluate blocks that actually intersect | |
| if not blk.intersects(self.land_poly): | |
| continue | |
| intersection = blk.intersection(self.land_poly) | |
| # ... rest of evaluation | |
| ``` | |
| #### Metrics | |
| - **Speedup**: 10-100x for large grids | |
| - **Memory**: Slightly higher (index storage) | |
| - **Implementation effort**: ThαΊ₯p (vΓ i giα») | |
| --- | |
| ### 6.2 Geometry Simplification | |
| ```python | |
| def preprocess_geometry(polygon: Polygon, tolerance: float = 0.5) -> Polygon: | |
| """ | |
| Simplify polygon before heavy operations. | |
| tolerance=0.5 means 50cm maximum deviation - acceptable for planning. | |
| """ | |
| # Remove redundant vertices | |
| simplified = polygon.simplify(tolerance, preserve_topology=True) | |
| # Fix any topology issues | |
| if not simplified.is_valid: | |
| simplified = simplified.buffer(0) | |
| return simplified | |
| # Apply before optimization | |
| class PreprocessedPipeline(LandRedistributionPipeline): | |
| def __init__(self, land_polygons, config, settings=None): | |
| # Simplify input geometry | |
| simplified_polygons = [ | |
| preprocess_geometry(p, tolerance=0.5) | |
| for p in land_polygons | |
| ] | |
| super().__init__(simplified_polygons, config, settings) | |
| ``` | |
| --- | |
| ### 6.3 Caching Layer | |
| ```python | |
| import hashlib | |
| from functools import lru_cache | |
| from typing import Tuple | |
| class CachedGridOptimizer(GridOptimizer): | |
| """ | |
| Cache fitness evaluations for identical or similar parameters. | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.eval_cache = {} | |
| def _discretize_params(self, spacing: float, angle: float) -> Tuple[int, int]: | |
| """Round to create discrete cache keys.""" | |
| return (round(spacing, 1), round(angle, 1)) | |
| def _evaluate_layout(self, individual): | |
| spacing, angle = individual | |
| cache_key = self._discretize_params(spacing, angle) | |
| if cache_key in self.eval_cache: | |
| return self.eval_cache[cache_key] | |
| # Compute and cache | |
| result = super()._evaluate_layout(individual) | |
| self.eval_cache[cache_key] = result | |
| return result | |
| ``` | |
| --- | |
| ## 7. Algorithm-Level Parallelization | |
| ### 7.1 Parallel Fitness Evaluation | |
| ```python | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| from deap import base, creator, tools | |
| import multiprocessing | |
| class ParallelGridOptimizer(GridOptimizer): | |
| def __init__(self, *args, num_workers=None, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.num_workers = num_workers or multiprocessing.cpu_count() | |
| def optimize(self, population_size=50, generations=100): | |
| pop = self.toolbox.population(n=population_size) | |
| # Use ProcessPoolExecutor for parallel evaluation | |
| with ProcessPoolExecutor(max_workers=self.num_workers) as executor: | |
| for gen in range(generations): | |
| # Parallel fitness evaluation | |
| futures = { | |
| executor.submit(self._evaluate_layout, ind): ind | |
| for ind in pop | |
| } | |
| for future in as_completed(futures): | |
| ind = futures[future] | |
| ind.fitness.values = future.result() | |
| # Selection and variation (sequential) | |
| offspring = algorithms.varAnd( | |
| pop, self.toolbox, | |
| cxpb=0.7, mutpb=0.2 | |
| ) | |
| # Parallel evaluation of offspring | |
| futures = { | |
| executor.submit(self._evaluate_layout, ind): ind | |
| for ind in offspring | |
| } | |
| for future in as_completed(futures): | |
| ind = futures[future] | |
| ind.fitness.values = future.result() | |
| pop = self.toolbox.select(pop + offspring, k=population_size) | |
| return tools.selBest(pop, 1)[0] | |
| ``` | |
| ### 7.2 DEAP Built-in Parallelization | |
| ```python | |
| from deap import base | |
| from scoop import futures | |
| # For distributed computing across machines | |
| toolbox.register("map", futures.map) | |
| # Usage: | |
| # python -m scoop -n 8 optimize.py | |
| ``` | |
| --- | |
| ## 8. Priority Roadmap | |
| ### 8.1 Impact vs Effort Matrix | |
| ``` | |
| IMPACT | |
| Low High | |
| ββββββββββββββββββ¬βββββββββββββββββ | |
| Low β β’ Caching β β’ STRtree β | |
| β β β’ Parallel Evalβ | |
| EFFORT β β β’ Steiner Tree β | |
| ββββββββββββββββββΌβββββββββββββββββ€ | |
| High β β’ Column Gen β β’ Surrogate β | |
| β β β’ 2D Packing β | |
| β β β’ CVT + Constr β | |
| ββββββββββββββββββ΄βββββββββββββββββ | |
| ``` | |
| ### 8.2 Recommended Implementation Order | |
| #### Phase 1: Quick Wins (1-2 weeks) | |
| | # | Optimization | Est. Time | Expected Speedup | | |
| |---|--------------|-----------|------------------| | |
| | 1 | STRtree Spatial Indexing | 4h | 10-50x for intersection | | |
| | 2 | Geometry Simplification | 2h | 2-3x for boolean ops | | |
| | 3 | Evaluation Caching | 4h | 1.5-2x | | |
| | 4 | Parallel Fitness Eval | 1d | 2-4x (CPU cores) | | |
| #### Phase 2: Medium-term (2-4 weeks) | |
| | # | Optimization | Est. Time | Expected Improvement | | |
| |---|--------------|-----------|---------------------| | |
| | 5 | Steiner Tree Network | 2d | 15-20% shorter cables | | |
| | 6 | CVT Voronoi Seeds | 3d | 30-50% less fragmentation | | |
| | 7 | Multi-obj Transformer | 2d | Better load balance + cost | | |
| | 8 | Island Model GA | 3d | Better global optimum | | |
| #### Phase 3: Advanced (1-2 months) | |
| | # | Optimization | Est. Time | Expected Improvement | | |
| |---|--------------|-----------|---------------------| | |
| | 9 | Extended Genome (5D) | 1w | Rectangular lots support | | |
| | 10 | Surrogate-Assisted | 2w | 5-20x fewer real evals | | |
| | 11 | 2D Bin Packing | 2w | 15-25% better space util | | |
| | 12 | Constrained Voronoi | 1w | Aligned main roads | | |
| --- | |
| ## Appendix A: Benchmark Template | |
| ```python | |
| import time | |
| from typing import Callable, Dict, Any | |
| def benchmark_optimization( | |
| optimizer_class: type, | |
| land_polygon: Polygon, | |
| configs: List[Dict[str, Any]], | |
| num_runs: int = 5 | |
| ) -> Dict: | |
| """ | |
| Standard benchmark for comparing optimizers. | |
| """ | |
| results = [] | |
| for config in configs: | |
| run_times = [] | |
| fitness_values = [] | |
| for run in range(num_runs): | |
| optimizer = optimizer_class(land_polygon, **config) | |
| start = time.perf_counter() | |
| solution, history = optimizer.optimize() | |
| elapsed = time.perf_counter() - start | |
| run_times.append(elapsed) | |
| fitness_values.append(solution.fitness.values) | |
| results.append({ | |
| 'config': config, | |
| 'avg_time': sum(run_times) / num_runs, | |
| 'std_time': np.std(run_times), | |
| 'avg_fitness': np.mean(fitness_values, axis=0), | |
| 'best_fitness': max(fitness_values), | |
| }) | |
| return results | |
| ``` | |
| --- | |
| ## Appendix B: References | |
| 1. **NSGA-II**: Deb et al., "A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II" (2002) | |
| 2. **Centroidal Voronoi Tessellation**: Du, Faber, Gunzburger (1999) | |
| 3. **Steiner Tree**: Hwang, Richards, Winter, "The Steiner Tree Problem" (1992) | |
| 4. **OR-Tools**: Google Operations Research Tools documentation | |
| 5. **Surrogate-Assisted Optimization**: Jin, "Surrogate-assisted evolutionary computation" (2011) | |
| 6. **Shapely STRtree**: Guttman, "R-trees: A Dynamic Index Structure for Spatial Searching" (1984) | |
| --- | |
| > **Document Version**: 1.0 | |
| > **Created**: 2025-12-08 | |
| > **Author**: AI Analysis System | |
| > **Status**: Draft - Pending Review | |