REMB / docs /ALGORITHM_OPTIMIZATIONS.md
Cuong2004's picture
algo opti docs
55bb298

Tối Ưu Thuật Toán - Khu Công Nghiệp Ảo

Tài liệu phân tích chi tiết các tối ưu thuật toán cho hệ thống "Khởi tạo khu công nghiệp bằng kỹ sư ảo"


Mục Lục

  1. Tổng quan kiến trúc
  2. Stage 1: Grid Optimization (NSGA-II)
  3. Stage 1 Alt: Voronoi Road Network
  4. Stage 2: Block Subdivision (OR-Tools)
  5. Stage 3: Infrastructure Planning
  6. Cross-Stage Optimizations
  7. Parallelization
  8. Priority Roadmap

1. Tổng Quan Kiến Trúc

1.1 Pipeline Hiện Tại

┌─────────────────────────────────────────────────────────────────────┐
│                    LAND REDISTRIBUTION PIPELINE                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────────┐ │
│  │  STAGE 1    │───▶│  STAGE 2    │───▶│       STAGE 3           │ │
│  │ Grid/Voronoi│    │ Subdivision │    │    Infrastructure       │ │
│  │  NSGA-II    │    │  OR-Tools   │    │ MST + K-Means + Drainage│ │
│  └─────────────┘    └─────────────┘    └─────────────────────────┘ │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 Các Module Chính

Module File Thuật toán Complexity
Grid Optimizer grid_optimizer.py NSGA-II Genetic Algorithm O(pop × gen × eval)
Voronoi Generator voronoi.py Fortune's Algorithm (Shapely) O(n log n)
Subdivision Solver subdivision_solver.py OR-Tools CP-SAT NP-hard (bounded)
Network Planner network_planner.py Kruskal's MST + Redundancy O(E log V)
Transformer Planner transformer_planner.py K-Means Clustering O(n × k × iterations)
Drainage Planner drainage_planner.py Vector Projection O(n)

2. Stage 1: Grid Optimization (NSGA-II)

2.1 Phân Tích Hiện Trạng

File: core/optimization/grid_optimizer.py

# Genome hiện tại: 2 genes
individual = [spacing, angle]  # spacing: 20-80m, angle: 0-90°

# Fitness: 2 objectives
objectives = (
    total_residential_area,  # maximize
    fragmented_blocks        # minimize
)

Bottleneck chính:

  • generate_grid_candidates(): Tạo grid và tính intersection với O(n²) Shapely operations
  • Mỗi fitness evaluation yêu cầu duyệt tất cả blocks

2.2 Tối Ưu 1: Mở Rộng Không Gian Tìm Kiếm

Vấn đề

Grid vuông (square) không tối ưu cho khu công nghiệp cần lots chữ nhật.

Giải pháp

# Đề xuất: 5 genes thay vì 2
class EnhancedGridOptimizer:
    def _setup_deap(self):
        # Gene definitions
        self.toolbox.register("attr_spacing_x", random.uniform, 20, 100)
        self.toolbox.register("attr_spacing_y", random.uniform, 20, 100)
        self.toolbox.register("attr_angle", random.uniform, 0, 90)
        self.toolbox.register("attr_offset_x", random.uniform, 0, 50)
        self.toolbox.register("attr_offset_y", random.uniform, 0, 50)
        
        self.toolbox.register(
            "individual", 
            tools.initCycle, 
            creator.Individual,
            (
                self.toolbox.attr_spacing_x,
                self.toolbox.attr_spacing_y,
                self.toolbox.attr_angle,
                self.toolbox.attr_offset_x,
                self.toolbox.attr_offset_y,
            ), 
            n=1
        )
    
    def generate_grid_candidates(
        self, 
        spacing_x: float,
        spacing_y: float, 
        angle_deg: float,
        offset_x: float,
        offset_y: float
    ) -> List[Polygon]:
        """Generate rectangular grid blocks."""
        # Create rectangular base block
        base_block = Polygon([
            (0, 0), 
            (spacing_x, 0), 
            (spacing_x, spacing_y), 
            (0, spacing_y)
        ])
        base_block = translate(base_block, -spacing_x/2 + offset_x, -spacing_y/2 + offset_y)
        # ... rotation and placement logic

Metrics

  • Flexibility: Hỗ trợ lots 40x80m, 50x100m, etc.
  • Trade-off: Tăng search space → cần tăng population hoặc generations
  • Implementation effort: Thấp (2-4 giờ)

2.3 Tối Ưu 2: Adaptive Island Model

Vấn đề

Single population dễ bị kẹt ở local optima.

Giải pháp

from deap import tools, algorithms
from concurrent.futures import ProcessPoolExecutor

