""" Valve Placement Engine Deterministic valve placement based on: 1. Capacity (pump flow vs. total emitter demand) 2. Topography (elevation deltas) 3. Crop type (hydrozones) 4. Hydraulics (max lateral length, pressure drop) Follows the 4-step decision matrix: - Capacity: if total demand > pump capacity, split into zones - Topography: if elevation delta > 5m, separate high/low zones - Crop: different crops get dedicated valves - Hydraulics: if lateral length > max runtime, place valve at mid-point """ from typing import List, Dict, Tuple, Optional import math from shapely.geometry import Point, Polygon, LineString, MultiPolygon, MultiPoint, GeometryCollection from shapely.ops import split as shapely_split, voronoi_diagram, unary_union import numpy as np class ValveEngineError(Exception): """Custom exception for valve placement errors.""" pass # Pump HP to flow rate conversion (realistic centrifugal pump curves) # These are typical discharge rates for agricultural irrigation pumps HP_TO_LPH = { 0.5: 2500, 0.75: 4000, 1.0: 5000, 1.5: 8000, 2.0: 15000, 3.0: 25000, 5.0: 40000, 7.5: 60000, 10.0: 80000, 15.0: 120000, 20.0: 160000, } # Crop flow parameters — emitter density derived from (lateral_spacing × emitter_spacing) # e.g. tomato: 0.8m row spacing, 0.3m emitter spacing → 1/(0.8×0.3) = 4.17 emitters/m² CROP_FLOW_PARAMS = { "tomato": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # 0.8m rows, 0.3m spacing "pepper": {"emitter_density_m2": 5.56, "emitter_flow_lph": 4}, # 0.6m rows, 0.3m spacing "lettuce": {"emitter_density_m2": 12.50, "emitter_flow_lph": 2}, # 0.4m rows, 0.2m spacing "cucumber": {"emitter_density_m2": 2.00, "emitter_flow_lph": 4}, # 1.0m rows, 0.5m spacing "orchard": {"emitter_density_m2": 0.50, "emitter_flow_lph": 8}, # 2.0m rows, 1.0m spacing "generic": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # same as tomato } # Area-based valve density (valves per hectare) by crop type. # Used as a floor in place_valves_hierarchical — ensures every farm # gets agronomically appropriate zone coverage regardless of pump capacity. # Rule of thumb: ~2 valves/acre ≈ 5 valves/ha for standard row crops. VALVE_DENSITY_PER_HA = { "tomato": 6, # 0.5m rows, intensive water demand "pepper": 6, # 0.6m rows, intensive "lettuce": 7, # 0.4m rows, very intensive "cucumber": 4, # 1.0m rows, moderate "orchard": 2, # 2.0m+ rows, low density "generic": 5, # ≈ 2 valves/acre, standard default } # Hydraulics defaults MAX_LATERAL_LENGTH_M = 200 # ~650 ft; beyond this, pressure drops significantly MAX_PRESSURE_DROP_PCT = 10 # percent ELEVATION_DELTA_THRESHOLD_M = 5 # split zones if > 5m elevation difference def calculate_pump_flow_lph(pump_hp: float) -> float: """ Convert pump horsepower to liters per hour. Uses a lookup table for common pump sizes. For intermediate values, interpolates linearly. Args: pump_hp: Pump horsepower Returns: Approximate flow rate in liters per hour """ if pump_hp <= 0: raise ValveEngineError("Pump horsepower must be > 0") # Find nearest HP values hp_values = sorted(HP_TO_LPH.keys()) if pump_hp in HP_TO_LPH: return HP_TO_LPH[pump_hp] # Interpolate between two nearest values lower_hp = max([h for h in hp_values if h < pump_hp], default=hp_values[0]) upper_hp = min([h for h in hp_values if h > pump_hp], default=hp_values[-1]) if lower_hp == upper_hp: return HP_TO_LPH[lower_hp] lower_flow = HP_TO_LPH[lower_hp] upper_flow = HP_TO_LPH[upper_hp] # Linear interpolation t = (pump_hp - lower_hp) / (upper_hp - lower_hp) return lower_flow + t * (upper_flow - lower_flow) def calculate_total_emitter_flow( crop_zones: List[Dict], crop_params: Optional[Dict] = None ) -> float: """ Calculate total emitter flow from crop zones. Args: crop_zones: List of dicts with keys: - 'crop': crop name - 'area_m2': area in square meters - 'polygon': optional Shapely Polygon (for area calc if not provided) crop_params: Optional custom crop parameters. Defaults to CROP_FLOW_PARAMS. Returns: Total flow in liters per hour """ if crop_params is None: crop_params = CROP_FLOW_PARAMS total_flow = 0.0 for zone in crop_zones: crop = zone.get("crop", "generic") if crop not in crop_params: crop = "generic" # Get area if "area_m2" in zone: area = zone["area_m2"] elif "polygon" in zone and isinstance(zone["polygon"], Polygon): area = zone["polygon"].area else: raise ValveEngineError( f"Zone must have 'area_m2' or 'polygon' with area: {zone}" ) # Get emitter params emitter_density = crop_params[crop]["emitter_density_m2"] emitter_flow = crop_params[crop]["emitter_flow_lph"] zone_flow = area * emitter_density * emitter_flow total_flow += zone_flow return total_flow def calculate_num_zones(total_emitter_flow_lph: float, pump_flow_lph: float) -> int: """ Calculate the number of zones (valves) needed based on capacity. Formula: num_zones = ceil(total_flow / pump_flow) Args: total_emitter_flow_lph: Total emitter demand in L/h pump_flow_lph: Pump capacity in L/h Returns: Number of zones (minimum 1) """ if pump_flow_lph <= 0: raise ValveEngineError("Pump flow must be > 0") if total_emitter_flow_lph <= 0: return 1 num_zones = math.ceil(total_emitter_flow_lph / pump_flow_lph) return max(1, num_zones) def split_polygon_by_crop_zones( farm_polygon: Polygon, crop_zones: List[Dict] ) -> Dict[str, Polygon]: """ Split farm polygon into sub-polygons by crop type. If crop_zones have explicit polygons, use those. Otherwise, attempt to infer zones (not implemented; return full polygon for each crop). Args: farm_polygon: Full farm boundary crop_zones: List of crop zone dicts with 'crop' and optional 'polygon' Returns: Dict mapping crop name to Polygon """ result = {} for i, zone in enumerate(crop_zones): crop = zone.get("crop", f"crop_{i}") if "polygon" in zone and isinstance(zone["polygon"], Polygon): result[crop] = zone["polygon"] else: # No explicit polygon; assign full farm to this crop # In production, you'd use field segmentation or user input result[crop] = farm_polygon return result def should_split_by_topography( polygon: Polygon, elevation_data: Optional[Dict] = None, threshold_m: float = ELEVATION_DELTA_THRESHOLD_M, ) -> Tuple[bool, float]: """ Determine if topography requires zone splitting. Args: polygon: Shapely Polygon elevation_data: Dict with 'min_elevation_m' and 'max_elevation_m' keys threshold_m: Elevation delta threshold (default 5m) Returns: (should_split, elevation_delta_m) """ if elevation_data is None: # No elevation data; assume flat return False, 0.0 min_elev = elevation_data.get("min_elevation_m", 0) max_elev = elevation_data.get("max_elevation_m", 0) delta = max_elev - min_elev should_split = delta > threshold_m return should_split, delta def choose_manifold_strategy(farm_area_m2: float) -> str: """ Choose centralized or distributed valve strategy based on farm area. Args: farm_area_m2: Farm area in square meters Returns: "centralized" if < 2 acres, else "distributed" """ # 1 ha = 10,000 m² HECTARES_M2 = 10000 if farm_area_m2 < HECTARES_M2: return "centralized" else: return "distributed" def find_perimeter_point( polygon: Polygon, reference_point: Point, max_distance_m: float = 50 ) -> Point: """ Find the closest point on the polygon boundary to a reference point. This is used to place valves at the entry point to a zone, along a perimeter path (avoiding planting areas). Args: polygon: Shapely Polygon (the zone) reference_point: Point to measure from (e.g., pump location) max_distance_m: Maximum distance to search (unused for now; just finds closest) Returns: Point on polygon boundary closest to reference_point """ boundary = polygon.boundary closest_point = boundary.interpolate(boundary.project(reference_point)) return closest_point def place_valve_for_zone( zone_polygon: Polygon, pump_point: Point, zone_id: str, strategy: str = "distributed", reason: str = "default", ) -> Dict: """ Place a single valve for a zone. Follows the "head-of-row" rule: - For distributed: place at the entry point of the sub-main into the zone - For centralized: place near the pump Args: zone_polygon: Polygon of the zone pump_point: Point location of pump zone_id: Identifier for this zone strategy: "centralized" or "distributed" reason: Why this zone was created (e.g., "capacity_split") Returns: Dict with valve metadata: - 'id': zone_id - 'location': Point object - 'lat': latitude - 'lon': longitude - 'strategy': "centralized" or "distributed" - 'reason': reason for zone """ if strategy == "centralized": # Place near pump (offset slightly to avoid exact pump location) valve_point = Point(pump_point.x + 10, pump_point.y + 10) else: # Distributed: place at entry point to zone from pump valve_point = find_perimeter_point(zone_polygon, pump_point) return { "id": zone_id, "location": valve_point, "lat": valve_point.y, "lon": valve_point.x, "strategy": strategy, "reason": reason, } def place_valves_hierarchical( farm_polygon: Polygon, pump_point: Point, crop_zones: List[Dict], pump_hp: float, centralized: bool = True, elevation_data: Optional[Dict] = None, max_lateral_length_m: float = MAX_LATERAL_LENGTH_M, max_valves: Optional[int] = None, ) -> List[Dict]: """ Place valves following the 4-step decision matrix. Step 1: Capacity — if total demand > pump capacity, split zones Step 2: Topography — if elevation delta > 5m, separate high/low Step 3: Crop — different crops get dedicated valves Step 4: Hydraulics — if lateral length > threshold, place valve at midpoint Args: farm_polygon: Full farm boundary pump_point: Point location of pump crop_zones: List of crop zone dicts with 'crop' and 'area_m2' or 'polygon' pump_hp: Pump horsepower centralized: If True, use centralized strategy; else distributed elevation_data: Optional dict with 'min_elevation_m', 'max_elevation_m' max_lateral_length_m: Maximum lateral length before forcing a split max_valves: Optional hard cap. If None, uses area-based default. Returns: List of valve dicts with placement and metadata """ valves = [] zone_counter = 0 # Farm area needed early — used by both the area floor and the cap. farm_area_ha = farm_polygon.area / 10000 # Step 1: Capacity constraint pump_flow = calculate_pump_flow_lph(pump_hp) total_flow = calculate_total_emitter_flow(crop_zones) num_zones_capacity = calculate_num_zones(total_flow, pump_flow) # Step 2: Topography should_split_topo, elevation_delta = should_split_by_topography( farm_polygon, elevation_data ) # Step 3: Crop constraint num_zones_crop = len(set(z.get("crop", "generic") for z in crop_zones)) # Step 4: Area-density floor — ensures minimum zone coverage regardless # of pump capacity. Rule of thumb: ~5 valves/ha (generic), crop-specific # for intensive or sparse crops (see VALVE_DENSITY_PER_HA). primary_crop = crop_zones[0].get("crop", "generic") if crop_zones else "generic" density = VALVE_DENSITY_PER_HA.get(primary_crop, VALVE_DENSITY_PER_HA["generic"]) num_zones_area = math.ceil(farm_area_ha * density) # Combined floor: highest of all drivers (all constraints must be satisfied) num_zones_required = max(num_zones_capacity, num_zones_crop, num_zones_area) if should_split_topo: num_zones_required += 1 # Hard cap: prevents runaway on very large farms or extreme capacity splits. if max_valves is None: if farm_area_ha < 0.5: max_valves = 6 elif farm_area_ha < 1.0: max_valves = 10 elif farm_area_ha < 2.0: max_valves = 15 elif farm_area_ha < 5.0: max_valves = 35 elif farm_area_ha < 10.0: max_valves = 60 else: max_valves = 100 num_zones_required = min(num_zones_required, max_valves) # Strategy choice strategy = "centralized" if centralized else "distributed" # Build crop list for assignment — distribute valves across crops # proportional to zone count, with each crop getting at least one valve. unique_crops = list(dict.fromkeys(z.get("crop", "generic") for z in crop_zones)) # Pre-compute evenly-spaced perimeter positions so that Voronoi # partitioning produces distinct zones. For centralized, fan valves # out from the pump; for distributed, space them along the boundary. perimeter = farm_polygon.boundary perimeter_len = perimeter.length # Place valves for i in range(num_zones_required): zone_id = f"valve_{zone_counter:03d}" zone_counter += 1 # Determine reason for this zone if i < max(num_zones_capacity - 1, 0): reason = "capacity_split" elif i < num_zones_crop: reason = "crop_type" elif i < num_zones_area: reason = "area_density" elif should_split_topo: reason = "topography_split" else: reason = "hydraulics_split" # Assign crop: round-robin across unique crops crop = unique_crops[i % len(unique_crops)] if unique_crops else "generic" if strategy == "centralized": # Fan out from pump along perimeter to ensure distinct locations fraction = i / max(num_zones_required, 1) valve_point = perimeter.interpolate(fraction * perimeter_len) # Shift slightly inward toward pump to keep "near pump" intent cx = (valve_point.x + pump_point.x) / 2 cy = (valve_point.y + pump_point.y) / 2 valve_point = Point(cx, cy) else: # Distributed: space evenly along perimeter fraction = i / max(num_zones_required, 1) valve_point = perimeter.interpolate(fraction * perimeter_len) valve = { "id": zone_id, "location": valve_point, "lat": valve_point.y, "lon": valve_point.x, "strategy": strategy, "reason": reason, "crop": crop, } valves.append(valve) return valves def partition_farm_by_sources( farm_polygon: Polygon, sources: List[Dict], ) -> List[Polygon]: """ Partition the farm into non-overlapping service regions, one per water source. Uses Voronoi tessellation weighted by pump capacity: higher-HP pumps claim proportionally larger regions. For a single source, the entire farm is returned. Args: farm_polygon: Farm boundary (UTM) sources: List of dicts with 'pump_point' (Point) and 'pump_hp' (float) Returns: List of Polygons (same order as sources). Their union equals farm_polygon. """ if len(sources) <= 1: return [farm_polygon] # --- Capacity-weighted seed points --------------------------------- # Shift each source point toward the farm centroid proportionally to # its *inverse* capacity. A weaker pump is pulled further inward, # shrinking its Voronoi cell. A stronger pump stays closer to its # real location, expanding its cell. farm_centroid = farm_polygon.centroid max_hp = max(s["pump_hp"] for s in sources) weighted_points = [] for src in sources: pt = src["pump_point"] hp = src["pump_hp"] # weight 0 → full shift to centroid (weakest); 1 → no shift (strongest) weight = hp / max_hp if max_hp > 0 else 1.0 wx = pt.x * weight + farm_centroid.x * (1 - weight) wy = pt.y * weight + farm_centroid.y * (1 - weight) weighted_points.append(Point(wx, wy)) # --- Voronoi diagram ----------------------------------------------- mp = MultiPoint(weighted_points) # envelope= argument clips unbounded Voronoi cells to a bounding region voronoi_geom = voronoi_diagram(mp, envelope=farm_polygon) # voronoi_geom is a GeometryCollection of polygons. We need to map # each cell back to the source whose weighted point lies inside it. cells = list(voronoi_geom.geoms) if isinstance(voronoi_geom, GeometryCollection) else [voronoi_geom] service_regions: List[Optional[Polygon]] = [None] * len(sources) for cell in cells: if not isinstance(cell, Polygon): continue # Clip to farm boundary clipped = cell.intersection(farm_polygon) if clipped.is_empty: continue if isinstance(clipped, MultiPolygon): clipped = max(clipped.geoms, key=lambda g: g.area) if not isinstance(clipped, Polygon): continue # Match to nearest weighted source point cell_centroid = clipped.centroid best_idx = min( range(len(weighted_points)), key=lambda i: cell_centroid.distance(weighted_points[i]), ) if service_regions[best_idx] is None: service_regions[best_idx] = clipped else: service_regions[best_idx] = service_regions[best_idx].union(clipped) if isinstance(service_regions[best_idx], MultiPolygon): service_regions[best_idx] = max( service_regions[best_idx].geoms, key=lambda g: g.area ) # Fill any unassigned sources with their nearest unclaimed area for i, region in enumerate(service_regions): if region is None: service_regions[i] = farm_polygon.buffer(0) # fallback: full farm return service_regions def _project_polygon_onto_axis( polygon: Polygon, axis_direction: Tuple[float, float] ) -> Tuple[float, float]: """ Project a polygon onto an axis direction, returning (min_proj, max_proj). Args: polygon: Shapely Polygon axis_direction: Normalized direction vector (dx, dy) Returns: (min_projection, max_projection) along the axis """ coords = list(polygon.exterior.coords) projections = [ coord[0] * axis_direction[0] + coord[1] * axis_direction[1] for coord in coords ] return min(projections), max(projections) def _refine_zones_by_crop_boundaries( strip_zones: List[Dict], crop_zones: List[Dict], ) -> List[Dict]: """ Refine strip zones by overlaying crop boundaries. If a strip overlaps more than one crop polygon, split it into sub-zones clipped by those crop boundaries and assign the corresponding crop. """ if not crop_zones: return strip_zones refined = [] for zone in strip_zones: strip_poly = zone["polygon"] overlaps = [] for crop_zone in crop_zones: crop_poly = crop_zone.get("polygon") if crop_poly is None or not strip_poly.intersects(crop_poly): continue clipped = strip_poly.intersection(crop_poly) if clipped.is_empty or clipped.area <= 1: continue if isinstance(clipped, MultiPolygon): for geom in clipped.geoms: if geom.area > 1: overlaps.append( { "crop": crop_zone.get("crop", "generic"), "polygon": geom, "area_m2": geom.area, "valve_id": zone["valve_id"], } ) elif isinstance(clipped, Polygon): overlaps.append( { "crop": crop_zone.get("crop", "generic"), "polygon": clipped, "area_m2": clipped.area, "valve_id": zone["valve_id"], } ) if overlaps: refined.extend(overlaps) else: refined.append(zone) return refined def simplify_farm_boundary(polygon: Polygon, tolerance: float = 1.0) -> Polygon: """ Simplify a farm polygon boundary using Douglas-Peucker algorithm. Removes micro-jags and simplifies complex boundaries while maintaining topological validity. Useful for preparing boundaries for geometric slicing. Args: polygon: Shapely Polygon (farm boundary) tolerance: Simplification tolerance in the same units as polygon coordinates. Default 1.0m removes small irregularities without affecting drip field design. Returns: Simplified Polygon with fewer vertices but same general shape """ if not isinstance(polygon, Polygon) or polygon.is_empty: return polygon simplified = polygon.simplify(tolerance, preserve_topology=True) if not isinstance(simplified, Polygon): # If simplification results in a degenerate shape, return original return polygon return simplified def _simplify_zone_vertices(polygon: Polygon, max_vertices: int = 5) -> Polygon: """ Reduce polygon vertex count to max_vertices by finding bounding trapezoid/rectangle. If polygon has > max_vertices, compute its oriented bounding box (trapezoid) and intersect with the original to get a simplified shape. Args: polygon: Shapely Polygon (zone) max_vertices: Target maximum vertex count (default 5 for trapezoid/rectangle) Returns: Polygon with <= max_vertices (or original if simplification fails) """ if not isinstance(polygon, Polygon) or polygon.is_empty: return polygon # Count exterior vertices (excluding repeated closing point) coords = list(polygon.exterior.coords) vertex_count = len(coords) - 1 # -1 because last point repeats the first if vertex_count <= max_vertices: return polygon # Try simplification: use adaptive simplification to reduce vertices # Start with a conservative tolerance and increase if needed simplified = polygon tolerance = 0.1 # Start small max_tolerance = polygon.length / 10 # Don't over-simplify while tolerance <= max_tolerance: test_simp = polygon.simplify(tolerance, preserve_topology=True) if isinstance(test_simp, Polygon): simp_coords = list(test_simp.exterior.coords) simp_vertex_count = len(simp_coords) - 1 if simp_vertex_count <= max_vertices: simplified = test_simp break tolerance *= 1.5 return simplified def _generate_strip_zones( farm_polygon: Polygon, main_direction: Tuple[float, float], num_zones: int, ) -> List[Polygon]: """ Slice farm into N rectangular strips perpendicular to main_direction. Args: farm_polygon: Farm boundary (UTM) main_direction: Normalized direction vector (dx, dy) for main axis num_zones: Number of strips to create Returns: List of strip polygons, clipped to farm boundary """ if num_zones <= 0: return [] # Perpendicular direction (rotate 90 degrees) lateral_direction = (-main_direction[1], main_direction[0]) # Project farm onto both axes to get actual coordinate ranges. # Using the bounding-box diagonal as lateral extent fails when UTM # coordinates are large — the strip rectangles end up far from the # actual polygon. Projecting onto both axes gives the true ranges. min_main, max_main = _project_polygon_onto_axis(farm_polygon, main_direction) min_lat, max_lat = _project_polygon_onto_axis(farm_polygon, lateral_direction) axis_span = max_main - min_main if axis_span <= 0: return [] # Pad the lateral range so the strip rectangle fully covers the polygon lateral_pad = (max_lat - min_lat) * 0.1 + 10 # Divide into N equal strips along the main axis strip_width = axis_span / num_zones strips = [] for i in range(num_zones): strip_min = min_main + i * strip_width strip_max = min_main + (i + 1) * strip_width # Build strip rectangle in the (main, lateral) projection space, # then reconstruct world coordinates. corner_offsets = [ (strip_min, min_lat - lateral_pad), (strip_max, min_lat - lateral_pad), (strip_max, max_lat + lateral_pad), (strip_min, max_lat + lateral_pad), ] strip_corners = [ ( offset[0] * main_direction[0] + offset[1] * lateral_direction[0], offset[0] * main_direction[1] + offset[1] * lateral_direction[1], ) for offset in corner_offsets ] strip_bounds = Polygon(strip_corners) # Intersect strip bounds with farm polygon strip_poly = strip_bounds.intersection(farm_polygon) if not strip_poly.is_empty and strip_poly.area > 0: # Handle MultiPolygon by taking the largest component if isinstance(strip_poly, MultiPolygon): strip_poly = max(strip_poly.geoms, key=lambda p: p.area) if isinstance(strip_poly, Polygon): strips.append(strip_poly) return strips def _allocate_zone_counts_by_crop(crop_zones: List[Dict], num_zones: int) -> List[int]: """ Allocate a zone count to each crop polygon proportional to area. Every crop zone receives at least one strip when possible. """ if not crop_zones or num_zones <= 0: return [] total_area = sum( zone.get("polygon").area for zone in crop_zones if zone.get("polygon") is not None and not zone.get("polygon").is_empty ) if total_area <= 0: return [1] * min(len(crop_zones), num_zones) raw_allocations = [] for zone in crop_zones: polygon = zone.get("polygon") area = polygon.area if polygon is not None and not polygon.is_empty else 0 raw_allocations.append((area / total_area) * num_zones) counts = [max(1, int(math.floor(value))) for value in raw_allocations] while sum(counts) > num_zones: reducible = [i for i, count in enumerate(counts) if count > 1] if not reducible: break index = max(reducible, key=lambda i: counts[i] - raw_allocations[i]) counts[index] -= 1 while sum(counts) < num_zones: index = max(range(len(counts)), key=lambda i: raw_allocations[i] - counts[i]) counts[index] += 1 return counts def _generate_crop_aware_strips( crop_zones: List[Dict], main_direction: Tuple[float, float], num_zones: int, ) -> List[Dict]: """ Generate rectangular strips within crop polygons instead of clipping later. This keeps each zone aligned with the farm axis while respecting crop boundaries, which avoids zigzag fragments caused by post-generation splits. """ if not crop_zones or num_zones <= 0: return [] zone_counts = _allocate_zone_counts_by_crop(crop_zones, num_zones) strips_with_crop = [] for crop_zone, crop_zone_count in zip(crop_zones, zone_counts): crop_polygon = crop_zone.get("polygon") if crop_polygon is None or crop_polygon.is_empty or crop_zone_count <= 0: continue crop_strips = _generate_strip_zones( crop_polygon, main_direction, crop_zone_count, ) for strip in crop_strips: min_projection, _ = _project_polygon_onto_axis(strip, main_direction) strips_with_crop.append( { "polygon": strip, "crop": crop_zone.get("crop", "generic"), "sort_key": min_projection, } ) strips_with_crop.sort(key=lambda item: item["sort_key"]) return strips_with_crop[:num_zones] def anchor_valves_to_zones( zones: List[Dict], pump_location: Point, design_type: str = "distributed", ) -> List[Dict]: """ Anchor valves to zone geometries based on design type. Adds a 'valve_location' to each zone dict, determining where the valve control point should be placed. Args: zones: List of zone dicts with 'polygon' and 'area_m2' keys pump_location: Point location of the pump (UTM) design_type: "centralized" or "distributed" Returns: List of zone dicts with 'valve_location' added """ if not zones: return zones anchored_zones = [] for idx, zone in enumerate(zones): zone_poly = zone.get("polygon") if not zone_poly or zone_poly.is_empty: anchored_zones.append(zone) continue if design_type == "centralized": # Place all valves at/near pump location with slight offsets for visual separation # Fan them out around the pump in different directions angle = (idx / max(len(zones), 1)) * (2 * math.pi) # Full circle offset_dist = 10 # 10 meters valve_x = pump_location.x + offset_dist * math.cos(angle) valve_y = pump_location.y + offset_dist * math.sin(angle) valve_location = Point(valve_x, valve_y) else: # Distributed: place valve at closest point on zone boundary to pump zone_boundary = zone_poly.boundary closest_point = zone_boundary.interpolate( zone_boundary.project(pump_location) ) valve_location = closest_point # Add valve location to zone dict, preserving all other properties anchored_zone = zone.copy() anchored_zone["valve_location"] = valve_location anchored_zones.append(anchored_zone) return anchored_zones def _merge_sliver_zones(zones: List[Dict], farm_polygon: Polygon) -> List[Dict]: """ Detect and merge sliver zones (zones with area < 2% of farm). Slivers are often created by boundary intersections and waste resources. Merge them with their largest neighbor by area. Args: zones: List of zone dicts with 'polygon' and 'area_m2' farm_polygon: Full farm boundary for area calculation Returns: List of zones with slivers merged """ if not zones or len(zones) <= 1: return zones farm_area = farm_polygon.area sliver_threshold = farm_area * 0.02 # 2% of farm area # Find slivers slivers = [] keepers = [] for zone in zones: if zone["area_m2"] < sliver_threshold: slivers.append(zone) else: keepers.append(zone) if not slivers: return zones # Merge each sliver with its largest neighbor merged_zones = keepers.copy() for sliver in slivers: if not merged_zones: merged_zones.append(sliver) continue # Find largest keeper zone (by area) to absorb this sliver largest_idx = max(range(len(merged_zones)), key=lambda i: merged_zones[i]["area_m2"]) largest_zone = merged_zones[largest_idx] # Union polygons merged_poly = largest_zone["polygon"].union(sliver["polygon"]) if isinstance(merged_poly, MultiPolygon): merged_poly = max(merged_poly.geoms, key=lambda p: p.area) # Update largest zone in place while preserving metadata largest_zone["polygon"] = merged_poly largest_zone["area_m2"] = merged_poly.area return merged_zones def generate_valve_zones( farm_polygon: Polygon, num_zones: int, main_direction: Optional[Tuple[float, float]] = None, crop_zones: Optional[List[Dict]] = None, ) -> List[Dict]: """ Generate zone polygons using rectangular strips. If main_direction is provided, creates N rectangular strips perpendicular to the main axis, respecting crop zone boundaries if provided. Otherwise, falls back to strip generation over the whole farm. Args: farm_polygon: Full farm boundary (UTM) num_zones: Number of zones to create main_direction: Optional normalized direction vector (dx, dy). If provided, uses strip-based zones. Otherwise, uses strip fallback. crop_zones: Optional list of crop zone dicts with 'crop' and 'polygon'. If provided, strips are generated within each crop zone boundary to avoid zigzag patterns across crop lines. Returns: List of dicts with 'polygon', 'area_m2', optionally 'crop' """ if num_zones <= 0: return [] # Use strip-based zones if main_direction is provided if main_direction is not None: # If crop zones provided, generate strips within each crop boundary if crop_zones: crop_aware_strips = _generate_crop_aware_strips( crop_zones, main_direction, num_zones, ) strips = [item["polygon"] for item in crop_aware_strips] else: strips = _generate_strip_zones(farm_polygon, main_direction, num_zones) crop_aware_strips = None # Reconcile strip count with valve count instead of falling back # to the legacy grid (which produces jagged zone boundaries). if len(strips) == 0: # Complete failure — generate strips over the whole farm strips = _generate_strip_zones(farm_polygon, main_direction, num_zones) crop_aware_strips = None if len(strips) < num_zones: # Fewer strips than valves: split the largest strip(s) while len(strips) < num_zones: largest_idx = max(range(len(strips)), key=lambda i: strips[i].area) largest = strips.pop(largest_idx) halves = _generate_strip_zones(largest, main_direction, 2) if len(halves) == 2: strips.insert(largest_idx, halves[0]) strips.insert(largest_idx + 1, halves[1]) else: strips.insert(largest_idx, largest) break # can't split further elif len(strips) > num_zones: # More strips than valves: keep only the N largest strips.sort(key=lambda p: p.area, reverse=True) strips = strips[:num_zones] # Final guard: if we still can't match, truncate to strips effective_count = min(len(strips), num_zones) # Create zone dicts (without valve_id, to be added by caller after anchoring) result = [] for index in range(effective_count): strip = strips[index] # Apply vertex simplification to reduce complexity # Ensures zones have <= 5 vertices (rectangular/trapezoidal shapes) simplified_strip = _simplify_zone_vertices(strip, max_vertices=5) zone_dict = { "polygon": simplified_strip, "area_m2": simplified_strip.area, } # Propagate crop from crop_aware_strips if available if crop_aware_strips and index < len(crop_aware_strips): zone_dict["crop"] = crop_aware_strips[index].get("crop", "generic") result.append(zone_dict) # Sliver detection and merging: combine small zones with neighbors result = _merge_sliver_zones(result, farm_polygon) return result else: # No direction provided — fall back to strip generation over whole farm strips = _generate_strip_zones(farm_polygon, (1, 0), num_zones) if strips: return [ {"polygon": s, "area_m2": s.area} for s in strips ] return [] def _generate_valve_zones_legacy( farm_polygon: Polygon, valves: List[Dict] ) -> List[Dict]: """ Legacy Voronoi-style zone generation using grid cells. (Original implementation, kept for backward compatibility.) """ # Bounding box of farm minx, miny, maxx, maxy = farm_polygon.bounds # Create a coarse grid and assign each cell to nearest valve grid_size_m = 10 # 10m grid cells cells = [] cell_areas = [] x = minx while x < maxx: y = miny while y < maxy: # Create a small square cell cell = Polygon( [ (x, y), (x + grid_size_m, y), (x + grid_size_m, y + grid_size_m), (x, y + grid_size_m), ] ) # Clip to farm boundary clipped = cell.intersection(farm_polygon) if not clipped.is_empty and clipped.area > 0: cells.append(clipped) cell_areas.append(clipped.area) y += grid_size_m x += grid_size_m # Group cells by nearest valve zones_by_valve = {v["id"]: [] for v in valves} for cell, area in zip(cells, cell_areas): cell_center = cell.centroid nearest_valve = min( valves, key=lambda v: cell_center.distance(v["location"]) ) zones_by_valve[nearest_valve["id"]].append(cell) # Merge cells per valve into one polygon result = [] for valve in valves: valve_id = valve["id"] cell_list = zones_by_valve[valve_id] if cell_list: # Union all cells merged = cell_list[0] for cell in cell_list[1:]: merged = merged.union(cell) # Handle MultiPolygon if isinstance(merged, MultiPolygon): merged = merged.convex_hull result.append( { "valve_id": valve_id, "polygon": merged, "area_m2": merged.area, } ) return result def valve_layout_summary(valves: List[Dict], zones: List[Dict]) -> str: """ Generate a human-readable summary of valve placement. Args: valves: List of valve dicts zones: List of zone dicts Returns: Formatted string summary """ summary = f""" === Valve Placement Summary === Total Valves: {len(valves)} Valve Details: """ for valve in valves: summary += f""" {valve['id']}: Location: ({valve['lat']:.6f}, {valve['lon']:.6f}) Strategy: {valve['strategy']} Reason: {valve['reason']} """ if zones: summary += "\nZone Areas:\n" total_area = 0 for idx, zone in enumerate(zones): area_ha = zone["area_m2"] / 10000 zone_id = zone.get('valve_id', f'zone_{idx:03d}') summary += f" {zone_id}: {area_ha:.2f} ha\n" total_area += zone["area_m2"] summary += f"Total: {total_area / 10000:.2f} ha\n" return summary.strip()