REMB / src /geometry /road_network.py
Cuong2004's picture
Initial commit: REMB - AI-Powered Industrial Estate Master Plan Optimization Engine
b010f1b
"""
Road Network Generator - Infrastructure Skeleton
Generates optimal road networks for industrial estates
"""
import numpy as np
from shapely.geometry import (
Polygon, MultiPolygon, LineString, MultiLineString,
Point, box
)
from shapely.ops import unary_union, linemerge, split
from typing import List, Tuple, Optional, Dict
import logging
import yaml
from pathlib import Path
from src.models.domain import SiteBoundary, RoadNetwork, Plot, PlotType
logger = logging.getLogger(__name__)
class RoadNetworkGenerator:
"""
Road network generator for industrial estates
Responsibilities:
- Generate primary road network from user input
- Generate secondary road grid
- Identify dead zones (>200m from road)
- Optimize road layout for accessibility
"""
def __init__(self, regulations_path: str = "config/regulations.yaml"):
"""
Initialize road network generator
Args:
regulations_path: Path to regulations YAML
"""
self.regulations_path = Path(regulations_path)
self.regulations = self._load_regulations()
self.logger = logging.getLogger(__name__)
# Road widths from regulations
road_config = self.regulations.get('roads', {})
self.primary_width = road_config.get('primary_width_m', 24)
self.secondary_width = road_config.get('secondary_width_m', 16)
self.tertiary_width = road_config.get('tertiary_width_m', 12)
self.max_distance = road_config.get('maximum_distance_to_road_m', 200)
def _load_regulations(self) -> dict:
"""Load regulations from YAML"""
if not self.regulations_path.exists():
return {}
with open(self.regulations_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def generate_grid_network(
self,
site: SiteBoundary,
primary_spacing: float = 200,
secondary_spacing: float = 100
) -> RoadNetwork:
"""
Generate a grid-based road network
Args:
site: Site boundary
primary_spacing: Distance between primary roads
secondary_spacing: Distance between secondary roads
Returns:
RoadNetwork object
"""
self.logger.info("Generating grid road network")
bounds = site.geometry.bounds
minx, miny, maxx, maxy = bounds
width = maxx - minx
height = maxy - miny
# Offset roads from boundary
setback = self.regulations.get('setbacks', {}).get('boundary_minimum', 50)
primary_roads = []
secondary_roads = []
# Primary horizontal roads
y_pos = miny + setback + primary_spacing / 2
while y_pos < maxy - setback:
line = LineString([(minx + setback, y_pos), (maxx - setback, y_pos)])
clipped = line.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
primary_roads.append(clipped if isinstance(clipped, LineString) else clipped)
y_pos += primary_spacing
# Primary vertical roads
x_pos = minx + setback + primary_spacing / 2
while x_pos < maxx - setback:
line = LineString([(x_pos, miny + setback), (x_pos, maxy - setback)])
clipped = line.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
primary_roads.append(clipped if isinstance(clipped, LineString) else clipped)
x_pos += primary_spacing
# Secondary roads (between primary roads)
y_pos = miny + setback + secondary_spacing
while y_pos < maxy - setback:
# Skip if too close to primary road
if not any(abs(y_pos - self._get_y_coord(r)) < secondary_spacing/2
for r in primary_roads if isinstance(r, LineString)):
line = LineString([(minx + setback, y_pos), (maxx - setback, y_pos)])
clipped = line.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
secondary_roads.append(clipped)
y_pos += secondary_spacing
x_pos = minx + setback + secondary_spacing
while x_pos < maxx - setback:
if not any(abs(x_pos - self._get_x_coord(r)) < secondary_spacing/2
for r in primary_roads if isinstance(r, LineString)):
line = LineString([(x_pos, miny + setback), (x_pos, maxy - setback)])
clipped = line.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
secondary_roads.append(clipped)
x_pos += secondary_spacing
# Create MultiLineStrings
primary_multi = MultiLineString(primary_roads) if primary_roads else None
secondary_multi = MultiLineString(secondary_roads) if secondary_roads else None
# Calculate total length
total_length = 0
if primary_multi:
total_length += primary_multi.length
if secondary_multi:
total_length += secondary_multi.length
# Calculate road area
road_area = 0
if primary_multi:
road_area += primary_multi.length * self.primary_width
if secondary_multi:
road_area += secondary_multi.length * self.secondary_width
network = RoadNetwork(
primary_roads=primary_multi,
secondary_roads=secondary_multi,
tertiary_roads=None,
total_length_m=total_length,
total_area_sqm=road_area
)
self.logger.info(
f"Generated road network: {len(primary_roads)} primary, "
f"{len(secondary_roads)} secondary, total {total_length:.0f}m"
)
return network
def generate_spine_network(
self,
site: SiteBoundary,
entry_points: Optional[List[Tuple[float, float]]] = None
) -> RoadNetwork:
"""
Generate a spine-based road network (main road with branches)
Args:
site: Site boundary
entry_points: Optional list of entry point coordinates
Returns:
RoadNetwork object
"""
self.logger.info("Generating spine road network")
bounds = site.geometry.bounds
minx, miny, maxx, maxy = bounds
center_x = (minx + maxx) / 2
center_y = (miny + maxy) / 2
setback = self.regulations.get('setbacks', {}).get('boundary_minimum', 50)
# Determine spine direction (along longest axis)
width = maxx - minx
height = maxy - miny
primary_roads = []
secondary_roads = []
if width >= height:
# Horizontal spine
spine = LineString([
(minx + setback, center_y),
(maxx - setback, center_y)
])
primary_roads.append(spine)
# Vertical branches
branch_spacing = self.max_distance * 1.5
x_pos = minx + setback + branch_spacing / 2
while x_pos < maxx - setback:
branch = LineString([
(x_pos, miny + setback),
(x_pos, maxy - setback)
])
clipped = branch.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
secondary_roads.append(clipped)
x_pos += branch_spacing
else:
# Vertical spine
spine = LineString([
(center_x, miny + setback),
(center_x, maxy - setback)
])
primary_roads.append(spine)
# Horizontal branches
branch_spacing = self.max_distance * 1.5
y_pos = miny + setback + branch_spacing / 2
while y_pos < maxy - setback:
branch = LineString([
(minx + setback, y_pos),
(maxx - setback, y_pos)
])
clipped = branch.intersection(site.geometry.buffer(-setback))
if not clipped.is_empty:
secondary_roads.append(clipped)
y_pos += branch_spacing
# Clip to site boundary
primary_roads = [r.intersection(site.geometry.buffer(-setback))
for r in primary_roads if not r.is_empty]
primary_multi = MultiLineString(primary_roads) if primary_roads else None
secondary_multi = MultiLineString(secondary_roads) if secondary_roads else None
total_length = 0
road_area = 0
if primary_multi:
total_length += primary_multi.length
road_area += primary_multi.length * self.primary_width
if secondary_multi:
total_length += secondary_multi.length
road_area += secondary_multi.length * self.secondary_width
return RoadNetwork(
primary_roads=primary_multi,
secondary_roads=secondary_multi,
total_length_m=total_length,
total_area_sqm=road_area
)
def identify_dead_zones(
self,
site: SiteBoundary,
road_network: RoadNetwork
) -> List[Polygon]:
"""
Identify areas more than max_distance from any road
Args:
site: Site boundary
road_network: Road network
Returns:
List of polygons representing dead zones
"""
self.logger.info(f"Identifying dead zones (>{self.max_distance}m from road)")
# Combine all roads
all_roads = []
if road_network.primary_roads:
if hasattr(road_network.primary_roads, 'geoms'):
all_roads.extend(road_network.primary_roads.geoms)
else:
all_roads.append(road_network.primary_roads)
if road_network.secondary_roads:
if hasattr(road_network.secondary_roads, 'geoms'):
all_roads.extend(road_network.secondary_roads.geoms)
else:
all_roads.append(road_network.secondary_roads)
if not all_roads:
return [site.geometry] # Entire site is dead zone
# Create buffer around all roads
road_union = unary_union(all_roads)
covered_area = road_union.buffer(self.max_distance)
# Find uncovered areas
dead_zones = site.geometry.difference(covered_area)
if dead_zones.is_empty:
return []
if isinstance(dead_zones, Polygon):
if dead_zones.area > 100: # Minimum 100 sqm
return [dead_zones]
return []
# MultiPolygon
return [p for p in dead_zones.geoms if p.area > 100]
def optimize_for_coverage(
self,
site: SiteBoundary,
max_road_ratio: float = 0.25
) -> RoadNetwork:
"""
Generate road network optimized for complete coverage
within road area budget
Args:
site: Site boundary
max_road_ratio: Maximum ratio of site area for roads
Returns:
Optimized RoadNetwork
"""
self.logger.info("Generating coverage-optimized road network")
max_road_area = site.buildable_area_sqm * max_road_ratio
# Start with sparse grid and densify until covered or budget exceeded
spacing = self.max_distance * 2 # Start sparse
while spacing >= self.max_distance / 2:
network = self.generate_grid_network(
site,
primary_spacing=spacing,
secondary_spacing=spacing * 2
)
dead_zones = self.identify_dead_zones(site, network)
dead_area = sum(z.area for z in dead_zones)
if dead_area < site.buildable_area_sqm * 0.05: # <5% dead zone
if network.total_area_sqm <= max_road_area:
return network
spacing *= 0.8 # Densify
# Return the last generated network
return network
def get_road_polygons(self, road_network: RoadNetwork) -> List[Polygon]:
"""
Convert road lines to polygons (for plotting/export)
Args:
road_network: Road network
Returns:
List of road polygons
"""
polygons = []
if road_network.primary_roads:
roads = road_network.primary_roads.geoms if hasattr(road_network.primary_roads, 'geoms') else [road_network.primary_roads]
for road in roads:
poly = road.buffer(self.primary_width / 2, cap_style=2)
polygons.append(poly)
if road_network.secondary_roads:
roads = road_network.secondary_roads.geoms if hasattr(road_network.secondary_roads, 'geoms') else [road_network.secondary_roads]
for road in roads:
poly = road.buffer(self.secondary_width / 2, cap_style=2)
polygons.append(poly)
return polygons
def _get_y_coord(self, line: LineString) -> float:
"""Get average Y coordinate of a line"""
if not isinstance(line, LineString):
return 0
coords = list(line.coords)
return sum(c[1] for c in coords) / len(coords)
def _get_x_coord(self, line: LineString) -> float:
"""Get average X coordinate of a line"""
if not isinstance(line, LineString):
return 0
coords = list(line.coords)
return sum(c[0] for c in coords) / len(coords)
# Example usage
if __name__ == "__main__":
from src.geometry.site_processor import SiteProcessor
# Create a test site
processor = SiteProcessor()
coords = [(0, 0), (500, 0), (500, 500), (0, 500), (0, 0)]
site = processor.import_from_coordinates(coords)
# Generate road network
generator = RoadNetworkGenerator()
# Grid network
grid_network = generator.generate_grid_network(
site,
primary_spacing=150,
secondary_spacing=75
)
print(f"Grid Network:")
print(f" Total length: {grid_network.total_length_m:.0f}m")
print(f" Total area: {grid_network.total_area_sqm:.0f}m²")
# Check dead zones
dead_zones = generator.identify_dead_zones(site, grid_network)
print(f" Dead zones: {len(dead_zones)}")
# Spine network
spine_network = generator.generate_spine_network(site)
print(f"\nSpine Network:")
print(f" Total length: {spine_network.total_length_m:.0f}m")
print(f" Total area: {spine_network.total_area_sqm:.0f}m²")
# Optimized network
optimized = generator.optimize_for_coverage(site, max_road_ratio=0.20)
print(f"\nOptimized Network:")
print(f" Total length: {optimized.total_length_m:.0f}m")
print(f" Total area: {optimized.total_area_sqm:.0f}m²")