class IslandGridOptimizer:
    def __init__(self, land_polygon, num_islands=4, migration_interval=5):
        self.num_islands = num_islands
        self.migration_interval = migration_interval
        self.islands = [self._create_population() for _ in range(num_islands)]
    
    def optimize(self, population_size=50, generations=100):
        for gen in range(generations):
            # Evolve each island independently
            for island in self.islands:
                self._evolve_one_generation(island)
            
            # Migration: exchange best individuals between islands
            if gen % self.migration_interval == 0:
                self._migrate()
        
        # Return best from all islands
        all_individuals = [ind for island in self.islands for ind in island]
        return tools.selBest(all_individuals, 1)[0]
    
    def _migrate(self):
        """Ring topology migration."""
        migrants = []
        for island in self.islands:
            best = tools.selBest(island, 2)
            migrants.append([toolbox.clone(ind) for ind in best])
        
        for i in range(self.num_islands):
            next_island = (i + 1) % self.num_islands
            # Replace worst with migrants
            worst = tools.selWorst(self.islands[next_island], 2)
            for w, m in zip(worst, migrants[i]):
                self.islands[next_island].remove(w)
                self.islands[next_island].append(m)

Metrics

  • Diversity: Giữ được đa dạng di truyền
  • Parallel potential: Mỗi island có thể chạy trên core riêng
  • Speedup: 2-4x với 4 islands trên 4 cores
  • Implementation effort: Trung bình (1-2 ngày)

2.4 Tối Ưu 3: Surrogate-Assisted Optimization

Vấn đề

Fitness evaluation (Shapely intersections) rất chậm.

Giải pháp

from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel
import numpy as np

class SurrogateAssistedOptimizer:
    def __init__(self, land_polygon):
        self.land_poly = land_polygon
        self.surrogate = None
        self.training_data = {'X': [], 'y': []}
        self.real_eval_count = 0
        
    def _build_surrogate(self):
        """Build GP surrogate model from collected samples."""
        if len(self.training_data['X']) < 20:
            return None
        
        X = np.array(self.training_data['X'])
        y = np.array(self.training_data['y'])
        
        kernel = ConstantKernel(1.0) * RBF(length_scale=[10, 10])
        gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5)
        gp.fit(X, y)
        return gp
    
    def _evaluate_with_surrogate(self, individual):
        """Evaluate using surrogate, with uncertainty-based real evaluation."""
        if self.surrogate is None:
            # No surrogate yet, use real evaluation
            return self._real_evaluate(individual)
        
        X = np.array([[individual[0], individual[1]]])
        pred, std = self.surrogate.predict(X, return_std=True)
        
        # If uncertainty is high, do real evaluation
        if std[0] > 0.1 * abs(pred[0]):  # 10% uncertainty threshold
            return self._real_evaluate(individual)
        
        return (pred[0], 0)  # Use surrogate prediction
    
    def _real_evaluate(self, individual):
        """Expensive real evaluation with Shapely."""
        self.real_eval_count += 1
        fitness = self._evaluate_layout(individual)
        
        # Store for training
        self.training_data['X'].append([individual[0], individual[1]])
        self.training_data['y'].append(fitness[0])
        
        # Rebuild surrogate periodically
        if len(self.training_data['X']) % 50 == 0:
            self.surrogate = self._build_surrogate()
        
        return fitness

Metrics

  • Speedup: 5-20x reduction in real evaluations
  • Accuracy: ~95% of true optimum
  • Trade-off: Cần initial samples để train surrogate
  • Implementation effort: Cao (1 tuần)

3. Stage 1 Alternative: Voronoi Road Network

3.1 Phân Tích Hiện Trạng

File: core/geometry/voronoi.py

def generate_voronoi_seeds(site, num_seeds=15, seed=None):
    # Random uniform distribution within bounding box
    for _ in range(num_seeds):
        x = random.uniform(minx, maxx)
        y = random.uniform(miny, maxy)
        seeds.append(Point(x, y))

Vấn đề: Random seeds → blocks không đều, nhiều fragmented blocks.


3.2 Tối Ưu 1: Centroidal Voronoi Tessellation (CVT)

Giải pháp

