farm-layout-model / pipe_network.py
spacedout-bits's picture
Phase 5: Drip Manifold Alignment - valve-proximity manifold selection
7e350ba
"""
Pipe Network Topology Generator
Generates connected irrigation pipe networks (trunk mains, sub-mains, zone mains)
for drip systems. Creates a "fishbone" or "tree" topology from pump to zones.
"""
from typing import Dict, List, Tuple, Optional
import math
from shapely.geometry import Point, LineString, Polygon
class PipeNetworkError(Exception):
"""Custom exception for pipe network errors."""
pass
def _project_point_onto_axis(
point: Point,
axis_direction: Tuple[float, float],
) -> float:
"""Project a point onto an axis direction."""
return point.x * axis_direction[0] + point.y * axis_direction[1]
def _reconstruct_point_from_axes(
main_projection: float,
lateral_projection: float,
main_axis: Tuple[float, float],
lateral_axis: Tuple[float, float],
) -> Point:
"""Reconstruct a world-space point from main/lateral axis projections."""
return Point(
main_projection * main_axis[0] + lateral_projection * lateral_axis[0],
main_projection * main_axis[1] + lateral_projection * lateral_axis[1],
)
def route_orthogonal(
start_pt: Point,
end_pt: Point,
main_axis: Tuple[float, float],
lateral_axis: Tuple[float, float],
) -> LineString:
"""
Create an axis-aligned route with at most one 90-degree bend.
The route is constructed in farm-axis coordinates, then converted back to
world coordinates. If the points are already aligned on one axis, the
result is a single straight segment.
"""
start_main = _project_point_onto_axis(start_pt, main_axis)
start_lateral = _project_point_onto_axis(start_pt, lateral_axis)
end_main = _project_point_onto_axis(end_pt, main_axis)
end_lateral = _project_point_onto_axis(end_pt, lateral_axis)
if math.isclose(start_main, end_main) or math.isclose(start_lateral, end_lateral):
return LineString([start_pt, end_pt])
main_first_corner = _reconstruct_point_from_axes(
end_main,
start_lateral,
main_axis,
lateral_axis,
)
return LineString([start_pt, main_first_corner, end_pt])
def generate_pipe_network(
farm_polygon: Polygon,
pump_point: Point,
zones: List[Dict],
main_direction: Tuple[float, float],
design_type: str = "distributed",
) -> Dict:
"""
Generate connected pipe network (trunk main, sub-mains, zone mains).
Creates a fishbone topology: a distributed design uses a trunk main along
the farm's main axis, while centralized design routes directly from the
pump. Sub-mains connect to each zone's anchored valve location using
orthogonal paths.
Args:
farm_polygon: Farm boundary (UTM)
pump_point: Pump location (UTM)
zones: List of zone dicts with 'valve_id', 'polygon', 'valve_location', 'area_m2'
main_direction: Normalized direction vector (dx, dy) for main axis
design_type: "centralized" or "distributed"
Returns:
Dict with:
- 'trunk_main': LineString from pump to far service extent along main axis
- 'sub_mains': Dict mapping valve_id to sub-main LineString
- 'zone_mains': Dict mapping valve_id to zone main LineString (from drip_engine)
- 'total_trunk_length_m': Trunk pipe length
- 'total_submain_length_m': Sum of all sub-main lengths
"""
if not zones:
return {
"trunk_main": None,
"sub_mains": {},
"zone_mains": {},
"total_trunk_length_m": 0,
"total_submain_length_m": 0,
}
lateral_direction = (-main_direction[1], main_direction[0])
# 1. Generate trunk main along farm axis for distributed layouts
trunk_main = None
total_trunk_length = 0.0
if design_type == "distributed":
trunk_main = _generate_trunk_main(farm_polygon, pump_point, main_direction)
total_trunk_length = trunk_main.length
# 2. Generate sub-mains from trunk/pump to each zone's anchored valve
sub_mains = {}
total_submain_length = 0.0
for zone in zones:
valve_id = zone["valve_id"]
valve_location = zone.get("valve_location", zone["polygon"].centroid)
if trunk_main is not None:
start_point = trunk_main.interpolate(trunk_main.project(valve_location))
else:
start_point = pump_point
sub_main = route_orthogonal(
start_point,
valve_location,
main_direction,
lateral_direction,
)
sub_mains[valve_id] = sub_main
total_submain_length += sub_main.length
return {
"trunk_main": trunk_main,
"sub_mains": sub_mains,
"zone_mains": {}, # Will be populated by drip_engine per zone
"total_trunk_length_m": total_trunk_length,
"total_submain_length_m": total_submain_length,
}
def _generate_trunk_main(
farm_polygon: Polygon,
pump_point: Point,
main_direction: Tuple[float, float],
) -> LineString:
"""
Generate trunk main line from pump toward the far edge of the service polygon.
The trunk runs from the pump along the main axis, extending to the
polygon's far boundary while maintaining the pump's lateral position.
Args:
farm_polygon: Service polygon boundary (UTM) — may be the full farm
or a per-source partition.
pump_point: Pump location (UTM)
main_direction: Normalized direction vector (dx, dy)
Returns:
LineString representing trunk main
"""
lateral_direction = (-main_direction[1], main_direction[0])
# Project farm boundary onto main axis
coords = list(farm_polygon.exterior.coords)
projections = [
coord[0] * main_direction[0] + coord[1] * main_direction[1]
for coord in coords
]
min_proj = min(projections)
max_proj = max(projections)
# Find the far edge from the pump along the main axis
pump_main_proj = pump_point.x * main_direction[0] + pump_point.y * main_direction[1]
pump_lat_proj = pump_point.x * lateral_direction[0] + pump_point.y * lateral_direction[1]
# Extend toward whichever end of the polygon is farther from the pump
far_proj = max_proj if abs(max_proj - pump_main_proj) >= abs(min_proj - pump_main_proj) else min_proj
# Reconstruct world coordinates keeping the pump's lateral position
trunk_end = (
far_proj * main_direction[0] + pump_lat_proj * lateral_direction[0],
far_proj * main_direction[1] + pump_lat_proj * lateral_direction[1],
)
return LineString([
(pump_point.x, pump_point.y),
trunk_end,
])
def calculate_pipe_lengths(pipe_network: Dict) -> Dict:
"""
Calculate total pipe lengths from a pipe network.
Args:
pipe_network: Output dict from generate_pipe_network()
Returns:
Dict with total_trunk_m, total_submain_m, total_main_m, total_pipe_m
"""
trunk_m = pipe_network.get("total_trunk_length_m", 0)
submain_m = pipe_network.get("total_submain_length_m", 0)
main_m = pipe_network.get("total_main_m", 0) # From drip_engine
return {
"trunk_m": trunk_m,
"submain_m": submain_m,
"zone_main_m": main_m,
"total_m": trunk_m + submain_m + main_m,
}