farm-layout-model / valve_engine.py
spacedout-bits's picture
Phase 5: Drip Manifold Alignment - valve-proximity manifold selection
7e350ba
"""
Valve Placement Engine
Deterministic valve placement based on:
1. Capacity (pump flow vs. total emitter demand)
2. Topography (elevation deltas)
3. Crop type (hydrozones)
4. Hydraulics (max lateral length, pressure drop)
Follows the 4-step decision matrix:
- Capacity: if total demand > pump capacity, split into zones
- Topography: if elevation delta > 5m, separate high/low zones
- Crop: different crops get dedicated valves
- Hydraulics: if lateral length > max runtime, place valve at mid-point
"""
from typing import List, Dict, Tuple, Optional
import math
from shapely.geometry import Point, Polygon, LineString, MultiPolygon, MultiPoint, GeometryCollection
from shapely.ops import split as shapely_split, voronoi_diagram, unary_union
import numpy as np
class ValveEngineError(Exception):
"""Custom exception for valve placement errors."""
pass
# Pump HP to flow rate conversion (realistic centrifugal pump curves)
# These are typical discharge rates for agricultural irrigation pumps
HP_TO_LPH = {
0.5: 2500,
0.75: 4000,
1.0: 5000,
1.5: 8000,
2.0: 15000,
3.0: 25000,
5.0: 40000,
7.5: 60000,
10.0: 80000,
15.0: 120000,
20.0: 160000,
}
# Crop flow parameters — emitter density derived from (lateral_spacing × emitter_spacing)
# e.g. tomato: 0.8m row spacing, 0.3m emitter spacing → 1/(0.8×0.3) = 4.17 emitters/m²
CROP_FLOW_PARAMS = {
"tomato": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # 0.8m rows, 0.3m spacing
"pepper": {"emitter_density_m2": 5.56, "emitter_flow_lph": 4}, # 0.6m rows, 0.3m spacing
"lettuce": {"emitter_density_m2": 12.50, "emitter_flow_lph": 2}, # 0.4m rows, 0.2m spacing
"cucumber": {"emitter_density_m2": 2.00, "emitter_flow_lph": 4}, # 1.0m rows, 0.5m spacing
"orchard": {"emitter_density_m2": 0.50, "emitter_flow_lph": 8}, # 2.0m rows, 1.0m spacing
"generic": {"emitter_density_m2": 4.17, "emitter_flow_lph": 4}, # same as tomato
}
# Area-based valve density (valves per hectare) by crop type.
# Used as a floor in place_valves_hierarchical — ensures every farm
# gets agronomically appropriate zone coverage regardless of pump capacity.
# Rule of thumb: ~2 valves/acre ≈ 5 valves/ha for standard row crops.
VALVE_DENSITY_PER_HA = {
"tomato": 6, # 0.5m rows, intensive water demand
"pepper": 6, # 0.6m rows, intensive
"lettuce": 7, # 0.4m rows, very intensive
"cucumber": 4, # 1.0m rows, moderate
"orchard": 2, # 2.0m+ rows, low density
"generic": 5, # ≈ 2 valves/acre, standard default
}
# Hydraulics defaults
MAX_LATERAL_LENGTH_M = 200 # ~650 ft; beyond this, pressure drops significantly
MAX_PRESSURE_DROP_PCT = 10 # percent
ELEVATION_DELTA_THRESHOLD_M = 5 # split zones if > 5m elevation difference
def calculate_pump_flow_lph(pump_hp: float) -> float:
"""
Convert pump horsepower to liters per hour.
Uses a lookup table for common pump sizes.
For intermediate values, interpolates linearly.
Args:
pump_hp: Pump horsepower
Returns:
Approximate flow rate in liters per hour
"""
if pump_hp <= 0:
raise ValveEngineError("Pump horsepower must be > 0")
# Find nearest HP values
hp_values = sorted(HP_TO_LPH.keys())
if pump_hp in HP_TO_LPH:
return HP_TO_LPH[pump_hp]
# Interpolate between two nearest values
lower_hp = max([h for h in hp_values if h < pump_hp], default=hp_values[0])
upper_hp = min([h for h in hp_values if h > pump_hp], default=hp_values[-1])
if lower_hp == upper_hp:
return HP_TO_LPH[lower_hp]
lower_flow = HP_TO_LPH[lower_hp]
upper_flow = HP_TO_LPH[upper_hp]
# Linear interpolation
t = (pump_hp - lower_hp) / (upper_hp - lower_hp)
return lower_flow + t * (upper_flow - lower_flow)
def calculate_total_emitter_flow(
crop_zones: List[Dict], crop_params: Optional[Dict] = None
) -> float:
"""
Calculate total emitter flow from crop zones.
Args:
crop_zones: List of dicts with keys:
- 'crop': crop name
- 'area_m2': area in square meters
- 'polygon': optional Shapely Polygon (for area calc if not provided)
crop_params: Optional custom crop parameters. Defaults to CROP_FLOW_PARAMS.
Returns:
Total flow in liters per hour
"""
if crop_params is None:
crop_params = CROP_FLOW_PARAMS
total_flow = 0.0
for zone in crop_zones:
crop = zone.get("crop", "generic")
if crop not in crop_params:
crop = "generic"
# Get area
if "area_m2" in zone:
area = zone["area_m2"]
elif "polygon" in zone and isinstance(zone["polygon"], Polygon):
area = zone["polygon"].area
else:
raise ValveEngineError(
f"Zone must have 'area_m2' or 'polygon' with area: {zone}"
)
# Get emitter params
emitter_density = crop_params[crop]["emitter_density_m2"]
emitter_flow = crop_params[crop]["emitter_flow_lph"]
zone_flow = area * emitter_density * emitter_flow
total_flow += zone_flow
return total_flow
def calculate_num_zones(total_emitter_flow_lph: float, pump_flow_lph: float) -> int:
"""
Calculate the number of zones (valves) needed based on capacity.
Formula: num_zones = ceil(total_flow / pump_flow)
Args:
total_emitter_flow_lph: Total emitter demand in L/h
pump_flow_lph: Pump capacity in L/h
Returns:
Number of zones (minimum 1)
"""
if pump_flow_lph <= 0:
raise ValveEngineError("Pump flow must be > 0")
if total_emitter_flow_lph <= 0:
return 1
num_zones = math.ceil(total_emitter_flow_lph / pump_flow_lph)
return max(1, num_zones)
def split_polygon_by_crop_zones(
farm_polygon: Polygon, crop_zones: List[Dict]
) -> Dict[str, Polygon]:
"""
Split farm polygon into sub-polygons by crop type.
If crop_zones have explicit polygons, use those.
Otherwise, attempt to infer zones (not implemented; return full polygon for each crop).
Args:
farm_polygon: Full farm boundary
crop_zones: List of crop zone dicts with 'crop' and optional 'polygon'
Returns:
Dict mapping crop name to Polygon
"""
result = {}
for i, zone in enumerate(crop_zones):
crop = zone.get("crop", f"crop_{i}")
if "polygon" in zone and isinstance(zone["polygon"], Polygon):
result[crop] = zone["polygon"]
else:
# No explicit polygon; assign full farm to this crop
# In production, you'd use field segmentation or user input
result[crop] = farm_polygon
return result
def should_split_by_topography(
polygon: Polygon,
elevation_data: Optional[Dict] = None,
threshold_m: float = ELEVATION_DELTA_THRESHOLD_M,
) -> Tuple[bool, float]:
"""
Determine if topography requires zone splitting.
Args:
polygon: Shapely Polygon
elevation_data: Dict with 'min_elevation_m' and 'max_elevation_m' keys
threshold_m: Elevation delta threshold (default 5m)
Returns:
(should_split, elevation_delta_m)
"""
if elevation_data is None:
# No elevation data; assume flat
return False, 0.0
min_elev = elevation_data.get("min_elevation_m", 0)
max_elev = elevation_data.get("max_elevation_m", 0)
delta = max_elev - min_elev
should_split = delta > threshold_m
return should_split, delta
def choose_manifold_strategy(farm_area_m2: float) -> str:
"""
Choose centralized or distributed valve strategy based on farm area.
Args:
farm_area_m2: Farm area in square meters
Returns:
"centralized" if < 2 acres, else "distributed"
"""
# 1 ha = 10,000 m²
HECTARES_M2 = 10000
if farm_area_m2 < HECTARES_M2:
return "centralized"
else:
return "distributed"
def find_perimeter_point(
polygon: Polygon, reference_point: Point, max_distance_m: float = 50
) -> Point:
"""
Find the closest point on the polygon boundary to a reference point.
This is used to place valves at the entry point to a zone,
along a perimeter path (avoiding planting areas).
Args:
polygon: Shapely Polygon (the zone)
reference_point: Point to measure from (e.g., pump location)
max_distance_m: Maximum distance to search (unused for now; just finds closest)
Returns:
Point on polygon boundary closest to reference_point
"""
boundary = polygon.boundary
closest_point = boundary.interpolate(boundary.project(reference_point))
return closest_point
def place_valve_for_zone(
zone_polygon: Polygon,
pump_point: Point,
zone_id: str,
strategy: str = "distributed",
reason: str = "default",
) -> Dict:
"""
Place a single valve for a zone.
Follows the "head-of-row" rule:
- For distributed: place at the entry point of the sub-main into the zone
- For centralized: place near the pump
Args:
zone_polygon: Polygon of the zone
pump_point: Point location of pump
zone_id: Identifier for this zone
strategy: "centralized" or "distributed"
reason: Why this zone was created (e.g., "capacity_split")
Returns:
Dict with valve metadata:
- 'id': zone_id
- 'location': Point object
- 'lat': latitude
- 'lon': longitude
- 'strategy': "centralized" or "distributed"
- 'reason': reason for zone
"""
if strategy == "centralized":
# Place near pump (offset slightly to avoid exact pump location)
valve_point = Point(pump_point.x + 10, pump_point.y + 10)
else:
# Distributed: place at entry point to zone from pump
valve_point = find_perimeter_point(zone_polygon, pump_point)
return {
"id": zone_id,
"location": valve_point,
"lat": valve_point.y,
"lon": valve_point.x,
"strategy": strategy,
"reason": reason,
}
def place_valves_hierarchical(
farm_polygon: Polygon,
pump_point: Point,
crop_zones: List[Dict],
pump_hp: float,
centralized: bool = True,
elevation_data: Optional[Dict] = None,
max_lateral_length_m: float = MAX_LATERAL_LENGTH_M,
max_valves: Optional[int] = None,
) -> List[Dict]:
"""
Place valves following the 4-step decision matrix.
Step 1: Capacity — if total demand > pump capacity, split zones
Step 2: Topography — if elevation delta > 5m, separate high/low
Step 3: Crop — different crops get dedicated valves
Step 4: Hydraulics — if lateral length > threshold, place valve at midpoint
Args:
farm_polygon: Full farm boundary
pump_point: Point location of pump
crop_zones: List of crop zone dicts with 'crop' and 'area_m2' or 'polygon'
pump_hp: Pump horsepower
centralized: If True, use centralized strategy; else distributed
elevation_data: Optional dict with 'min_elevation_m', 'max_elevation_m'
max_lateral_length_m: Maximum lateral length before forcing a split
max_valves: Optional hard cap. If None, uses area-based default.
Returns:
List of valve dicts with placement and metadata
"""
valves = []
zone_counter = 0
# Farm area needed early — used by both the area floor and the cap.
farm_area_ha = farm_polygon.area / 10000
# Step 1: Capacity constraint
pump_flow = calculate_pump_flow_lph(pump_hp)
total_flow = calculate_total_emitter_flow(crop_zones)
num_zones_capacity = calculate_num_zones(total_flow, pump_flow)
# Step 2: Topography
should_split_topo, elevation_delta = should_split_by_topography(
farm_polygon, elevation_data
)
# Step 3: Crop constraint
num_zones_crop = len(set(z.get("crop", "generic") for z in crop_zones))
# Step 4: Area-density floor — ensures minimum zone coverage regardless
# of pump capacity. Rule of thumb: ~5 valves/ha (generic), crop-specific
# for intensive or sparse crops (see VALVE_DENSITY_PER_HA).
primary_crop = crop_zones[0].get("crop", "generic") if crop_zones else "generic"
density = VALVE_DENSITY_PER_HA.get(primary_crop, VALVE_DENSITY_PER_HA["generic"])
num_zones_area = math.ceil(farm_area_ha * density)
# Combined floor: highest of all drivers (all constraints must be satisfied)
num_zones_required = max(num_zones_capacity, num_zones_crop, num_zones_area)
if should_split_topo:
num_zones_required += 1
# Hard cap: prevents runaway on very large farms or extreme capacity splits.
if max_valves is None:
if farm_area_ha < 0.5:
max_valves = 6
elif farm_area_ha < 1.0:
max_valves = 10
elif farm_area_ha < 2.0:
max_valves = 15
elif farm_area_ha < 5.0:
max_valves = 35
elif farm_area_ha < 10.0:
max_valves = 60
else:
max_valves = 100
num_zones_required = min(num_zones_required, max_valves)
# Strategy choice
strategy = "centralized" if centralized else "distributed"
# Build crop list for assignment — distribute valves across crops
# proportional to zone count, with each crop getting at least one valve.
unique_crops = list(dict.fromkeys(z.get("crop", "generic") for z in crop_zones))
# Pre-compute evenly-spaced perimeter positions so that Voronoi
# partitioning produces distinct zones. For centralized, fan valves
# out from the pump; for distributed, space them along the boundary.
perimeter = farm_polygon.boundary
perimeter_len = perimeter.length
# Place valves
for i in range(num_zones_required):
zone_id = f"valve_{zone_counter:03d}"
zone_counter += 1
# Determine reason for this zone
if i < max(num_zones_capacity - 1, 0):
reason = "capacity_split"
elif i < num_zones_crop:
reason = "crop_type"
elif i < num_zones_area:
reason = "area_density"
elif should_split_topo:
reason = "topography_split"
else:
reason = "hydraulics_split"
# Assign crop: round-robin across unique crops
crop = unique_crops[i % len(unique_crops)] if unique_crops else "generic"
if strategy == "centralized":
# Fan out from pump along perimeter to ensure distinct locations
fraction = i / max(num_zones_required, 1)
valve_point = perimeter.interpolate(fraction * perimeter_len)
# Shift slightly inward toward pump to keep "near pump" intent
cx = (valve_point.x + pump_point.x) / 2
cy = (valve_point.y + pump_point.y) / 2
valve_point = Point(cx, cy)
else:
# Distributed: space evenly along perimeter
fraction = i / max(num_zones_required, 1)
valve_point = perimeter.interpolate(fraction * perimeter_len)
valve = {
"id": zone_id,
"location": valve_point,
"lat": valve_point.y,
"lon": valve_point.x,
"strategy": strategy,
"reason": reason,
"crop": crop,
}
valves.append(valve)
return valves
def partition_farm_by_sources(
farm_polygon: Polygon,
sources: List[Dict],
) -> List[Polygon]:
"""
Partition the farm into non-overlapping service regions, one per water source.
Uses Voronoi tessellation weighted by pump capacity: higher-HP pumps claim
proportionally larger regions. For a single source, the entire farm is returned.
Args:
farm_polygon: Farm boundary (UTM)
sources: List of dicts with 'pump_point' (Point) and 'pump_hp' (float)
Returns:
List of Polygons (same order as sources). Their union equals farm_polygon.
"""
if len(sources) <= 1:
return [farm_polygon]
# --- Capacity-weighted seed points ---------------------------------
# Shift each source point toward the farm centroid proportionally to
# its *inverse* capacity. A weaker pump is pulled further inward,
# shrinking its Voronoi cell. A stronger pump stays closer to its
# real location, expanding its cell.
farm_centroid = farm_polygon.centroid
max_hp = max(s["pump_hp"] for s in sources)
weighted_points = []
for src in sources:
pt = src["pump_point"]
hp = src["pump_hp"]
# weight 0 → full shift to centroid (weakest); 1 → no shift (strongest)
weight = hp / max_hp if max_hp > 0 else 1.0
wx = pt.x * weight + farm_centroid.x * (1 - weight)
wy = pt.y * weight + farm_centroid.y * (1 - weight)
weighted_points.append(Point(wx, wy))
# --- Voronoi diagram -----------------------------------------------
mp = MultiPoint(weighted_points)
# envelope= argument clips unbounded Voronoi cells to a bounding region
voronoi_geom = voronoi_diagram(mp, envelope=farm_polygon)
# voronoi_geom is a GeometryCollection of polygons. We need to map
# each cell back to the source whose weighted point lies inside it.
cells = list(voronoi_geom.geoms) if isinstance(voronoi_geom, GeometryCollection) else [voronoi_geom]
service_regions: List[Optional[Polygon]] = [None] * len(sources)
for cell in cells:
if not isinstance(cell, Polygon):
continue
# Clip to farm boundary
clipped = cell.intersection(farm_polygon)
if clipped.is_empty:
continue
if isinstance(clipped, MultiPolygon):
clipped = max(clipped.geoms, key=lambda g: g.area)
if not isinstance(clipped, Polygon):
continue
# Match to nearest weighted source point
cell_centroid = clipped.centroid
best_idx = min(
range(len(weighted_points)),
key=lambda i: cell_centroid.distance(weighted_points[i]),
)
if service_regions[best_idx] is None:
service_regions[best_idx] = clipped
else:
service_regions[best_idx] = service_regions[best_idx].union(clipped)
if isinstance(service_regions[best_idx], MultiPolygon):
service_regions[best_idx] = max(
service_regions[best_idx].geoms, key=lambda g: g.area
)
# Fill any unassigned sources with their nearest unclaimed area
for i, region in enumerate(service_regions):
if region is None:
service_regions[i] = farm_polygon.buffer(0) # fallback: full farm
return service_regions
def _project_polygon_onto_axis(
polygon: Polygon, axis_direction: Tuple[float, float]
) -> Tuple[float, float]:
"""
Project a polygon onto an axis direction, returning (min_proj, max_proj).
Args:
polygon: Shapely Polygon
axis_direction: Normalized direction vector (dx, dy)
Returns:
(min_projection, max_projection) along the axis
"""
coords = list(polygon.exterior.coords)
projections = [
coord[0] * axis_direction[0] + coord[1] * axis_direction[1]
for coord in coords
]
return min(projections), max(projections)
def _refine_zones_by_crop_boundaries(
strip_zones: List[Dict],
crop_zones: List[Dict],
) -> List[Dict]:
"""
Refine strip zones by overlaying crop boundaries.
If a strip overlaps more than one crop polygon, split it into sub-zones
clipped by those crop boundaries and assign the corresponding crop.
"""
if not crop_zones:
return strip_zones
refined = []
for zone in strip_zones:
strip_poly = zone["polygon"]
overlaps = []
for crop_zone in crop_zones:
crop_poly = crop_zone.get("polygon")
if crop_poly is None or not strip_poly.intersects(crop_poly):
continue
clipped = strip_poly.intersection(crop_poly)
if clipped.is_empty or clipped.area <= 1:
continue
if isinstance(clipped, MultiPolygon):
for geom in clipped.geoms:
if geom.area > 1:
overlaps.append(
{
"crop": crop_zone.get("crop", "generic"),
"polygon": geom,
"area_m2": geom.area,
"valve_id": zone["valve_id"],
}
)
elif isinstance(clipped, Polygon):
overlaps.append(
{
"crop": crop_zone.get("crop", "generic"),
"polygon": clipped,
"area_m2": clipped.area,
"valve_id": zone["valve_id"],
}
)
if overlaps:
refined.extend(overlaps)
else:
refined.append(zone)
return refined
def simplify_farm_boundary(polygon: Polygon, tolerance: float = 1.0) -> Polygon:
"""
Simplify a farm polygon boundary using Douglas-Peucker algorithm.
Removes micro-jags and simplifies complex boundaries while maintaining
topological validity. Useful for preparing boundaries for geometric slicing.
Args:
polygon: Shapely Polygon (farm boundary)
tolerance: Simplification tolerance in the same units as polygon coordinates.
Default 1.0m removes small irregularities without affecting drip field design.
Returns:
Simplified Polygon with fewer vertices but same general shape
"""
if not isinstance(polygon, Polygon) or polygon.is_empty:
return polygon
simplified = polygon.simplify(tolerance, preserve_topology=True)
if not isinstance(simplified, Polygon):
# If simplification results in a degenerate shape, return original
return polygon
return simplified
def _simplify_zone_vertices(polygon: Polygon, max_vertices: int = 5) -> Polygon:
"""
Reduce polygon vertex count to max_vertices by finding bounding trapezoid/rectangle.
If polygon has > max_vertices, compute its oriented bounding box (trapezoid)
and intersect with the original to get a simplified shape.
Args:
polygon: Shapely Polygon (zone)
max_vertices: Target maximum vertex count (default 5 for trapezoid/rectangle)
Returns:
Polygon with <= max_vertices (or original if simplification fails)
"""
if not isinstance(polygon, Polygon) or polygon.is_empty:
return polygon
# Count exterior vertices (excluding repeated closing point)
coords = list(polygon.exterior.coords)
vertex_count = len(coords) - 1 # -1 because last point repeats the first
if vertex_count <= max_vertices:
return polygon
# Try simplification: use adaptive simplification to reduce vertices
# Start with a conservative tolerance and increase if needed
simplified = polygon
tolerance = 0.1 # Start small
max_tolerance = polygon.length / 10 # Don't over-simplify
while tolerance <= max_tolerance:
test_simp = polygon.simplify(tolerance, preserve_topology=True)
if isinstance(test_simp, Polygon):
simp_coords = list(test_simp.exterior.coords)
simp_vertex_count = len(simp_coords) - 1
if simp_vertex_count <= max_vertices:
simplified = test_simp
break
tolerance *= 1.5
return simplified
def _generate_strip_zones(
farm_polygon: Polygon,
main_direction: Tuple[float, float],
num_zones: int,
) -> List[Polygon]:
"""
Slice farm into N rectangular strips perpendicular to main_direction.
Args:
farm_polygon: Farm boundary (UTM)
main_direction: Normalized direction vector (dx, dy) for main axis
num_zones: Number of strips to create
Returns:
List of strip polygons, clipped to farm boundary
"""
if num_zones <= 0:
return []
# Perpendicular direction (rotate 90 degrees)
lateral_direction = (-main_direction[1], main_direction[0])
# Project farm onto both axes to get actual coordinate ranges.
# Using the bounding-box diagonal as lateral extent fails when UTM
# coordinates are large — the strip rectangles end up far from the
# actual polygon. Projecting onto both axes gives the true ranges.
min_main, max_main = _project_polygon_onto_axis(farm_polygon, main_direction)
min_lat, max_lat = _project_polygon_onto_axis(farm_polygon, lateral_direction)
axis_span = max_main - min_main
if axis_span <= 0:
return []
# Pad the lateral range so the strip rectangle fully covers the polygon
lateral_pad = (max_lat - min_lat) * 0.1 + 10
# Divide into N equal strips along the main axis
strip_width = axis_span / num_zones
strips = []
for i in range(num_zones):
strip_min = min_main + i * strip_width
strip_max = min_main + (i + 1) * strip_width
# Build strip rectangle in the (main, lateral) projection space,
# then reconstruct world coordinates.
corner_offsets = [
(strip_min, min_lat - lateral_pad),
(strip_max, min_lat - lateral_pad),
(strip_max, max_lat + lateral_pad),
(strip_min, max_lat + lateral_pad),
]
strip_corners = [
(
offset[0] * main_direction[0] + offset[1] * lateral_direction[0],
offset[0] * main_direction[1] + offset[1] * lateral_direction[1],
)
for offset in corner_offsets
]
strip_bounds = Polygon(strip_corners)
# Intersect strip bounds with farm polygon
strip_poly = strip_bounds.intersection(farm_polygon)
if not strip_poly.is_empty and strip_poly.area > 0:
# Handle MultiPolygon by taking the largest component
if isinstance(strip_poly, MultiPolygon):
strip_poly = max(strip_poly.geoms, key=lambda p: p.area)
if isinstance(strip_poly, Polygon):
strips.append(strip_poly)
return strips
def _allocate_zone_counts_by_crop(crop_zones: List[Dict], num_zones: int) -> List[int]:
"""
Allocate a zone count to each crop polygon proportional to area.
Every crop zone receives at least one strip when possible.
"""
if not crop_zones or num_zones <= 0:
return []
total_area = sum(
zone.get("polygon").area
for zone in crop_zones
if zone.get("polygon") is not None and not zone.get("polygon").is_empty
)
if total_area <= 0:
return [1] * min(len(crop_zones), num_zones)
raw_allocations = []
for zone in crop_zones:
polygon = zone.get("polygon")
area = polygon.area if polygon is not None and not polygon.is_empty else 0
raw_allocations.append((area / total_area) * num_zones)
counts = [max(1, int(math.floor(value))) for value in raw_allocations]
while sum(counts) > num_zones:
reducible = [i for i, count in enumerate(counts) if count > 1]
if not reducible:
break
index = max(reducible, key=lambda i: counts[i] - raw_allocations[i])
counts[index] -= 1
while sum(counts) < num_zones:
index = max(range(len(counts)), key=lambda i: raw_allocations[i] - counts[i])
counts[index] += 1
return counts
def _generate_crop_aware_strips(
crop_zones: List[Dict],
main_direction: Tuple[float, float],
num_zones: int,
) -> List[Dict]:
"""
Generate rectangular strips within crop polygons instead of clipping later.
This keeps each zone aligned with the farm axis while respecting crop
boundaries, which avoids zigzag fragments caused by post-generation splits.
"""
if not crop_zones or num_zones <= 0:
return []
zone_counts = _allocate_zone_counts_by_crop(crop_zones, num_zones)
strips_with_crop = []
for crop_zone, crop_zone_count in zip(crop_zones, zone_counts):
crop_polygon = crop_zone.get("polygon")
if crop_polygon is None or crop_polygon.is_empty or crop_zone_count <= 0:
continue
crop_strips = _generate_strip_zones(
crop_polygon,
main_direction,
crop_zone_count,
)
for strip in crop_strips:
min_projection, _ = _project_polygon_onto_axis(strip, main_direction)
strips_with_crop.append(
{
"polygon": strip,
"crop": crop_zone.get("crop", "generic"),
"sort_key": min_projection,
}
)
strips_with_crop.sort(key=lambda item: item["sort_key"])
return strips_with_crop[:num_zones]
def anchor_valves_to_zones(
zones: List[Dict],
pump_location: Point,
design_type: str = "distributed",
) -> List[Dict]:
"""
Anchor valves to zone geometries based on design type.
Adds a 'valve_location' to each zone dict, determining where the valve
control point should be placed.
Args:
zones: List of zone dicts with 'polygon' and 'area_m2' keys
pump_location: Point location of the pump (UTM)
design_type: "centralized" or "distributed"
Returns:
List of zone dicts with 'valve_location' added
"""
if not zones:
return zones
anchored_zones = []
for idx, zone in enumerate(zones):
zone_poly = zone.get("polygon")
if not zone_poly or zone_poly.is_empty:
anchored_zones.append(zone)
continue
if design_type == "centralized":
# Place all valves at/near pump location with slight offsets for visual separation
# Fan them out around the pump in different directions
angle = (idx / max(len(zones), 1)) * (2 * math.pi) # Full circle
offset_dist = 10 # 10 meters
valve_x = pump_location.x + offset_dist * math.cos(angle)
valve_y = pump_location.y + offset_dist * math.sin(angle)
valve_location = Point(valve_x, valve_y)
else:
# Distributed: place valve at closest point on zone boundary to pump
zone_boundary = zone_poly.boundary
closest_point = zone_boundary.interpolate(
zone_boundary.project(pump_location)
)
valve_location = closest_point
# Add valve location to zone dict, preserving all other properties
anchored_zone = zone.copy()
anchored_zone["valve_location"] = valve_location
anchored_zones.append(anchored_zone)
return anchored_zones
def _merge_sliver_zones(zones: List[Dict], farm_polygon: Polygon) -> List[Dict]:
"""
Detect and merge sliver zones (zones with area < 2% of farm).
Slivers are often created by boundary intersections and waste resources.
Merge them with their largest neighbor by area.
Args:
zones: List of zone dicts with 'polygon' and 'area_m2'
farm_polygon: Full farm boundary for area calculation
Returns:
List of zones with slivers merged
"""
if not zones or len(zones) <= 1:
return zones
farm_area = farm_polygon.area
sliver_threshold = farm_area * 0.02 # 2% of farm area
# Find slivers
slivers = []
keepers = []
for zone in zones:
if zone["area_m2"] < sliver_threshold:
slivers.append(zone)
else:
keepers.append(zone)
if not slivers:
return zones
# Merge each sliver with its largest neighbor
merged_zones = keepers.copy()
for sliver in slivers:
if not merged_zones:
merged_zones.append(sliver)
continue
# Find largest keeper zone (by area) to absorb this sliver
largest_idx = max(range(len(merged_zones)),
key=lambda i: merged_zones[i]["area_m2"])
largest_zone = merged_zones[largest_idx]
# Union polygons
merged_poly = largest_zone["polygon"].union(sliver["polygon"])
if isinstance(merged_poly, MultiPolygon):
merged_poly = max(merged_poly.geoms, key=lambda p: p.area)
# Update largest zone in place while preserving metadata
largest_zone["polygon"] = merged_poly
largest_zone["area_m2"] = merged_poly.area
return merged_zones
def generate_valve_zones(
farm_polygon: Polygon,
num_zones: int,
main_direction: Optional[Tuple[float, float]] = None,
crop_zones: Optional[List[Dict]] = None,
) -> List[Dict]:
"""
Generate zone polygons using rectangular strips.
If main_direction is provided, creates N rectangular strips perpendicular
to the main axis, respecting crop zone boundaries if provided.
Otherwise, falls back to strip generation over the whole farm.
Args:
farm_polygon: Full farm boundary (UTM)
num_zones: Number of zones to create
main_direction: Optional normalized direction vector (dx, dy).
If provided, uses strip-based zones. Otherwise, uses strip fallback.
crop_zones: Optional list of crop zone dicts with 'crop' and 'polygon'.
If provided, strips are generated within each crop zone boundary
to avoid zigzag patterns across crop lines.
Returns:
List of dicts with 'polygon', 'area_m2', optionally 'crop'
"""
if num_zones <= 0:
return []
# Use strip-based zones if main_direction is provided
if main_direction is not None:
# If crop zones provided, generate strips within each crop boundary
if crop_zones:
crop_aware_strips = _generate_crop_aware_strips(
crop_zones,
main_direction,
num_zones,
)
strips = [item["polygon"] for item in crop_aware_strips]
else:
strips = _generate_strip_zones(farm_polygon, main_direction, num_zones)
crop_aware_strips = None
# Reconcile strip count with valve count instead of falling back
# to the legacy grid (which produces jagged zone boundaries).
if len(strips) == 0:
# Complete failure — generate strips over the whole farm
strips = _generate_strip_zones(farm_polygon, main_direction, num_zones)
crop_aware_strips = None
if len(strips) < num_zones:
# Fewer strips than valves: split the largest strip(s)
while len(strips) < num_zones:
largest_idx = max(range(len(strips)), key=lambda i: strips[i].area)
largest = strips.pop(largest_idx)
halves = _generate_strip_zones(largest, main_direction, 2)
if len(halves) == 2:
strips.insert(largest_idx, halves[0])
strips.insert(largest_idx + 1, halves[1])
else:
strips.insert(largest_idx, largest)
break # can't split further
elif len(strips) > num_zones:
# More strips than valves: keep only the N largest
strips.sort(key=lambda p: p.area, reverse=True)
strips = strips[:num_zones]
# Final guard: if we still can't match, truncate to strips
effective_count = min(len(strips), num_zones)
# Create zone dicts (without valve_id, to be added by caller after anchoring)
result = []
for index in range(effective_count):
strip = strips[index]
# Apply vertex simplification to reduce complexity
# Ensures zones have <= 5 vertices (rectangular/trapezoidal shapes)
simplified_strip = _simplify_zone_vertices(strip, max_vertices=5)
zone_dict = {
"polygon": simplified_strip,
"area_m2": simplified_strip.area,
}
# Propagate crop from crop_aware_strips if available
if crop_aware_strips and index < len(crop_aware_strips):
zone_dict["crop"] = crop_aware_strips[index].get("crop", "generic")
result.append(zone_dict)
# Sliver detection and merging: combine small zones with neighbors
result = _merge_sliver_zones(result, farm_polygon)
return result
else:
# No direction provided — fall back to strip generation over whole farm
strips = _generate_strip_zones(farm_polygon, (1, 0), num_zones)
if strips:
return [
{"polygon": s, "area_m2": s.area}
for s in strips
]
return []
def _generate_valve_zones_legacy(
farm_polygon: Polygon, valves: List[Dict]
) -> List[Dict]:
"""
Legacy Voronoi-style zone generation using grid cells.
(Original implementation, kept for backward compatibility.)
"""
# Bounding box of farm
minx, miny, maxx, maxy = farm_polygon.bounds
# Create a coarse grid and assign each cell to nearest valve
grid_size_m = 10 # 10m grid cells
cells = []
cell_areas = []
x = minx
while x < maxx:
y = miny
while y < maxy:
# Create a small square cell
cell = Polygon(
[
(x, y),
(x + grid_size_m, y),
(x + grid_size_m, y + grid_size_m),
(x, y + grid_size_m),
]
)
# Clip to farm boundary
clipped = cell.intersection(farm_polygon)
if not clipped.is_empty and clipped.area > 0:
cells.append(clipped)
cell_areas.append(clipped.area)
y += grid_size_m
x += grid_size_m
# Group cells by nearest valve
zones_by_valve = {v["id"]: [] for v in valves}
for cell, area in zip(cells, cell_areas):
cell_center = cell.centroid
nearest_valve = min(
valves, key=lambda v: cell_center.distance(v["location"])
)
zones_by_valve[nearest_valve["id"]].append(cell)
# Merge cells per valve into one polygon
result = []
for valve in valves:
valve_id = valve["id"]
cell_list = zones_by_valve[valve_id]
if cell_list:
# Union all cells
merged = cell_list[0]
for cell in cell_list[1:]:
merged = merged.union(cell)
# Handle MultiPolygon
if isinstance(merged, MultiPolygon):
merged = merged.convex_hull
result.append(
{
"valve_id": valve_id,
"polygon": merged,
"area_m2": merged.area,
}
)
return result
def valve_layout_summary(valves: List[Dict], zones: List[Dict]) -> str:
"""
Generate a human-readable summary of valve placement.
Args:
valves: List of valve dicts
zones: List of zone dicts
Returns:
Formatted string summary
"""
summary = f"""
=== Valve Placement Summary ===
Total Valves: {len(valves)}
Valve Details:
"""
for valve in valves:
summary += f"""
{valve['id']}:
Location: ({valve['lat']:.6f}, {valve['lon']:.6f})
Strategy: {valve['strategy']}
Reason: {valve['reason']}
"""
if zones:
summary += "\nZone Areas:\n"
total_area = 0
for idx, zone in enumerate(zones):
area_ha = zone["area_m2"] / 10000
zone_id = zone.get('valve_id', f'zone_{idx:03d}')
summary += f" {zone_id}: {area_ha:.2f} ha\n"
total_area += zone["area_m2"]
summary += f"Total: {total_area / 10000:.2f} ha\n"
return summary.strip()