def generate_cvt_seeds(
    site: Polygon,
    num_seeds: int = 15,
    iterations: int = 20,
    seed: Optional[int] = None
) -> List[Point]:
    """
    Generate Centroidal Voronoi Tessellation seeds.
    
    CVT iteratively moves seeds to the centroid of their Voronoi regions,
    resulting in more uniform block sizes.
    """
    if seed is not None:
        random.seed(seed)
    
    minx, miny, maxx, maxy = site.bounds
    
    # Initialize with random seeds
    seeds = []
    for _ in range(num_seeds):
        x = random.uniform(minx, maxx)
        y = random.uniform(miny, maxy)
        seeds.append(Point(x, y))
    
    # CVT iterations
    for iteration in range(iterations):
        # Create Voronoi regions
        multi_point = MultiPoint(seeds)
        regions = voronoi_diagram(multi_point, envelope=site)
        
        # Move seeds to centroids
        new_seeds = []
        for region in regions.geoms:
            if region.geom_type == 'Polygon':
                # Clip to site boundary
                clipped = region.intersection(site)
                if not clipped.is_empty and clipped.geom_type == 'Polygon':
                    new_seeds.append(clipped.centroid)
                else:
                    # Keep original if clipped is invalid
                    new_seeds.append(region.centroid)
        
        # Check convergence
        if len(new_seeds) == len(seeds):
            max_movement = max(
                s1.distance(s2) for s1, s2 in zip(seeds, new_seeds)
            )
            if max_movement < 0.1:  # Convergence threshold
                break
            seeds = new_seeds
    
    return seeds


def generate_weighted_cvt_seeds(
    site: Polygon,
    density_function: Callable[[float, float], float],
    num_seeds: int = 15,
    iterations: int = 30
) -> List[Point]:
    """
    Weighted CVT with density function.
    
    Args:
        density_function: f(x, y) -> weight, higher = smaller blocks
    """
    # Use importance sampling for initial seeds
    candidates = []
    weights = []
    
    minx, miny, maxx, maxy = site.bounds
    for _ in range(num_seeds * 100):  # Oversample
        x = random.uniform(minx, maxx)
        y = random.uniform(miny, maxy)
        p = Point(x, y)
        if site.contains(p):
            candidates.append(p)
            weights.append(density_function(x, y))
    
    # Weighted sampling
    weights = np.array(weights) / sum(weights)
    selected_idx = np.random.choice(
        len(candidates), size=num_seeds, replace=False, p=weights
    )
    seeds = [candidates[i] for i in selected_idx]
    
    # Run weighted CVT iterations
    for _ in range(iterations):
        # ... similar to regular CVT but with weighted centroids
        pass
    
    return seeds

Use Case: Industrial Park Zoning

def industrial_density(x, y):
    """
    Higher density near main access roads and utilities.
    """
    # Assume main road at y=0
    dist_to_main_road = abs(y)
    
    # High density (smaller blocks) near road
    if dist_to_main_road < 100:
        return 2.0
    elif dist_to_main_road < 200:
        return 1.5
    else:
        return 1.0

seeds = generate_weighted_cvt_seeds(
    site=land_polygon,
    density_function=industrial_density,
    num_seeds=20
)

Metrics

  • Block uniformity: +40-60% more uniform areas
  • Fragmentation reduction: -30-50% fragmented blocks
  • Time overhead: 20 iterations × O(n log n) = acceptable
  • Implementation effort: Trung bình (2-3 ngày)

3.3 Tối Ưu 2: Constrained Voronoi

Vấn đề

Voronoi edges có thể không align với preferred road directions.

Giải pháp

from shapely.geometry import LineString
from shapely.ops import split

class ConstrainedVoronoi:
    """
    Voronoi with hard constraints (e.g., main roads must be axis-aligned).
    """
    
    def __init__(self, site: Polygon, main_roads: List[LineString]):
        self.site = site
        self.main_roads = main_roads  # Pre-defined main road axes
    
    def generate(self, num_internal_seeds: int = 10) -> List[Polygon]:
        # Step 1: Split site by main roads
        regions = [self.site]
        for road in self.main_roads:
            new_regions = []
            for region in regions:
                if road.intersects(region):
                    parts = split(region, road)
                    new_regions.extend(parts.geoms)
                else:
                    new_regions.append(region)
            regions = [r for r in new_regions if r.geom_type == 'Polygon']
        
        # Step 2: Apply Voronoi within each region
        all_blocks = []
        for region in regions:
            seeds_per_region = max(2, int(num_internal_seeds * region.area / self.site.area))
            seeds = generate_cvt_seeds(region, seeds_per_region)
            voronoi_regions = create_voronoi_diagram(seeds, region)
            
            if voronoi_regions:
                for v in voronoi_regions.geoms:
                    clipped = v.intersection(region)
                    if clipped.geom_type == 'Polygon' and clipped.area > 100:
                        all_blocks.append(clipped)
        
        return all_blocks


# Usage
main_roads = [
    LineString([(0, 500), (1000, 500)]),    # Horizontal main road
    LineString([(500, 0), (500, 1000)]),     # Vertical main road
]

