Spaces:
Sleeping
Sleeping
| """ | |
| Valve Placement Engine | |
| Deterministic valve placement based on: | |
| 1. Capacity (pump flow vs. total emitter demand) | |
| 2. Topography (elevation deltas) | |
| 3. Crop type (hydrozones) | |
| 4. Hydraulics (max lateral length, pressure drop) | |
| Follows the 4-step decision matrix: | |
| - Capacity: if total demand > pump capacity, split into zones | |
| - Topography: if elevation delta > 5m, separate high/low zones | |
| - Crop: different crops get dedicated valves | |
| - Hydraulics: if lateral length > max runtime, place valve at mid-point | |
| """ | |
| from typing import List, Dict, Tuple, Optional | |
| import math | |
| from shapely.geometry import Point, Polygon, LineString, MultiPolygon, MultiPoint, GeometryCollection | |
| from shapely.ops import split as shapely_split, voronoi_diagram, unary_union | |
| import numpy as np | |
| class ValveEngineError(Exception): | |
| """Custom exception for valve placement errors.""" | |
| pass | |
| # Pump HP to flow rate conversion (realistic centrifugal pump curves) | |
| # These are typical discharge rates for agricultural irrigation pumps | |
| HP_TO_LPH = { | |
| 0.5: 2500, | |
| 0.75: 4000, | |
| 1.0: 5000, | |
| 1.5: 8000, | |
| 2.0: 15000, | |
| 3.0: 25000, | |
| 5.0: 40000, | |
| 7.5: 60000, | |
| 10.0: 80000, | |
| 15.0: 120000, | |
| 20.0: 160000, | |
| } | |
| # Crop flow parameters — emitter density derived from (lateral_spacing × emitter_spacing) | |
| # e.g. tomato: 0.8m row spacing, 0.3m emitter spacing → 1/(0.8×0.3) = 4.17 emitters/m² | |
| CROP_FLOW_PARAMS = { | |
| "tomato": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # 0.8m rows, 0.3m spacing | |
| "pepper": {"emitter_density_m2": 5.56, "emitter_flow_lph": 4}, # 0.6m rows, 0.3m spacing | |
| "lettuce": {"emitter_density_m2": 12.50, "emitter_flow_lph": 2}, # 0.4m rows, 0.2m spacing | |
| "cucumber": {"emitter_density_m2": 2.00, "emitter_flow_lph": 4}, # 1.0m rows, 0.5m spacing | |
| "orchard": {"emitter_density_m2": 0.50, "emitter_flow_lph": 8}, # 2.0m rows, 1.0m spacing | |
| "generic": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # same as tomato | |
| } | |
| # Area-based valve density (valves per hectare) by crop type. | |
| # Used as a floor in place_valves_hierarchical — ensures every farm | |
| # gets agronomically appropriate zone coverage regardless of pump capacity. | |
| # Rule of thumb: ~2 valves/acre ≈ 5 valves/ha for standard row crops. | |
| VALVE_DENSITY_PER_HA = { | |
| "tomato": 6, # 0.5m rows, intensive water demand | |
| "pepper": 6, # 0.6m rows, intensive | |
| "lettuce": 7, # 0.4m rows, very intensive | |
| "cucumber": 4, # 1.0m rows, moderate | |
| "orchard": 2, # 2.0m+ rows, low density | |
| "generic": 5, # ≈ 2 valves/acre, standard default | |
| } | |
| # Hydraulics defaults | |
| MAX_LATERAL_LENGTH_M = 200 # ~650 ft; beyond this, pressure drops significantly | |
| MAX_PRESSURE_DROP_PCT = 10 # percent | |
| ELEVATION_DELTA_THRESHOLD_M = 5 # split zones if > 5m elevation difference | |
| def calculate_pump_flow_lph(pump_hp: float) -> float: | |
| """ | |
| Convert pump horsepower to liters per hour. | |
| Uses a lookup table for common pump sizes. | |
| For intermediate values, interpolates linearly. | |
| Args: | |
| pump_hp: Pump horsepower | |
| Returns: | |
| Approximate flow rate in liters per hour | |
| """ | |
| if pump_hp <= 0: | |
| raise ValveEngineError("Pump horsepower must be > 0") | |
| # Find nearest HP values | |
| hp_values = sorted(HP_TO_LPH.keys()) | |
| if pump_hp in HP_TO_LPH: | |
| return HP_TO_LPH[pump_hp] | |
| # Interpolate between two nearest values | |
| lower_hp = max([h for h in hp_values if h < pump_hp], default=hp_values[0]) | |
| upper_hp = min([h for h in hp_values if h > pump_hp], default=hp_values[-1]) | |
| if lower_hp == upper_hp: | |
| return HP_TO_LPH[lower_hp] | |
| lower_flow = HP_TO_LPH[lower_hp] | |
| upper_flow = HP_TO_LPH[upper_hp] | |
| # Linear interpolation | |
| t = (pump_hp - lower_hp) / (upper_hp - lower_hp) | |
| return lower_flow + t * (upper_flow - lower_flow) | |
| def calculate_total_emitter_flow( | |
| crop_zones: List[Dict], crop_params: Optional[Dict] = None | |
| ) -> float: | |
| """ | |
| Calculate total emitter flow from crop zones. | |
| Args: | |
| crop_zones: List of dicts with keys: | |
| - 'crop': crop name | |
| - 'area_m2': area in square meters | |
| - 'polygon': optional Shapely Polygon (for area calc if not provided) | |
| crop_params: Optional custom crop parameters. Defaults to CROP_FLOW_PARAMS. | |
| Returns: | |
| Total flow in liters per hour | |
| """ | |
| if crop_params is None: | |
| crop_params = CROP_FLOW_PARAMS | |
| total_flow = 0.0 | |
| for zone in crop_zones: | |
| crop = zone.get("crop", "generic") | |
| if crop not in crop_params: | |
| crop = "generic" | |
| # Get area | |
| if "area_m2" in zone: | |
| area = zone["area_m2"] | |
| elif "polygon" in zone and isinstance(zone["polygon"], Polygon): | |
| area = zone["polygon"].area | |
| else: | |
| raise ValveEngineError( | |
| f"Zone must have 'area_m2' or 'polygon' with area: {zone}" | |
| ) | |
| # Get emitter params | |
| emitter_density = crop_params[crop]["emitter_density_m2"] | |
| emitter_flow = crop_params[crop]["emitter_flow_lph"] | |
| zone_flow = area * emitter_density * emitter_flow | |
| total_flow += zone_flow | |
| return total_flow | |
| def calculate_num_zones(total_emitter_flow_lph: float, pump_flow_lph: float) -> int: | |
| """ | |
| Calculate the number of zones (valves) needed based on capacity. | |
| Formula: num_zones = ceil(total_flow / pump_flow) | |
| Args: | |
| total_emitter_flow_lph: Total emitter demand in L/h | |
| pump_flow_lph: Pump capacity in L/h | |
| Returns: | |
| Number of zones (minimum 1) | |
| """ | |
| if pump_flow_lph <= 0: | |
| raise ValveEngineError("Pump flow must be > 0") | |
| if total_emitter_flow_lph <= 0: | |
| return 1 | |
| num_zones = math.ceil(total_emitter_flow_lph / pump_flow_lph) | |
| return max(1, num_zones) | |
| def split_polygon_by_crop_zones( | |
| farm_polygon: Polygon, crop_zones: List[Dict] | |
| ) -> Dict[str, Polygon]: | |
| """ | |
| Split farm polygon into sub-polygons by crop type. | |
| If crop_zones have explicit polygons, use those. | |
| Otherwise, attempt to infer zones (not implemented; return full polygon for each crop). | |
| Args: | |
| farm_polygon: Full farm boundary | |
| crop_zones: List of crop zone dicts with 'crop' and optional 'polygon' | |
| Returns: | |
| Dict mapping crop name to Polygon | |
| """ | |
| result = {} | |
| for i, zone in enumerate(crop_zones): | |
| crop = zone.get("crop", f"crop_{i}") | |
| if "polygon" in zone and isinstance(zone["polygon"], Polygon): | |
| result[crop] = zone["polygon"] | |
| else: | |
| # No explicit polygon; assign full farm to this crop | |
| # In production, you'd use field segmentation or user input | |
| result[crop] = farm_polygon | |
| return result | |
| def should_split_by_topography( | |
| polygon: Polygon, | |
| elevation_data: Optional[Dict] = None, | |
| threshold_m: float = ELEVATION_DELTA_THRESHOLD_M, | |
| ) -> Tuple[bool, float]: | |
| """ | |
| Determine if topography requires zone splitting. | |
| Args: | |
| polygon: Shapely Polygon | |
| elevation_data: Dict with 'min_elevation_m' and 'max_elevation_m' keys | |
| threshold_m: Elevation delta threshold (default 5m) | |
| Returns: | |
| (should_split, elevation_delta_m) | |
| """ | |
| if elevation_data is None: | |
| # No elevation data; assume flat | |
| return False, 0.0 | |
| min_elev = elevation_data.get("min_elevation_m", 0) | |
| max_elev = elevation_data.get("max_elevation_m", 0) | |
| delta = max_elev - min_elev | |
| should_split = delta > threshold_m | |
| return should_split, delta | |
| def choose_manifold_strategy(farm_area_m2: float) -> str: | |
| """ | |
| Choose centralized or distributed valve strategy based on farm area. | |
| Args: | |
| farm_area_m2: Farm area in square meters | |
| Returns: | |
| "centralized" if < 2 acres, else "distributed" | |
| """ | |
| # 1 ha = 10,000 m² | |
| HECTARES_M2 = 10000 | |
| if farm_area_m2 < HECTARES_M2: | |
| return "centralized" | |
| else: | |
| return "distributed" | |
| def find_perimeter_point( | |
| polygon: Polygon, reference_point: Point, max_distance_m: float = 50 | |
| ) -> Point: | |
| """ | |
| Find the closest point on the polygon boundary to a reference point. | |
| This is used to place valves at the entry point to a zone, | |
| along a perimeter path (avoiding planting areas). | |
| Args: | |
| polygon: Shapely Polygon (the zone) | |
| reference_point: Point to measure from (e.g., pump location) | |
| max_distance_m: Maximum distance to search (unused for now; just finds closest) | |
| Returns: | |
| Point on polygon boundary closest to reference_point | |
| """ | |
| boundary = polygon.boundary | |
| closest_point = boundary.interpolate(boundary.project(reference_point)) | |
| return closest_point | |
| def place_valve_for_zone( | |
| zone_polygon: Polygon, | |
| pump_point: Point, | |
| zone_id: str, | |
| strategy: str = "distributed", | |
| reason: str = "default", | |
| ) -> Dict: | |
| """ | |
| Place a single valve for a zone. | |
| Follows the "head-of-row" rule: | |
| - For distributed: place at the entry point of the sub-main into the zone | |
| - For centralized: place near the pump | |
| Args: | |
| zone_polygon: Polygon of the zone | |
| pump_point: Point location of pump | |
| zone_id: Identifier for this zone | |
| strategy: "centralized" or "distributed" | |
| reason: Why this zone was created (e.g., "capacity_split") | |
| Returns: | |
| Dict with valve metadata: | |
| - 'id': zone_id | |
| - 'location': Point object | |
| - 'lat': latitude | |
| - 'lon': longitude | |
| - 'strategy': "centralized" or "distributed" | |
| - 'reason': reason for zone | |
| """ | |
| if strategy == "centralized": | |
| # Place near pump (offset slightly to avoid exact pump location) | |
| valve_point = Point(pump_point.x + 10, pump_point.y + 10) | |
| else: | |
| # Distributed: place at entry point to zone from pump | |
| valve_point = find_perimeter_point(zone_polygon, pump_point) | |
| return { | |
| "id": zone_id, | |
| "location": valve_point, | |
| "lat": valve_point.y, | |
| "lon": valve_point.x, | |
| "strategy": strategy, | |
| "reason": reason, | |
| } | |
| def place_valves_hierarchical( | |
| farm_polygon: Polygon, | |
| pump_point: Point, | |
| crop_zones: List[Dict], | |
| pump_hp: float, | |
| centralized: bool = True, | |
| elevation_data: Optional[Dict] = None, | |
| max_lateral_length_m: float = MAX_LATERAL_LENGTH_M, | |
| max_valves: Optional[int] = None, | |
| ) -> List[Dict]: | |
| """ | |
| Place valves following the 4-step decision matrix. | |
| Step 1: Capacity — if total demand > pump capacity, split zones | |
| Step 2: Topography — if elevation delta > 5m, separate high/low | |
| Step 3: Crop — different crops get dedicated valves | |
| Step 4: Hydraulics — if lateral length > threshold, place valve at midpoint | |
| Args: | |
| farm_polygon: Full farm boundary | |
| pump_point: Point location of pump | |
| crop_zones: List of crop zone dicts with 'crop' and 'area_m2' or 'polygon' | |
| pump_hp: Pump horsepower | |
| centralized: If True, use centralized strategy; else distributed | |
| elevation_data: Optional dict with 'min_elevation_m', 'max_elevation_m' | |
| max_lateral_length_m: Maximum lateral length before forcing a split | |
| max_valves: Optional hard cap. If None, uses area-based default. | |
| Returns: | |
| List of valve dicts with placement and metadata | |
| """ | |
| valves = [] | |
| zone_counter = 0 | |
| # Farm area needed early — used by both the area floor and the cap. | |
| farm_area_ha = farm_polygon.area / 10000 | |
| # Step 1: Capacity constraint | |
| pump_flow = calculate_pump_flow_lph(pump_hp) | |
| total_flow = calculate_total_emitter_flow(crop_zones) | |
| num_zones_capacity = calculate_num_zones(total_flow, pump_flow) | |
| # Step 2: Topography | |
| should_split_topo, elevation_delta = should_split_by_topography( | |
| farm_polygon, elevation_data | |
| ) | |
| # Step 3: Crop constraint | |
| num_zones_crop = len(set(z.get("crop", "generic") for z in crop_zones)) | |
| # Step 4: Area-density floor — ensures minimum zone coverage regardless | |
| # of pump capacity. Rule of thumb: ~5 valves/ha (generic), crop-specific | |
| # for intensive or sparse crops (see VALVE_DENSITY_PER_HA). | |
| primary_crop = crop_zones[0].get("crop", "generic") if crop_zones else "generic" | |
| density = VALVE_DENSITY_PER_HA.get(primary_crop, VALVE_DENSITY_PER_HA["generic"]) | |
| num_zones_area = math.ceil(farm_area_ha * density) | |
| # Combined floor: highest of all drivers (all constraints must be satisfied) | |
| num_zones_required = max(num_zones_capacity, num_zones_crop, num_zones_area) | |
| if should_split_topo: | |
| num_zones_required += 1 | |
| # Hard cap: prevents runaway on very large farms or extreme capacity splits. | |
| if max_valves is None: | |
| if farm_area_ha < 0.5: | |
| max_valves = 6 | |
| elif farm_area_ha < 1.0: | |
| max_valves = 10 | |
| elif farm_area_ha < 2.0: | |
| max_valves = 15 | |
| elif farm_area_ha < 5.0: | |
| max_valves = 35 | |
| elif farm_area_ha < 10.0: | |
| max_valves = 60 | |
| else: | |
| max_valves = 100 | |
| num_zones_required = min(num_zones_required, max_valves) | |
| # Strategy choice | |
| strategy = "centralized" if centralized else "distributed" | |
| # Build crop list for assignment — distribute valves across crops | |
| # proportional to zone count, with each crop getting at least one valve. | |
| unique_crops = list(dict.fromkeys(z.get("crop", "generic") for z in crop_zones)) | |
| # Pre-compute evenly-spaced perimeter positions so that Voronoi | |
| # partitioning produces distinct zones. For centralized, fan valves | |
| # out from the pump; for distributed, space them along the boundary. | |
| perimeter = farm_polygon.boundary | |
| perimeter_len = perimeter.length | |
| # Place valves | |
| for i in range(num_zones_required): | |
| zone_id = f"valve_{zone_counter:03d}" | |
| zone_counter += 1 | |
| # Determine reason for this zone | |
| if i < max(num_zones_capacity - 1, 0): | |
| reason = "capacity_split" | |
| elif i < num_zones_crop: | |
| reason = "crop_type" | |
| elif i < num_zones_area: | |
| reason = "area_density" | |
| elif should_split_topo: | |
| reason = "topography_split" | |
| else: | |
| reason = "hydraulics_split" | |
| # Assign crop: round-robin across unique crops | |
| crop = unique_crops[i % len(unique_crops)] if unique_crops else "generic" | |
| if strategy == "centralized": | |
| # Fan out from pump along perimeter to ensure distinct locations | |
| fraction = i / max(num_zones_required, 1) | |
| valve_point = perimeter.interpolate(fraction * perimeter_len) | |
| # Shift slightly inward toward pump to keep "near pump" intent | |
| cx = (valve_point.x + pump_point.x) / 2 | |
| cy = (valve_point.y + pump_point.y) / 2 | |
| valve_point = Point(cx, cy) | |
| else: | |
| # Distributed: space evenly along perimeter | |
| fraction = i / max(num_zones_required, 1) | |
| valve_point = perimeter.interpolate(fraction * perimeter_len) | |
| valve = { | |
| "id": zone_id, | |
| "location": valve_point, | |
| "lat": valve_point.y, | |
| "lon": valve_point.x, | |
| "strategy": strategy, | |
| "reason": reason, | |
| "crop": crop, | |
| } | |
| valves.append(valve) | |
| return valves | |
| def partition_farm_by_sources( | |
| farm_polygon: Polygon, | |
| sources: List[Dict], | |
| ) -> List[Polygon]: | |
| """ | |
| Partition the farm into non-overlapping service regions, one per water source. | |
| Uses Voronoi tessellation weighted by pump capacity: higher-HP pumps claim | |
| proportionally larger regions. For a single source, the entire farm is returned. | |
| Args: | |
| farm_polygon: Farm boundary (UTM) | |
| sources: List of dicts with 'pump_point' (Point) and 'pump_hp' (float) | |
| Returns: | |
| List of Polygons (same order as sources). Their union equals farm_polygon. | |
| """ | |
| if len(sources) <= 1: | |
| return [farm_polygon] | |
| # --- Capacity-weighted seed points --------------------------------- | |
| # Shift each source point toward the farm centroid proportionally to | |
| # its *inverse* capacity. A weaker pump is pulled further inward, | |
| # shrinking its Voronoi cell. A stronger pump stays closer to its | |
| # real location, expanding its cell. | |
| farm_centroid = farm_polygon.centroid | |
| max_hp = max(s["pump_hp"] for s in sources) | |
| weighted_points = [] | |
| for src in sources: | |
| pt = src["pump_point"] | |
| hp = src["pump_hp"] | |
| # weight 0 → full shift to centroid (weakest); 1 → no shift (strongest) | |
| weight = hp / max_hp if max_hp > 0 else 1.0 | |
| wx = pt.x * weight + farm_centroid.x * (1 - weight) | |
| wy = pt.y * weight + farm_centroid.y * (1 - weight) | |
| weighted_points.append(Point(wx, wy)) | |
| # --- Voronoi diagram ----------------------------------------------- | |
| mp = MultiPoint(weighted_points) | |
| # envelope= argument clips unbounded Voronoi cells to a bounding region | |
| voronoi_geom = voronoi_diagram(mp, envelope=farm_polygon) | |
| # voronoi_geom is a GeometryCollection of polygons. We need to map | |
| # each cell back to the source whose weighted point lies inside it. | |
| cells = list(voronoi_geom.geoms) if isinstance(voronoi_geom, GeometryCollection) else [voronoi_geom] | |
| service_regions: List[Optional[Polygon]] = [None] * len(sources) | |
| for cell in cells: | |
| if not isinstance(cell, Polygon): | |
| continue | |
| # Clip to farm boundary | |
| clipped = cell.intersection(farm_polygon) | |
| if clipped.is_empty: | |
| continue | |
| if isinstance(clipped, MultiPolygon): | |
| clipped = max(clipped.geoms, key=lambda g: g.area) | |
| if not isinstance(clipped, Polygon): | |
| continue | |
| # Match to nearest weighted source point | |
| cell_centroid = clipped.centroid | |
| best_idx = min( | |
| range(len(weighted_points)), | |
| key=lambda i: cell_centroid.distance(weighted_points[i]), | |
| ) | |
| if service_regions[best_idx] is None: | |
| service_regions[best_idx] = clipped | |
| else: | |
| service_regions[best_idx] = service_regions[best_idx].union(clipped) | |
| if isinstance(service_regions[best_idx], MultiPolygon): | |
| service_regions[best_idx] = max( | |
| service_regions[best_idx].geoms, key=lambda g: g.area | |
| ) | |
| # Fill any unassigned sources with their nearest unclaimed area | |
| for i, region in enumerate(service_regions): | |
| if region is None: | |
| service_regions[i] = farm_polygon.buffer(0) # fallback: full farm | |
| return service_regions | |
| def _project_polygon_onto_axis( | |
| polygon: Polygon, axis_direction: Tuple[float, float] | |
| ) -> Tuple[float, float]: | |
| """ | |
| Project a polygon onto an axis direction, returning (min_proj, max_proj). | |
| Args: | |
| polygon: Shapely Polygon | |
| axis_direction: Normalized direction vector (dx, dy) | |
| Returns: | |
| (min_projection, max_projection) along the axis | |
| """ | |
| coords = list(polygon.exterior.coords) | |
| projections = [ | |
| coord[0] * axis_direction[0] + coord[1] * axis_direction[1] | |
| for coord in coords | |
| ] | |
| return min(projections), max(projections) | |
| def _refine_zones_by_crop_boundaries( | |
| strip_zones: List[Dict], | |
| crop_zones: List[Dict], | |
| ) -> List[Dict]: | |
| """ | |
| Refine strip zones by overlaying crop boundaries. | |
| If a strip overlaps more than one crop polygon, split it into sub-zones | |
| clipped by those crop boundaries and assign the corresponding crop. | |
| """ | |
| if not crop_zones: | |
| return strip_zones | |
| refined = [] | |
| for zone in strip_zones: | |
| strip_poly = zone["polygon"] | |
| overlaps = [] | |
| for crop_zone in crop_zones: | |
| crop_poly = crop_zone.get("polygon") | |
| if crop_poly is None or not strip_poly.intersects(crop_poly): | |
| continue | |
| clipped = strip_poly.intersection(crop_poly) | |
| if clipped.is_empty or clipped.area <= 1: | |
| continue | |
| if isinstance(clipped, MultiPolygon): | |
| for geom in clipped.geoms: | |
| if geom.area > 1: | |
| overlaps.append( | |
| { | |
| "crop": crop_zone.get("crop", "generic"), | |
| "polygon": geom, | |
| "area_m2": geom.area, | |
| "valve_id": zone["valve_id"], | |
| } | |
| ) | |
| elif isinstance(clipped, Polygon): | |
| overlaps.append( | |
| { | |
| "crop": crop_zone.get("crop", "generic"), | |
| "polygon": clipped, | |
| "area_m2": clipped.area, | |
| "valve_id": zone["valve_id"], | |
| } | |
| ) | |
| if overlaps: | |
| refined.extend(overlaps) | |
| else: | |
| refined.append(zone) | |
| return refined | |
| def simplify_farm_boundary(polygon: Polygon, tolerance: float = 1.0) -> Polygon: | |
| """ | |
| Simplify a farm polygon boundary using Douglas-Peucker algorithm. | |
| Removes micro-jags and simplifies complex boundaries while maintaining | |
| topological validity. Useful for preparing boundaries for geometric slicing. | |
| Args: | |
| polygon: Shapely Polygon (farm boundary) | |
| tolerance: Simplification tolerance in the same units as polygon coordinates. | |
| Default 1.0m removes small irregularities without affecting drip field design. | |
| Returns: | |
| Simplified Polygon with fewer vertices but same general shape | |
| """ | |
| if not isinstance(polygon, Polygon) or polygon.is_empty: | |
| return polygon | |
| simplified = polygon.simplify(tolerance, preserve_topology=True) | |
| if not isinstance(simplified, Polygon): | |
| # If simplification results in a degenerate shape, return original | |
| return polygon | |
| return simplified | |
| def _simplify_zone_vertices(polygon: Polygon, max_vertices: int = 5) -> Polygon: | |
| """ | |
| Reduce polygon vertex count to max_vertices by finding bounding trapezoid/rectangle. | |
| If polygon has > max_vertices, compute its oriented bounding box (trapezoid) | |
| and intersect with the original to get a simplified shape. | |
| Args: | |
| polygon: Shapely Polygon (zone) | |
| max_vertices: Target maximum vertex count (default 5 for trapezoid/rectangle) | |
| Returns: | |
| Polygon with <= max_vertices (or original if simplification fails) | |
| """ | |
| if not isinstance(polygon, Polygon) or polygon.is_empty: | |
| return polygon | |
| # Count exterior vertices (excluding repeated closing point) | |
| coords = list(polygon.exterior.coords) | |
| vertex_count = len(coords) - 1 # -1 because last point repeats the first | |
| if vertex_count <= max_vertices: | |
| return polygon | |
| # Try simplification: use adaptive simplification to reduce vertices | |
| # Start with a conservative tolerance and increase if needed | |
| simplified = polygon | |
| tolerance = 0.1 # Start small | |
| max_tolerance = polygon.length / 10 # Don't over-simplify | |
| while tolerance <= max_tolerance: | |
| test_simp = polygon.simplify(tolerance, preserve_topology=True) | |
| if isinstance(test_simp, Polygon): | |
| simp_coords = list(test_simp.exterior.coords) | |
| simp_vertex_count = len(simp_coords) - 1 | |
| if simp_vertex_count <= max_vertices: | |
| simplified = test_simp | |
| break | |
| tolerance *= 1.5 | |
| return simplified | |
| def _generate_strip_zones( | |
| farm_polygon: Polygon, | |
| main_direction: Tuple[float, float], | |
| num_zones: int, | |
| ) -> List[Polygon]: | |
| """ | |
| Slice farm into N rectangular strips perpendicular to main_direction. | |
| Args: | |
| farm_polygon: Farm boundary (UTM) | |
| main_direction: Normalized direction vector (dx, dy) for main axis | |
| num_zones: Number of strips to create | |
| Returns: | |
| List of strip polygons, clipped to farm boundary | |
| """ | |
| if num_zones <= 0: | |
| return [] | |
| # Perpendicular direction (rotate 90 degrees) | |
| lateral_direction = (-main_direction[1], main_direction[0]) | |
| # Project farm onto both axes to get actual coordinate ranges. | |
| # Using the bounding-box diagonal as lateral extent fails when UTM | |
| # coordinates are large — the strip rectangles end up far from the | |
| # actual polygon. Projecting onto both axes gives the true ranges. | |
| min_main, max_main = _project_polygon_onto_axis(farm_polygon, main_direction) | |
| min_lat, max_lat = _project_polygon_onto_axis(farm_polygon, lateral_direction) | |
| axis_span = max_main - min_main | |
| if axis_span <= 0: | |
| return [] | |
| # Pad the lateral range so the strip rectangle fully covers the polygon | |
| lateral_pad = (max_lat - min_lat) * 0.1 + 10 | |
| # Divide into N equal strips along the main axis | |
| strip_width = axis_span / num_zones | |
| strips = [] | |
| for i in range(num_zones): | |
| strip_min = min_main + i * strip_width | |
| strip_max = min_main + (i + 1) * strip_width | |
| # Build strip rectangle in the (main, lateral) projection space, | |
| # then reconstruct world coordinates. | |
| corner_offsets = [ | |
| (strip_min, min_lat - lateral_pad), | |
| (strip_max, min_lat - lateral_pad), | |
| (strip_max, max_lat + lateral_pad), | |
| (strip_min, max_lat + lateral_pad), | |
| ] | |
| strip_corners = [ | |
| ( | |
| offset[0] * main_direction[0] + offset[1] * lateral_direction[0], | |
| offset[0] * main_direction[1] + offset[1] * lateral_direction[1], | |
| ) | |
| for offset in corner_offsets | |
| ] | |
| strip_bounds = Polygon(strip_corners) | |
| # Intersect strip bounds with farm polygon | |
| strip_poly = strip_bounds.intersection(farm_polygon) | |
| if not strip_poly.is_empty and strip_poly.area > 0: | |
| # Handle MultiPolygon by taking the largest component | |
| if isinstance(strip_poly, MultiPolygon): | |
| strip_poly = max(strip_poly.geoms, key=lambda p: p.area) | |
| if isinstance(strip_poly, Polygon): | |
| strips.append(strip_poly) | |
| return strips | |
| def _allocate_zone_counts_by_crop(crop_zones: List[Dict], num_zones: int) -> List[int]: | |
| """ | |
| Allocate a zone count to each crop polygon proportional to area. | |
| Every crop zone receives at least one strip when possible. | |
| """ | |
| if not crop_zones or num_zones <= 0: | |
| return [] | |
| total_area = sum( | |
| zone.get("polygon").area | |
| for zone in crop_zones | |
| if zone.get("polygon") is not None and not zone.get("polygon").is_empty | |
| ) | |
| if total_area <= 0: | |
| return [1] * min(len(crop_zones), num_zones) | |
| raw_allocations = [] | |
| for zone in crop_zones: | |
| polygon = zone.get("polygon") | |
| area = polygon.area if polygon is not None and not polygon.is_empty else 0 | |
| raw_allocations.append((area / total_area) * num_zones) | |
| counts = [max(1, int(math.floor(value))) for value in raw_allocations] | |
| while sum(counts) > num_zones: | |
| reducible = [i for i, count in enumerate(counts) if count > 1] | |
| if not reducible: | |
| break | |
| index = max(reducible, key=lambda i: counts[i] - raw_allocations[i]) | |
| counts[index] -= 1 | |
| while sum(counts) < num_zones: | |
| index = max(range(len(counts)), key=lambda i: raw_allocations[i] - counts[i]) | |
| counts[index] += 1 | |
| return counts | |
| def _generate_crop_aware_strips( | |
| crop_zones: List[Dict], | |
| main_direction: Tuple[float, float], | |
| num_zones: int, | |
| ) -> List[Dict]: | |
| """ | |
| Generate rectangular strips within crop polygons instead of clipping later. | |
| This keeps each zone aligned with the farm axis while respecting crop | |
| boundaries, which avoids zigzag fragments caused by post-generation splits. | |
| """ | |
| if not crop_zones or num_zones <= 0: | |
| return [] | |
| zone_counts = _allocate_zone_counts_by_crop(crop_zones, num_zones) | |
| strips_with_crop = [] | |
| for crop_zone, crop_zone_count in zip(crop_zones, zone_counts): | |
| crop_polygon = crop_zone.get("polygon") | |
| if crop_polygon is None or crop_polygon.is_empty or crop_zone_count <= 0: | |
| continue | |
| crop_strips = _generate_strip_zones( | |
| crop_polygon, | |
| main_direction, | |
| crop_zone_count, | |
| ) | |
| for strip in crop_strips: | |
| min_projection, _ = _project_polygon_onto_axis(strip, main_direction) | |
| strips_with_crop.append( | |
| { | |
| "polygon": strip, | |
| "crop": crop_zone.get("crop", "generic"), | |
| "sort_key": min_projection, | |
| } | |
| ) | |
| strips_with_crop.sort(key=lambda item: item["sort_key"]) | |
| return strips_with_crop[:num_zones] | |
| def anchor_valves_to_zones( | |
| zones: List[Dict], | |
| pump_location: Point, | |
| design_type: str = "distributed", | |
| ) -> List[Dict]: | |
| """ | |
| Anchor valves to zone geometries based on design type. | |
| Adds a 'valve_location' to each zone dict, determining where the valve | |
| control point should be placed. | |
| Args: | |
| zones: List of zone dicts with 'polygon' and 'area_m2' keys | |
| pump_location: Point location of the pump (UTM) | |
| design_type: "centralized" or "distributed" | |
| Returns: | |
| List of zone dicts with 'valve_location' added | |
| """ | |
| if not zones: | |
| return zones | |
| anchored_zones = [] | |
| for idx, zone in enumerate(zones): | |
| zone_poly = zone.get("polygon") | |
| if not zone_poly or zone_poly.is_empty: | |
| anchored_zones.append(zone) | |
| continue | |
| if design_type == "centralized": | |
| # Place all valves at/near pump location with slight offsets for visual separation | |
| # Fan them out around the pump in different directions | |
| angle = (idx / max(len(zones), 1)) * (2 * math.pi) # Full circle | |
| offset_dist = 10 # 10 meters | |
| valve_x = pump_location.x + offset_dist * math.cos(angle) | |
| valve_y = pump_location.y + offset_dist * math.sin(angle) | |
| valve_location = Point(valve_x, valve_y) | |
| else: | |
| # Distributed: place valve at closest point on zone boundary to pump | |
| zone_boundary = zone_poly.boundary | |
| closest_point = zone_boundary.interpolate( | |
| zone_boundary.project(pump_location) | |
| ) | |
| valve_location = closest_point | |
| # Add valve location to zone dict, preserving all other properties | |
| anchored_zone = zone.copy() | |
| anchored_zone["valve_location"] = valve_location | |
| anchored_zones.append(anchored_zone) | |
| return anchored_zones | |
| def _merge_sliver_zones(zones: List[Dict], farm_polygon: Polygon) -> List[Dict]: | |
| """ | |
| Detect and merge sliver zones (zones with area < 2% of farm). | |
| Slivers are often created by boundary intersections and waste resources. | |
| Merge them with their largest neighbor by area. | |
| Args: | |
| zones: List of zone dicts with 'polygon' and 'area_m2' | |
| farm_polygon: Full farm boundary for area calculation | |
| Returns: | |
| List of zones with slivers merged | |
| """ | |
| if not zones or len(zones) <= 1: | |
| return zones | |
| farm_area = farm_polygon.area | |
| sliver_threshold = farm_area * 0.02 # 2% of farm area | |
| # Find slivers | |
| slivers = [] | |
| keepers = [] | |
| for zone in zones: | |
| if zone["area_m2"] < sliver_threshold: | |
| slivers.append(zone) | |
| else: | |
| keepers.append(zone) | |
| if not slivers: | |
| return zones | |
| # Merge each sliver with its largest neighbor | |
| merged_zones = keepers.copy() | |
| for sliver in slivers: | |
| if not merged_zones: | |
| merged_zones.append(sliver) | |
| continue | |
| # Find largest keeper zone (by area) to absorb this sliver | |
| largest_idx = max(range(len(merged_zones)), | |
| key=lambda i: merged_zones[i]["area_m2"]) | |
| largest_zone = merged_zones[largest_idx] | |
| # Union polygons | |
| merged_poly = largest_zone["polygon"].union(sliver["polygon"]) | |
| if isinstance(merged_poly, MultiPolygon): | |
| merged_poly = max(merged_poly.geoms, key=lambda p: p.area) | |
| # Update largest zone in place while preserving metadata | |
| largest_zone["polygon"] = merged_poly | |
| largest_zone["area_m2"] = merged_poly.area | |
| return merged_zones | |
| def generate_valve_zones( | |
| farm_polygon: Polygon, | |
| num_zones: int, | |
| main_direction: Optional[Tuple[float, float]] = None, | |
| crop_zones: Optional[List[Dict]] = None, | |
| ) -> List[Dict]: | |
| """ | |
| Generate zone polygons using rectangular strips. | |
| If main_direction is provided, creates N rectangular strips perpendicular | |
| to the main axis, respecting crop zone boundaries if provided. | |
| Otherwise, falls back to strip generation over the whole farm. | |
| Args: | |
| farm_polygon: Full farm boundary (UTM) | |
| num_zones: Number of zones to create | |
| main_direction: Optional normalized direction vector (dx, dy). | |
| If provided, uses strip-based zones. Otherwise, uses strip fallback. | |
| crop_zones: Optional list of crop zone dicts with 'crop' and 'polygon'. | |
| If provided, strips are generated within each crop zone boundary | |
| to avoid zigzag patterns across crop lines. | |
| Returns: | |
| List of dicts with 'polygon', 'area_m2', optionally 'crop' | |
| """ | |
| if num_zones <= 0: | |
| return [] | |
| # Use strip-based zones if main_direction is provided | |
| if main_direction is not None: | |
| # If crop zones provided, generate strips within each crop boundary | |
| if crop_zones: | |
| crop_aware_strips = _generate_crop_aware_strips( | |
| crop_zones, | |
| main_direction, | |
| num_zones, | |
| ) | |
| strips = [item["polygon"] for item in crop_aware_strips] | |
| else: | |
| strips = _generate_strip_zones(farm_polygon, main_direction, num_zones) | |
| crop_aware_strips = None | |
| # Reconcile strip count with valve count instead of falling back | |
| # to the legacy grid (which produces jagged zone boundaries). | |
| if len(strips) == 0: | |
| # Complete failure — generate strips over the whole farm | |
| strips = _generate_strip_zones(farm_polygon, main_direction, num_zones) | |
| crop_aware_strips = None | |
| if len(strips) < num_zones: | |
| # Fewer strips than valves: split the largest strip(s) | |
| while len(strips) < num_zones: | |
| largest_idx = max(range(len(strips)), key=lambda i: strips[i].area) | |
| largest = strips.pop(largest_idx) | |
| halves = _generate_strip_zones(largest, main_direction, 2) | |
| if len(halves) == 2: | |
| strips.insert(largest_idx, halves[0]) | |
| strips.insert(largest_idx + 1, halves[1]) | |
| else: | |
| strips.insert(largest_idx, largest) | |
| break # can't split further | |
| elif len(strips) > num_zones: | |
| # More strips than valves: keep only the N largest | |
| strips.sort(key=lambda p: p.area, reverse=True) | |
| strips = strips[:num_zones] | |
| # Final guard: if we still can't match, truncate to strips | |
| effective_count = min(len(strips), num_zones) | |
| # Create zone dicts (without valve_id, to be added by caller after anchoring) | |
| result = [] | |
| for index in range(effective_count): | |
| strip = strips[index] | |
| # Apply vertex simplification to reduce complexity | |
| # Ensures zones have <= 5 vertices (rectangular/trapezoidal shapes) | |
| simplified_strip = _simplify_zone_vertices(strip, max_vertices=5) | |
| zone_dict = { | |
| "polygon": simplified_strip, | |
| "area_m2": simplified_strip.area, | |
| } | |
| # Propagate crop from crop_aware_strips if available | |
| if crop_aware_strips and index < len(crop_aware_strips): | |
| zone_dict["crop"] = crop_aware_strips[index].get("crop", "generic") | |
| result.append(zone_dict) | |
| # Sliver detection and merging: combine small zones with neighbors | |
| result = _merge_sliver_zones(result, farm_polygon) | |
| return result | |
| else: | |
| # No direction provided — fall back to strip generation over whole farm | |
| strips = _generate_strip_zones(farm_polygon, (1, 0), num_zones) | |
| if strips: | |
| return [ | |
| {"polygon": s, "area_m2": s.area} | |
| for s in strips | |
| ] | |
| return [] | |
| def _generate_valve_zones_legacy( | |
| farm_polygon: Polygon, valves: List[Dict] | |
| ) -> List[Dict]: | |
| """ | |
| Legacy Voronoi-style zone generation using grid cells. | |
| (Original implementation, kept for backward compatibility.) | |
| """ | |
| # Bounding box of farm | |
| minx, miny, maxx, maxy = farm_polygon.bounds | |
| # Create a coarse grid and assign each cell to nearest valve | |
| grid_size_m = 10 # 10m grid cells | |
| cells = [] | |
| cell_areas = [] | |
| x = minx | |
| while x < maxx: | |
| y = miny | |
| while y < maxy: | |
| # Create a small square cell | |
| cell = Polygon( | |
| [ | |
| (x, y), | |
| (x + grid_size_m, y), | |
| (x + grid_size_m, y + grid_size_m), | |
| (x, y + grid_size_m), | |
| ] | |
| ) | |
| # Clip to farm boundary | |
| clipped = cell.intersection(farm_polygon) | |
| if not clipped.is_empty and clipped.area > 0: | |
| cells.append(clipped) | |
| cell_areas.append(clipped.area) | |
| y += grid_size_m | |
| x += grid_size_m | |
| # Group cells by nearest valve | |
| zones_by_valve = {v["id"]: [] for v in valves} | |
| for cell, area in zip(cells, cell_areas): | |
| cell_center = cell.centroid | |
| nearest_valve = min( | |
| valves, key=lambda v: cell_center.distance(v["location"]) | |
| ) | |
| zones_by_valve[nearest_valve["id"]].append(cell) | |
| # Merge cells per valve into one polygon | |
| result = [] | |
| for valve in valves: | |
| valve_id = valve["id"] | |
| cell_list = zones_by_valve[valve_id] | |
| if cell_list: | |
| # Union all cells | |
| merged = cell_list[0] | |
| for cell in cell_list[1:]: | |
| merged = merged.union(cell) | |
| # Handle MultiPolygon | |
| if isinstance(merged, MultiPolygon): | |
| merged = merged.convex_hull | |
| result.append( | |
| { | |
| "valve_id": valve_id, | |
| "polygon": merged, | |
| "area_m2": merged.area, | |
| } | |
| ) | |
| return result | |
| def valve_layout_summary(valves: List[Dict], zones: List[Dict]) -> str: | |
| """ | |
| Generate a human-readable summary of valve placement. | |
| Args: | |
| valves: List of valve dicts | |
| zones: List of zone dicts | |
| Returns: | |
| Formatted string summary | |
| """ | |
| summary = f""" | |
| === Valve Placement Summary === | |
| Total Valves: {len(valves)} | |
| Valve Details: | |
| """ | |
| for valve in valves: | |
| summary += f""" | |
| {valve['id']}: | |
| Location: ({valve['lat']:.6f}, {valve['lon']:.6f}) | |
| Strategy: {valve['strategy']} | |
| Reason: {valve['reason']} | |
| """ | |
| if zones: | |
| summary += "\nZone Areas:\n" | |
| total_area = 0 | |
| for idx, zone in enumerate(zones): | |
| area_ha = zone["area_m2"] / 10000 | |
| zone_id = zone.get('valve_id', f'zone_{idx:03d}') | |
| summary += f" {zone_id}: {area_ha:.2f} ha\n" | |
| total_area += zone["area_m2"] | |
| summary += f"Total: {total_area / 10000:.2f} ha\n" | |
| return summary.strip() | |