Tối Ưu Thuật Toán - Khu Công Nghiệp Ảo
Tài liệu phân tích chi tiết các tối ưu thuật toán cho hệ thống "Khởi tạo khu công nghiệp bằng kỹ sư ảo"
Mục Lục
- Tổng quan kiến trúc
- Stage 1: Grid Optimization (NSGA-II)
- Stage 1 Alt: Voronoi Road Network
- Stage 2: Block Subdivision (OR-Tools)
- Stage 3: Infrastructure Planning
- Cross-Stage Optimizations
- Parallelization
- Priority Roadmap
1. Tổng Quan Kiến Trúc
1.1 Pipeline Hiện Tại
┌─────────────────────────────────────────────────────────────────────┐
│ LAND REDISTRIBUTION PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ STAGE 1 │───▶│ STAGE 2 │───▶│ STAGE 3 │ │
│ │ Grid/Voronoi│ │ Subdivision │ │ Infrastructure │ │
│ │ NSGA-II │ │ OR-Tools │ │ MST + K-Means + Drainage│ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.2 Các Module Chính
| Module | File | Thuật toán | Complexity |
|---|---|---|---|
| Grid Optimizer | grid_optimizer.py |
NSGA-II Genetic Algorithm | O(pop × gen × eval) |
| Voronoi Generator | voronoi.py |
Fortune's Algorithm (Shapely) | O(n log n) |
| Subdivision Solver | subdivision_solver.py |
OR-Tools CP-SAT | NP-hard (bounded) |
| Network Planner | network_planner.py |
Kruskal's MST + Redundancy | O(E log V) |
| Transformer Planner | transformer_planner.py |
K-Means Clustering | O(n × k × iterations) |
| Drainage Planner | drainage_planner.py |
Vector Projection | O(n) |
2. Stage 1: Grid Optimization (NSGA-II)
2.1 Phân Tích Hiện Trạng
File: core/optimization/grid_optimizer.py
# Genome hiện tại: 2 genes
individual = [spacing, angle] # spacing: 20-80m, angle: 0-90°
# Fitness: 2 objectives
objectives = (
total_residential_area, # maximize
fragmented_blocks # minimize
)
Bottleneck chính:
generate_grid_candidates(): Tạo grid và tính intersection với O(n²) Shapely operations- Mỗi fitness evaluation yêu cầu duyệt tất cả blocks
2.2 Tối Ưu 1: Mở Rộng Không Gian Tìm Kiếm
Vấn đề
Grid vuông (square) không tối ưu cho khu công nghiệp cần lots chữ nhật.
Giải pháp
# Đề xuất: 5 genes thay vì 2
class EnhancedGridOptimizer:
def _setup_deap(self):
# Gene definitions
self.toolbox.register("attr_spacing_x", random.uniform, 20, 100)
self.toolbox.register("attr_spacing_y", random.uniform, 20, 100)
self.toolbox.register("attr_angle", random.uniform, 0, 90)
self.toolbox.register("attr_offset_x", random.uniform, 0, 50)
self.toolbox.register("attr_offset_y", random.uniform, 0, 50)
self.toolbox.register(
"individual",
tools.initCycle,
creator.Individual,
(
self.toolbox.attr_spacing_x,
self.toolbox.attr_spacing_y,
self.toolbox.attr_angle,
self.toolbox.attr_offset_x,
self.toolbox.attr_offset_y,
),
n=1
)
def generate_grid_candidates(
self,
spacing_x: float,
spacing_y: float,
angle_deg: float,
offset_x: float,
offset_y: float
) -> List[Polygon]:
"""Generate rectangular grid blocks."""
# Create rectangular base block
base_block = Polygon([
(0, 0),
(spacing_x, 0),
(spacing_x, spacing_y),
(0, spacing_y)
])
base_block = translate(base_block, -spacing_x/2 + offset_x, -spacing_y/2 + offset_y)
# ... rotation and placement logic
Metrics
- Flexibility: Hỗ trợ lots 40x80m, 50x100m, etc.
- Trade-off: Tăng search space → cần tăng population hoặc generations
- Implementation effort: Thấp (2-4 giờ)
2.3 Tối Ưu 2: Adaptive Island Model
Vấn đề
Single population dễ bị kẹt ở local optima.
Giải pháp
from deap import tools, algorithms
from concurrent.futures import ProcessPoolExecutor
class IslandGridOptimizer:
def __init__(self, land_polygon, num_islands=4, migration_interval=5):
self.num_islands = num_islands
self.migration_interval = migration_interval
self.islands = [self._create_population() for _ in range(num_islands)]
def optimize(self, population_size=50, generations=100):
for gen in range(generations):
# Evolve each island independently
for island in self.islands:
self._evolve_one_generation(island)
# Migration: exchange best individuals between islands
if gen % self.migration_interval == 0:
self._migrate()
# Return best from all islands
all_individuals = [ind for island in self.islands for ind in island]
return tools.selBest(all_individuals, 1)[0]
def _migrate(self):
"""Ring topology migration."""
migrants = []
for island in self.islands:
best = tools.selBest(island, 2)
migrants.append([toolbox.clone(ind) for ind in best])
for i in range(self.num_islands):
next_island = (i + 1) % self.num_islands
# Replace worst with migrants
worst = tools.selWorst(self.islands[next_island], 2)
for w, m in zip(worst, migrants[i]):
self.islands[next_island].remove(w)
self.islands[next_island].append(m)
Metrics
- Diversity: Giữ được đa dạng di truyền
- Parallel potential: Mỗi island có thể chạy trên core riêng
- Speedup: 2-4x với 4 islands trên 4 cores
- Implementation effort: Trung bình (1-2 ngày)
2.4 Tối Ưu 3: Surrogate-Assisted Optimization
Vấn đề
Fitness evaluation (Shapely intersections) rất chậm.
Giải pháp
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel
import numpy as np
class SurrogateAssistedOptimizer:
def __init__(self, land_polygon):
self.land_poly = land_polygon
self.surrogate = None
self.training_data = {'X': [], 'y': []}
self.real_eval_count = 0
def _build_surrogate(self):
"""Build GP surrogate model from collected samples."""
if len(self.training_data['X']) < 20:
return None
X = np.array(self.training_data['X'])
y = np.array(self.training_data['y'])
kernel = ConstantKernel(1.0) * RBF(length_scale=[10, 10])
gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5)
gp.fit(X, y)
return gp
def _evaluate_with_surrogate(self, individual):
"""Evaluate using surrogate, with uncertainty-based real evaluation."""
if self.surrogate is None:
# No surrogate yet, use real evaluation
return self._real_evaluate(individual)
X = np.array([[individual[0], individual[1]]])
pred, std = self.surrogate.predict(X, return_std=True)
# If uncertainty is high, do real evaluation
if std[0] > 0.1 * abs(pred[0]): # 10% uncertainty threshold
return self._real_evaluate(individual)
return (pred[0], 0) # Use surrogate prediction
def _real_evaluate(self, individual):
"""Expensive real evaluation with Shapely."""
self.real_eval_count += 1
fitness = self._evaluate_layout(individual)
# Store for training
self.training_data['X'].append([individual[0], individual[1]])
self.training_data['y'].append(fitness[0])
# Rebuild surrogate periodically
if len(self.training_data['X']) % 50 == 0:
self.surrogate = self._build_surrogate()
return fitness
Metrics
- Speedup: 5-20x reduction in real evaluations
- Accuracy: ~95% of true optimum
- Trade-off: Cần initial samples để train surrogate
- Implementation effort: Cao (1 tuần)
3. Stage 1 Alternative: Voronoi Road Network
3.1 Phân Tích Hiện Trạng
File: core/geometry/voronoi.py
def generate_voronoi_seeds(site, num_seeds=15, seed=None):
# Random uniform distribution within bounding box
for _ in range(num_seeds):
x = random.uniform(minx, maxx)
y = random.uniform(miny, maxy)
seeds.append(Point(x, y))
Vấn đề: Random seeds → blocks không đều, nhiều fragmented blocks.
3.2 Tối Ưu 1: Centroidal Voronoi Tessellation (CVT)
Giải pháp
def generate_cvt_seeds(
site: Polygon,
num_seeds: int = 15,
iterations: int = 20,
seed: Optional[int] = None
) -> List[Point]:
"""
Generate Centroidal Voronoi Tessellation seeds.
CVT iteratively moves seeds to the centroid of their Voronoi regions,
resulting in more uniform block sizes.
"""
if seed is not None:
random.seed(seed)
minx, miny, maxx, maxy = site.bounds
# Initialize with random seeds
seeds = []
for _ in range(num_seeds):
x = random.uniform(minx, maxx)
y = random.uniform(miny, maxy)
seeds.append(Point(x, y))
# CVT iterations
for iteration in range(iterations):
# Create Voronoi regions
multi_point = MultiPoint(seeds)
regions = voronoi_diagram(multi_point, envelope=site)
# Move seeds to centroids
new_seeds = []
for region in regions.geoms:
if region.geom_type == 'Polygon':
# Clip to site boundary
clipped = region.intersection(site)
if not clipped.is_empty and clipped.geom_type == 'Polygon':
new_seeds.append(clipped.centroid)
else:
# Keep original if clipped is invalid
new_seeds.append(region.centroid)
# Check convergence
if len(new_seeds) == len(seeds):
max_movement = max(
s1.distance(s2) for s1, s2 in zip(seeds, new_seeds)
)
if max_movement < 0.1: # Convergence threshold
break
seeds = new_seeds
return seeds
def generate_weighted_cvt_seeds(
site: Polygon,
density_function: Callable[[float, float], float],
num_seeds: int = 15,
iterations: int = 30
) -> List[Point]:
"""
Weighted CVT with density function.
Args:
density_function: f(x, y) -> weight, higher = smaller blocks
"""
# Use importance sampling for initial seeds
candidates = []
weights = []
minx, miny, maxx, maxy = site.bounds
for _ in range(num_seeds * 100): # Oversample
x = random.uniform(minx, maxx)
y = random.uniform(miny, maxy)
p = Point(x, y)
if site.contains(p):
candidates.append(p)
weights.append(density_function(x, y))
# Weighted sampling
weights = np.array(weights) / sum(weights)
selected_idx = np.random.choice(
len(candidates), size=num_seeds, replace=False, p=weights
)
seeds = [candidates[i] for i in selected_idx]
# Run weighted CVT iterations
for _ in range(iterations):
# ... similar to regular CVT but with weighted centroids
pass
return seeds
Use Case: Industrial Park Zoning
def industrial_density(x, y):
"""
Higher density near main access roads and utilities.
"""
# Assume main road at y=0
dist_to_main_road = abs(y)
# High density (smaller blocks) near road
if dist_to_main_road < 100:
return 2.0
elif dist_to_main_road < 200:
return 1.5
else:
return 1.0
seeds = generate_weighted_cvt_seeds(
site=land_polygon,
density_function=industrial_density,
num_seeds=20
)
Metrics
- Block uniformity: +40-60% more uniform areas
- Fragmentation reduction: -30-50% fragmented blocks
- Time overhead: 20 iterations × O(n log n) = acceptable
- Implementation effort: Trung bình (2-3 ngày)
3.3 Tối Ưu 2: Constrained Voronoi
Vấn đề
Voronoi edges có thể không align với preferred road directions.
Giải pháp
from shapely.geometry import LineString
from shapely.ops import split
class ConstrainedVoronoi:
"""
Voronoi with hard constraints (e.g., main roads must be axis-aligned).
"""
def __init__(self, site: Polygon, main_roads: List[LineString]):
self.site = site
self.main_roads = main_roads # Pre-defined main road axes
def generate(self, num_internal_seeds: int = 10) -> List[Polygon]:
# Step 1: Split site by main roads
regions = [self.site]
for road in self.main_roads:
new_regions = []
for region in regions:
if road.intersects(region):
parts = split(region, road)
new_regions.extend(parts.geoms)
else:
new_regions.append(region)
regions = [r for r in new_regions if r.geom_type == 'Polygon']
# Step 2: Apply Voronoi within each region
all_blocks = []
for region in regions:
seeds_per_region = max(2, int(num_internal_seeds * region.area / self.site.area))
seeds = generate_cvt_seeds(region, seeds_per_region)
voronoi_regions = create_voronoi_diagram(seeds, region)
if voronoi_regions:
for v in voronoi_regions.geoms:
clipped = v.intersection(region)
if clipped.geom_type == 'Polygon' and clipped.area > 100:
all_blocks.append(clipped)
return all_blocks
# Usage
main_roads = [
LineString([(0, 500), (1000, 500)]), # Horizontal main road
LineString([(500, 0), (500, 1000)]), # Vertical main road
]
cv = ConstrainedVoronoi(land_polygon, main_roads)
blocks = cv.generate(num_internal_seeds=15)
Metrics
- Road alignment: Main roads guaranteed straight
- Flexibility: Sub-blocks use organic Voronoi
- Implementation effort: Trung bình (2 ngày)
4. Stage 2: Block Subdivision (OR-Tools)
4.1 Phân Tích Hiện Trạng
File: core/optimization/subdivision_solver.py
# Hiện tại: 1D subdivision (width only)
def solve_subdivision(total_length, min_width, max_width, target_width):
# lot_vars[i] = width of lot i
# Constraint: sum(lot_vars) == total_length
# Objective: minimize deviation from target_width
Hạn chế: Chỉ chia theo 1 chiều, không tối ưu cho lots cần aspect ratio cụ thể.
4.2 Tối Ưu 1: 2D Bin Packing
Giải pháp
from ortools.sat.python import cp_model
from typing import List, Tuple, Dict
class TwoDimensionalSubdivisionSolver:
"""
2D bin packing for lot placement within a block.
Optimizes both lot placement and dimensions.
"""
@staticmethod
def solve_2d_subdivision(
block_width: float,
block_height: float,
min_lot_width: float,
max_lot_width: float,
min_lot_height: float,
max_lot_height: float,
target_aspect_ratio: float = 2.0, # height/width
time_limit: float = 10.0
) -> List[Dict]:
"""
Solve 2D lot placement using constraint programming.
Returns:
List of lots with {x, y, width, height}
"""
model = cp_model.CpModel()
scale = 100 # 1cm precision
# Estimate max lots
min_lot_area = min_lot_width * min_lot_height
max_lots = int((block_width * block_height) / min_lot_area) + 1
max_lots = min(max_lots, 50) # Cap for performance
# Decision variables for each lot
lots = []
for i in range(max_lots):
lot = {
'x': model.NewIntVar(0, int(block_width * scale), f'x_{i}'),
'y': model.NewIntVar(0, int(block_height * scale), f'y_{i}'),
'width': model.NewIntVar(
int(min_lot_width * scale),
int(max_lot_width * scale),
f'w_{i}'
),
'height': model.NewIntVar(
int(min_lot_height * scale),
int(max_lot_height * scale),
f'h_{i}'
),
'used': model.NewBoolVar(f'used_{i}'),
'area': model.NewIntVar(0, int(block_width * block_height * scale * scale), f'area_{i}')
}
lots.append(lot)
# Constraints
for i, lot in enumerate(lots):
# Lot must fit within block
model.Add(lot['x'] + lot['width'] <= int(block_width * scale)).OnlyEnforceIf(lot['used'])
model.Add(lot['y'] + lot['height'] <= int(block_height * scale)).OnlyEnforceIf(lot['used'])
# If not used, dimensions are 0
model.Add(lot['width'] == 0).OnlyEnforceIf(lot['used'].Not())
model.Add(lot['height'] == 0).OnlyEnforceIf(lot['used'].Not())
# Area calculation
model.AddMultiplicationEquality(lot['area'], [lot['width'], lot['height']])
# No overlap constraint (using intervals)
x_intervals = []
y_intervals = []
for i, lot in enumerate(lots):
x_interval = model.NewOptionalIntervalVar(
lot['x'], lot['width'], lot['x'] + lot['width'],
lot['used'], f'x_interval_{i}'
)
y_interval = model.NewOptionalIntervalVar(
lot['y'], lot['height'], lot['y'] + lot['height'],
lot['used'], f'y_interval_{i}'
)
x_intervals.append(x_interval)
y_intervals.append(y_interval)
model.AddNoOverlap2D(x_intervals, y_intervals)
# Objective: Maximize total used area + Aspect ratio penalty
total_area = sum(lot['area'] for lot in lots)
# Aspect ratio deviation penalty
aspect_penalties = []
target_ratio_scaled = int(target_aspect_ratio * 100)
for i, lot in enumerate(lots):
# height/width should be close to target_aspect_ratio
deviation = model.NewIntVar(0, 1000, f'aspect_dev_{i}')
actual_ratio = model.NewIntVar(0, 1000, f'ratio_{i}')
# actual_ratio ≈ (height * 100) / width
model.AddDivisionEquality(
actual_ratio,
lot['height'] * 100,
lot['width']
)
model.AddAbsEquality(deviation, actual_ratio - target_ratio_scaled)
aspect_penalties.append(deviation)
# Multi-objective: maximize area, minimize aspect deviation
model.Maximize(total_area - sum(aspect_penalties) * 100)
# Solve
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit
status = solver.Solve(model)
# Extract solution
result = []
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
for lot in lots:
if solver.Value(lot['used']):
result.append({
'x': solver.Value(lot['x']) / scale,
'y': solver.Value(lot['y']) / scale,
'width': solver.Value(lot['width']) / scale,
'height': solver.Value(lot['height']) / scale,
})
return result
Metrics
- Space utilization: +15-25% more efficient packing
- Aspect ratio control: Lots match industrial requirements
- Time complexity: Higher, nhưng bounded by time_limit
- Implementation effort: Cao (1 tuần)
4.3 Tối Ưu 2: Hierarchical Decomposition
Giải pháp
from concurrent.futures import ProcessPoolExecutor
from shapely.geometry import box
class HierarchicalSubdivisionSolver:
"""
Divide large blocks into quadrants, solve each in parallel.
"""
@staticmethod
def subdivide_hierarchically(
block: Polygon,
max_block_size: float = 10000, # m²
**solver_kwargs
) -> List[Dict]:
"""
Recursively subdivide large blocks before optimization.
"""
if block.area <= max_block_size:
# Small enough, solve directly
return SubdivisionSolver.subdivide_block(block, **solver_kwargs)
# Split into quadrants
minx, miny, maxx, maxy = block.bounds
midx, midy = (minx + maxx) / 2, (miny + maxy) / 2
quadrants = [
box(minx, miny, midx, midy),
box(midx, miny, maxx, midy),
box(minx, midy, midx, maxy),
box(midx, midy, maxx, maxy),
]
sub_blocks = []
for quad in quadrants:
intersection = block.intersection(quad)
if not intersection.is_empty and intersection.area > 100:
sub_blocks.append(intersection)
# Solve in parallel
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(
lambda b: HierarchicalSubdivisionSolver.subdivide_hierarchically(
b, max_block_size, **solver_kwargs
),
sub_blocks
))
# Merge results
all_lots = []
for result in results:
if 'lots' in result:
all_lots.extend(result['lots'])
return {'geometry': block, 'type': 'residential', 'lots': all_lots}
Metrics
- Parallel speedup: 2-4x with 4 cores
- Scalability: Handles very large blocks
- Implementation effort: Trung bình (2-3 ngày)
5. Stage 3: Infrastructure Planning
5.1 Network Planner - Steiner Tree
File: core/infrastructure/network_planner.py
Vấn đề Hiện Tại
MST + 15% redundancy là heuristic, không optimal.
Giải pháp: Steiner Tree
import networkx as nx
from networkx.algorithms.approximation import steiner_tree
from scipy.spatial import Delaunay
def generate_steiner_network(
lots: List[Polygon],
steiner_candidates: int = 20,
max_distance: float = 500.0,
redundancy_ratio: float = 0.15
) -> Tuple[List[List[float]], List[LineString]]:
"""
Generate network using Steiner Tree approximation.
Steiner Tree allows intermediate nodes (not at lot centroids)
which can reduce total cable length by 15-20%.
"""
if len(lots) < 2:
return [], []
# Get lot centroids (terminal nodes)
centroids = [lot.centroid for lot in lots]
terminal_coords = [(p.x, p.y) for p in centroids]
# Generate candidate Steiner points
# Use centroid of triangles from Delaunay triangulation
all_coords = np.array(terminal_coords)
if len(all_coords) >= 3:
tri = Delaunay(all_coords)
steiner_candidates_points = []
for simplex in tri.simplices:
triangle_points = all_coords[simplex]
centroid = triangle_points.mean(axis=0)
steiner_candidates_points.append(tuple(centroid))
else:
steiner_candidates_points = []
# Build graph with terminals and Steiner candidates
all_points = terminal_coords + steiner_candidates_points
G = nx.Graph()
for i, p in enumerate(all_points):
G.add_node(i, pos=p, terminal=(i < len(terminal_coords)))
# Add edges between nearby points
for i in range(len(all_points)):
for j in range(i + 1, len(all_points)):
dist = np.sqrt(
(all_points[i][0] - all_points[j][0])**2 +
(all_points[i][1] - all_points[j][1])**2
)
if dist < max_distance:
G.add_edge(i, j, weight=dist)
# Find Steiner tree connecting all terminals
terminal_nodes = list(range(len(terminal_coords)))
try:
st = steiner_tree(G, terminal_nodes, weight='weight')
except Exception:
# Fallback to MST
st = nx.minimum_spanning_tree(G)
# Add redundancy
loop_graph = st.copy()
all_edges = sorted(G.edges(data=True), key=lambda x: x[2]['weight'])
target_extra = int(len(terminal_coords) * redundancy_ratio)
added = 0
for u, v, data in all_edges:
if not loop_graph.has_edge(u, v):
loop_graph.add_edge(u, v, **data)
added += 1
if added >= target_extra:
break
# Convert to LineStrings
connections = []
for u, v in loop_graph.edges():
p1 = Point(all_points[u])
p2 = Point(all_points[v])
connections.append(LineString([p1, p2]))
return [list(p) for p in all_points], connections
Metrics
- Cable length reduction: 15-20%
- Cost savings: Significant for large projects
- Implementation effort: Thấp (1 ngày)
5.2 Transformer Placement - Multi-Objective
Giải pháp
from scipy.optimize import minimize
from sklearn.cluster import KMeans
import numpy as np
def generate_transformers_multiobjective(
lots: List[Polygon],
power_per_lot: float = 100.0, # kW
transformer_capacity: float = 1000.0, # kVA
cable_cost_per_meter: float = 50.0, # USD
transformer_cost: float = 50000.0 # USD
) -> Tuple[List[Tuple[float, float]], Dict]:
"""
Multi-objective transformer placement:
1. Minimize total cable length
2. Balance load across transformers
3. Minimize total cost (transformers + cables)
"""
if not lots:
return [], {}
lot_coords = np.array([[lot.centroid.x, lot.centroid.y] for lot in lots])
lot_powers = np.array([power_per_lot] * len(lots))
# Step 1: Find optimal number of transformers
total_power = sum(lot_powers)
min_transformers = int(np.ceil(total_power / transformer_capacity))
max_transformers = min(len(lots), min_transformers * 2)
best_cost = float('inf')
best_solution = None
best_k = min_transformers
for k in range(min_transformers, max_transformers + 1):
# K-Means clustering
kmeans = KMeans(n_clusters=k, n_init=10, random_state=42)
labels = kmeans.fit_predict(lot_coords)
centers = kmeans.cluster_centers_
# Calculate total cable length
total_cable = 0
load_imbalance = 0
cluster_loads = [0] * k
for i, label in enumerate(labels):
dist = np.sqrt(
(lot_coords[i][0] - centers[label][0])**2 +
(lot_coords[i][1] - centers[label][1])**2
)
total_cable += dist
cluster_loads[label] += lot_powers[i]
# Check capacity constraints
capacity_ok = all(load <= transformer_capacity for load in cluster_loads)
if not capacity_ok:
continue
# Load imbalance (variance)
load_imbalance = np.var(cluster_loads)
# Total cost
cable_cost = total_cable * cable_cost_per_meter
total_cost = k * transformer_cost + cable_cost
# Penalize imbalance
total_cost += load_imbalance * 0.1
if total_cost < best_cost:
best_cost = total_cost
best_k = k
best_solution = {
'centers': [tuple(c) for c in centers],
'labels': labels.tolist(),
'cluster_loads': cluster_loads,
'total_cable': total_cable,
'total_cost': total_cost,
}
return best_solution['centers'], best_solution
5.3 Drainage - Shortest Path on Road Network
Giải pháp
import networkx as nx
from shapely.geometry import LineString
def calculate_drainage_on_network(
lots: List[Polygon],
road_network: nx.Graph,
wwtp_node: int,
arrow_length: float = 20.0
) -> List[Dict]:
"""
Calculate drainage paths following the road network.
More realistic than direct vectors - pipes follow roads.
"""
arrows = []
if not road_network.nodes():
return arrows
# Precompute shortest paths from all nodes to WWTP
try:
lengths, paths = nx.single_source_dijkstra(
road_network, wwtp_node, weight='length'
)
except nx.NetworkXError:
return arrows
for lot in lots:
c = lot.centroid
# Find nearest road network node
nearest_node = None
min_dist = float('inf')
for node, data in road_network.nodes(data=True):
pos = data.get('pos', (node, node))
dist = c.distance(Point(pos))
if dist < min_dist:
min_dist = dist
nearest_node = node
if nearest_node is None or nearest_node not in paths:
continue
# Get path to WWTP
path = paths[nearest_node]
if len(path) < 2:
continue
# Arrow direction: first segment of path
first_node = path[0]
second_node = path[1]
pos1 = road_network.nodes[first_node].get('pos', (first_node, first_node))
pos2 = road_network.nodes[second_node].get('pos', (second_node, second_node))
dx = pos2[0] - pos1[0]
dy = pos2[1] - pos1[1]
length = np.sqrt(dx*dx + dy*dy)
if length > 0:
arrows.append({
'start': (c.x, c.y),
'vector': (dx/length * arrow_length, dy/length * arrow_length),
'path_length': lengths.get(nearest_node, 0)
})
return arrows
6. Cross-Stage Optimizations
6.1 Spatial Indexing với STRtree
Vấn đề: O(n²) intersection checks trong nhiều modules.
Giải pháp
from shapely.strtree import STRtree
from shapely.geometry import Polygon
class SpatiallyIndexedOperations:
"""
Use R-Tree spatial index for efficient geometric queries.
"""
def __init__(self, geometries: List[Polygon]):
self.geometries = geometries
self.tree = STRtree(geometries)
def query_intersects(self, query_geom: Polygon) -> List[Polygon]:
"""O(log n) instead of O(n) for intersection queries."""
return self.tree.query(query_geom)
def query_within_distance(
self,
query_geom: Polygon,
distance: float
) -> List[Polygon]:
"""Find geometries within distance."""
buffered = query_geom.buffer(distance)
return self.tree.query(buffered)
# Integration into GridOptimizer
class OptimizedGridOptimizer(GridOptimizer):
def _evaluate_layout(self, individual):
spacing, angle = individual
blocks = self.generate_grid_candidates(spacing, angle)
# Use spatial index for intersection checks
tree = STRtree(blocks)
candidates = tree.query(self.land_poly)
total_residential_area = 0.0
fragmented_blocks = 0
original_area = spacing * spacing
for blk in candidates:
# Only evaluate blocks that actually intersect
if not blk.intersects(self.land_poly):
continue
intersection = blk.intersection(self.land_poly)
# ... rest of evaluation
Metrics
- Speedup: 10-100x for large grids
- Memory: Slightly higher (index storage)
- Implementation effort: Thấp (vài giờ)
6.2 Geometry Simplification
def preprocess_geometry(polygon: Polygon, tolerance: float = 0.5) -> Polygon:
"""
Simplify polygon before heavy operations.
tolerance=0.5 means 50cm maximum deviation - acceptable for planning.
"""
# Remove redundant vertices
simplified = polygon.simplify(tolerance, preserve_topology=True)
# Fix any topology issues
if not simplified.is_valid:
simplified = simplified.buffer(0)
return simplified
# Apply before optimization
class PreprocessedPipeline(LandRedistributionPipeline):
def __init__(self, land_polygons, config, settings=None):
# Simplify input geometry
simplified_polygons = [
preprocess_geometry(p, tolerance=0.5)
for p in land_polygons
]
super().__init__(simplified_polygons, config, settings)
6.3 Caching Layer
import hashlib
from functools import lru_cache
from typing import Tuple
class CachedGridOptimizer(GridOptimizer):
"""
Cache fitness evaluations for identical or similar parameters.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.eval_cache = {}
def _discretize_params(self, spacing: float, angle: float) -> Tuple[int, int]:
"""Round to create discrete cache keys."""
return (round(spacing, 1), round(angle, 1))
def _evaluate_layout(self, individual):
spacing, angle = individual
cache_key = self._discretize_params(spacing, angle)
if cache_key in self.eval_cache:
return self.eval_cache[cache_key]
# Compute and cache
result = super()._evaluate_layout(individual)
self.eval_cache[cache_key] = result
return result
7. Algorithm-Level Parallelization
7.1 Parallel Fitness Evaluation
from concurrent.futures import ProcessPoolExecutor, as_completed
from deap import base, creator, tools
import multiprocessing
class ParallelGridOptimizer(GridOptimizer):
def __init__(self, *args, num_workers=None, **kwargs):
super().__init__(*args, **kwargs)
self.num_workers = num_workers or multiprocessing.cpu_count()
def optimize(self, population_size=50, generations=100):
pop = self.toolbox.population(n=population_size)
# Use ProcessPoolExecutor for parallel evaluation
with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
for gen in range(generations):
# Parallel fitness evaluation
futures = {
executor.submit(self._evaluate_layout, ind): ind
for ind in pop
}
for future in as_completed(futures):
ind = futures[future]
ind.fitness.values = future.result()
# Selection and variation (sequential)
offspring = algorithms.varAnd(
pop, self.toolbox,
cxpb=0.7, mutpb=0.2
)
# Parallel evaluation of offspring
futures = {
executor.submit(self._evaluate_layout, ind): ind
for ind in offspring
}
for future in as_completed(futures):
ind = futures[future]
ind.fitness.values = future.result()
pop = self.toolbox.select(pop + offspring, k=population_size)
return tools.selBest(pop, 1)[0]
7.2 DEAP Built-in Parallelization
from deap import base
from scoop import futures
# For distributed computing across machines
toolbox.register("map", futures.map)
# Usage:
# python -m scoop -n 8 optimize.py
8. Priority Roadmap
8.1 Impact vs Effort Matrix
IMPACT
Low High
┌────────────────┬────────────────┐
Low │ • Caching │ • STRtree │
│ │ • Parallel Eval│
EFFORT │ │ • Steiner Tree │
├────────────────┼────────────────┤
High │ • Column Gen │ • Surrogate │
│ │ • 2D Packing │
│ │ • CVT + Constr │
└────────────────┴────────────────┘
8.2 Recommended Implementation Order
Phase 1: Quick Wins (1-2 weeks)
| # | Optimization | Est. Time | Expected Speedup |
|---|---|---|---|
| 1 | STRtree Spatial Indexing | 4h | 10-50x for intersection |
| 2 | Geometry Simplification | 2h | 2-3x for boolean ops |
| 3 | Evaluation Caching | 4h | 1.5-2x |
| 4 | Parallel Fitness Eval | 1d | 2-4x (CPU cores) |
Phase 2: Medium-term (2-4 weeks)
| # | Optimization | Est. Time | Expected Improvement |
|---|---|---|---|
| 5 | Steiner Tree Network | 2d | 15-20% shorter cables |
| 6 | CVT Voronoi Seeds | 3d | 30-50% less fragmentation |
| 7 | Multi-obj Transformer | 2d | Better load balance + cost |
| 8 | Island Model GA | 3d | Better global optimum |
Phase 3: Advanced (1-2 months)
| # | Optimization | Est. Time | Expected Improvement |
|---|---|---|---|
| 9 | Extended Genome (5D) | 1w | Rectangular lots support |
| 10 | Surrogate-Assisted | 2w | 5-20x fewer real evals |
| 11 | 2D Bin Packing | 2w | 15-25% better space util |
| 12 | Constrained Voronoi | 1w | Aligned main roads |
Appendix A: Benchmark Template
import time
from typing import Callable, Dict, Any
def benchmark_optimization(
optimizer_class: type,
land_polygon: Polygon,
configs: List[Dict[str, Any]],
num_runs: int = 5
) -> Dict:
"""
Standard benchmark for comparing optimizers.
"""
results = []
for config in configs:
run_times = []
fitness_values = []
for run in range(num_runs):
optimizer = optimizer_class(land_polygon, **config)
start = time.perf_counter()
solution, history = optimizer.optimize()
elapsed = time.perf_counter() - start
run_times.append(elapsed)
fitness_values.append(solution.fitness.values)
results.append({
'config': config,
'avg_time': sum(run_times) / num_runs,
'std_time': np.std(run_times),
'avg_fitness': np.mean(fitness_values, axis=0),
'best_fitness': max(fitness_values),
})
return results
Appendix B: References
- NSGA-II: Deb et al., "A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II" (2002)
- Centroidal Voronoi Tessellation: Du, Faber, Gunzburger (1999)
- Steiner Tree: Hwang, Richards, Winter, "The Steiner Tree Problem" (1992)
- OR-Tools: Google Operations Research Tools documentation
- Surrogate-Assisted Optimization: Jin, "Surrogate-assisted evolutionary computation" (2011)
- Shapely STRtree: Guttman, "R-trees: A Dynamic Index Structure for Spatial Searching" (1984)
Document Version: 1.0
Created: 2025-12-08
Author: AI Analysis System
Status: Draft - Pending Review