cv = ConstrainedVoronoi(land_polygon, main_roads)
blocks = cv.generate(num_internal_seeds=15)

Metrics

  • Road alignment: Main roads guaranteed straight
  • Flexibility: Sub-blocks use organic Voronoi
  • Implementation effort: Trung bình (2 ngày)

4. Stage 2: Block Subdivision (OR-Tools)

4.1 Phân Tích Hiện Trạng

File: core/optimization/subdivision_solver.py

# Hiện tại: 1D subdivision (width only)
def solve_subdivision(total_length, min_width, max_width, target_width):
    # lot_vars[i] = width of lot i
    # Constraint: sum(lot_vars) == total_length
    # Objective: minimize deviation from target_width

Hạn chế: Chỉ chia theo 1 chiều, không tối ưu cho lots cần aspect ratio cụ thể.


4.2 Tối Ưu 1: 2D Bin Packing

Giải pháp

from ortools.sat.python import cp_model
from typing import List, Tuple, Dict

class TwoDimensionalSubdivisionSolver:
    """
    2D bin packing for lot placement within a block.
    
    Optimizes both lot placement and dimensions.
    """
    
    @staticmethod
    def solve_2d_subdivision(
        block_width: float,
        block_height: float,
        min_lot_width: float,
        max_lot_width: float,
        min_lot_height: float,
        max_lot_height: float,
        target_aspect_ratio: float = 2.0,  # height/width
        time_limit: float = 10.0
    ) -> List[Dict]:
        """
        Solve 2D lot placement using constraint programming.
        
        Returns:
            List of lots with {x, y, width, height}
        """
        model = cp_model.CpModel()
        scale = 100  # 1cm precision
        
        # Estimate max lots
        min_lot_area = min_lot_width * min_lot_height
        max_lots = int((block_width * block_height) / min_lot_area) + 1
        max_lots = min(max_lots, 50)  # Cap for performance
        
        # Decision variables for each lot
        lots = []
        for i in range(max_lots):
            lot = {
                'x': model.NewIntVar(0, int(block_width * scale), f'x_{i}'),
                'y': model.NewIntVar(0, int(block_height * scale), f'y_{i}'),
                'width': model.NewIntVar(
                    int(min_lot_width * scale),
                    int(max_lot_width * scale),
                    f'w_{i}'
                ),
                'height': model.NewIntVar(
                    int(min_lot_height * scale),
                    int(max_lot_height * scale),
                    f'h_{i}'
                ),
                'used': model.NewBoolVar(f'used_{i}'),
                'area': model.NewIntVar(0, int(block_width * block_height * scale * scale), f'area_{i}')
            }
            lots.append(lot)
        
        # Constraints
        for i, lot in enumerate(lots):
            # Lot must fit within block
            model.Add(lot['x'] + lot['width'] <= int(block_width * scale)).OnlyEnforceIf(lot['used'])
            model.Add(lot['y'] + lot['height'] <= int(block_height * scale)).OnlyEnforceIf(lot['used'])
            
            # If not used, dimensions are 0
            model.Add(lot['width'] == 0).OnlyEnforceIf(lot['used'].Not())
            model.Add(lot['height'] == 0).OnlyEnforceIf(lot['used'].Not())
            
            # Area calculation
            model.AddMultiplicationEquality(lot['area'], [lot['width'], lot['height']])
        
        # No overlap constraint (using intervals)
        x_intervals = []
        y_intervals = []
        for i, lot in enumerate(lots):
            x_interval = model.NewOptionalIntervalVar(
                lot['x'], lot['width'], lot['x'] + lot['width'],
                lot['used'], f'x_interval_{i}'
            )
            y_interval = model.NewOptionalIntervalVar(
                lot['y'], lot['height'], lot['y'] + lot['height'],
                lot['used'], f'y_interval_{i}'
            )
            x_intervals.append(x_interval)
            y_intervals.append(y_interval)
        
        model.AddNoOverlap2D(x_intervals, y_intervals)
        
        # Objective: Maximize total used area + Aspect ratio penalty
        total_area = sum(lot['area'] for lot in lots)
        
        # Aspect ratio deviation penalty
        aspect_penalties = []
        target_ratio_scaled = int(target_aspect_ratio * 100)
        for i, lot in enumerate(lots):
            # height/width should be close to target_aspect_ratio
            deviation = model.NewIntVar(0, 1000, f'aspect_dev_{i}')
            actual_ratio = model.NewIntVar(0, 1000, f'ratio_{i}')
            # actual_ratio ≈ (height * 100) / width
            model.AddDivisionEquality(
                actual_ratio,
                lot['height'] * 100,
                lot['width']
            )
            model.AddAbsEquality(deviation, actual_ratio - target_ratio_scaled)
            aspect_penalties.append(deviation)
        
        # Multi-objective: maximize area, minimize aspect deviation
        model.Maximize(total_area - sum(aspect_penalties) * 100)
        
        # Solve
        solver = cp_model.CpSolver()
        solver.parameters.max_time_in_seconds = time_limit
        status = solver.Solve(model)
        
        # Extract solution
        result = []
        if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
            for lot in lots:
                if solver.Value(lot['used']):
                    result.append({
                        'x': solver.Value(lot['x']) / scale,
                        'y': solver.Value(lot['y']) / scale,
                        'width': solver.Value(lot['width']) / scale,
                        'height': solver.Value(lot['height']) / scale,
                    })
        
        return result

