Cuong2004 commited on
Commit
5f6bdbf
·
1 Parent(s): 2eb0f5c

restructure algo folder

Browse files
algorithms/backend/algorithm.py DELETED
@@ -1,780 +0,0 @@
1
- """Core land redistribution algorithm implementation.
2
-
3
- This module contains the exact algorithm logic from algo.ipynb for:
4
- - Stage 1: Grid optimization using NSGA-II genetic algorithm
5
- - Stage 2: Block subdivision using OR-Tools
6
- - Stage 3: Infrastructure planning
7
- """
8
-
9
- import random
10
- import numpy as np
11
- from typing import List, Tuple, Dict, Any
12
- from shapely.geometry import Polygon, Point, LineString, MultiPolygon, MultiPoint, mapping
13
- from shapely.affinity import translate, rotate
14
- from deap import base, creator, tools, algorithms
15
- from ortools.sat.python import cp_model
16
- import networkx as nx
17
- from scipy.spatial.distance import pdist, squareform
18
- from scipy.sparse.csgraph import minimum_spanning_tree
19
- from sklearn.cluster import KMeans
20
- from shapely.ops import unary_union, voronoi_diagram
21
-
22
-
23
- class GridOptimizer:
24
- """Stage 1: Optimize grid layout using NSGA-II genetic algorithm."""
25
-
26
- def __init__(self, land_polygon: Polygon, lake_polygon: Polygon = None):
27
- """
28
- Initialize grid optimizer.
29
-
30
- Args:
31
- land_polygon: Main land boundary
32
- lake_polygon: Water body to exclude (optional)
33
- """
34
- self.land_poly = land_polygon
35
- self.lake_poly = lake_polygon or Polygon()
36
-
37
- # Setup DEAP genetic algorithm
38
- self._setup_deap()
39
-
40
- def _setup_deap(self):
41
- """Configure DEAP toolbox for multi-objective optimization."""
42
- # Create fitness and individual classes
43
- if not hasattr(creator, "FitnessMulti"):
44
- creator.create("FitnessMulti", base.Fitness, weights=(1.0, -1.0)) # Max area, Min fragments
45
- if not hasattr(creator, "Individual"):
46
- creator.create("Individual", list, fitness=creator.FitnessMulti)
47
-
48
- self.toolbox = base.Toolbox()
49
- # Gene 1: Block spacing (20m - 40m)
50
- self.toolbox.register("attr_spacing", random.uniform, 20, 40)
51
- # Gene 2: Rotation angle (0 - 90 degrees)
52
- self.toolbox.register("attr_angle", random.uniform, 0, 90)
53
-
54
- self.toolbox.register("individual", tools.initCycle, creator.Individual,
55
- (self.toolbox.attr_spacing, self.toolbox.attr_angle), n=1)
56
- self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
57
-
58
- self.toolbox.register("evaluate", self._evaluate_layout)
59
- self.toolbox.register("mate", tools.cxSimulatedBinaryBounded, low=[20, 0], up=[40, 90], eta=20.0)
60
- self.toolbox.register("mutate", tools.mutPolynomialBounded, low=[20, 0], up=[40, 90], eta=20.0, indpb=0.2)
61
- self.toolbox.register("select", tools.selNSGA2)
62
-
63
- def generate_grid_candidates(self, spacing: float, angle_deg: float) -> List[Polygon]:
64
- """
65
- Generate grid blocks at given spacing and rotation.
66
-
67
- Args:
68
- spacing: Grid spacing in meters
69
- angle_deg: Rotation angle in degrees
70
-
71
- Returns:
72
- List of block polygons
73
- """
74
- minx, miny, maxx, maxy = self.land_poly.bounds
75
- diameter = ((maxx - minx)**2 + (maxy - miny)**2)**0.5
76
- center = self.land_poly.centroid
77
-
78
- # Create grid ranges
79
- x_range = np.arange(minx - diameter, maxx + diameter, spacing)
80
- y_range = np.arange(miny - diameter, maxy + diameter, spacing)
81
-
82
- blocks = []
83
-
84
- # Create base block at origin
85
- base_block = Polygon([(0, 0), (spacing, 0), (spacing, spacing), (0, spacing)])
86
- base_block = translate(base_block, -spacing/2, -spacing/2)
87
-
88
- for x in x_range:
89
- for y in y_range:
90
- # Translate and rotate block
91
- poly = translate(base_block, x, y)
92
- poly = rotate(poly, angle_deg, origin=center)
93
-
94
- # Only keep blocks that intersect land
95
- if poly.intersects(self.land_poly):
96
- blocks.append(poly)
97
-
98
- return blocks
99
-
100
- def _evaluate_layout(self, individual: List[float]) -> Tuple[float, int]:
101
- """
102
- Evaluate layout fitness.
103
-
104
- Objectives:
105
- 1. Maximize residential area
106
- 2. Minimize fragmented blocks
107
-
108
- Args:
109
- individual: [spacing, angle]
110
-
111
- Returns:
112
- (total_residential_area, fragmented_blocks)
113
- """
114
- spacing, angle = individual
115
- blocks = self.generate_grid_candidates(spacing, angle)
116
-
117
- total_residential_area = 0
118
- fragmented_blocks = 0
119
-
120
- for blk in blocks:
121
- # Cut with land boundary
122
- intersection = blk.intersection(self.land_poly)
123
- if intersection.is_empty:
124
- continue
125
-
126
- # Subtract lake
127
- usable_part = intersection.difference(self.lake_poly)
128
- if usable_part.is_empty:
129
- continue
130
-
131
- # Calculate area ratio
132
- original_area = spacing * spacing
133
- actual_area = usable_part.area
134
- ratio = actual_area / original_area
135
-
136
- # Classify block quality
137
- if ratio > 0.65:
138
- # Good block for residential
139
- total_residential_area += actual_area
140
- elif ratio > 0.1:
141
- # Fragmented block (penalize)
142
- fragmented_blocks += 1
143
-
144
- return (total_residential_area, fragmented_blocks)
145
-
146
- def optimize(self, population_size: int = 30, generations: int = 15) -> Tuple[List[float], List[List[float]]]:
147
- """
148
- Run NSGA-II optimization.
149
-
150
- Args:
151
- population_size: Population size
152
- generations: Number of generations
153
-
154
- Returns:
155
- (best_solution, history)
156
- """
157
- random.seed(42)
158
- pop = self.toolbox.population(n=population_size)
159
-
160
- history = []
161
-
162
- # Initial evaluation
163
- fits = list(map(self.toolbox.evaluate, pop))
164
- for ind, fit in zip(pop, fits):
165
- ind.fitness.values = fit
166
-
167
- # Save best from generation 0
168
- best_ind = tools.selBest(pop, 1)[0]
169
- history.append(list(best_ind))
170
-
171
- # Evolution
172
- for gen in range(generations):
173
- offspring = algorithms.varAnd(pop, self.toolbox, cxpb=0.7, mutpb=0.3)
174
- fits = list(map(self.toolbox.evaluate, offspring))
175
- for ind, fit in zip(offspring, fits):
176
- ind.fitness.values = fit
177
- pop = self.toolbox.select(pop + offspring, k=len(pop))
178
-
179
- # Save best from each generation
180
- best_ind = tools.selBest(pop, 1)[0]
181
- history.append(list(best_ind))
182
-
183
- final_best = tools.selBest(pop, 1)[0]
184
- return list(final_best), history
185
-
186
-
187
- class SubdivisionSolver:
188
- """Stage 2: Optimize block subdivision using OR-Tools."""
189
-
190
- @staticmethod
191
- def solve_subdivision(total_length: float, min_width: float, max_width: float,
192
- target_width: float, time_limit: float = 5.0) -> List[float]:
193
- """
194
- Solve optimal lot widths using constraint programming.
195
-
196
- Args:
197
- total_length: Total length to subdivide
198
- min_width: Minimum lot width
199
- max_width: Maximum lot width
200
- target_width: Target lot width
201
- time_limit: Solver time limit in seconds
202
-
203
- Returns:
204
- List of lot widths
205
- """
206
- # Safety check: prevent division by zero
207
- if total_length <= 0 or min_width <= 0 or total_length < min_width:
208
- return []
209
-
210
- model = cp_model.CpModel()
211
-
212
- # Estimate number of lots
213
- max_lots = int(total_length / min_width) + 1
214
-
215
- # Decision variables: lot widths (scaled to integers)
216
- scale = 100 # 1cm precision
217
- lot_vars = [model.NewIntVar(int(min_width * scale), int(max_width * scale), f'lot_{i}')
218
- for i in range(max_lots)]
219
-
220
- # Used lot indicators
221
- used = [model.NewBoolVar(f'used_{i}') for i in range(max_lots)]
222
-
223
- # Constraints
224
- # Sum of widths must equal total length
225
- model.Add(sum(lot_vars[i] for i in range(max_lots)) == int(total_length * scale))
226
-
227
- # Lot ordering (if used, previous must be used)
228
- for i in range(1, max_lots):
229
- model.Add(used[i] <= used[i-1])
230
-
231
- # Connect lot values to usage
232
- for i in range(max_lots):
233
- model.Add(lot_vars[i] >= int(min_width * scale)).OnlyEnforceIf(used[i])
234
- model.Add(lot_vars[i] == 0).OnlyEnforceIf(used[i].Not())
235
-
236
- # Minimize deviation from target
237
- deviations = [model.NewIntVar(0, int((max_width - min_width) * scale), f'dev_{i}')
238
- for i in range(max_lots)]
239
-
240
- target_scaled = int(target_width * scale)
241
- for i in range(max_lots):
242
- model.AddAbsEquality(deviations[i], lot_vars[i] - target_scaled)
243
-
244
- model.Minimize(sum(deviations))
245
-
246
- # Solve
247
- solver = cp_model.CpSolver()
248
- solver.parameters.max_time_in_seconds = time_limit
249
- status = solver.Solve(model)
250
-
251
- # Extract solution
252
- if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
253
- widths = []
254
- for i in range(max_lots):
255
- if solver.Value(used[i]):
256
- widths.append(solver.Value(lot_vars[i]) / scale)
257
- return widths
258
- else:
259
- # Fallback: uniform division
260
- num_lots = int(total_length / target_width)
261
- return [total_length / num_lots] * num_lots
262
-
263
- @staticmethod
264
- def subdivide_block(block_geom: Polygon, spacing: float, min_width: float,
265
- max_width: float, target_width: float, time_limit: float = 5.0) -> Dict[str, Any]:
266
- """
267
- Subdivide a block into lots.
268
-
269
- Args:
270
- block_geom: Block geometry
271
- spacing: Grid spacing
272
- min_width: Minimum lot width
273
- max_width: Maximum lot width
274
- target_width: Target lot width
275
- setback: Setback distance in meters (default 6.0)
276
-
277
- Returns:
278
- Dictionary with subdivision info
279
- """
280
- # Determine block quality
281
- original_area = spacing * spacing
282
- current_area = block_geom.area
283
- ratio = current_area / original_area
284
-
285
- result = {
286
- 'geometry': block_geom,
287
- 'type': 'unknown',
288
- 'lots': []
289
- }
290
-
291
- # Fragmented blocks become parks
292
- if ratio < 0.6:
293
- result['type'] = 'park'
294
- return result
295
-
296
- # Good blocks become residential
297
- result['type'] = 'residential'
298
-
299
- # Solve subdivision
300
- minx, miny, maxx, maxy = block_geom.bounds
301
- total_width = maxx - minx
302
-
303
- lot_widths = SubdivisionSolver.solve_subdivision(
304
- total_width, min_width, max_width, target_width, time_limit
305
- )
306
-
307
- # Create lot geometries
308
- current_x = minx
309
- setback_dist = 6.0 # Default setback
310
-
311
- for width in lot_widths:
312
- lot_poly = Polygon([
313
- (current_x, miny),
314
- (current_x + width, miny),
315
- (current_x + width, maxy),
316
- (current_x, maxy)
317
- ])
318
- # Clip to block
319
- clipped = lot_poly.intersection(block_geom)
320
- if not clipped.is_empty:
321
- # Calculate setback (buildable area)
322
- buildable = clipped.buffer(-setback_dist)
323
- if buildable.is_empty:
324
- buildable = None
325
-
326
- result['lots'].append({
327
- 'geometry': clipped,
328
- 'width': width,
329
- 'buildable': buildable
330
- })
331
- current_x += width
332
-
333
- return result
334
-
335
-
336
- class InfrastructurePlanner:
337
- """Stage 3: Plan infrastructure network."""
338
-
339
- @staticmethod
340
- def get_elevation(x: float, y: float) -> float:
341
- """Simulate elevation (sloping from NW to SE)."""
342
- return 50.0 - (x * 0.02) - (y * 0.03)
343
-
344
- def generate_network(lots: List[Polygon]) -> Tuple[List[Tuple[float, float]], List[LineString]]:
345
- """
346
- Generate Loop Network for electrical infrastructure (MST + 15% redundancy).
347
- Matches notebook's create_loop_network function.
348
-
349
- Args:
350
- lots: List of lot polygons
351
-
352
- Returns:
353
- (points, connection_lines)
354
- """
355
- if len(lots) < 2:
356
- return [], []
357
-
358
- centroids = [lot.centroid for lot in lots]
359
- points = np.array([(p.x, p.y) for p in centroids])
360
-
361
- # 1. Create full graph with all nearby connections
362
- G = nx.Graph()
363
- for i, p in enumerate(centroids):
364
- G.add_node(i, pos=(p.x, p.y))
365
-
366
- # Add edges for all pairs within 500m
367
- for i in range(len(centroids)):
368
- for j in range(i+1, len(centroids)):
369
- dist = centroids[i].distance(centroids[j])
370
- if dist < 500:
371
- G.add_edge(i, j, weight=dist)
372
-
373
- # 2. Create MST (Minimum Spanning Tree)
374
- if not nx.is_connected(G):
375
- # Handle disconnected graph - use largest component
376
- components = list(nx.connected_components(G))
377
- largest_comp = max(components, key=len)
378
- subgraph = G.subgraph(largest_comp).copy()
379
- mst = nx.minimum_spanning_tree(subgraph)
380
- else:
381
- mst = nx.minimum_spanning_tree(G)
382
-
383
- # 3. CREATE LOOP: Add back 15% of edges for redundancy (safety)
384
- all_edges = sorted(G.edges(data=True), key=lambda x: x[2]['weight'])
385
- loop_graph = mst.copy()
386
-
387
- added_count = 0
388
- target_extra = int(len(lots) * 0.15) # 15% extra edges
389
-
390
- for u, v, data in all_edges:
391
- if not loop_graph.has_edge(u, v):
392
- loop_graph.add_edge(u, v, **data)
393
- added_count += 1
394
- if added_count >= target_extra:
395
- break
396
-
397
- # Convert NetworkX graph to LineString list
398
- connections = []
399
- for u, v in loop_graph.edges():
400
- connections.append(LineString([centroids[u], centroids[v]]))
401
-
402
- return points.tolist(), connections
403
-
404
- @staticmethod
405
- def generate_transformers(lots: List[Polygon], radius: float = 300.0) -> List[Tuple[float, float]]:
406
- """
407
- Cluster lots to place transformers using K-Means.
408
- """
409
- if not lots:
410
- return []
411
-
412
- lot_coords = np.array([(lot.centroid.x, lot.centroid.y) for lot in lots])
413
-
414
- # Estimate number of transformers (approx 1 per 15 lots)
415
- num_transformers = max(1, int(len(lots) / 15))
416
-
417
- if len(lots) < num_transformers:
418
- num_transformers = len(lots)
419
-
420
- kmeans = KMeans(n_clusters=num_transformers, n_init=10).fit(lot_coords)
421
- return kmeans.cluster_centers_.tolist()
422
-
423
- @staticmethod
424
- def calculate_drainage(lots: List[Polygon], wwtp_centroid: Point) -> List[Dict[str, Any]]:
425
- """
426
- Calculate drainage flow direction towards Wastewater Treatment Plant (XLNT).
427
- """
428
- arrows = []
429
- if not wwtp_centroid:
430
- return arrows
431
-
432
- for lot in lots:
433
- c = lot.centroid
434
- dx = wwtp_centroid.x - c.x
435
- dy = wwtp_centroid.y - c.y
436
- length = (dx**2 + dy**2)**0.5
437
-
438
- if length > 0:
439
- # Normalize vector to 30m arrow length
440
- arrows.append({
441
- 'start': (c.x, c.y),
442
- 'vector': (dx/length * 30, dy/length * 30)
443
- })
444
- return arrows
445
-
446
- class LandRedistributionPipeline:
447
- """Main pipeline orchestrating all optimization stages."""
448
-
449
- def __init__(self, land_polygons: List[Polygon], config: Dict[str, Any]):
450
- """
451
- Initialize pipeline.
452
-
453
- Args:
454
- land_polygons: Input land plots
455
- config: Algorithm configuration
456
- """
457
- # Merge all input polygons
458
- from shapely.ops import unary_union
459
- self.land_poly = unary_union(land_polygons)
460
- self.config = config
461
- self.lake_poly = Polygon() # No lake by default
462
-
463
- def generate_road_network(self, num_seeds: int = 15) -> Tuple[Polygon, List[Polygon], List[Polygon]]:
464
- """
465
- Generate road network using Voronoi diagram (matches notebook's generate_road_network).
466
-
467
- Args:
468
- num_seeds: Number of Voronoi seed points
469
-
470
- Returns:
471
- (road_network, service_blocks, commercial_blocks)
472
- """
473
- # Constants from notebook
474
- ROAD_MAIN_WIDTH = 25.0 # Main road width (m)
475
- ROAD_INTERNAL_WIDTH = 15.0 # Internal road width (m)
476
- SIDEWALK_WIDTH = 4.0 # Sidewalk width each side (m)
477
- TURNING_RADIUS = 15.0 # Turning radius for intersections (m)
478
- SERVICE_AREA_RATIO = 0.10 # 10% for service areas
479
- MIN_BLOCK_AREA = 5000 # Minimum block area (m2)
480
-
481
- site = self.land_poly
482
- minx, miny, maxx, maxy = site.bounds
483
-
484
- # 1. Generate random Voronoi seeds
485
- seeds = []
486
- for _ in range(num_seeds):
487
- seeds.append(Point(random.uniform(minx, maxx), random.uniform(miny, maxy)))
488
-
489
- # 2. Create Voronoi diagram
490
- try:
491
- regions = voronoi_diagram(MultiPoint(seeds), envelope=site)
492
- except:
493
- # Fallback if Voronoi fails
494
- return Polygon(), [], [site]
495
-
496
- # 3. Extract edges from Voronoi regions
497
- edges = []
498
- if hasattr(regions, 'geoms'):
499
- for region in regions.geoms:
500
- if region.geom_type == 'Polygon':
501
- edges.append(region.exterior)
502
- elif regions.geom_type == 'Polygon':
503
- edges.append(regions.exterior)
504
-
505
- # 4. Classify roads and create buffers
506
- center = site.centroid
507
- road_polys = []
508
-
509
- all_lines = []
510
- for geom in edges:
511
- all_lines.append(geom)
512
-
513
- merged_lines = unary_union(all_lines)
514
-
515
- # Normalize to list of LineStrings
516
- lines_to_process = []
517
- if hasattr(merged_lines, 'geoms'):
518
- lines_to_process = list(merged_lines.geoms)
519
- else:
520
- lines_to_process = [merged_lines]
521
-
522
- for line in lines_to_process:
523
- if line.geom_type != 'LineString':
524
- continue
525
-
526
- # Heuristic: roads near center or very long = main roads
527
- dist_to_center = line.distance(center)
528
- if dist_to_center < 100 or line.length > 400:
529
- # Main road: wider + sidewalks
530
- width = ROAD_MAIN_WIDTH + 2 * SIDEWALK_WIDTH
531
- road_polys.append(line.buffer(width / 2, cap_style=2, join_style=2))
532
- else:
533
- # Internal road: narrower
534
- width = ROAD_INTERNAL_WIDTH + 2 * SIDEWALK_WIDTH
535
- road_polys.append(line.buffer(width / 2, cap_style=2, join_style=2))
536
-
537
- if not road_polys:
538
- # No roads generated - fallback
539
- return Polygon(), [], [site]
540
-
541
- network_poly = unary_union(road_polys)
542
-
543
- # 5. Apply turning radius smoothing (vạt góc)
544
- smooth_network = network_poly.buffer(TURNING_RADIUS, join_style=1).buffer(-TURNING_RADIUS, join_style=1)
545
-
546
- # 6. Extract blocks (land minus roads)
547
- blocks_rough = site.difference(smooth_network)
548
-
549
- service_blocks = []
550
- commercial_blocks = []
551
-
552
- # Normalize blocks list
553
- candidates = []
554
- if hasattr(blocks_rough, 'geoms'):
555
- candidates = list(blocks_rough.geoms)
556
- else:
557
- candidates = [blocks_rough]
558
-
559
- # Filter by minimum area
560
- valid_blocks = [b for b in candidates if b.geom_type == 'Polygon' and b.area >= MIN_BLOCK_AREA]
561
-
562
- if not valid_blocks:
563
- return smooth_network, [], []
564
-
565
- # 7. Sort by elevation to find XLNT (lowest)
566
- blocks_with_elev = [(b, InfrastructurePlanner.get_elevation(b.centroid.x, b.centroid.y)) for b in valid_blocks]
567
- blocks_with_elev.sort(key=lambda x: x[1])
568
-
569
- # 8. Allocate service areas (10% of total)
570
- total_area = sum(b.area for b in valid_blocks)
571
-
572
- # Safety check: prevent division issues
573
- if total_area <= 0 or len(valid_blocks) == 0:
574
- return smooth_network, [], valid_blocks
575
-
576
- service_area_needed = total_area * SERVICE_AREA_RATIO
577
-
578
- accumulated_service_area = 0
579
- for block, elev in blocks_with_elev:
580
- if accumulated_service_area < service_area_needed:
581
- service_blocks.append(block)
582
- accumulated_service_area += block.area
583
- else:
584
- commercial_blocks.append(block)
585
-
586
- # Ensure at least one commercial block exists
587
- if not commercial_blocks and service_blocks:
588
- # Move one service block to commercial
589
- commercial_blocks.append(service_blocks.pop())
590
-
591
- return smooth_network, service_blocks, commercial_blocks
592
-
593
- def run_stage1(self) -> Dict[str, Any]:
594
- """Run grid optimization stage."""
595
- optimizer = GridOptimizer(self.land_poly, self.lake_poly)
596
-
597
- best_solution, history = optimizer.optimize(
598
- population_size=self.config.get('population_size', 30),
599
- generations=self.config.get('generations', 15)
600
- )
601
-
602
- spacing, angle = best_solution
603
- blocks = optimizer.generate_grid_candidates(spacing, angle)
604
-
605
- # Filter to usable blocks
606
- usable_blocks = []
607
- for blk in blocks:
608
- intersection = blk.intersection(self.land_poly).difference(self.lake_poly)
609
- if not intersection.is_empty:
610
- usable_blocks.append(intersection)
611
-
612
- return {
613
- 'spacing': spacing,
614
- 'angle': angle,
615
- 'blocks': usable_blocks,
616
- 'history': history,
617
- 'metrics': {
618
- 'total_blocks': len(usable_blocks),
619
- 'optimal_spacing': spacing,
620
- 'optimal_angle': angle
621
- }
622
- }
623
-
624
- def run_stage2(self, blocks: List[Polygon], spacing: float) -> Dict[str, Any]:
625
- """Run subdivision stage."""
626
- all_lots = []
627
- parks = []
628
-
629
- for block in blocks:
630
- result = SubdivisionSolver.subdivide_block(
631
- block,
632
- spacing,
633
- self.config.get('min_lot_width', 5.0),
634
- self.config.get('max_lot_width', 8.0),
635
- self.config.get('target_lot_width', 6.0),
636
- self.config.get('ortools_time_limit', 5)
637
- )
638
-
639
- if result['type'] == 'park':
640
- parks.append(result['geometry'])
641
- else:
642
- all_lots.extend(result['lots'])
643
-
644
- return {
645
- 'lots': all_lots,
646
- 'parks': parks,
647
- 'metrics': {
648
- 'total_lots': len(all_lots),
649
- 'total_parks': len(parks),
650
- 'avg_lot_width': np.mean([lot['width'] for lot in all_lots]) if all_lots else 0
651
- }
652
- }
653
-
654
- def classify_blocks(self, blocks: List[Polygon]) -> Dict[str, List[Polygon]]:
655
- """
656
- Classify blocks into Service (XLNT, Operations) and Commercial.
657
- Logic:
658
- - Sort by elevation (lowest -> XLNT)
659
- - Reserve 10% for Service/Parking
660
- - Rest -> Commercial (Residential/Industrial)
661
- """
662
- if not blocks:
663
- return {'service': [], 'commercial': [], 'xlnt': []}
664
-
665
- # Sort by elevation
666
- sorted_blocks = sorted(blocks, key=lambda b: InfrastructurePlanner.get_elevation(b.centroid.x, b.centroid.y))
667
-
668
- total_area = sum(b.area for b in blocks)
669
- service_area_target = total_area * 0.10
670
- current_service_area = 0
671
-
672
- service_blocks = []
673
- commercial_blocks = []
674
- xlnt_block = []
675
-
676
- # Lowest block is XLNT
677
- if sorted_blocks:
678
- xlnt = sorted_blocks.pop(0)
679
- xlnt_block.append(xlnt)
680
- current_service_area += xlnt.area
681
-
682
- # Fill remaining service quota
683
- for b in sorted_blocks:
684
- if current_service_area < service_area_target:
685
- service_blocks.append(b)
686
- current_service_area += b.area
687
- else:
688
- commercial_blocks.append(b)
689
-
690
- return {
691
- 'xlnt': xlnt_block,
692
- 'service': service_blocks,
693
- 'commercial': commercial_blocks
694
- }
695
-
696
- def run_full_pipeline(self) -> Dict[str, Any]:
697
- """Run complete optimization pipeline with Voronoi road generation."""
698
- # NEW: Stage 0 - Voronoi Road Network Generation
699
- road_network, service_blocks_voronoi, commercial_blocks_voronoi = self.generate_road_network(num_seeds=15)
700
-
701
- # If Voronoi fails, fallback to old approach
702
- if not commercial_blocks_voronoi:
703
- # Old approach: Grid-based
704
- stage1_result = self.run_stage1()
705
- classification = self.classify_blocks(stage1_result['blocks'])
706
- commercial_blocks_voronoi = classification['commercial']
707
- service_blocks_voronoi = classification['service']
708
- xlnt_blocks = classification['xlnt']
709
- # Old road network
710
- all_blocks = stage1_result['blocks']
711
- road_network = self.land_poly.difference(unary_union(all_blocks))
712
- spacing_for_subdivision = stage1_result['spacing']
713
- else:
714
- # Voronoi succeeded - separate XLNT from service blocks
715
- # XLNT is the first service block (lowest elevation)
716
- if service_blocks_voronoi:
717
- xlnt_blocks = [service_blocks_voronoi[0]]
718
- service_blocks_voronoi = service_blocks_voronoi[1:]
719
- else:
720
- xlnt_blocks = []
721
-
722
- # Estimate spacing for subdivision (use average block dimension)
723
- if commercial_blocks_voronoi and len(commercial_blocks_voronoi) > 0:
724
- avg_area = sum(b.area for b in commercial_blocks_voronoi) / len(commercial_blocks_voronoi)
725
- spacing_for_subdivision = max(20.0, (avg_area ** 0.5) * 0.7) # Heuristic, min 20m
726
- else:
727
- spacing_for_subdivision = 25.0
728
-
729
- # Stage 2: Subdivision (only for commercial blocks)
730
- stage2_result = self.run_stage2(
731
- commercial_blocks_voronoi,
732
- spacing_for_subdivision
733
- )
734
-
735
- # Construct final list of all network nodes
736
- all_network_nodes = stage2_result['lots'] + \
737
- [{'geometry': b, 'type': 'service'} for b in service_blocks_voronoi] + \
738
- [{'geometry': b, 'type': 'xlnt'} for b in xlnt_blocks]
739
-
740
- # Extract polygons for Infrastructure
741
- infra_polys = [item['geometry'] for item in all_network_nodes]
742
-
743
- # Stage 3: Infrastructure
744
- points, connections = InfrastructurePlanner.generate_network(infra_polys)
745
-
746
- # Transformers
747
- transformers = InfrastructurePlanner.generate_transformers(infra_polys)
748
-
749
- # Drainage
750
- wwtp_center = xlnt_blocks[0].centroid if xlnt_blocks else None
751
- drainage = InfrastructurePlanner.calculate_drainage(infra_polys, wwtp_center)
752
-
753
- return {
754
- 'stage1': {
755
- 'blocks': commercial_blocks_voronoi + service_blocks_voronoi + xlnt_blocks,
756
- 'metrics': {
757
- 'total_blocks': len(commercial_blocks_voronoi) + len(service_blocks_voronoi) + len(xlnt_blocks)
758
- },
759
- 'spacing': spacing_for_subdivision,
760
- 'angle': 0.0 # Voronoi doesn't use angle
761
- },
762
- 'stage2': stage2_result,
763
- 'classification': {
764
- 'xlnt_count': len(xlnt_blocks),
765
- 'service_count': len(service_blocks_voronoi),
766
- 'commercial_count': len(commercial_blocks_voronoi),
767
- 'xlnt': xlnt_blocks,
768
- 'service': service_blocks_voronoi
769
- },
770
- 'stage3': {
771
- 'points': points,
772
- 'connections': [list(line.coords) for line in connections],
773
- 'drainage': drainage,
774
- 'transformers': transformers,
775
- 'road_network': mapping(road_network)
776
- },
777
- 'total_lots': stage2_result['metrics']['total_lots'],
778
- 'service_blocks': [list(b.exterior.coords) for b in service_blocks_voronoi],
779
- 'xlnt_blocks': [list(b.exterior.coords) for b in xlnt_blocks]
780
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
algorithms/backend/api/__init__.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ # API module
2
+ from api.schemas import (
3
+ AlgorithmConfig,
4
+ LandPlot,
5
+ OptimizationRequest,
6
+ StageResult,
7
+ OptimizationResponse,
8
+ HealthResponse,
9
+ )
algorithms/backend/api/routes/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from api.routes.optimization_routes import router as optim_router
2
+ from api.routes.dxf_routes import router as dxf_router
algorithms/backend/api/routes/dxf_routes.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """DXF file handling routes."""
2
+
3
+ import logging
4
+ from fastapi import APIRouter, HTTPException, UploadFile, File
5
+ from fastapi.responses import Response
6
+
7
+ from utils.dxf_utils import load_boundary_from_dxf, export_to_dxf, validate_dxf
8
+
9
+ logger = logging.getLogger(__name__)
10
+ router = APIRouter()
11
+
12
+
13
+ @router.post("/upload-dxf")
14
+ async def upload_dxf(file: UploadFile = File(...)):
15
+ """
16
+ Upload and parse DXF file to extract boundary polygon.
17
+
18
+ Returns GeoJSON polygon that can be used as input.
19
+ """
20
+ try:
21
+ content = await file.read()
22
+
23
+ is_valid, message = validate_dxf(content)
24
+ if not is_valid:
25
+ raise HTTPException(status_code=400, detail=message)
26
+
27
+ polygon = load_boundary_from_dxf(content)
28
+
29
+ if polygon is None:
30
+ raise HTTPException(
31
+ status_code=400,
32
+ detail="Could not extract boundary polygon from DXF. Make sure it contains closed polylines."
33
+ )
34
+
35
+ geojson = {
36
+ "type": "Polygon",
37
+ "coordinates": [list(polygon.exterior.coords)],
38
+ "properties": {
39
+ "source": "dxf",
40
+ "filename": file.filename,
41
+ "area": polygon.area
42
+ }
43
+ }
44
+
45
+ return {
46
+ "success": True,
47
+ "message": f"Successfully extracted boundary from {file.filename}",
48
+ "polygon": geojson,
49
+ "area": polygon.area,
50
+ "bounds": polygon.bounds
51
+ }
52
+
53
+ except HTTPException:
54
+ raise
55
+ except Exception as e:
56
+ logger.error(f"DXF processing failed: {e}")
57
+ raise HTTPException(status_code=500, detail=f"Failed to process DXF: {str(e)}")
58
+
59
+
60
+ @router.post("/export-dxf")
61
+ async def export_dxf_endpoint(request: dict):
62
+ """
63
+ Export optimization results to DXF format.
64
+
65
+ Expects: {"result": OptimizationResponse}
66
+ Returns: DXF file
67
+ """
68
+ try:
69
+ result = request.get('result')
70
+ if not result:
71
+ raise HTTPException(status_code=400, detail="No result data provided")
72
+
73
+ geometries = []
74
+
75
+ if 'final_layout' in result and result['final_layout']:
76
+ features = result['final_layout'].get('features', [])
77
+ geometries = features
78
+ elif 'stages' in result and len(result['stages']) > 0:
79
+ last_stage = result['stages'][-1]
80
+ features = last_stage.get('geometry', {}).get('features', [])
81
+ geometries = features
82
+
83
+ if not geometries:
84
+ raise HTTPException(status_code=400, detail="No geometries to export")
85
+
86
+ dxf_bytes = export_to_dxf(geometries)
87
+
88
+ if not dxf_bytes:
89
+ raise HTTPException(status_code=500, detail="Failed to generate DXF")
90
+
91
+ return Response(
92
+ content=dxf_bytes,
93
+ media_type="application/dxf",
94
+ headers={
95
+ "Content-Disposition": "attachment; filename=land_redistribution.dxf"
96
+ }
97
+ )
98
+
99
+ except HTTPException:
100
+ raise
101
+ except Exception as e:
102
+ logger.error(f"DXF export failed: {e}")
103
+ raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
algorithms/backend/{routes.py → api/routes/optimization_routes.py} RENAMED
@@ -1,19 +1,15 @@
1
- """API routes for land redistribution algorithm."""
2
 
3
- from fastapi import APIRouter, HTTPException, UploadFile, File
4
- from fastapi.responses import Response
5
- from typing import List
6
  import traceback
 
7
  from shapely.geometry import Polygon, mapping, LineString, Point
8
 
9
- from models import (
10
- OptimizationRequest,
11
- OptimizationResponse,
12
- StageResult
13
- )
14
- from algorithm import LandRedistributionPipeline
15
- from dxf_utils import load_boundary_from_dxf, export_to_dxf, validate_dxf
16
 
 
17
  router = APIRouter()
18
 
19
 
@@ -37,8 +33,6 @@ async def optimize_full(request: OptimizationRequest):
37
  1. Grid optimization (NSGA-II)
38
  2. Block subdivision (OR-Tools)
39
  3. Infrastructure planning
40
-
41
- Returns detailed results with process visualization data.
42
  """
43
  try:
44
  # Convert input land plots to Shapely polygons
@@ -93,8 +87,7 @@ async def optimize_full(request: OptimizationRequest):
93
  "properties": lot_props
94
  })
95
 
96
- # Setback (Network visualization will need this as separate line usually,
97
- # or frontend can render it if passed as property geometry)
98
  if lot.get('buildable'):
99
  stage2_features.append({
100
  "type": "Feature",
@@ -117,7 +110,6 @@ async def optimize_full(request: OptimizationRequest):
117
  }
118
  })
119
 
120
-
121
  # Add Service Blocks
122
  for block in result['classification'].get('service', []):
123
  stage2_features.append({
@@ -176,7 +168,6 @@ async def optimize_full(request: OptimizationRequest):
176
  "label": "Transportation Infra"
177
  }
178
  }
179
- # PREPEND roads so they are at bottom layer
180
  stage3_features.insert(0, road_feat)
181
 
182
  # Add connection lines
@@ -206,7 +197,6 @@ async def optimize_full(request: OptimizationRequest):
206
 
207
  # Add drainage
208
  for drainage in result['stage3']['drainage']:
209
- # Create a line for the arrow
210
  start = drainage['start']
211
  vec = drainage['vector']
212
  end = (start[0] + vec[0], start[1] + vec[1])
@@ -221,7 +211,7 @@ async def optimize_full(request: OptimizationRequest):
221
 
222
  stage3_geoms = {
223
  "type": "FeatureCollection",
224
- "features": stage3_features + stage2_features # Include base map
225
  }
226
 
227
  stages.append(StageResult(
@@ -235,7 +225,6 @@ async def optimize_full(request: OptimizationRequest):
235
  parameters={}
236
  ))
237
 
238
- # Build response
239
  return OptimizationResponse(
240
  success=True,
241
  message="Optimization completed successfully",
@@ -255,6 +244,7 @@ async def optimize_full(request: OptimizationRequest):
255
 
256
  except Exception as e:
257
  error_msg = f"Optimization failed: {str(e)}\n{traceback.format_exc()}"
 
258
  raise HTTPException(status_code=500, detail=error_msg)
259
 
260
 
@@ -263,7 +253,6 @@ async def optimize_stage1(request: OptimizationRequest):
263
  """Run only grid optimization stage."""
264
  try:
265
  land_polygons = [land_plot_to_polygon(plot.dict()) for plot in request.land_plots]
266
-
267
  config = request.config.dict()
268
  pipeline = LandRedistributionPipeline(land_polygons, config)
269
 
@@ -297,102 +286,5 @@ async def optimize_stage1(request: OptimizationRequest):
297
  )
298
 
299
  except Exception as e:
 
300
  raise HTTPException(status_code=500, detail=f"Stage 1 failed: {str(e)}")
301
-
302
-
303
- @router.post("/upload-dxf")
304
- async def upload_dxf(file: UploadFile = File(...)):
305
- """
306
- Upload and parse DXF file to extract boundary polygon.
307
-
308
- Returns GeoJSON polygon that can be used as input.
309
- """
310
- try:
311
- # Read file content
312
- content = await file.read()
313
-
314
- # Validate DXF
315
- is_valid, message = validate_dxf(content)
316
- if not is_valid:
317
- raise HTTPException(status_code=400, detail=message)
318
-
319
- # Load boundary
320
- polygon = load_boundary_from_dxf(content)
321
-
322
- if polygon is None:
323
- raise HTTPException(
324
- status_code=400,
325
- detail="Could not extract boundary polygon from DXF. Make sure it contains closed polylines."
326
- )
327
-
328
- # Convert to GeoJSON
329
- geojson = {
330
- "type": "Polygon",
331
- "coordinates": [list(polygon.exterior.coords)],
332
- "properties": {
333
- "source": "dxf",
334
- "filename": file.filename,
335
- "area": polygon.area
336
- }
337
- }
338
-
339
- return {
340
- "success": True,
341
- "message": f"Successfully extracted boundary from {file.filename}",
342
- "polygon": geojson,
343
- "area": polygon.area,
344
- "bounds": polygon.bounds
345
- }
346
-
347
- except HTTPException:
348
- raise
349
- except Exception as e:
350
- raise HTTPException(status_code=500, detail=f"Failed to process DXF: {str(e)}")
351
-
352
-
353
- @router.post("/export-dxf")
354
- async def export_dxf_endpoint(request: dict):
355
- """
356
- Export optimization results to DXF format.
357
-
358
- Expects: {"result": OptimizationResponse}
359
- Returns: DXF file
360
- """
361
- try:
362
- result = request.get('result')
363
- if not result:
364
- raise HTTPException(status_code=400, detail="No result data provided")
365
-
366
- # Get final layout or last stage
367
- geometries = []
368
-
369
- if 'final_layout' in result and result['final_layout']:
370
- features = result['final_layout'].get('features', [])
371
- geometries = features
372
- elif 'stages' in result and len(result['stages']) > 0:
373
- last_stage = result['stages'][-1]
374
- features = last_stage.get('geometry', {}).get('features', [])
375
- geometries = features
376
-
377
- if not geometries:
378
- raise HTTPException(status_code=400, detail="No geometries to export")
379
-
380
- # Export to DXF
381
- dxf_bytes = export_to_dxf(geometries)
382
-
383
- if not dxf_bytes:
384
- raise HTTPException(status_code=500, detail="Failed to generate DXF")
385
-
386
- # Return as downloadable file
387
- return Response(
388
- content=dxf_bytes,
389
- media_type="application/dxf",
390
- headers={
391
- "Content-Disposition": "attachment; filename=land_redistribution.dxf"
392
- }
393
- )
394
-
395
- except HTTPException:
396
- raise
397
- except Exception as e:
398
- raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
 
1
+ """Optimization API routes."""
2
 
3
+ import logging
 
 
4
  import traceback
5
+ from fastapi import APIRouter, HTTPException
6
  from shapely.geometry import Polygon, mapping, LineString, Point
7
 
8
+ from api.schemas.request_schemas import OptimizationRequest
9
+ from api.schemas.response_schemas import OptimizationResponse, StageResult
10
+ from pipeline.land_redistribution import LandRedistributionPipeline
 
 
 
 
11
 
12
+ logger = logging.getLogger(__name__)
13
  router = APIRouter()
14
 
15
 
 
33
  1. Grid optimization (NSGA-II)
34
  2. Block subdivision (OR-Tools)
35
  3. Infrastructure planning
 
 
36
  """
37
  try:
38
  # Convert input land plots to Shapely polygons
 
87
  "properties": lot_props
88
  })
