farm-layout-model / design_api.py
spacedout-bits's picture
Phase 5: Drip Manifold Alignment - valve-proximity manifold selection
7e350ba
"""
Design API β€” End-to-end farm irrigation design pipeline.
Orchestrates:
GeoJSON Input β†’ Parse β†’ Valve Placement β†’ Drip Layout β†’ GeoJSON Output
Input: GeoJSON FeatureCollection (farm_boundary, pump, crop_zones, elevation)
Output: GeoJSON FeatureCollection (valves, zones, mains, laterals, BOM)
"""
import json
import math
from typing import Dict, List, Any, Tuple, Optional
import pyproj
from shapely.geometry import Polygon, Point, LineString, MultiPolygon
import geojson_io as gj_io
from drip_engine import (
generate_drip_layout,
estimate_bom,
latlon_to_utm,
compute_farm_axis,
CROP_DEFAULTS,
DripLayoutError,
)
from pipe_network import (
generate_pipe_network,
calculate_pipe_lengths,
PipeNetworkError,
)
from pricing_config import get_default_pricing_config
from valve_engine import (
place_valves_hierarchical,
generate_valve_zones,
anchor_valves_to_zones,
partition_farm_by_sources,
calculate_pump_flow_lph,
calculate_total_emitter_flow,
calculate_num_zones,
choose_manifold_strategy,
_refine_zones_by_crop_boundaries,
ValveEngineError,
)
class DesignAPIError(Exception):
"""Top-level exception for design pipeline errors."""
pass
def process_farm_design(geojson_input: str) -> Dict[str, Any]:
"""
Main entry point: parse GeoJSON input, run design pipeline, return GeoJSON output.
Args:
geojson_input: GeoJSON FeatureCollection string (or dict)
Returns:
Dict that is a GeoJSON FeatureCollection with:
- properties: design_summary, bom
- features: farm_boundary, valves, valve_zones, main_lines, laterals
Raises:
DesignAPIError: On any pipeline failure (with structured error info)
"""
try:
# ── 1. Parse input ──────────────────────────────────────────────
fc = gj_io.parse_geojson_feature_collection(geojson_input)
features = fc.get("features", [])
top_props = fc.get("properties", {})
# ── 2. Extract geometries ───────────────────────────────────────
farm_boundary, _ = gj_io.extract_farm_boundary(fc)
pump_point, pump_props = gj_io.extract_pump_location(fc)
water_sources = gj_io.extract_all_water_sources(fc)
crop_zones = gj_io.extract_crop_zones(fc)
elevation_data = gj_io.extract_elevation_data(fc)
# ── 3. Resolve parameters (top-level props override feature props)
pump_hp = _resolve_pump_hp(top_props, pump_props, features)
headland_m = top_props.get("headland_buffer_m", 1.0)
override_spacing = top_props.get("override_lateral_spacing_m")
max_valves = top_props.get("max_valves")
if max_valves is not None:
max_valves = int(max_valves)
# ── 4. Convert to UTM for accurate calculations ─────────────────
# Farm boundary lat/lon β†’ UTM
farm_utm = latlon_to_utm(farm_boundary)
# Build a reusable UTM transformer (computed once, used everywhere)
utm_crs, transformer_to_utm, transformer_from_utm = _build_utm_transformers(farm_boundary)
pump_utm = _apply_transform(pump_point, transformer_to_utm)
farm_main_direction, _farm_lateral_direction = compute_farm_axis(farm_utm)
# Resolve design_type flag β€” explicit override or derive from farm area
design_type = _resolve_design_type(top_props, farm_utm.area)
# Convert design_type string to boolean centralized flag for engine compatibility
centralized = (design_type == "centralized")
# Convert crop zone polygons to UTM
crop_zones_utm = []
for zone in crop_zones:
zone_poly = zone.get("polygon")
if zone_poly is None:
continue
zone_utm = _apply_polygon_transform(zone_poly, transformer_to_utm)
crop_zones_utm.append({
"crop": zone.get("crop", "generic"),
"polygon": zone_utm,
"area_m2": zone_utm.area,
})
# If no explicit crop zones, treat entire farm as single generic zone.
# If crop zones exist but don't cover the full farm, add a generic
# zone for the uncovered area so strip generation can tile the
# entire farm and valve counts stay consistent.
if not crop_zones_utm:
crop_zones_utm = [{
"crop": "generic",
"polygon": farm_utm,
"area_m2": farm_utm.area,
}]
else:
from shapely.ops import unary_union
crop_union = unary_union([z["polygon"] for z in crop_zones_utm])
uncovered = farm_utm.difference(crop_union)
if not uncovered.is_empty and uncovered.area > farm_utm.area * 0.05:
if isinstance(uncovered, MultiPolygon):
for part in uncovered.geoms:
if part.area > farm_utm.area * 0.02:
crop_zones_utm.append({
"crop": "generic",
"polygon": part,
"area_m2": part.area,
})
elif isinstance(uncovered, Polygon):
crop_zones_utm.append({
"crop": "generic",
"polygon": uncovered,
"area_m2": uncovered.area,
})
# ── 5. Run valve placement engine with multi-source orchestration ───
source_contexts = []
for source in water_sources:
source_point_utm = _apply_transform(source["location"], transformer_to_utm)
source_contexts.append(
{
"source_id": f"source_{source['index']:03d}",
"source_index": source["index"],
"pump_point": source_point_utm,
"pump_hp": float(source.get("pump_hp") or pump_hp),
"properties": source.get("properties", {}),
"crop_zones": [],
}
)
# Partition the farm into non-overlapping service regions per source.
# Each source's valve count and zones are scoped to its region.
service_regions = partition_farm_by_sources(farm_utm, source_contexts)
for ctx, region in zip(source_contexts, service_regions):
ctx["service_polygon"] = region
# Clip crop zones to each source's service region
for crop_zone in crop_zones_utm:
crop_poly = crop_zone["polygon"]
best_overlap = 0
best_ctx = source_contexts[0]
for ctx in source_contexts:
overlap = crop_poly.intersection(ctx["service_polygon"]).area
if overlap > best_overlap:
best_overlap = overlap
best_ctx = ctx
# Clip to service region boundary
clipped = crop_poly.intersection(best_ctx["service_polygon"])
if clipped.is_empty:
clipped = crop_poly
if isinstance(clipped, MultiPolygon):
clipped = max(clipped.geoms, key=lambda g: g.area)
if isinstance(clipped, Polygon) and clipped.area > 1:
best_ctx["crop_zones"].append({
"crop": crop_zone.get("crop", "generic"),
"polygon": clipped,
"area_m2": clipped.area,
})
valves = []
zones = []
pipe_networks = []
valve_counter = 0
for source_context in source_contexts:
service_poly = source_context["service_polygon"]
source_crop_zones = source_context["crop_zones"]
# If no crop zones landed in this region, cover it with generic
if not source_crop_zones:
source_crop_zones = [{
"crop": "generic",
"polygon": service_poly,
"area_m2": service_poly.area,
}]
source_valves = place_valves_hierarchical(
farm_polygon=service_poly,
pump_point=source_context["pump_point"],
crop_zones=source_crop_zones,
pump_hp=source_context["pump_hp"],
centralized=centralized,
elevation_data=elevation_data,
max_valves=max_valves,
)
# Renumber valve IDs globally to avoid duplicates across sources
for valve in source_valves:
valve["id"] = f"valve_{valve_counter:03d}"
valve["source_id"] = source_context["source_id"]
valve_counter += 1
# Generate zones first (without valve IDs)
source_zones = generate_valve_zones(
service_poly,
len(source_valves),
main_direction=farm_main_direction,
crop_zones=source_crop_zones,
)
# Anchor valves to zones
source_zones = anchor_valves_to_zones(
source_zones,
source_context["pump_point"],
design_type=design_type,
)
# Assign valve IDs and source IDs to zones
for zone, valve in zip(source_zones, source_valves):
zone["valve_id"] = valve["id"]
zone["source_id"] = source_context["source_id"]
source_pipe_network = generate_pipe_network(
farm_polygon=service_poly,
pump_point=source_context["pump_point"],
zones=source_zones,
main_direction=farm_main_direction,
design_type=design_type,
)
valves.extend(source_valves)
zones.extend(source_zones)
pipe_networks.append(source_pipe_network)
trunk_main_length = sum(
network.get("total_trunk_length_m", 0) for network in pipe_networks
)
submain_length = sum(
network.get("total_submain_length_m", 0) for network in pipe_networks
)
# ── 6. Run drip layout per zone ────────────────────────────────
all_drip_designs = []
all_boms = []
zone_summaries = []
for zone in zones:
zone_poly = zone["polygon"]
valve_id = zone["valve_id"]
# Determine crop for this zone (from valve metadata or default)
valve_meta = next((v for v in valves if v["id"] == valve_id), None)
crop = valve_meta.get("crop", "generic") if valve_meta else "generic"
try:
design = generate_drip_layout(
polygon_utm=zone_poly,
crop=crop,
headland_buffer_m=headland_m,
override_spacing_m=override_spacing if override_spacing else None,
main_direction=farm_main_direction,
valve_location=zone.get("valve_location"),
)
bom = estimate_bom(design, unit="usd")
all_drip_designs.append((valve_id, design))
all_boms.append(bom)
zone_summaries.append({
"valve_id": valve_id,
"crop": crop,
"area_ha": design["farm_area_ha"],
"emitters": design["emitter_count"],
"main_m": design["total_main_length_m"],
"lateral_m": design["total_drip_tape_m"],
})
except DripLayoutError as e:
# Zone too small after headland β€” skip with warning
zone_summaries.append({
"valve_id": valve_id,
"crop": crop,
"error": str(e),
})
# ── 7. Aggregate totals ─────────────────────────────────────────
total_area_ha = sum(s.get("area_ha", 0) for s in zone_summaries if "area_ha" in s)
total_emitters = sum(s.get("emitters", 0) for s in zone_summaries if "emitters" in s)
total_main_m = sum(s.get("main_m", 0) for s in zone_summaries if "main_m" in s)
total_lateral_m = sum(s.get("lateral_m", 0) for s in zone_summaries if "lateral_m" in s)
# Aggregate BOM β€” use pricing config for valve cost
pricing = get_default_pricing_config()
total_bom = {
"main_line_16mm_m": round(sum(b.get("main_line_16mm_m", 0) for b in all_boms), 2),
"drip_tape_16mm_m": round(sum(b.get("drip_tape_16mm_m", 0) for b in all_boms), 2),
"inline_emitters": sum(b.get("inline_emitters", 0) for b in all_boms),
"total_pipe_m": round(sum(b.get("total_pipe_m", 0) for b in all_boms), 2),
"valves_count": len(valves),
}
if all_boms and "cost_main" in all_boms[0]:
total_bom["cost_main"] = round(sum(b.get("cost_main", 0) for b in all_boms), 2)
total_bom["cost_drip_tape"] = round(sum(b.get("cost_drip_tape", 0) for b in all_boms), 2)
total_bom["cost_emitters"] = round(sum(b.get("cost_emitters", 0) for b in all_boms), 2)
total_bom["cost_valves"] = round(len(valves) * pricing.get_price("valve"), 2)
total_bom["total_cost_usd"] = round(
total_bom.get("cost_main", 0)
+ total_bom.get("cost_drip_tape", 0)
+ total_bom.get("cost_emitters", 0)
+ total_bom.get("cost_valves", 0),
2,
)
# ── 8. Convert back to lat/lon for GeoJSON output ───────────────
# Build output features in UTM, then transform all coordinates back
output_features = []
# Farm boundary (echo input)
output_features.append({
"type": "Feature",
"properties": {"type": "farm_boundary", "area_ha": round(total_area_ha, 2)},
"geometry": _polygon_to_geojson(farm_boundary),
})
# Valves (convert UTM points back to lat/lon)
for valve in valves:
valve_point_utm = valve["location"]
valve_point_latlon = _apply_transform(valve_point_utm, transformer_from_utm)
output_features.append({
"type": "Feature",
"properties": {
"type": "valve",
"id": valve["id"],
"strategy": valve["strategy"],
"reason": valve["reason"],
"crop": valve.get("crop", "generic"),
},
"geometry": {
"type": "Point",
"coordinates": [valve_point_latlon.x, valve_point_latlon.y],
},
})
# Valve zones (convert UTM polygons back to lat/lon)
for zone in zones:
zone_poly_utm = zone["polygon"]
zone_poly_latlon = _apply_polygon_transform(zone_poly_utm, transformer_from_utm)
output_features.append({
"type": "Feature",
"properties": {
"type": "valve_zone",
"valve_id": zone["valve_id"],
"area_m2": round(zone["area_m2"], 2),
"area_ha": round(zone["area_m2"] / 10000, 4),
},
"geometry": _polygon_to_geojson(zone_poly_latlon),
})
# Drip layout: main lines and laterals per zone
for valve_id, design in all_drip_designs:
# Main line
main_utm = design["main_line"]
main_latlon = _apply_linestring_transform(main_utm, transformer_from_utm)
output_features.append({
"type": "Feature",
"properties": {
"type": "main_line",
"valve_id": valve_id,
"length_m": round(main_utm.length, 2),
"crop": design["crop"],
},
"geometry": _linestring_to_geojson(main_latlon),
})
# Laterals
for i, lateral_utm in enumerate(design["laterals"]):
lateral_latlon = _apply_linestring_transform(lateral_utm, transformer_from_utm)
output_features.append({
"type": "Feature",
"properties": {
"type": "lateral",
"index": i,
"valve_id": valve_id,
"length_m": round(lateral_utm.length, 2),
"spacing_m": design["design_params"]["lateral_spacing_m"],
},
"geometry": _linestring_to_geojson(lateral_latlon),
})
# ── 9. Build output FeatureCollection ────────────────────────────
output = {
"type": "FeatureCollection",
"properties": {
"type": "farm_design",
"farm_id": top_props.get("farm_id", "unknown"),
"generated_at": _iso_timestamp(),
"design_summary": {
"farm_area_ha": round(total_area_ha, 2),
"total_valves": len(valves),
"total_drip_tape_m": round(total_lateral_m, 2),
"total_main_line_m": round(total_main_m, 2),
"total_emitters": total_emitters,
"pump_hp": pump_hp,
"pump_flow_lph": round(calculate_pump_flow_lph(pump_hp), 2),
"design_type": design_type,
},
"bom": total_bom,
"zone_details": zone_summaries,
},
"features": output_features,
}
return output
except (gj_io.GeoJSONError, ValveEngineError, DripLayoutError) as e:
# Structured error response (still valid GeoJSON)
return _error_response(str(e), type(e).__name__)
except Exception as e:
return _error_response(str(e), "INTERNAL_ERROR")
# ──────────────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────────────
def _resolve_pump_hp(top_props: Dict, pump_props: Dict, features: List[Dict]) -> float:
"""Get pump HP from top-level, pump feature, or feature scan."""
# Top-level property takes precedence
if "pump_hp" in top_props and top_props["pump_hp"] is not None:
return float(top_props["pump_hp"])
# Pump feature property
if "pump_hp" in pump_props and pump_props["pump_hp"] is not None:
return float(pump_props["pump_hp"])
# Scan all features
hp = gj_io.validate_pump_hp(features)
if hp is not None:
return hp
raise DesignAPIError("No pump_hp found in input. Add 'pump_hp' to top-level properties or pump feature.")
def _resolve_design_type(top_props: Dict, farm_area_m2: float = 0) -> str:
"""Get design_type flag from top-level properties.
Returns:
"centralized" or "distributed"
Accepts both new design_type (string) and legacy centralized (bool).
When no explicit flag is provided, derives the default from farm area:
< 1 ha (10,000 mΒ²) β†’ "centralized", β‰₯ 1 ha β†’ "distributed".
"""
# Check new design_type property first
val = top_props.get("design_type")
if isinstance(val, str):
val_lower = val.lower().strip()
if val_lower in ("centralized", "distributed"):
return val_lower
# Check legacy centralized boolean for backward compatibility
val = top_props.get("centralized")
if isinstance(val, bool):
return "centralized" if val else "distributed"
if isinstance(val, str):
return "centralized" if val.lower() in ("true", "yes", "1", "centralized") else "distributed"
# Derive from farm area
return "centralized" if farm_area_m2 < 10000 else "distributed"
def _resolve_centralized(top_props: Dict, farm_area_m2: float = 0) -> bool:
"""Get centralized flag from top-level properties.
When no explicit flag is provided, derives the default from farm area.
This is kept for backward compatibility; prefer _resolve_design_type().
"""
design_type = _resolve_design_type(top_props, farm_area_m2)
return design_type == "centralized"
def _build_utm_transformers(
reference_polygon: Polygon,
) -> tuple:
"""Compute UTM CRS from a lat/lon polygon and return reusable transformers.
Returns:
(utm_crs_string, transformer_to_utm, transformer_from_utm)
"""
centroid = reference_polygon.centroid
lon, lat = centroid.x, centroid.y
utm_zone = int((lon + 180) / 6) + 1
is_southern = lat < 0
utm_crs = f"EPSG:{32700 + utm_zone if is_southern else 32600 + utm_zone}"
to_utm = pyproj.Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True)
from_utm = pyproj.Transformer.from_crs(utm_crs, "EPSG:4326", always_xy=True)
return utm_crs, to_utm, from_utm
def _apply_transform(point: Point, transformer: "pyproj.Transformer") -> Point:
"""Transform a Point using a precomputed pyproj Transformer."""
x, y = transformer.transform(point.x, point.y)
return Point(x, y)
def _apply_polygon_transform(
polygon: Polygon, transformer: "pyproj.Transformer"
) -> Polygon:
"""Transform a Polygon using a precomputed pyproj Transformer."""
coords = [transformer.transform(x, y) for x, y in polygon.exterior.coords]
return Polygon(coords)
def _apply_linestring_transform(
line: LineString, transformer: "pyproj.Transformer"
) -> LineString:
"""Transform a LineString using a precomputed pyproj Transformer."""
coords = [transformer.transform(x, y) for x, y in line.coords]
return LineString(coords)
def _polygon_to_geojson(polygon: Polygon) -> Dict:
"""Convert Shapely Polygon to GeoJSON Polygon dict."""
coords = []
for x, y in polygon.exterior.coords:
coords.append([x, y])
return {"type": "Polygon", "coordinates": [coords]}
def _linestring_to_geojson(line: LineString) -> Dict:
"""Convert Shapely LineString to GeoJSON LineString dict."""
coords = []
for x, y in line.coords:
coords.append([x, y])
return {"type": "LineString", "coordinates": coords}
def _iso_timestamp() -> str:
"""Return current ISO timestamp string."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
def _error_response(message: str, code: str) -> Dict:
"""Return a valid GeoJSON FeatureCollection containing error info."""
return {
"type": "FeatureCollection",
"properties": {
"type": "farm_design_error",
"error": {
"code": code,
"message": message,
},
},
"features": [],
}
# ──────────────────────────────────────────────────────────────────────
# Convenience functions
# ──────────────────────────────────────────────────────────────────────
def process_from_file(file_path: str) -> str:
"""Read GeoJSON from file, run pipeline, return JSON string."""
with open(file_path, "r", encoding="utf-8") as f:
geojson_str = f.read()
result = process_farm_design(geojson_str)
return json.dumps(result, indent=2)
def process_from_string(geojson_str: str) -> str:
"""Run pipeline on GeoJSON string, return JSON string."""
result = process_farm_design(geojson_str)
return json.dumps(result, indent=2)