Metrics

  • Space utilization: +15-25% more efficient packing
  • Aspect ratio control: Lots match industrial requirements
  • Time complexity: Higher, nhưng bounded by time_limit
  • Implementation effort: Cao (1 tuần)

4.3 Tối Ưu 2: Hierarchical Decomposition

Giải pháp

from concurrent.futures import ProcessPoolExecutor
from shapely.geometry import box

class HierarchicalSubdivisionSolver:
    """
    Divide large blocks into quadrants, solve each in parallel.
    """
    
    @staticmethod
    def subdivide_hierarchically(
        block: Polygon,
        max_block_size: float = 10000,  # m²
        **solver_kwargs
    ) -> List[Dict]:
        """
        Recursively subdivide large blocks before optimization.
        """
        if block.area <= max_block_size:
            # Small enough, solve directly
            return SubdivisionSolver.subdivide_block(block, **solver_kwargs)
        
        # Split into quadrants
        minx, miny, maxx, maxy = block.bounds
        midx, midy = (minx + maxx) / 2, (miny + maxy) / 2
        
        quadrants = [
            box(minx, miny, midx, midy),
            box(midx, miny, maxx, midy),
            box(minx, midy, midx, maxy),
            box(midx, midy, maxx, maxy),
        ]
        
        sub_blocks = []
        for quad in quadrants:
            intersection = block.intersection(quad)
            if not intersection.is_empty and intersection.area > 100:
                sub_blocks.append(intersection)
        
        # Solve in parallel
        with ProcessPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(
                lambda b: HierarchicalSubdivisionSolver.subdivide_hierarchically(
                    b, max_block_size, **solver_kwargs
                ),
                sub_blocks
            ))
        
        # Merge results
        all_lots = []
        for result in results:
            if 'lots' in result:
                all_lots.extend(result['lots'])
        
        return {'geometry': block, 'type': 'residential', 'lots': all_lots}

Metrics

  • Parallel speedup: 2-4x with 4 cores
  • Scalability: Handles very large blocks
  • Implementation effort: Trung bình (2-3 ngày)

5. Stage 3: Infrastructure Planning

5.1 Network Planner - Steiner Tree

File: core/infrastructure/network_planner.py

Vấn đề Hiện Tại

MST + 15% redundancy là heuristic, không optimal.

Giải pháp: Steiner Tree

import networkx as nx
from networkx.algorithms.approximation import steiner_tree
from scipy.spatial import Delaunay

def generate_steiner_network(
    lots: List[Polygon],
    steiner_candidates: int = 20,
    max_distance: float = 500.0,
    redundancy_ratio: float = 0.15
) -> Tuple[List[List[float]], List[LineString]]:
    """
    Generate network using Steiner Tree approximation.
    
    Steiner Tree allows intermediate nodes (not at lot centroids)
    which can reduce total cable length by 15-20%.
    """
    if len(lots) < 2:
        return [], []
    
    # Get lot centroids (terminal nodes)
    centroids = [lot.centroid for lot in lots]
    terminal_coords = [(p.x, p.y) for p in centroids]
    
    # Generate candidate Steiner points
    # Use centroid of triangles from Delaunay triangulation
    all_coords = np.array(terminal_coords)
    if len(all_coords) >= 3:
        tri = Delaunay(all_coords)
        steiner_candidates_points = []
        for simplex in tri.simplices:
            triangle_points = all_coords[simplex]
            centroid = triangle_points.mean(axis=0)
            steiner_candidates_points.append(tuple(centroid))
    else:
        steiner_candidates_points = []
    
    # Build graph with terminals and Steiner candidates
    all_points = terminal_coords + steiner_candidates_points
    G = nx.Graph()
    
    for i, p in enumerate(all_points):
        G.add_node(i, pos=p, terminal=(i < len(terminal_coords)))
    
    # Add edges between nearby points
    for i in range(len(all_points)):
        for j in range(i + 1, len(all_points)):
            dist = np.sqrt(
                (all_points[i][0] - all_points[j][0])**2 +
                (all_points[i][1] - all_points[j][1])**2
            )
            if dist < max_distance:
                G.add_edge(i, j, weight=dist)
    
    # Find Steiner tree connecting all terminals
    terminal_nodes = list(range(len(terminal_coords)))
    try:
        st = steiner_tree(G, terminal_nodes, weight='weight')
    except Exception:
        # Fallback to MST
        st = nx.minimum_spanning_tree(G)
    
    # Add redundancy
    loop_graph = st.copy()
    all_edges = sorted(G.edges(data=True), key=lambda x: x[2]['weight'])
    target_extra = int(len(terminal_coords) * redundancy_ratio)
    added = 0
    
    for u, v, data in all_edges:
        if not loop_graph.has_edge(u, v):
            loop_graph.add_edge(u, v, **data)
            added += 1
            if added >= target_extra:
                break
    
    # Convert to LineStrings
    connections = []
    for u, v in loop_graph.edges():
        p1 = Point(all_points[u])
        p2 = Point(all_points[v])
        connections.append(LineString([p1, p2]))
    
    return [list(p) for p in all_points], connections

