Spaces:
Running
Running
| """ | |
| Pipe Network Topology Generator | |
| Generates connected irrigation pipe networks (trunk mains, sub-mains, zone mains) | |
| for drip systems. Creates a "fishbone" or "tree" topology from pump to zones. | |
| """ | |
| from typing import Dict, List, Tuple, Optional | |
| import math | |
| from shapely.geometry import Point, LineString, Polygon | |
| class PipeNetworkError(Exception): | |
| """Custom exception for pipe network errors.""" | |
| pass | |
| def _project_point_onto_axis( | |
| point: Point, | |
| axis_direction: Tuple[float, float], | |
| ) -> float: | |
| """Project a point onto an axis direction.""" | |
| return point.x * axis_direction[0] + point.y * axis_direction[1] | |
| def _reconstruct_point_from_axes( | |
| main_projection: float, | |
| lateral_projection: float, | |
| main_axis: Tuple[float, float], | |
| lateral_axis: Tuple[float, float], | |
| ) -> Point: | |
| """Reconstruct a world-space point from main/lateral axis projections.""" | |
| return Point( | |
| main_projection * main_axis[0] + lateral_projection * lateral_axis[0], | |
| main_projection * main_axis[1] + lateral_projection * lateral_axis[1], | |
| ) | |
| def route_orthogonal( | |
| start_pt: Point, | |
| end_pt: Point, | |
| main_axis: Tuple[float, float], | |
| lateral_axis: Tuple[float, float], | |
| ) -> LineString: | |
| """ | |
| Create an axis-aligned route with at most one 90-degree bend. | |
| The route is constructed in farm-axis coordinates, then converted back to | |
| world coordinates. If the points are already aligned on one axis, the | |
| result is a single straight segment. | |
| """ | |
| start_main = _project_point_onto_axis(start_pt, main_axis) | |
| start_lateral = _project_point_onto_axis(start_pt, lateral_axis) | |
| end_main = _project_point_onto_axis(end_pt, main_axis) | |
| end_lateral = _project_point_onto_axis(end_pt, lateral_axis) | |
| if math.isclose(start_main, end_main) or math.isclose(start_lateral, end_lateral): | |
| return LineString([start_pt, end_pt]) | |
| main_first_corner = _reconstruct_point_from_axes( | |
| end_main, | |
| start_lateral, | |
| main_axis, | |
| lateral_axis, | |
| ) | |
| return LineString([start_pt, main_first_corner, end_pt]) | |
| def generate_pipe_network( | |
| farm_polygon: Polygon, | |
| pump_point: Point, | |
| zones: List[Dict], | |
| main_direction: Tuple[float, float], | |
| design_type: str = "distributed", | |
| ) -> Dict: | |
| """ | |
| Generate connected pipe network (trunk main, sub-mains, zone mains). | |
| Creates a fishbone topology: a distributed design uses a trunk main along | |
| the farm's main axis, while centralized design routes directly from the | |
| pump. Sub-mains connect to each zone's anchored valve location using | |
| orthogonal paths. | |
| Args: | |
| farm_polygon: Farm boundary (UTM) | |
| pump_point: Pump location (UTM) | |
| zones: List of zone dicts with 'valve_id', 'polygon', 'valve_location', 'area_m2' | |
| main_direction: Normalized direction vector (dx, dy) for main axis | |
| design_type: "centralized" or "distributed" | |
| Returns: | |
| Dict with: | |
| - 'trunk_main': LineString from pump to far service extent along main axis | |
| - 'sub_mains': Dict mapping valve_id to sub-main LineString | |
| - 'zone_mains': Dict mapping valve_id to zone main LineString (from drip_engine) | |
| - 'total_trunk_length_m': Trunk pipe length | |
| - 'total_submain_length_m': Sum of all sub-main lengths | |
| """ | |
| if not zones: | |
| return { | |
| "trunk_main": None, | |
| "sub_mains": {}, | |
| "zone_mains": {}, | |
| "total_trunk_length_m": 0, | |
| "total_submain_length_m": 0, | |
| } | |
| lateral_direction = (-main_direction[1], main_direction[0]) | |
| # 1. Generate trunk main along farm axis for distributed layouts | |
| trunk_main = None | |
| total_trunk_length = 0.0 | |
| if design_type == "distributed": | |
| trunk_main = _generate_trunk_main(farm_polygon, pump_point, main_direction) | |
| total_trunk_length = trunk_main.length | |
| # 2. Generate sub-mains from trunk/pump to each zone's anchored valve | |
| sub_mains = {} | |
| total_submain_length = 0.0 | |
| for zone in zones: | |
| valve_id = zone["valve_id"] | |
| valve_location = zone.get("valve_location", zone["polygon"].centroid) | |
| if trunk_main is not None: | |
| start_point = trunk_main.interpolate(trunk_main.project(valve_location)) | |
| else: | |
| start_point = pump_point | |
| sub_main = route_orthogonal( | |
| start_point, | |
| valve_location, | |
| main_direction, | |
| lateral_direction, | |
| ) | |
| sub_mains[valve_id] = sub_main | |
| total_submain_length += sub_main.length | |
| return { | |
| "trunk_main": trunk_main, | |
| "sub_mains": sub_mains, | |
| "zone_mains": {}, # Will be populated by drip_engine per zone | |
| "total_trunk_length_m": total_trunk_length, | |
| "total_submain_length_m": total_submain_length, | |
| } | |
| def _generate_trunk_main( | |
| farm_polygon: Polygon, | |
| pump_point: Point, | |
| main_direction: Tuple[float, float], | |
| ) -> LineString: | |
| """ | |
| Generate trunk main line from pump toward the far edge of the service polygon. | |
| The trunk runs from the pump along the main axis, extending to the | |
| polygon's far boundary while maintaining the pump's lateral position. | |
| Args: | |
| farm_polygon: Service polygon boundary (UTM) — may be the full farm | |
| or a per-source partition. | |
| pump_point: Pump location (UTM) | |
| main_direction: Normalized direction vector (dx, dy) | |
| Returns: | |
| LineString representing trunk main | |
| """ | |
| lateral_direction = (-main_direction[1], main_direction[0]) | |
| # Project farm boundary onto main axis | |
| coords = list(farm_polygon.exterior.coords) | |
| projections = [ | |
| coord[0] * main_direction[0] + coord[1] * main_direction[1] | |
| for coord in coords | |
| ] | |
| min_proj = min(projections) | |
| max_proj = max(projections) | |
| # Find the far edge from the pump along the main axis | |
| pump_main_proj = pump_point.x * main_direction[0] + pump_point.y * main_direction[1] | |
| pump_lat_proj = pump_point.x * lateral_direction[0] + pump_point.y * lateral_direction[1] | |
| # Extend toward whichever end of the polygon is farther from the pump | |
| far_proj = max_proj if abs(max_proj - pump_main_proj) >= abs(min_proj - pump_main_proj) else min_proj | |
| # Reconstruct world coordinates keeping the pump's lateral position | |
| trunk_end = ( | |
| far_proj * main_direction[0] + pump_lat_proj * lateral_direction[0], | |
| far_proj * main_direction[1] + pump_lat_proj * lateral_direction[1], | |
| ) | |
| return LineString([ | |
| (pump_point.x, pump_point.y), | |
| trunk_end, | |
| ]) | |
| def calculate_pipe_lengths(pipe_network: Dict) -> Dict: | |
| """ | |
| Calculate total pipe lengths from a pipe network. | |
| Args: | |
| pipe_network: Output dict from generate_pipe_network() | |
| Returns: | |
| Dict with total_trunk_m, total_submain_m, total_main_m, total_pipe_m | |
| """ | |
| trunk_m = pipe_network.get("total_trunk_length_m", 0) | |
| submain_m = pipe_network.get("total_submain_length_m", 0) | |
| main_m = pipe_network.get("total_main_m", 0) # From drip_engine | |
| return { | |
| "trunk_m": trunk_m, | |
| "submain_m": submain_m, | |
| "zone_main_m": main_m, | |
| "total_m": trunk_m + submain_m + main_m, | |
| } | |