89
 
90
+ # Setback
 
91
  if lot.get('buildable'):
92
  stage2_features.append({
93
  "type": "Feature",
 
110
  }
111
  })
112
 
 
113
  # Add Service Blocks
114
  for block in result['classification'].get('service', []):
115
  stage2_features.append({
 
168
  "label": "Transportation Infra"
169
  }
170
  }
 
171
  stage3_features.insert(0, road_feat)
172
 
173
  # Add connection lines
 
197
 
198
  # Add drainage
199
  for drainage in result['stage3']['drainage']:
 
200
  start = drainage['start']
201
  vec = drainage['vector']
202
  end = (start[0] + vec[0], start[1] + vec[1])
 
211
 
212
  stage3_geoms = {
213
  "type": "FeatureCollection",
214
+ "features": stage3_features + stage2_features
215
  }
216
 
217
  stages.append(StageResult(
 
225
  parameters={}
226
  ))
227
 
 
228
  return OptimizationResponse(
229
  success=True,
230
  message="Optimization completed successfully",
 
244
 
245
  except Exception as e:
246
  error_msg = f"Optimization failed: {str(e)}\n{traceback.format_exc()}"
247
+ logger.error(error_msg)
248
  raise HTTPException(status_code=500, detail=error_msg)
249
 
250
 
 
253
  """Run only grid optimization stage."""
254
  try:
255
  land_polygons = [land_plot_to_polygon(plot.dict()) for plot in request.land_plots]
 
256
  config = request.config.dict()
257
  pipeline = LandRedistributionPipeline(land_polygons, config)
258
 
 
286
  )