Metrics

  • Cable length reduction: 15-20%
  • Cost savings: Significant for large projects
  • Implementation effort: Thấp (1 ngày)

5.2 Transformer Placement - Multi-Objective

Giải pháp

from scipy.optimize import minimize
from sklearn.cluster import KMeans
import numpy as np

def generate_transformers_multiobjective(
    lots: List[Polygon],
    power_per_lot: float = 100.0,  # kW
    transformer_capacity: float = 1000.0,  # kVA
    cable_cost_per_meter: float = 50.0,  # USD
    transformer_cost: float = 50000.0  # USD
) -> Tuple[List[Tuple[float, float]], Dict]:
    """
    Multi-objective transformer placement:
    1. Minimize total cable length
    2. Balance load across transformers
    3. Minimize total cost (transformers + cables)
    """
    if not lots:
        return [], {}
    
    lot_coords = np.array([[lot.centroid.x, lot.centroid.y] for lot in lots])
    lot_powers = np.array([power_per_lot] * len(lots))
    
    # Step 1: Find optimal number of transformers
    total_power = sum(lot_powers)
    min_transformers = int(np.ceil(total_power / transformer_capacity))
    max_transformers = min(len(lots), min_transformers * 2)
    
    best_cost = float('inf')
    best_solution = None
    best_k = min_transformers
    
    for k in range(min_transformers, max_transformers + 1):
        # K-Means clustering
        kmeans = KMeans(n_clusters=k, n_init=10, random_state=42)
        labels = kmeans.fit_predict(lot_coords)
        centers = kmeans.cluster_centers_
        
        # Calculate total cable length
        total_cable = 0
        load_imbalance = 0
        
        cluster_loads = [0] * k
        for i, label in enumerate(labels):
            dist = np.sqrt(
                (lot_coords[i][0] - centers[label][0])**2 +
                (lot_coords[i][1] - centers[label][1])**2
            )
            total_cable += dist
            cluster_loads[label] += lot_powers[i]
        
        # Check capacity constraints
        capacity_ok = all(load <= transformer_capacity for load in cluster_loads)
        if not capacity_ok:
            continue
        
        # Load imbalance (variance)
        load_imbalance = np.var(cluster_loads)
        
        # Total cost
        cable_cost = total_cable * cable_cost_per_meter
        total_cost = k * transformer_cost + cable_cost
        
        # Penalize imbalance
        total_cost += load_imbalance * 0.1
        
        if total_cost < best_cost:
            best_cost = total_cost
            best_k = k
            best_solution = {
                'centers': [tuple(c) for c in centers],
                'labels': labels.tolist(),
                'cluster_loads': cluster_loads,
                'total_cable': total_cable,
                'total_cost': total_cost,
            }
    
    return best_solution['centers'], best_solution

5.3 Drainage - Shortest Path on Road Network

Giải pháp

import networkx as nx
from shapely.geometry import LineString

