Spaces:
Sleeping
Sleeping
File size: 7,296 Bytes
24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 7e350ba 24046b7 67e7f6b 24046b7 67e7f6b 24046b7 67e7f6b 24046b7 67e7f6b 24046b7 67e7f6b 24046b7 67e7f6b 24046b7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 | """
Pipe Network Topology Generator
Generates connected irrigation pipe networks (trunk mains, sub-mains, zone mains)
for drip systems. Creates a "fishbone" or "tree" topology from pump to zones.
"""
from typing import Dict, List, Tuple, Optional
import math
from shapely.geometry import Point, LineString, Polygon
class PipeNetworkError(Exception):
"""Custom exception for pipe network errors."""
pass
def _project_point_onto_axis(
point: Point,
axis_direction: Tuple[float, float],
) -> float:
"""Project a point onto an axis direction."""
return point.x * axis_direction[0] + point.y * axis_direction[1]
def _reconstruct_point_from_axes(
main_projection: float,
lateral_projection: float,
main_axis: Tuple[float, float],
lateral_axis: Tuple[float, float],
) -> Point:
"""Reconstruct a world-space point from main/lateral axis projections."""
return Point(
main_projection * main_axis[0] + lateral_projection * lateral_axis[0],
main_projection * main_axis[1] + lateral_projection * lateral_axis[1],
)
def route_orthogonal(
start_pt: Point,
end_pt: Point,
main_axis: Tuple[float, float],
lateral_axis: Tuple[float, float],
) -> LineString:
"""
Create an axis-aligned route with at most one 90-degree bend.
The route is constructed in farm-axis coordinates, then converted back to
world coordinates. If the points are already aligned on one axis, the
result is a single straight segment.
"""
start_main = _project_point_onto_axis(start_pt, main_axis)
start_lateral = _project_point_onto_axis(start_pt, lateral_axis)
end_main = _project_point_onto_axis(end_pt, main_axis)
end_lateral = _project_point_onto_axis(end_pt, lateral_axis)
if math.isclose(start_main, end_main) or math.isclose(start_lateral, end_lateral):
return LineString([start_pt, end_pt])
main_first_corner = _reconstruct_point_from_axes(
end_main,
start_lateral,
main_axis,
lateral_axis,
)
return LineString([start_pt, main_first_corner, end_pt])
def generate_pipe_network(
farm_polygon: Polygon,
pump_point: Point,
zones: List[Dict],
main_direction: Tuple[float, float],
design_type: str = "distributed",
) -> Dict:
"""
Generate connected pipe network (trunk main, sub-mains, zone mains).
Creates a fishbone topology: a distributed design uses a trunk main along
the farm's main axis, while centralized design routes directly from the
pump. Sub-mains connect to each zone's anchored valve location using
orthogonal paths.
Args:
farm_polygon: Farm boundary (UTM)
pump_point: Pump location (UTM)
zones: List of zone dicts with 'valve_id', 'polygon', 'valve_location', 'area_m2'
main_direction: Normalized direction vector (dx, dy) for main axis
design_type: "centralized" or "distributed"
Returns:
Dict with:
- 'trunk_main': LineString from pump to far service extent along main axis
- 'sub_mains': Dict mapping valve_id to sub-main LineString
- 'zone_mains': Dict mapping valve_id to zone main LineString (from drip_engine)
- 'total_trunk_length_m': Trunk pipe length
- 'total_submain_length_m': Sum of all sub-main lengths
"""
if not zones:
return {
"trunk_main": None,
"sub_mains": {},
"zone_mains": {},
"total_trunk_length_m": 0,
"total_submain_length_m": 0,
}
lateral_direction = (-main_direction[1], main_direction[0])
# 1. Generate trunk main along farm axis for distributed layouts
trunk_main = None
total_trunk_length = 0.0
if design_type == "distributed":
trunk_main = _generate_trunk_main(farm_polygon, pump_point, main_direction)
total_trunk_length = trunk_main.length
# 2. Generate sub-mains from trunk/pump to each zone's anchored valve
sub_mains = {}
total_submain_length = 0.0
for zone in zones:
valve_id = zone["valve_id"]
valve_location = zone.get("valve_location", zone["polygon"].centroid)
if trunk_main is not None:
start_point = trunk_main.interpolate(trunk_main.project(valve_location))
else:
start_point = pump_point
sub_main = route_orthogonal(
start_point,
valve_location,
main_direction,
lateral_direction,
)
sub_mains[valve_id] = sub_main
total_submain_length += sub_main.length
return {
"trunk_main": trunk_main,
"sub_mains": sub_mains,
"zone_mains": {}, # Will be populated by drip_engine per zone
"total_trunk_length_m": total_trunk_length,
"total_submain_length_m": total_submain_length,
}
def _generate_trunk_main(
farm_polygon: Polygon,
pump_point: Point,
main_direction: Tuple[float, float],
) -> LineString:
"""
Generate trunk main line from pump toward the far edge of the service polygon.
The trunk runs from the pump along the main axis, extending to the
polygon's far boundary while maintaining the pump's lateral position.
Args:
farm_polygon: Service polygon boundary (UTM) — may be the full farm
or a per-source partition.
pump_point: Pump location (UTM)
main_direction: Normalized direction vector (dx, dy)
Returns:
LineString representing trunk main
"""
lateral_direction = (-main_direction[1], main_direction[0])
# Project farm boundary onto main axis
coords = list(farm_polygon.exterior.coords)
projections = [
coord[0] * main_direction[0] + coord[1] * main_direction[1]
for coord in coords
]
min_proj = min(projections)
max_proj = max(projections)
# Find the far edge from the pump along the main axis
pump_main_proj = pump_point.x * main_direction[0] + pump_point.y * main_direction[1]
pump_lat_proj = pump_point.x * lateral_direction[0] + pump_point.y * lateral_direction[1]
# Extend toward whichever end of the polygon is farther from the pump
far_proj = max_proj if abs(max_proj - pump_main_proj) >= abs(min_proj - pump_main_proj) else min_proj
# Reconstruct world coordinates keeping the pump's lateral position
trunk_end = (
far_proj * main_direction[0] + pump_lat_proj * lateral_direction[0],
far_proj * main_direction[1] + pump_lat_proj * lateral_direction[1],
)
return LineString([
(pump_point.x, pump_point.y),
trunk_end,
])
def calculate_pipe_lengths(pipe_network: Dict) -> Dict:
"""
Calculate total pipe lengths from a pipe network.
Args:
pipe_network: Output dict from generate_pipe_network()
Returns:
Dict with total_trunk_m, total_submain_m, total_main_m, total_pipe_m
"""
trunk_m = pipe_network.get("total_trunk_length_m", 0)
submain_m = pipe_network.get("total_submain_length_m", 0)
main_m = pipe_network.get("total_main_m", 0) # From drip_engine
return {
"trunk_m": trunk_m,
"submain_m": submain_m,
"zone_main_m": main_m,
"total_m": trunk_m + submain_m + main_m,
}
|