287
 
288
  except Exception as e:
289
+ logger.error(f"Stage 1 failed: {e}")
290
  raise HTTPException(status_code=500, detail=f"Stage 1 failed: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
algorithms/backend/api/schemas/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from api.schemas.request_schemas import AlgorithmConfig, LandPlot, OptimizationRequest
2
+ from api.schemas.response_schemas import StageResult, OptimizationResponse, HealthResponse
algorithms/backend/{models.py → api/schemas/request_schemas.py} RENAMED
@@ -1,4 +1,4 @@
1
- """Pydantic models for API request/response schemas."""
2
 
3
  from typing import List, Dict, Any, Optional
4
  from pydantic import BaseModel, Field
@@ -41,30 +41,3 @@ class OptimizationRequest(BaseModel):
41
 
42
  config: AlgorithmConfig = Field(..., description="Algorithm configuration")
43
  land_plots: List[LandPlot] = Field(..., description="Initial land plots to subdivide")
44
-
45
-
46
- class StageResult(BaseModel):
47
- """Result from a single optimization stage."""
48
-
49
- stage_name: str = Field(..., description="Name of the stage")
50
- geometry: Dict[str, Any] = Field(..., description="GeoJSON geometry of results")
51
- metrics: Dict[str, float] = Field(..., description="Performance metrics")
52
- parameters: Dict[str, Any] = Field(..., description="Parameters used")
53
-
54
-
55
- class OptimizationResponse(BaseModel):
56
- """Response model containing optimization results."""
57
-
58
- success: bool = Field(..., description="Whether optimization succeeded")
59
- message: str = Field(..., description="Status message")
60
- stages: List[StageResult] = Field(default=[], description="Results from each stage")
61
- final_layout: Optional[Dict[str, Any]] = Field(None, description="Final GeoJSON layout")
62
- total_lots: Optional[int] = Field(None, description="Total number of lots created")
63
- statistics: Optional[Dict[str, Any]] = Field(None, description="Overall statistics")
64
-
65
-
66
- class HealthResponse(BaseModel):
67
- """Health check response."""
68
-
69
- status: str = Field(default="healthy", description="Service status")
70
- version: str = Field(default="1.0.0", description="API version")
 
1
+ """Request schemas for the API."""
2
 
3
  from typing import List, Dict, Any, Optional
4
  from pydantic import BaseModel, Field
 
41
 
42
  config: AlgorithmConfig = Field(..., description="Algorithm configuration")
43
  land_plots: List[LandPlot] = Field(..., description="Initial land plots to subdivide")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
algorithms/backend/api/schemas/response_schemas.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Response schemas for the API."""
2
+
3
+ from typing import List, Dict, Any, Optional
4
+ from pydantic import BaseModel, Field
5
+
6
+
7
+ class StageResult(BaseModel):
8
+ """Result from a single optimization stage."""
9
+
10
+ stage_name: str = Field(..., description="Name of the stage")
11
+ geometry: Dict[str, Any] = Field(..., description="GeoJSON geometry of results")
12
+ metrics: Dict[str, float] = Field(..., description="Performance metrics")
13
+ parameters: Dict[str, Any] = Field(..., description="Parameters used")
14
+
15
+
16
+ class OptimizationResponse(BaseModel):
17
+ """Response model containing optimization results."""
18
+
19
+ success: bool = Field(..., description="Whether optimization succeeded")
20
+ message: str = Field(..., description="Status message")
21
+ stages: List[StageResult] = Field(default=[], description="Results from each stage")
22
+ final_layout: Optional[Dict[str, Any]] = Field(None, description="Final GeoJSON layout")
23
+ total_lots: Optional[int] = Field(None, description="Total number of lots created")
24
+ statistics: Optional[Dict[str, Any]] = Field(None, description="Overall statistics")
25
+
26
+
27
+ class HealthResponse(BaseModel):
28
+ """Health check response."""
29
+
30
+ status: str = Field(default="healthy", description="Service status")
31
+ version: str = Field(default="1.0.0", description="API version")
algorithms/backend/core/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ # Core module exports
2
+ from core.config import settings
algorithms/backend/core/config/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from core.config.settings import AlgorithmSettings, RoadSettings, SubdivisionSettings, InfrastructureSettings
algorithms/backend/core/config/settings.py ADDED
@@ -0,0 +1,134 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Algorithm configuration settings and constants.
3
+
4
+ Contains all configurable parameters for the land redistribution algorithm,
5
+ organized into dataclasses for type safety and clarity.
6
+ """
7
+
8
+ from dataclasses import dataclass, field
9
+ from typing import Tuple
10
+
11
+
12
+ @dataclass(frozen=True)
13
+ class RoadSettings:
14
+ """Road and transportation infrastructure settings (TCVN standards)."""
15
+
16
+ # Road widths (meters)
17
+ main_width: float = 30.0 # Main road - container trucks can pass
18
+ internal_width: float = 15.0 # Internal road
19
+ sidewalk_width: float = 4.0 # Sidewalk each side (includes utility trench)
20
+ turning_radius: float = 15.0 # Corner chamfer radius for intersections
21
+
22
+
23
+ @dataclass(frozen=True)
24
+ class SubdivisionSettings:
25
+ """Block and lot subdivision settings."""
26
+
27
+ # Land allocation
28
+ service_area_ratio: float = 0.10 # 10% for infrastructure (WWTP, parking, etc.)
29
+ min_block_area: float = 5000.0 # Minimum block area to subdivide (m²)
30
+
31
+ # Lot dimensions (industrial)
32
+ min_lot_width: float = 20.0 # Minimum lot frontage (m)
33
+ max_lot_width: float = 80.0 # Maximum lot frontage (m)
34
+ target_lot_width: float = 40.0 # Target lot width (m)
35
+
36
+ # Legal/Construction (TCVN)
37
+ setback_distance: float = 6.0 # Building setback from road (m)
38
+ fire_safety_gap: float = 4.0 # Fire safety gap between buildings (m)
39
+
40
+ # Solver
41
+ solver_time_limit: float = 0.5 # OR-Tools time limit per block (seconds)
42
+
43
+
44
+ @dataclass(frozen=True)
45
+ class InfrastructureSettings:
46
+ """Infrastructure planning settings."""
47
+
48
+ # Electrical
49
+ transformer_radius: float = 300.0 # Effective service radius (m)
50
+ lots_per_transformer: int = 15 # Approximate lots per transformer
51
+
52
+ # Network
53
+ loop_redundancy_ratio: float = 0.15 # 15% extra edges for loop network safety
54
+ max_connection_distance: float = 500.0 # Max distance for lot connections (m)
55
+
56
+ # Drainage
57
+ drainage_arrow_length: float = 30.0 # Arrow length for visualization (m)
58
+
59
+
60
+ @dataclass(frozen=True)
61
+ class OptimizationSettings:
62
+ """NSGA-II genetic algorithm settings."""
63
+
64
+ # Population
65
+ population_size: int = 30
66
+ generations: int = 15
67
+
68
+ # Crossover/Mutation
69
+ crossover_probability: float = 0.7
70
+ mutation_probability: float = 0.3
71
+ eta: float = 20.0 # Distribution index for SBX crossover
72
+
73
+ # Gene bounds
74
+ spacing_bounds: Tuple[float, float] = (20.0, 40.0)
75
+ angle_bounds: Tuple[float, float] = (0.0, 90.0)
76
+
77
+ # Block quality thresholds
78
+ good_block_ratio: float = 0.65 # Ratio for residential/commercial
79
+ fragmented_block_ratio: float = 0.1 # Below this = too small
80
+
81
+
82
+ @dataclass
83
+ class AlgorithmSettings:
84
+ """Complete algorithm configuration."""
85
+
86
+ road: RoadSettings = field(default_factory=RoadSettings)
87
+ subdivision: SubdivisionSettings = field(default_factory=SubdivisionSettings)
88
+ infrastructure: InfrastructureSettings = field(default_factory=InfrastructureSettings)
89
+ optimization: OptimizationSettings = field(default_factory=OptimizationSettings)
90
+
91
+ # Random seed for reproducibility
92
+ random_seed: int = 42
93
+
94
+ @classmethod
95
+ def from_dict(cls, config: dict) -> 'AlgorithmSettings':
96
+ """Create settings from API config dictionary."""
97
+ settings = cls()
98
+
99
+ # Map API config to internal settings
100
+ if 'min_lot_width' in config:
101
+ settings = cls(
102
+ subdivision=SubdivisionSettings(
103
+ min_lot_width=config.get('min_lot_width', 20.0),
104
+ max_lot_width=config.get('max_lot_width', 80.0),
105
+ target_lot_width=config.get('target_lot_width', 40.0),
106
+ solver_time_limit=config.get('ortools_time_limit', 0.5),
107
+ ),
108
+ optimization=OptimizationSettings(
109
+ population_size=config.get('population_size', 30),
110
+ generations=config.get('generations', 15),
111
+ ),
112
+ )
113
+
114
+ return settings
115
+
116
+
117
+ # Default settings instance
118
+ DEFAULT_SETTINGS = AlgorithmSettings()
119
+
120
+
121
+ # Convenience accessors for backward compatibility
122
+ ROAD_MAIN_WIDTH = DEFAULT_SETTINGS.road.main_width
123
+ ROAD_INTERNAL_WIDTH = DEFAULT_SETTINGS.road.internal_width
124
+ SIDEWALK_WIDTH = DEFAULT_SETTINGS.road.sidewalk_width
125
+ TURNING_RADIUS = DEFAULT_SETTINGS.road.turning_radius
126
+ SERVICE_AREA_RATIO = DEFAULT_SETTINGS.subdivision.service_area_ratio
127
+ MIN_BLOCK_AREA = DEFAULT_SETTINGS.subdivision.min_block_area
128
+ MIN_LOT_WIDTH = DEFAULT_SETTINGS.subdivision.min_lot_width
129
+ MAX_LOT_WIDTH = DEFAULT_SETTINGS.subdivision.max_lot_width
130
+ TARGET_LOT_WIDTH = DEFAULT_SETTINGS.subdivision.target_lot_width
131
+ SETBACK_DISTANCE = DEFAULT_SETTINGS.subdivision.setback_distance
132
+ FIRE_SAFETY_GAP = DEFAULT_SETTINGS.subdivision.fire_safety_gap
133
+ SOLVER_TIME_LIMIT = DEFAULT_SETTINGS.subdivision.solver_time_limit
134
+ TRANSFORMER_RADIUS = DEFAULT_SETTINGS.infrastructure.transformer_radius
algorithms/backend/core/geometry/__init__.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from core.geometry.polygon_utils import (
2
+ get_elevation,
3
+ normalize_geometry_list,
4
+ merge_polygons,
5
+ )
6
+ from core.geometry.voronoi import (
7
+ generate_voronoi_seeds,
8
+ create_voronoi_diagram,
9
+ extract_voronoi_edges,
10
+ )
algorithms/backend/core/geometry/polygon_utils.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Polygon utility functions for geometry processing.
3
+
4
+ Provides helper functions for Shapely polygon operations,
5
+ elevation calculations, and geometry normalization.
6
+ """
7
+
8
+ import logging
9
+ from typing import List, Union, Optional
10
+
11
+ from shapely.geometry import Polygon, MultiPolygon, GeometryCollection
12
+ from shapely.ops import unary_union
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ def get_elevation(x: float, y: float) -> float:
18
+ """
19
+ Simulate terrain elevation (sloping from NW to SE).
20
+
21
+ This is a simple linear model: z = 50 - 0.02x - 0.03y
22
+ Used for determining WWTP placement (lowest point).
23
+
24
+ Args:
25
+ x: X coordinate
26
+ y: Y coordinate
27
+
28
+ Returns:
29
+ Simulated elevation value
30
+ """
31
+ return 50.0 - (x * 0.02) - (y * 0.03)
32
+
33
+
34
+ def normalize_geometry_list(
35
+ geometry: Union[Polygon, MultiPolygon, GeometryCollection, None]
36
+ ) -> List[Polygon]:
37
+ """
38
+ Convert various geometry types to a flat list of Polygons.
39
+
40
+ Handles GeometryCollection, MultiPolygon, single Polygon, or None.
41
+
42
+ Args:
43
+ geometry: Input geometry of various types
44
+
45
+ Returns:
46
+ List of Polygon objects (may be empty)
47
+ """
48
+ if geometry is None or geometry.is_empty:
49
+ return []
50
+
51
+ if hasattr(geometry, 'geoms'):
52
+ # GeometryCollection or MultiPolygon
53
+ result = []
54
+ for geom in geometry.geoms:
55
+ if geom.geom_type == 'Polygon' and not geom.is_empty:
56
+ result.append(geom)
57
+ return result
58
+ elif geometry.geom_type == 'Polygon':
59
+ return [geometry]
60
+ else:
61
+ logger.warning(f"Unexpected geometry type: {geometry.geom_type}")
62
+ return []
63
+
64
+
65
+ def merge_polygons(polygons: List[Polygon]) -> Polygon:
66
+ """
67
+ Merge multiple polygons into a single polygon using unary_union.
68
+
69
+ Args:
70
+ polygons: List of Polygon objects
71
+
72
+ Returns:
73
+ Merged polygon (may be MultiPolygon if non-contiguous)
74
+ """
75
+ if not polygons:
76
+ return Polygon()
77
+
78
+ if len(polygons) == 1:
79
+ return polygons[0]
80
+
81
+ return unary_union(polygons)
82
+
83
+
84
+ def filter_by_min_area(
85
+ polygons: List[Polygon],
86
+ min_area: float
87
+ ) -> List[Polygon]:
88
+ """
89
+ Filter polygons by minimum area threshold.
90
+
91
+ Args:
92
+ polygons: List of polygons to filter
93
+ min_area: Minimum area threshold (m²)
94
+
95
+ Returns:
96
+ Filtered list of polygons meeting the area requirement
97
+ """
98
+ return [p for p in polygons if p.area >= min_area]
99
+
100
+
101
+ def sort_by_elevation(polygons: List[Polygon]) -> List[Polygon]:
102
+ """
103
+ Sort polygons by centroid elevation (lowest first).
104
+
105
+ Useful for determining WWTP placement (should be at lowest point).
106
+
107
+ Args:
108
+ polygons: List of polygons to sort
109
+
110
+ Returns:
111
+ Sorted list (lowest elevation first)
112
+ """
113
+ return sorted(
114
+ polygons,
115
+ key=lambda p: get_elevation(p.centroid.x, p.centroid.y)
116
+ )
117
+
118
+
119
+ def calculate_block_quality_ratio(
120
+ block: Polygon,
121
+ original_area: float
122
+ ) -> float:
123
+ """
124
+ Calculate quality ratio of a block (actual area / original area).
125
+
126
+ Used to classify blocks as residential, fragmented, or unusable.
127
+
128
+ Args:
129
+ block: Block polygon
130
+ original_area: Original (full) block area before clipping
131
+
132
+ Returns:
133
+ Ratio between 0 and 1
134
+ """
135
+ if original_area <= 0:
136
+ return 0.0
137
+ return min(1.0, block.area / original_area)
algorithms/backend/core/geometry/voronoi.py ADDED
@@ -0,0 +1,173 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Voronoi diagram generation for road network planning.
3
+
4
+ Uses Shapely's Voronoi implementation to create organic road layouts
5
+ based on random seed points distributed within the site boundary.
6
+ """
7
+
8
+ import random
9
+ import logging
10
+ from typing import List, Tuple, Optional
11
+
12
+ from shapely.geometry import Point, Polygon, MultiPoint, LineString
13
+ from shapely.ops import voronoi_diagram, unary_union
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ def generate_voronoi_seeds(
19
+ site: Polygon,
20
+ num_seeds: int = 15,
21
+ seed: Optional[int] = None
22
+ ) -> List[Point]:
23
+ """
24
+ Generate random seed points for Voronoi diagram.
25
+
26
+ Points are uniformly distributed within the site bounding box.
27
+
28
+ Args:
29
+ site: Site boundary polygon
30
+ num_seeds: Number of seed points to generate
31
+ seed: Random seed for reproducibility (optional)
32
+
33
+ Returns:
34
+ List of Point objects
35
+ """
36
+ if seed is not None:
37
+ random.seed(seed)
38
+
39
+ minx, miny, maxx, maxy = site.bounds
40
+
41
+ seeds = []
42
+ for _ in range(num_seeds):
43
+ x = random.uniform(minx, maxx)
44
+ y = random.uniform(miny, maxy)
45
+ seeds.append(Point(x, y))
46
+
47
+ return seeds
48
+
49
+
50
+ def create_voronoi_diagram(
51
+ seeds: List[Point],
52
+ envelope: Polygon
53
+ ) -> Optional[object]:
54
+ """
55
+ Create Voronoi diagram from seed points.
56
+
57
+ Args:
58
+ seeds: List of seed Point objects
59
+ envelope: Bounding envelope for the diagram
60
+
61
+ Returns:
62
+ Voronoi regions geometry, or None if generation fails
63
+ """
64
+ if len(seeds) < 2:
65
+ logger.warning("Need at least 2 seeds for Voronoi diagram")
66
+ return None
67
+
68
+ try:
69
+ multi_point = MultiPoint(seeds)
70
+ regions = voronoi_diagram(multi_point, envelope=envelope)
71
+ return regions
72
+ except Exception as e:
73
+ logger.error(f"Voronoi diagram generation failed: {e}")
74
+ return None
75
+
76
+
77
+ def extract_voronoi_edges(
78
+ regions: object
79
+ ) -> List[LineString]:
80
+ """
81
+ Extract edge LineStrings from Voronoi regions.
82
+
83
+ Collects the exterior rings of all Voronoi cells and merges them.
84
+
85
+ Args:
86
+ regions: Voronoi diagram geometry (GeometryCollection of Polygons)
87
+
88
+ Returns:
89
+ List of LineString edges
90
+ """
91
+ edges = []
92
+
93
+ if regions is None:
94
+ return edges
95
+
96
+ # Collect exterior rings from Voronoi polygons
97
+ if hasattr(regions, 'geoms'):
98
+ for region in regions.geoms:
99
+ if region.geom_type == 'Polygon':
100
+ edges.append(region.exterior)
101
+ elif regions.geom_type == 'Polygon':
102
+ edges.append(regions.exterior)
103
+
104
+ if not edges:
105
+ return []
106
+
107
+ # Merge all lines for unified processing
108
+ merged = unary_union(edges)
109
+
110
+ # Normalize to list of LineStrings
111
+ if hasattr(merged, 'geoms'):
112
+ return [g for g in merged.geoms if g.geom_type == 'LineString']
113
+ elif merged.geom_type == 'LineString':
114
+ return [merged]
115
+ else:
116
+ return []
117
+
118
+
119
+ def classify_road_type(
120
+ line: LineString,
121
+ site_center: Point,
122
+ center_threshold: float = 100.0,
123
+ length_threshold: float = 400.0
124
+ ) -> str:
125
+ """
126
+ Classify a road line as 'main' or 'internal'.
127
+
128
+ Heuristic: Roads near center or very long are main roads.
129
+
130
+ Args:
131
+ line: Road centerline
132
+ site_center: Center point of the site
133
+ center_threshold: Distance threshold from center (m)
134
+ length_threshold: Length threshold (m)
135
+
136
+ Returns:
137
+ 'main' or 'internal'
138
+ """
139
+ dist_to_center = line.distance(site_center)
140
+
141
+ if dist_to_center < center_threshold or line.length > length_threshold:
142
+ return 'main'
143
+ else:
144
+ return 'internal'
145
+
146
+
147
+ def create_road_buffer(
148
+ line: LineString,
149
+ road_type: str,
150
+ main_width: float = 30.0,
151
+ internal_width: float = 15.0,
152
+ sidewalk_width: float = 4.0
153
+ ) -> Polygon:
154
+ """
155
+ Create road polygon by buffering centerline.
156
+
157
+ Args:
158
+ line: Road centerline
159
+ road_type: 'main' or 'internal'
160
+ main_width: Main road width (m)
161
+ internal_width: Internal road width (m)
162
+ sidewalk_width: Sidewalk width each side (m)
163
+
164
+ Returns:
165
+ Road polygon (buffered centerline)
166
+ """
167
+ if road_type == 'main':
168
+ width = main_width + 2 * sidewalk_width
169
+ else:
170
+ width = internal_width + 2 * sidewalk_width
171
+
172
+ # Use flat cap and mitre join for road-like appearance
173
+ return line.buffer(width / 2, cap_style=2, join_style=2)
algorithms/backend/core/infrastructure/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from core.infrastructure.network_planner import generate_loop_network
2
+ from core.infrastructure.transformer_planner import generate_transformers
3
+ from core.infrastructure.drainage_planner import calculate_drainage
algorithms/backend/core/infrastructure/drainage_planner.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Drainage flow planning for gravity-based wastewater systems.
3
+
4
+ Calculates flow directions from each lot towards the
5
+ Wastewater Treatment Plant (WWTP/XLNT) located at the lowest elevation.
6
+ """
7
+
8
+ import logging
9
+ import math
10
+ from typing import List, Dict, Any, Optional
11
+
12
+ from shapely.geometry import Polygon, Point
13
+
14
+ from core.config.settings import InfrastructureSettings, DEFAULT_SETTINGS
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ def calculate_drainage(
20
+ lots: List[Polygon],
21
+ wwtp_centroid: Optional[Point],
22
+ arrow_length: Optional[float] = None
23
+ ) -> List[Dict[str, Any]]:
24
+ """
25
+ Calculate drainage flow direction towards WWTP (gravity flow).
26
+
27
+ Creates directional arrows from each lot centroid pointing
28
+ towards the wastewater treatment plant.
29
+
30
+ Args:
31
+ lots: List of lot polygons
32
+ wwtp_centroid: Location of WWTP (lowest elevation)
33
+ arrow_length: Visualization arrow length (m)
34
+
35
+ Returns:
36
+ List of dicts with 'start' and 'vector' keys
37
+ """
38
+ settings = DEFAULT_SETTINGS.infrastructure
39
+ arrow_len = arrow_length or settings.drainage_arrow_length
40
+
41
+ arrows = []
42
+
43
+ if not wwtp_centroid:
44
+ logger.warning("No WWTP location provided, skipping drainage calculation")
45
+ return arrows
46
+
47
+ if not lots:
48
+ return arrows
49
+
50
+ for lot in lots:
51
+ try:
52
+ c = lot.centroid
53
+
54
+ # Vector from lot to WWTP
55
+ dx = wwtp_centroid.x - c.x
56
+ dy = wwtp_centroid.y - c.y
57
+
58
+ # Calculate length (with safe division)
59
+ length = math.sqrt(dx * dx + dy * dy)
60
+
61
+ if length > 0:
62
+ # Normalize vector and scale to arrow length
63
+ norm_dx = dx / length * arrow_len
64
+ norm_dy = dy / length * arrow_len
65
+
66
+ arrows.append({
67
+ 'start': (c.x, c.y),
68
+ 'vector': (norm_dx, norm_dy)
69
+ })
70
+ else:
71
+ # Lot is at WWTP location (unlikely but handle it)
72
+ logger.debug("Lot centroid coincides with WWTP")
73
+
74
+ except Exception as e:
75
+ logger.warning(f"Error calculating drainage for lot: {e}")
76
+ continue
77
+
78
+ logger.debug(f"Calculated drainage for {len(arrows)} lots")
79
+ return arrows
algorithms/backend/core/infrastructure/network_planner.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Electrical network planning using MST with loop redundancy.
3
+
4
+ Creates a minimum spanning tree network between lots, then adds
5
+ redundant edges for reliability (loop network).
6
+ """
7
+
8
+ import logging
9
+ from typing import List, Tuple
10
+
11
+ import numpy as np
12
+ import networkx as nx
13
+ from shapely.geometry import Polygon, LineString
14
+
15
+ from core.config.settings import InfrastructureSettings, DEFAULT_SETTINGS
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ def generate_loop_network(
21
+ lots: List[Polygon],
22
+ max_distance: float = None,
23
+ redundancy_ratio: float = None
24
+ ) -> Tuple[List[List[float]], List[LineString]]:
25
+ """
26
+ Generate Loop Network for electrical/utility infrastructure.
27
+
28
+ Creates MST (Minimum Spanning Tree) then adds back 15% of edges
29
+ for redundancy/safety (loop network pattern).
30
+
31
+ Args:
32
+ lots: List of lot polygons
33
+ max_distance: Maximum connection distance (m)
34
+ redundancy_ratio: Extra edges to add (0.0-1.0)
35
+
36
+ Returns:
37
+ (points, connection_lines) where:
38
+ - points: List of [x, y] coordinates
39
+ - connection_lines: List of LineString connections
40
+ """
41
+ settings = DEFAULT_SETTINGS.infrastructure
42
+ max_dist = max_distance or settings.max_connection_distance
43
+ redundancy = redundancy_ratio or settings.loop_redundancy_ratio
44
+
45
+ if len(lots) < 2:
46
+ logger.warning("Need at least 2 lots for network generation")
47
+ return [], []
48
+
49
+ # Get lot centroids
50
+ centroids = [lot.centroid for lot in lots]
51
+ points = [[p.x, p.y] for p in centroids]
52
+
53
+ # Build full graph with nearby connections
54
+ G = nx.Graph()
55
+ for i, p in enumerate(centroids):
56
+ G.add_node(i, pos=(p.x, p.y))
57
+
58
+ # Add edges for all pairs within max distance
59
+ for i in range(len(centroids)):
60
+ for j in range(i + 1, len(centroids)):
61
+ dist = centroids[i].distance(centroids[j])
62
+ if dist < max_dist:
63
+ G.add_edge(i, j, weight=dist)
64
+
65
+ # Handle disconnected graph
66
+ if not nx.is_connected(G):
67
+ components = list(nx.connected_components(G))
68
+ largest_comp = max(components, key=len)
69
+ G = G.subgraph(largest_comp).copy()
70
+ logger.warning(f"Graph disconnected, using largest component ({len(largest_comp)} nodes)")
71
+
72
+ if G.number_of_edges() == 0:
73
+ logger.warning("No edges in graph")
74
+ return points, []
75
+
76
+ # Create Minimum Spanning Tree
77
+ mst = nx.minimum_spanning_tree(G)
78
+
79
+ # Create Loop: Add back redundant edges for safety
80
+ all_edges = sorted(G.edges(data=True), key=lambda x: x[2]['weight'])
81
+ loop_graph = mst.copy()
82
+
83
+ target_extra = int(len(lots) * redundancy)
84
+ added_count = 0
85
+
86
+ for u, v, data in all_edges:
87
+ if not loop_graph.has_edge(u, v):
88
+ loop_graph.add_edge(u, v, **data)
89
+ added_count += 1
90
+ if added_count >= target_extra:
91
+ break
92
+
93
+ # Convert to LineStrings
94
+ connections = []
95
+ for u, v in loop_graph.edges():
96
+ if u < len(centroids) and v < len(centroids):
97
+ connections.append(LineString([centroids[u], centroids[v]]))
98
+
99
+ logger.debug(f"Generated network: {len(connections)} connections, {added_count} redundant")
100
+ return points, connections
algorithms/backend/core/infrastructure/transformer_planner.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Transformer station placement using K-Means clustering.
3
+
4
+ Optimally positions electrical transformer stations to serve
5
+ lots within a defined service radius.
6
+ """
7
+
8
+ import logging
9
+ from typing import List, Tuple, Optional
10
+
11
+ import numpy as np
12
+ from sklearn.cluster import KMeans
13
+ from shapely.geometry import Polygon
14
+
15
+ from core.config.settings import InfrastructureSettings, DEFAULT_SETTINGS
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ def generate_transformers(
21
+ lots: List[Polygon],
22
+ lots_per_transformer: Optional[int] = None,
23
+ service_radius: Optional[float] = None
24
+ ) -> List[Tuple[float, float]]:
25
+ """
26
+ Cluster lots to determine optimal transformer placements.
27
+
28
+ Uses K-Means clustering with dynamic k based on lot count.
29
+
30
+ Args:
31
+ lots: List of lot polygons
32
+ lots_per_transformer: Approximate lots per transformer
33
+ service_radius: Service radius (for reference, not used in clustering)
34
+
35
+ Returns:
36
+ List of (x, y) transformer locations
37
+ """
38
+ settings = DEFAULT_SETTINGS.infrastructure
39
+ lots_per_tf = lots_per_transformer or settings.lots_per_transformer
40
+
41
+ if not lots:
42
+ return []
43
+
44
+ if len(lots) == 1:
45
+ # Single lot - transformer at centroid
46
+ c = lots[0].centroid
47
+ return [(c.x, c.y)]
48
+
49
+ # Get lot centroids
50
+ lot_coords = np.array([
51
+ [lot.centroid.x, lot.centroid.y]
52
+ for lot in lots
53
+ ])
54
+
55
+ # Calculate number of transformers
56
+ num_transformers = max(1, len(lots) // lots_per_tf)
57
+
58
+ # Don't exceed number of lots
59
+ num_transformers = min(num_transformers, len(lots))
60
+
61
+ # K-Means clustering
62
+ try:
63
+ kmeans = KMeans(
64
+ n_clusters=num_transformers,
65
+ n_init=10,
66
+ random_state=42
67
+ )
68
+ kmeans.fit(lot_coords)
69
+
70
+ centers = [tuple(c) for c in kmeans.cluster_centers_]
71
+ logger.debug(f"Placed {len(centers)} transformers for {len(lots)} lots")
72
+ return centers
73
+
74
+ except Exception as e:
75
+ logger.error(f"K-Means clustering failed: {e}")
76
+ # Fallback: centroid of all lots
77
+ mean_x = np.mean(lot_coords[:, 0])
78
+ mean_y = np.mean(lot_coords[:, 1])
79
+ return [(mean_x, mean_y)]
algorithms/backend/core/optimization/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from core.optimization.grid_optimizer import GridOptimizer
2
+ from core.optimization.subdivision_solver import SubdivisionSolver
algorithms/backend/core/optimization/grid_optimizer.py ADDED
@@ -0,0 +1,240 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Grid layout optimization using NSGA-II genetic algorithm.
3
+
4
+ Optimizes grid spacing and rotation angle to maximize usable land area
5
+ while minimizing fragmented blocks.
6
+ """
7
+
8
+ import random
9
+ import logging
10
+ from typing import List, Tuple, Optional
11
+
12
+ import numpy as np
13
+ from shapely.geometry import Polygon, Point
14
+ from shapely.affinity import translate, rotate
15
+ from deap import base, creator, tools, algorithms
16
+
17
+ from core.config.settings import OptimizationSettings, DEFAULT_SETTINGS
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+
22
+ class GridOptimizer:
23
+ """
24
+ Stage 1: Optimize grid layout using NSGA-II genetic algorithm.
25
+
26
+ Multi-objective optimization:
27
+ - Maximize residential/commercial area
28
+ - Minimize fragmented blocks
29
+ """
30
+
31
+ def __init__(
32
+ self,
33
+ land_polygon: Polygon,
34
+ lake_polygon: Optional[Polygon] = None,
35
+ settings: Optional[OptimizationSettings] = None
36
+ ):
37
+ """
38
+ Initialize grid optimizer.
39
+
40
+ Args:
41
+ land_polygon: Main land boundary
42
+ lake_polygon: Water body to exclude (optional)
43
+ settings: Optimization settings (uses defaults if None)
44
+ """
45
+ self.land_poly = land_polygon
46
+ self.lake_poly = lake_polygon or Polygon()
47
+ self.settings = settings or DEFAULT_SETTINGS.optimization
48
+
49
+ self._setup_deap()
50
+
51
+ def _setup_deap(self) -> None:
52
+ """Configure DEAP toolbox for multi-objective optimization."""
53
+ # Create fitness and individual classes (check if already exists)
54
+ if not hasattr(creator, "FitnessMulti"):
55
+ creator.create("FitnessMulti", base.Fitness, weights=(1.0, -1.0))
56
+ if not hasattr(creator, "Individual"):
57
+ creator.create("Individual", list, fitness=creator.FitnessMulti)
58
+
59
+ self.toolbox = base.Toolbox()
60
+
61
+ # Gene definitions
62
+ spacing_min, spacing_max = self.settings.spacing_bounds
63
+ angle_min, angle_max = self.settings.angle_bounds
64
+
65
+ self.toolbox.register("attr_spacing", random.uniform, spacing_min, spacing_max)
66
+ self.toolbox.register("attr_angle", random.uniform, angle_min, angle_max)
67
+
68
+ self.toolbox.register(
69
+ "individual",
70
+ tools.initCycle,
71
+ creator.Individual,
72
+ (self.toolbox.attr_spacing, self.toolbox.attr_angle),
73
+ n=1
74
+ )
75
+ self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
76
+
77
+ # Genetic operators
78
+ self.toolbox.register("evaluate", self._evaluate_layout)
79
+ self.toolbox.register(
80
+ "mate",
81
+ tools.cxSimulatedBinaryBounded,
82
+ low=[spacing_min, angle_min],
83
+ up=[spacing_max, angle_max],
84
+ eta=self.settings.eta
85
+ )
86
+ self.toolbox.register(
87
+ "mutate",
88
+ tools.mutPolynomialBounded,
89
+ low=[spacing_min, angle_min],
90
+ up=[spacing_max, angle_max],
91
+ eta=self.settings.eta,
92
+ indpb=0.2
93
+ )
94
+ self.toolbox.register("select", tools.selNSGA2)
95
+
96
+ def generate_grid_candidates(
97
+ self,
98
+ spacing: float,
99
+ angle_deg: float
100
+ ) -> List[Polygon]:
101
+ """
102
+ Generate grid blocks at given spacing and rotation.
103
+
104
+ Args:
105
+ spacing: Grid spacing in meters
106
+ angle_deg: Rotation angle in degrees
107
+
108
+ Returns:
109
+ List of block polygons
110
+ """
111
+ minx, miny, maxx, maxy = self.land_poly.bounds
112
+ diameter = ((maxx - minx)**2 + (maxy - miny)**2)**0.5
113
+ center = self.land_poly.centroid
114
+
115
+ # Create grid ranges (extend beyond bounds to cover after rotation)
116
+ x_range = np.arange(minx - diameter, maxx + diameter, spacing)
117
+ y_range = np.arange(miny - diameter, maxy + diameter, spacing)
118
+
119
+ blocks = []
120
+
121
+ # Create base block at origin
122
+ base_block = Polygon([
123
+ (0, 0),
124
+ (spacing, 0),
125
+ (spacing, spacing),
126
+ (0, spacing)
127
+ ])
128
+ base_block = translate(base_block, -spacing/2, -spacing/2)
129
+
130
+ for x in x_range:
131
+ for y in y_range:
132
+ # Translate and rotate block
133
+ poly = translate(base_block, x, y)
134
+ poly = rotate(poly, angle_deg, origin=center)
135
+
136
+ # Only keep blocks that intersect land
137
+ if poly.intersects(self.land_poly):
138
+ blocks.append(poly)
139
+
140
+ return blocks
141
+
142
+ def _evaluate_layout(self, individual: List[float]) -> Tuple[float, int]:
143
+ """
144
+ Evaluate layout fitness.
145
+
146
+ Objectives:
147
+ 1. Maximize residential area (positive weight)
148
+ 2. Minimize fragmented blocks (negative weight)
149
+
150
+ Args:
151
+ individual: [spacing, angle]
152
+
153
+ Returns:
154
+ (total_residential_area, fragmented_blocks)
155
+ """
156
+ spacing, angle = individual
157
+ blocks = self.generate_grid_candidates(spacing, angle)
158
+
159
+ total_residential_area = 0.0
160
+ fragmented_blocks = 0
161
+
162
+ original_area = spacing * spacing
163
+
164
+ for blk in blocks:
165
+ # Cut with land boundary
166
+ intersection = blk.intersection(self.land_poly)
167
+ if intersection.is_empty:
168
+ continue
169
+
170
+ # Subtract lake/water body
171
+ usable_part = intersection.difference(self.lake_poly)
172
+ if usable_part.is_empty:
173
+ continue
174
+
175
+ # Calculate area ratio
176
+ ratio = usable_part.area / original_area
177
+
178
+ # Classify block quality
179
+ if ratio > self.settings.good_block_ratio:
180
+ # Good block for residential/commercial
181
+ total_residential_area += usable_part.area
182
+ elif ratio > self.settings.fragmented_block_ratio:
183
+ # Fragmented block (penalize)
184
+ fragmented_blocks += 1
185
+
186
+ return (total_residential_area, fragmented_blocks)
187
+
188
+ def optimize(
189
+ self,
190
+ population_size: Optional[int] = None,
191
+ generations: Optional[int] = None
192
+ ) -> Tuple[List[float], List[List[float]]]:
193
+ """
194
+ Run NSGA-II optimization.
195
+
196
+ Args:
197
+ population_size: Population size (uses settings if None)
198
+ generations: Number of generations (uses settings if None)
199
+
200
+ Returns:
201
+ (best_solution, history) where best_solution is [spacing, angle]
202
+ """
203
+ pop_size = population_size or self.settings.population_size
204
+ num_gens = generations or self.settings.generations
205
+
206
+ random.seed(DEFAULT_SETTINGS.random_seed)
207
+ pop = self.toolbox.population(n=pop_size)
208
+
209
+ history = []
210
+
211
+ # Initial evaluation
212
+ fits = list(map(self.toolbox.evaluate, pop))
213
+ for ind, fit in zip(pop, fits):
214
+ ind.fitness.values = fit
215
+
216
+ # Save best from generation 0
217
+ best_ind = tools.selBest(pop, 1)[0]
218
+ history.append(list(best_ind))
219
+
220
+ # Evolution loop
221
+ for gen in range(num_gens):
222
+ offspring = algorithms.varAnd(
223
+ pop,
224
+ self.toolbox,
225
+ cxpb=self.settings.crossover_probability,
226
+ mutpb=self.settings.mutation_probability
227
+ )
228
+ fits = list(map(self.toolbox.evaluate, offspring))
229
+ for ind, fit in zip(offspring, fits):
230
+ ind.fitness.values = fit
231
+ pop = self.toolbox.select(pop + offspring, k=len(pop))
232
+
233
+ # Track best solution per generation
234
+ best_ind = tools.selBest(pop, 1)[0]
235
+ history.append(list(best_ind))
236
+
237
+ final_best = tools.selBest(pop, 1)[0]
238
+ logger.info(f"Optimization complete: spacing={final_best[0]:.2f}, angle={final_best[1]:.2f}")
239
+
240
+ return list(final_best), history
algorithms/backend/core/optimization/subdivision_solver.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Block subdivision solver using OR-Tools constraint programming.
3
+
4
+ Optimizes lot widths within blocks to meet target dimensions while
5
+ respecting minimum/maximum constraints.
6
+ """
7
+
8
+ import logging
9
+ from typing import List, Dict, Any, Optional
10
+
11
+ import numpy as np
12
+ from shapely.geometry import Polygon
13
+ from ortools.sat.python import cp_model
14
+
15
+ from core.config.settings import SubdivisionSettings, DEFAULT_SETTINGS
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ class SubdivisionSolver:
21
+ """
22
+ Stage 2: Optimize block subdivision using OR-Tools CP-SAT solver.
23
+
24
+ Solves for optimal lot widths that:
25
+ - Sum to total available length
26
+ - Stay within min/max bounds
27
+ - Minimize deviation from target width
28
+ """
29
+
30
+ @staticmethod
31
+ def solve_subdivision(
32
+ total_length: float,
33
+ min_width: float,
34
+ max_width: float,
35
+ target_width: float,
36
+ time_limit: float = 5.0
37
+ ) -> List[float]:
38
+ """
39
+ Solve optimal lot widths using constraint programming.
40
+
41
+ Args:
42
+ total_length: Total length to subdivide
43
+ min_width: Minimum lot width
44
+ max_width: Maximum lot width
45
+ target_width: Target lot width
46
+ time_limit: Solver time limit in seconds
47
+
48
+ Returns:
49
+ List of lot widths
50
+ """
51
+ # Input validation
52
+ if total_length <= 0:
53
+ logger.warning("Total length must be positive")
54
+ return []
55
+ if min_width <= 0:
56
+ logger.warning("Minimum width must be positive")
57
+ return []
58
+ if total_length < min_width:
59
+ logger.warning(f"Total length ({total_length}) < min width ({min_width})")
60
+ return []
61
+ if min_width > max_width:
62
+ logger.warning("Min width > max width")
63
+ return []
64
+ if target_width < min_width or target_width > max_width:
65
+ target_width = (min_width + max_width) / 2
66
+ logger.info(f"Target width adjusted to {target_width}")
67
+
68
+ model = cp_model.CpModel()
69
+
70
+ # Estimate number of lots
71
+ max_lots = int(total_length / min_width) + 1
72
+
73
+ # Decision variables: lot widths (scaled to integers for CP)
74
+ scale = 100 # 1cm precision
75
+ lot_vars = [
76
+ model.NewIntVar(
77
+ int(min_width * scale),
78
+ int(max_width * scale),
79
+ f'lot_{i}'
80
+ )
81
+ for i in range(max_lots)
82
+ ]
83
+
84
+ # Used lot indicators
85
+ used = [model.NewBoolVar(f'used_{i}') for i in range(max_lots)]
86
+
87
+ # Constraint: Sum of widths equals total length
88
+ model.Add(
89
+ sum(lot_vars[i] for i in range(max_lots)) == int(total_length * scale)
90
+ )
91
+
92
+ # Constraint: Lot ordering (if used[i], then used[i-1] must be true)
93
+ for i in range(1, max_lots):
94
+ model.Add(used[i] <= used[i-1])
95
+
96
+ # Constraint: Connect lot values to usage
97
+ for i in range(max_lots):
98
+ model.Add(lot_vars[i] >= int(min_width * scale)).OnlyEnforceIf(used[i])
99
+ model.Add(lot_vars[i] == 0).OnlyEnforceIf(used[i].Not())
100
+
101
+ # Objective: Minimize deviation from target
102
+ deviations = [
103
+ model.NewIntVar(0, int((max_width - min_width) * scale), f'dev_{i}')
104
+ for i in range(max_lots)
105
+ ]
106
+
107
+ target_scaled = int(target_width * scale)
108
+ for i in range(max_lots):
109
+ model.AddAbsEquality(deviations[i], lot_vars[i] - target_scaled)
110
+
111
+ model.Minimize(sum(deviations))
112
+
113
+ # Solve
114
+ solver = cp_model.CpSolver()
115
+ solver.parameters.max_time_in_seconds = time_limit
116
+ status = solver.Solve(model)
117
+
118
+ # Extract solution
119
+ if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
120
+ widths = []
121
+ for i in range(max_lots):
122
+ if solver.Value(used[i]):
123
+ widths.append(solver.Value(lot_vars[i]) / scale)
124
+ logger.debug(f"Subdivision solved: {len(widths)} lots")
125
+ return widths
126
+ else:
127
+ # Fallback: uniform division
128
+ logger.warning("CP solver failed, using uniform fallback")
129
+ num_lots = max(1, int(total_length / target_width))
130
+ return [total_length / num_lots] * num_lots
131
+
132
+ @staticmethod
133
+ def subdivide_block(
134
+ block_geom: Polygon,
135
+ spacing: float,
136
+ min_width: float,
137
+ max_width: float,
138
+ target_width: float,
139
+ time_limit: float = 5.0,
140
+ setback_dist: float = 6.0
141
+ ) -> Dict[str, Any]:
142
+ """
143
+ Subdivide a block into lots.
144
+
145
+ Args:
146
+ block_geom: Block geometry
147
+ spacing: Grid spacing (for quality calculation)
148
+ min_width: Minimum lot width
149
+ max_width: Maximum lot width
150
+ target_width: Target lot width
151
+ time_limit: Solver time limit
152
+ setback_dist: Building setback distance
153
+
154
+ Returns:
155
+ Dictionary with subdivision info:
156
+ - geometry: Original block
157
+ - type: 'residential' or 'park'
158
+ - lots: List of lot info dicts
159
+ """
160
+ # Determine block quality
161
+ original_area = spacing * spacing
162
+ current_area = block_geom.area
163
+
164
+ # Safety check for division
165
+ if original_area <= 0:
166
+ ratio = 0.0
167
+ else:
168
+ ratio = current_area / original_area
169
+
170
+ result = {
171
+ 'geometry': block_geom,
172
+ 'type': 'unknown',
173
+ 'lots': []
174
+ }
175
+
176
+ # Fragmented blocks become parks
177
+ if ratio < 0.6:
178
+ result['type'] = 'park'
179
+ return result
180
+
181
+ # Good blocks become residential/commercial
182
+ result['type'] = 'residential'
183
+
184
+ # Solve subdivision
185
+ minx, miny, maxx, maxy = block_geom.bounds
186
+ total_width = maxx - minx
187
+
188
+ # Adaptive time limit based on block size
189
+ adaptive_time = min(time_limit, max(0.5, total_width / 100))
190
+
191
+ lot_widths = SubdivisionSolver.solve_subdivision(
192
+ total_width, min_width, max_width, target_width, adaptive_time
193
+ )
194
+
195
+ # Create lot geometries
196
+ current_x = minx
197
+
198
+ for width in lot_widths:
199
+ lot_poly = Polygon([
200
+ (current_x, miny),
201
+ (current_x + width, miny),
202
+ (current_x + width, maxy),
203
+ (current_x, maxy)
204
+ ])
205
+
206
+ # Clip to block boundary
207
+ clipped = lot_poly.intersection(block_geom)
208
+ if not clipped.is_empty and clipped.geom_type == 'Polygon':
209
+ # Calculate setback (buildable area)
210
+ buildable = clipped.buffer(-setback_dist)
211
+ if buildable.is_empty or not buildable.is_valid:
212
+ buildable = None
213
+
214
+ result['lots'].append({
215
+ 'geometry': clipped,
216
+ 'width': width,
217
+ 'buildable': buildable
218
+ })
219
+
220
+ current_x += width
221
+
222
+ return result
algorithms/backend/main.py CHANGED
@@ -1,34 +1,43 @@
1
  """FastAPI application entry point."""
2
 
 
3
  from fastapi import FastAPI
4
  from fastapi.middleware.cors import CORSMiddleware
5
 
6
- from routes import router
7
- from models import HealthResponse
 
 
 
 
 
 
 
8
 
9
  app = FastAPI(
10
  title="Land Redistribution Algorithm API",
11
  description="API for testing land subdivision and redistribution algorithms",
12
- version="1.0.0"
13
  )
14
 
15
  # Configure CORS
16
  app.add_middleware(
17
  CORSMiddleware,
18
- allow_origins=["*"], # Allow all origins for development
19
  allow_credentials=True,
20
  allow_methods=["*"],
21
  allow_headers=["*"],
22
  )
23
 
24
  # Include routes
25
- app.include_router(router, prefix="/api")
 
26
 
27
 
28
  @app.get("/health", response_model=HealthResponse)
29
  async def health_check():
30
  """Health check endpoint."""
31
- return HealthResponse(status="healthy", version="1.0.0")
32
 
33
 
34
  @app.get("/")
@@ -36,7 +45,13 @@ async def root():
36
  """Root endpoint with API information."""
37
  return {
38
  "message": "Land Redistribution Algorithm API",
39
- "version": "1.0.0",
40
  "docs": "/docs",
41
  "health": "/health"
42
  }
 
 
 
 
 
 
 
1
  """FastAPI application entry point."""
2
 
3
+ import logging
4
  from fastapi import FastAPI
5
  from fastapi.middleware.cors import CORSMiddleware
6
 
7
+ from api.schemas.response_schemas import HealthResponse
8
+ from api.routes import optim_router, dxf_router
9
+
10
+ # Configure logging
11
+ logging.basicConfig(
12
+ level=logging.INFO,
13
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
14
+ )
15
+ logger = logging.getLogger(__name__)
16
 
17
  app = FastAPI(
18
  title="Land Redistribution Algorithm API",
19
  description="API for testing land subdivision and redistribution algorithms",
20
+ version="2.0.0"
21
  )
22
 
23
  # Configure CORS
24
  app.add_middleware(
25
  CORSMiddleware,
26
+ allow_origins=["*"],
27
  allow_credentials=True,
28
  allow_methods=["*"],
29
  allow_headers=["*"],
30
  )
31
 
32
  # Include routes
33
+ app.include_router(optim_router, prefix="/api")
34
+ app.include_router(dxf_router, prefix="/api")
35
 
36
 
37
  @app.get("/health", response_model=HealthResponse)
38
  async def health_check():
39
  """Health check endpoint."""
40
+ return HealthResponse(status="healthy", version="2.0.0")
41
 
42
 
43
  @app.get("/")
 
45
  """Root endpoint with API information."""
46
  return {
47
  "message": "Land Redistribution Algorithm API",
48
+ "version": "2.0.0",
49
  "docs": "/docs",
50
  "health": "/health"
51
  }
52
+
53
+
54
+ @app.on_event("startup")
55
+ async def startup_event():
56
+ """Log startup information."""
57
+ logger.info("Land Redistribution API started (v2.0.0 - Modular Architecture)")
algorithms/backend/pipeline/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from pipeline.land_redistribution import LandRedistributionPipeline
algorithms/backend/pipeline/land_redistribution.py ADDED
@@ -0,0 +1,358 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Main land redistribution pipeline orchestration.
3
+
4
+ Coordinates all stages of the optimization pipeline:
5
+ 1. Road network generation (Voronoi or Grid-based)
6
+ 2. Block subdivision (OR-Tools)
7
+ 3. Infrastructure planning (MST, Transformers, Drainage)
8
+ """
9
+
10
+ import logging
11
+ import random
12
+ from typing import List, Dict, Any, Tuple, Optional
13
+
14
+ import numpy as np
15
+ from shapely.geometry import Polygon, Point, mapping
16
+ from shapely.ops import unary_union
17
+
18
+ from core.config.settings import (
19
+ AlgorithmSettings,
20
+ DEFAULT_SETTINGS,
21
+ ROAD_MAIN_WIDTH,
22
+ ROAD_INTERNAL_WIDTH,
23
+ SIDEWALK_WIDTH,
24
+ TURNING_RADIUS,
25
+ SERVICE_AREA_RATIO,
26
+ MIN_BLOCK_AREA,
27
+ )
28
+ from core.geometry.polygon_utils import (
29
+ get_elevation,
30
+ normalize_geometry_list,
31
+ filter_by_min_area,
32
+ sort_by_elevation,
33
+ )
34
+ from core.geometry.voronoi import (
35
+ generate_voronoi_seeds,
36
+ create_voronoi_diagram,
37
+ extract_voronoi_edges,
38
+ classify_road_type,
39
+ create_road_buffer,
40
+ )
41
+ from core.optimization.grid_optimizer import GridOptimizer
42
+ from core.optimization.subdivision_solver import SubdivisionSolver
43
+ from core.infrastructure.network_planner import generate_loop_network
44
+ from core.infrastructure.transformer_planner import generate_transformers
45
+ from core.infrastructure.drainage_planner import calculate_drainage
46
+
47
+ logger = logging.getLogger(__name__)
48
+
49
+
50
+ class LandRedistributionPipeline:
51
+ """
52
+ Main pipeline orchestrating all optimization stages.
53
+
54
+ Supports two modes:
55
+ 1. Voronoi-based road network (default, more organic layout)
56
+ 2. Grid-based optimization using NSGA-II (fallback)
57
+ """
58
+
59
+ def __init__(
60
+ self,
61
+ land_polygons: List[Polygon],
62
+ config: Dict[str, Any],
63
+ settings: Optional[AlgorithmSettings] = None
64
+ ):
65
+ """
66
+ Initialize pipeline.
67
+
68
+ Args:
69
+ land_polygons: Input land plots
70
+ config: API configuration dictionary
71
+ settings: Algorithm settings (optional)
72
+ """
73
+ self.land_poly = unary_union(land_polygons)
74
+ self.config = config
75
+ self.settings = settings or AlgorithmSettings.from_dict(config)
76
+ self.lake_poly = Polygon() # No lake by default
77
+
78
+ logger.info(f"Pipeline initialized with land area: {self.land_poly.area:.2f} m²")
79
+
80
+ def generate_road_network(
81
+ self,
82
+ num_seeds: int = 15
83
+ ) -> Tuple[Polygon, List[Polygon], List[Polygon]]:
84
+ """
85
+ Generate road network using Voronoi diagram.
86
+
87
+ Args:
88
+ num_seeds: Number of Voronoi seed points
89
+
90
+ Returns:
91
+ (road_network, service_blocks, commercial_blocks)
92
+ """
93
+ site = self.land_poly
94
+
95
+ # Generate Voronoi seeds
96
+ seeds = generate_voronoi_seeds(site, num_seeds)
97
+
98
+ # Create Voronoi diagram
99
+ regions = create_voronoi_diagram(seeds, site)
100
+ if regions is None:
101
+ logger.warning("Voronoi generation failed, returning empty")
102
+ return Polygon(), [], [site]
103
+
104
+ # Extract edges
105
+ edges = extract_voronoi_edges(regions)
106
+ if not edges:
107
+ return Polygon(), [], [site]
108
+
109
+ # Create road buffers
110
+ center = site.centroid
111
+ road_polys = []
112
+
113
+ for line in edges:
114
+ road_type = classify_road_type(line, center)
115
+ road_buffer = create_road_buffer(
116
+ line,
117
+ road_type,
118
+ main_width=ROAD_MAIN_WIDTH,
119
+ internal_width=ROAD_INTERNAL_WIDTH,
120
+ sidewalk_width=SIDEWALK_WIDTH
121
+ )
122
+ road_polys.append(road_buffer)
123
+
124
+ if not road_polys:
125
+ return Polygon(), [], [site]
126
+
127
+ # Merge road network
128
+ network_poly = unary_union(road_polys)
129
+
130
+ # Apply turning radius smoothing
131
+ smooth_network = network_poly.buffer(
132
+ TURNING_RADIUS, join_style=1
133
+ ).buffer(-TURNING_RADIUS, join_style=1)
134
+
135
+ # Extract blocks (land minus roads)
136
+ blocks_rough = site.difference(smooth_network)
137
+ candidates = normalize_geometry_list(blocks_rough)
138
+
139
+ # Filter by minimum area
140
+ valid_blocks = filter_by_min_area(candidates, MIN_BLOCK_AREA)
141
+
142
+ if not valid_blocks:
143
+ return smooth_network, [], []
144
+
145
+ # Sort by elevation (lowest first for WWTP)
146
+ sorted_blocks = sort_by_elevation(valid_blocks)
147
+
148
+ # Allocate service areas (10% of total)
149
+ total_area = sum(b.area for b in valid_blocks)
150
+ service_target = total_area * SERVICE_AREA_RATIO
151
+
152
+ service_blocks = []
153
+ commercial_blocks = []
154
+ accumulated = 0.0
155
+
156
+ for block in sorted_blocks:
157
+ if accumulated < service_target:
158
+ service_blocks.append(block)
159
+ accumulated += block.area
160
+ else:
161
+ commercial_blocks.append(block)
162
+
163
+ # Ensure at least one commercial block
164
+ if not commercial_blocks and service_blocks:
165
+ commercial_blocks.append(service_blocks.pop())
166
+
167
+ logger.info(f"Road network: {len(service_blocks)} service, {len(commercial_blocks)} commercial blocks")
168
+ return smooth_network, service_blocks, commercial_blocks
169
+
170
+ def run_stage1(self) -> Dict[str, Any]:
171
+ """Run grid optimization stage (NSGA-II)."""
172
+ optimizer = GridOptimizer(self.land_poly, self.lake_poly)
173
+
174
+ best_solution, history = optimizer.optimize(
175
+ population_size=self.config.get('population_size', 30),
176
+ generations=self.config.get('generations', 15)
177
+ )
178
+
179
+ spacing, angle = best_solution
180
+ blocks = optimizer.generate_grid_candidates(spacing, angle)
181
+
182
+ # Filter to usable blocks
183
+ usable_blocks = []
184
+ for blk in blocks:
185
+ intersection = blk.intersection(self.land_poly).difference(self.lake_poly)
186
+ if not intersection.is_empty:
187
+ usable_blocks.append(intersection)
188
+
189
+ return {
190
+ 'spacing': spacing,
191
+ 'angle': angle,
192
+ 'blocks': usable_blocks,
193
+ 'history': history,
194
+ 'metrics': {
195
+ 'total_blocks': len(usable_blocks),
196
+ 'optimal_spacing': spacing,
197
+ 'optimal_angle': angle
198
+ }
199
+ }
200
+
201
+ def run_stage2(
202
+ self,
203
+ blocks: List[Polygon],
204
+ spacing: float
205
+ ) -> Dict[str, Any]:
206
+ """Run subdivision stage (OR-Tools)."""
207
+ all_lots = []
208
+ parks = []
209
+
210
+ for block in blocks:
211
+ result = SubdivisionSolver.subdivide_block(
212
+ block,
213
+ spacing,
214
+ self.config.get('min_lot_width', 20.0),
215
+ self.config.get('max_lot_width', 80.0),
216
+ self.config.get('target_lot_width', 40.0),
217
+ self.config.get('ortools_time_limit', 5)
218
+ )
219
+
220
+ if result['type'] == 'park':
221
+ parks.append(result['geometry'])
222
+ else:
223
+ all_lots.extend(result['lots'])
224
+
225
+ avg_width = np.mean([lot['width'] for lot in all_lots]) if all_lots else 0
226
+
227
+ return {
228
+ 'lots': all_lots,
229
+ 'parks': parks,
230
+ 'metrics': {
231
+ 'total_lots': len(all_lots),
232
+ 'total_parks': len(parks),
233
+ 'avg_lot_width': avg_width
234
+ }
235
+ }
236
+
237
+ def classify_blocks(
238
+ self,
239
+ blocks: List[Polygon]
240
+ ) -> Dict[str, List[Polygon]]:
241
+ """Classify blocks into service and commercial categories."""
242
+ if not blocks:
243
+ return {'service': [], 'commercial': [], 'xlnt': []}
244
+
245
+ sorted_blocks = sort_by_elevation(blocks)
246
+
247
+ total_area = sum(b.area for b in blocks)
248
+ service_target = total_area * SERVICE_AREA_RATIO
249
+ accumulated = 0.0
250
+
251
+ xlnt_block = []
252
+ service_blocks = []
253
+ commercial_blocks = []
254
+
255
+ # Lowest block is XLNT (Wastewater Treatment)
256
+ if sorted_blocks:
257
+ xlnt = sorted_blocks.pop(0)
258
+ xlnt_block.append(xlnt)
259
+ accumulated += xlnt.area
260
+
261
+ # Fill remaining service quota
262
+ for b in sorted_blocks:
263
+ if accumulated < service_target:
264
+ service_blocks.append(b)
265
+ accumulated += b.area
266
+ else:
267
+ commercial_blocks.append(b)
268
+
269
+ return {
270
+ 'xlnt': xlnt_block,
271
+ 'service': service_blocks,
272
+ 'commercial': commercial_blocks
273
+ }
274
+
275
+ def run_full_pipeline(self) -> Dict[str, Any]:
276
+ """Run complete optimization pipeline with Voronoi road generation."""
277
+ logger.info("Starting full pipeline...")
278
+
279
+ # Stage 0: Voronoi Road Network
280
+ road_network, service_blocks_voronoi, commercial_blocks_voronoi = \
281
+ self.generate_road_network(num_seeds=15)
282
+
283
+ # Fallback to grid-based if Voronoi fails
284
+ if not commercial_blocks_voronoi:
285
+ logger.info("Voronoi failed, using grid-based approach")
286
+ stage1_result = self.run_stage1()
287
+ classification = self.classify_blocks(stage1_result['blocks'])
288
+ commercial_blocks_voronoi = classification['commercial']
289
+ service_blocks_voronoi = classification['service']
290
+ xlnt_blocks = classification['xlnt']
291
+ all_blocks = stage1_result['blocks']
292
+ road_network = self.land_poly.difference(unary_union(all_blocks))
293
+ spacing_for_subdivision = stage1_result['spacing']
294
+ else:
295
+ # Separate XLNT from service blocks
296
+ if service_blocks_voronoi:
297
+ xlnt_blocks = [service_blocks_voronoi[0]]
298
+ service_blocks_voronoi = service_blocks_voronoi[1:]
299
+ else:
300
+ xlnt_blocks = []
301
+
302
+ # Estimate spacing for subdivision
303
+ if commercial_blocks_voronoi:
304
+ avg_area = sum(b.area for b in commercial_blocks_voronoi) / len(commercial_blocks_voronoi)
305
+ spacing_for_subdivision = max(20.0, (avg_area ** 0.5) * 0.7)
306
+ else:
307
+ spacing_for_subdivision = 25.0
308
+
309
+ # Stage 2: Subdivision
310
+ stage2_result = self.run_stage2(
311
+ commercial_blocks_voronoi,
312
+ spacing_for_subdivision
313
+ )
314
+
315
+ # Collect all polygons for infrastructure
316
+ all_network_nodes = stage2_result['lots'] + \
317
+ [{'geometry': b, 'type': 'service'} for b in service_blocks_voronoi] + \
318
+ [{'geometry': b, 'type': 'xlnt'} for b in xlnt_blocks]
319
+
320
+ infra_polys = [item['geometry'] for item in all_network_nodes]
321
+
322
+ # Stage 3: Infrastructure
323
+ points, connections = generate_loop_network(infra_polys)
324
+ transformers = generate_transformers(infra_polys)
325
+
326
+ wwtp_center = xlnt_blocks[0].centroid if xlnt_blocks else None
327
+ drainage = calculate_drainage(infra_polys, wwtp_center)
328
+
329
+ logger.info(f"Pipeline complete: {len(stage2_result['lots'])} lots, {len(connections)} connections")
330
+
331
+ return {
332
+ 'stage1': {
333
+ 'blocks': commercial_blocks_voronoi + service_blocks_voronoi + xlnt_blocks,
334
+ 'metrics': {
335
+ 'total_blocks': len(commercial_blocks_voronoi) + len(service_blocks_voronoi) + len(xlnt_blocks)
336
+ },
337
+ 'spacing': spacing_for_subdivision,
338
+ 'angle': 0.0
339
+ },
340
+ 'stage2': stage2_result,
341
+ 'classification': {
342
+ 'xlnt_count': len(xlnt_blocks),
343
+ 'service_count': len(service_blocks_voronoi),
344
+ 'commercial_count': len(commercial_blocks_voronoi),
345
+ 'xlnt': xlnt_blocks,
346
+ 'service': service_blocks_voronoi
347
+ },
348
+ 'stage3': {
349
+ 'points': points,
350
+ 'connections': [list(line.coords) for line in connections],
351
+ 'drainage': drainage,
352
+ 'transformers': transformers,
353
+ 'road_network': mapping(road_network)
354
+ },
355
+ 'total_lots': stage2_result['metrics']['total_lots'],
356
+ 'service_blocks': [list(b.exterior.coords) for b in service_blocks_voronoi],
357
+ 'xlnt_blocks': [list(b.exterior.coords) for b in xlnt_blocks]
358
+ }
algorithms/backend/test_basic.py CHANGED
@@ -3,7 +3,6 @@
3
  import sys
4
  import traceback
5
 
6
-
7
  def test_imports():
8
  """Test that all required packages can be imported."""
9
  print("Testing imports...")
@@ -16,6 +15,12 @@ def test_imports():
16
  import matplotlib
17
  import ortools
18
  import deap
 
 
 
 
 
 
19
  print("✅ All imports successful")
20
  return True
21
  except ImportError as e:
@@ -28,7 +33,8 @@ def test_algorithm():
28
  print("\nTesting algorithm...")
29
  try:
30
  from shapely.geometry import Polygon
31
- from algorithm import GridOptimizer, SubdivisionSolver
 
32
 
33
  # Create simple test polygon
34
  land_poly = Polygon([(0, 0), (100, 0), (100, 100), (0, 100)])
@@ -54,7 +60,7 @@ def test_api_models():
54
  """Test Pydantic models."""
55
  print("\nTesting API models...")
56
  try:
57
- from models import AlgorithmConfig, LandPlot, OptimizationRequest
58
 
59
  # Create test config
60
  config = AlgorithmConfig()
@@ -81,7 +87,7 @@ def test_api_models():
81
 
82
  if __name__ == "__main__":
83
  print("=" * 50)
84
- print("Algorithm Backend Test Suite")
85
  print("=" * 50)
86
 
87
  results = []
 
3
  import sys
4
  import traceback
5
 
 
6
  def test_imports():
7
  """Test that all required packages can be imported."""
8
  print("Testing imports...")
 
15
  import matplotlib
16
  import ortools
17
  import deap
18
+
19
+ # Test new module imports
20
+ from core.optimization.grid_optimizer import GridOptimizer
21
+ from core.optimization.subdivision_solver import SubdivisionSolver
22
+ from pipeline.land_redistribution import LandRedistributionPipeline
23
+
24
  print("✅ All imports successful")
25
  return True
26
  except ImportError as e:
 
33
  print("\nTesting algorithm...")
34
  try:
35
  from shapely.geometry import Polygon
36
+ from core.optimization.grid_optimizer import GridOptimizer
37
+ from core.optimization.subdivision_solver import SubdivisionSolver
38
 
39
  # Create simple test polygon
40
  land_poly = Polygon([(0, 0), (100, 0), (100, 100), (0, 100)])
 
60
  """Test Pydantic models."""
61
  print("\nTesting API models...")
62
  try:
63
+ from api.schemas.request_schemas import AlgorithmConfig, LandPlot, OptimizationRequest
64
 
65
  # Create test config
66
  config = AlgorithmConfig()
 
87
 
88
  if __name__ == "__main__":
89
  print("=" * 50)
90
+ print("Algorithm Backend Test Suite (Modular Architecture)")
91
  print("=" * 50)
92
 
93
  results = []
algorithms/backend/{dxf_utils.py → utils/dxf_utils.py} RENAMED
@@ -109,22 +109,45 @@ def export_to_dxf(geometries: List[dict], output_type: str = 'final') -> bytes:
109
  # Get coordinates
110
  if geom and 'coordinates' in geom:
111
  coords = geom['coordinates']
112
- if coords and len(coords) > 0:
113
- points = coords[0] # Exterior ring
114
-
115
- # Add as LWPOLYLINE
116
- if len(points) >= 3:
117
- # Convert to 2D points (x, y)
118
- points_2d = [(p[0], p[1]) for p in points]
119
-
120
- # Create closed polyline
121
- msp.add_lwpolyline(
122
- points_2d,
123
- dxfattribs={
124
- 'layer': layer,
125
- 'closed': True
126
- }
127
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
 
129
  # Save to bytes
130
  stream = io.StringIO()
 
109
  # Get coordinates
110
  if geom and 'coordinates' in geom:
111
  coords = geom['coordinates']
112
+ geom_geom_type = geom.get('type', 'Polygon')
113
+
114
+ # Handle different geometry types
115
+ if geom_geom_type == 'Point':
116
+ # Point: [x, y]
117
+ if isinstance(coords, list) and len(coords) >= 2:
118
+ msp.add_circle(
119
+ center=(coords[0], coords[1]),
120
+ radius=2.0,
121
+ dxfattribs={'layer': layer}
 
 
 
 
 
122
  )
123
+ elif geom_geom_type == 'LineString':
124
+ # LineString: [[x1, y1], [x2, y2], ...]
125
+ if isinstance(coords, list) and len(coords) > 0:
126
+ if all(isinstance(p, (list, tuple)) and len(p) >= 2 for p in coords):
127
+ points_2d = [(p[0], p[1]) for p in coords]
128
+ if len(points_2d) >= 2:
129
+ msp.add_lwpolyline(
130
+ points_2d,
131
+ dxfattribs={'layer': layer, 'closed': False}
132
+ )
133
+ elif geom_geom_type == 'Polygon':
134
+ # Polygon: [[[x1, y1], [x2, y2], ...]] (exterior ring)
135
+ if isinstance(coords, list) and len(coords) > 0:
136
+ points = coords[0] if isinstance(coords[0], list) else coords
137
+
138
+ # Validate points structure
139
+ if isinstance(points, list) and len(points) >= 3:
140
+ if all(isinstance(p, (list, tuple)) and len(p) >= 2 for p in points):
141
+ points_2d = [(p[0], p[1]) for p in points]
142
+
143
+ # Create closed polyline
144
+ msp.add_lwpolyline(
145
+ points_2d,
146
+ dxfattribs={
147
+ 'layer': layer,
148
+ 'closed': True
149
+ }
150
+ )
151
 
152
  # Save to bytes
153
  stream = io.StringIO()