def calculate_drainage_on_network(
    lots: List[Polygon],
    road_network: nx.Graph,
    wwtp_node: int,
    arrow_length: float = 20.0
) -> List[Dict]:
    """
    Calculate drainage paths following the road network.
    
    More realistic than direct vectors - pipes follow roads.
    """
    arrows = []
    
    if not road_network.nodes():
        return arrows
    
    # Precompute shortest paths from all nodes to WWTP
    try:
        lengths, paths = nx.single_source_dijkstra(
            road_network, wwtp_node, weight='length'
        )
    except nx.NetworkXError:
        return arrows
    
    for lot in lots:
        c = lot.centroid
        
        # Find nearest road network node
        nearest_node = None
        min_dist = float('inf')
        for node, data in road_network.nodes(data=True):
            pos = data.get('pos', (node, node))
            dist = c.distance(Point(pos))
            if dist < min_dist:
                min_dist = dist
                nearest_node = node
        
        if nearest_node is None or nearest_node not in paths:
            continue
        
        # Get path to WWTP
        path = paths[nearest_node]
        if len(path) < 2:
            continue
        
        # Arrow direction: first segment of path
        first_node = path[0]
        second_node = path[1]
        
        pos1 = road_network.nodes[first_node].get('pos', (first_node, first_node))
        pos2 = road_network.nodes[second_node].get('pos', (second_node, second_node))
        
        dx = pos2[0] - pos1[0]
        dy = pos2[1] - pos1[1]
        length = np.sqrt(dx*dx + dy*dy)
        
        if length > 0:
            arrows.append({
                'start': (c.x, c.y),
                'vector': (dx/length * arrow_length, dy/length * arrow_length),
                'path_length': lengths.get(nearest_node, 0)
            })
    
    return arrows

6. Cross-Stage Optimizations

6.1 Spatial Indexing với STRtree

Vấn đề: O(n²) intersection checks trong nhiều modules.

Giải pháp

from shapely.strtree import STRtree
from shapely.geometry import Polygon

class SpatiallyIndexedOperations:
    """
    Use R-Tree spatial index for efficient geometric queries.
    """
    
    def __init__(self, geometries: List[Polygon]):
        self.geometries = geometries
        self.tree = STRtree(geometries)
    
    def query_intersects(self, query_geom: Polygon) -> List[Polygon]:
        """O(log n) instead of O(n) for intersection queries."""
        return self.tree.query(query_geom)
    
    def query_within_distance(
        self, 
        query_geom: Polygon, 
        distance: float
    ) -> List[Polygon]:
        """Find geometries within distance."""
        buffered = query_geom.buffer(distance)
        return self.tree.query(buffered)


# Integration into GridOptimizer
class OptimizedGridOptimizer(GridOptimizer):
    def _evaluate_layout(self, individual):
        spacing, angle = individual
        blocks = self.generate_grid_candidates(spacing, angle)
        
        # Use spatial index for intersection checks
        tree = STRtree(blocks)
        candidates = tree.query(self.land_poly)
        
        total_residential_area = 0.0
        fragmented_blocks = 0
        original_area = spacing * spacing
        
        for blk in candidates:
            # Only evaluate blocks that actually intersect
            if not blk.intersects(self.land_poly):
                continue
            
            intersection = blk.intersection(self.land_poly)
            # ... rest of evaluation

Metrics

  • Speedup: 10-100x for large grids
  • Memory: Slightly higher (index storage)
  • Implementation effort: Thấp (vài giờ)

6.2 Geometry Simplification

def preprocess_geometry(polygon: Polygon, tolerance: float = 0.5) -> Polygon:
    """
    Simplify polygon before heavy operations.
    
    tolerance=0.5 means 50cm maximum deviation - acceptable for planning.
    """
    # Remove redundant vertices
    simplified = polygon.simplify(tolerance, preserve_topology=True)
    
    # Fix any topology issues
    if not simplified.is_valid:
        simplified = simplified.buffer(0)
    
    return simplified


# Apply before optimization
class PreprocessedPipeline(LandRedistributionPipeline):
    def __init__(self, land_polygons, config, settings=None):
        # Simplify input geometry
        simplified_polygons = [
            preprocess_geometry(p, tolerance=0.5) 
            for p in land_polygons
        ]
        super().__init__(simplified_polygons, config, settings)

6.3 Caching Layer

import hashlib
from functools import lru_cache
from typing import Tuple

class CachedGridOptimizer(GridOptimizer):
    """
    Cache fitness evaluations for identical or similar parameters.
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.eval_cache = {}
    
    def _discretize_params(self, spacing: float, angle: float) -> Tuple[int, int]:
        """Round to create discrete cache keys."""
        return (round(spacing, 1), round(angle, 1))
    
    def _evaluate_layout(self, individual):
        spacing, angle = individual
        cache_key = self._discretize_params(spacing, angle)
        
        if cache_key in self.eval_cache:
            return self.eval_cache[cache_key]
        
        # Compute and cache
        result = super()._evaluate_layout(individual)
        self.eval_cache[cache_key] = result
        
        return result

7. Algorithm-Level Parallelization

7.1 Parallel Fitness Evaluation

from concurrent.futures import ProcessPoolExecutor, as_completed
from deap import base, creator, tools
import multiprocessing

class ParallelGridOptimizer(GridOptimizer):
    def __init__(self, *args, num_workers=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.num_workers = num_workers or multiprocessing.cpu_count()
    
    def optimize(self, population_size=50, generations=100):
        pop = self.toolbox.population(n=population_size)
        
        # Use ProcessPoolExecutor for parallel evaluation
        with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
            for gen in range(generations):
                # Parallel fitness evaluation
                futures = {
                    executor.submit(self._evaluate_layout, ind): ind 
                    for ind in pop
                }
                
                for future in as_completed(futures):
                    ind = futures[future]
                    ind.fitness.values = future.result()
                
                # Selection and variation (sequential)
                offspring = algorithms.varAnd(
                    pop, self.toolbox, 
                    cxpb=0.7, mutpb=0.2
                )
                
                # Parallel evaluation of offspring
                futures = {
                    executor.submit(self._evaluate_layout, ind): ind 
                    for ind in offspring
                }
                
                for future in as_completed(futures):
                    ind = futures[future]
                    ind.fitness.values = future.result()
                
                pop = self.toolbox.select(pop + offspring, k=population_size)
        
        return tools.selBest(pop, 1)[0]

7.2 DEAP Built-in Parallelization

from deap import base
from scoop import futures

# For distributed computing across machines
toolbox.register("map", futures.map)

# Usage:
# python -m scoop -n 8 optimize.py

8. Priority Roadmap

8.1 Impact vs Effort Matrix

                              IMPACT
                    Low                    High
              ┌────────────────┬────────────────┐
         Low  │ • Caching      │ • STRtree      │
              │                │ • Parallel Eval│
    EFFORT    │                │ • Steiner Tree │
              ├────────────────┼────────────────┤
         High │ • Column Gen   │ • Surrogate    │
              │                │ • 2D Packing   │
              │                │ • CVT + Constr │
              └────────────────┴────────────────┘

8.2 Recommended Implementation Order

Phase 1: Quick Wins (1-2 weeks)

# Optimization Est. Time Expected Speedup
1 STRtree Spatial Indexing 4h 10-50x for intersection
2 Geometry Simplification 2h 2-3x for boolean ops
3 Evaluation Caching 4h 1.5-2x
4 Parallel Fitness Eval 1d 2-4x (CPU cores)

Phase 2: Medium-term (2-4 weeks)

# Optimization Est. Time Expected Improvement
5 Steiner Tree Network 2d 15-20% shorter cables
6 CVT Voronoi Seeds 3d 30-50% less fragmentation
7 Multi-obj Transformer 2d Better load balance + cost
8 Island Model GA 3d Better global optimum

Phase 3: Advanced (1-2 months)

# Optimization Est. Time Expected Improvement
9 Extended Genome (5D) 1w Rectangular lots support
10 Surrogate-Assisted 2w 5-20x fewer real evals
11 2D Bin Packing 2w 15-25% better space util
12 Constrained Voronoi 1w Aligned main roads

Appendix A: Benchmark Template

import time
from typing import Callable, Dict, Any

def benchmark_optimization(
    optimizer_class: type,
    land_polygon: Polygon,
    configs: List[Dict[str, Any]],
    num_runs: int = 5
) -> Dict:
    """
    Standard benchmark for comparing optimizers.
    """
    results = []
    
    for config in configs:
        run_times = []
        fitness_values = []
        
        for run in range(num_runs):
            optimizer = optimizer_class(land_polygon, **config)
            
            start = time.perf_counter()
            solution, history = optimizer.optimize()
            elapsed = time.perf_counter() - start
            
            run_times.append(elapsed)
            fitness_values.append(solution.fitness.values)
        
        results.append({
            'config': config,
            'avg_time': sum(run_times) / num_runs,
            'std_time': np.std(run_times),
            'avg_fitness': np.mean(fitness_values, axis=0),
            'best_fitness': max(fitness_values),
        })
    
    return results

Appendix B: References

  1. NSGA-II: Deb et al., "A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II" (2002)
  2. Centroidal Voronoi Tessellation: Du, Faber, Gunzburger (1999)
  3. Steiner Tree: Hwang, Richards, Winter, "The Steiner Tree Problem" (1992)
  4. OR-Tools: Google Operations Research Tools documentation
  5. Surrogate-Assisted Optimization: Jin, "Surrogate-assisted evolutionary computation" (2011)
  6. Shapely STRtree: Guttman, "R-trees: A Dynamic Index Structure for Spatial Searching" (1984)

Document Version: 1.0
Created: 2025-12-08
Author: AI Analysis System
Status: Draft - Pending Review