""" Design API — End-to-end farm irrigation design pipeline. Orchestrates: GeoJSON Input → Parse → Valve Placement → Drip Layout → GeoJSON Output Input: GeoJSON FeatureCollection (farm_boundary, pump, crop_zones, elevation) Output: GeoJSON FeatureCollection (valves, zones, mains, laterals, BOM) """ import json import math from typing import Dict, List, Any, Tuple, Optional import pyproj from shapely.geometry import Polygon, Point, LineString, MultiPolygon import geojson_io as gj_io from drip_engine import ( generate_drip_layout, estimate_bom, latlon_to_utm, compute_farm_axis, CROP_DEFAULTS, DripLayoutError, ) from pipe_network import ( generate_pipe_network, calculate_pipe_lengths, PipeNetworkError, ) from pricing_config import get_default_pricing_config from valve_engine import ( place_valves_hierarchical, generate_valve_zones, anchor_valves_to_zones, partition_farm_by_sources, calculate_pump_flow_lph, calculate_total_emitter_flow, calculate_num_zones, choose_manifold_strategy, _refine_zones_by_crop_boundaries, ValveEngineError, ) class DesignAPIError(Exception): """Top-level exception for design pipeline errors.""" pass def process_farm_design(geojson_input: str) -> Dict[str, Any]: """ Main entry point: parse GeoJSON input, run design pipeline, return GeoJSON output. Args: geojson_input: GeoJSON FeatureCollection string (or dict) Returns: Dict that is a GeoJSON FeatureCollection with: - properties: design_summary, bom - features: farm_boundary, valves, valve_zones, main_lines, laterals Raises: DesignAPIError: On any pipeline failure (with structured error info) """ try: # ── 1. Parse input ────────────────────────────────────────────── fc = gj_io.parse_geojson_feature_collection(geojson_input) features = fc.get("features", []) top_props = fc.get("properties", {}) # ── 2. Extract geometries ─────────────────────────────────────── farm_boundary, _ = gj_io.extract_farm_boundary(fc) pump_point, pump_props = gj_io.extract_pump_location(fc) water_sources = gj_io.extract_all_water_sources(fc) crop_zones = gj_io.extract_crop_zones(fc) elevation_data = gj_io.extract_elevation_data(fc) # ── 3. Resolve parameters (top-level props override feature props) pump_hp = _resolve_pump_hp(top_props, pump_props, features) headland_m = top_props.get("headland_buffer_m", 1.0) override_spacing = top_props.get("override_lateral_spacing_m") max_valves = top_props.get("max_valves") if max_valves is not None: max_valves = int(max_valves) # ── 4. Convert to UTM for accurate calculations ───────────────── # Farm boundary lat/lon → UTM farm_utm = latlon_to_utm(farm_boundary) # Build a reusable UTM transformer (computed once, used everywhere) utm_crs, transformer_to_utm, transformer_from_utm = _build_utm_transformers(farm_boundary) pump_utm = _apply_transform(pump_point, transformer_to_utm) farm_main_direction, _farm_lateral_direction = compute_farm_axis(farm_utm) # Resolve design_type flag — explicit override or derive from farm area design_type = _resolve_design_type(top_props, farm_utm.area) # Convert design_type string to boolean centralized flag for engine compatibility centralized = (design_type == "centralized") # Convert crop zone polygons to UTM crop_zones_utm = [] for zone in crop_zones: zone_poly = zone.get("polygon") if zone_poly is None: continue zone_utm = _apply_polygon_transform(zone_poly, transformer_to_utm) crop_zones_utm.append({ "crop": zone.get("crop", "generic"), "polygon": zone_utm, "area_m2": zone_utm.area, }) # If no explicit crop zones, treat entire farm as single generic zone. # If crop zones exist but don't cover the full farm, add a generic # zone for the uncovered area so strip generation can tile the # entire farm and valve counts stay consistent. if not crop_zones_utm: crop_zones_utm = [{ "crop": "generic", "polygon": farm_utm, "area_m2": farm_utm.area, }] else: from shapely.ops import unary_union crop_union = unary_union([z["polygon"] for z in crop_zones_utm]) uncovered = farm_utm.difference(crop_union) if not uncovered.is_empty and uncovered.area > farm_utm.area * 0.05: if isinstance(uncovered, MultiPolygon): for part in uncovered.geoms: if part.area > farm_utm.area * 0.02: crop_zones_utm.append({ "crop": "generic", "polygon": part, "area_m2": part.area, }) elif isinstance(uncovered, Polygon): crop_zones_utm.append({ "crop": "generic", "polygon": uncovered, "area_m2": uncovered.area, }) # ── 5. Run valve placement engine with multi-source orchestration ─── source_contexts = [] for source in water_sources: source_point_utm = _apply_transform(source["location"], transformer_to_utm) source_contexts.append( { "source_id": f"source_{source['index']:03d}", "source_index": source["index"], "pump_point": source_point_utm, "pump_hp": float(source.get("pump_hp") or pump_hp), "properties": source.get("properties", {}), "crop_zones": [], } ) # Partition the farm into non-overlapping service regions per source. # Each source's valve count and zones are scoped to its region. service_regions = partition_farm_by_sources(farm_utm, source_contexts) for ctx, region in zip(source_contexts, service_regions): ctx["service_polygon"] = region # Clip crop zones to each source's service region for crop_zone in crop_zones_utm: crop_poly = crop_zone["polygon"] best_overlap = 0 best_ctx = source_contexts[0] for ctx in source_contexts: overlap = crop_poly.intersection(ctx["service_polygon"]).area if overlap > best_overlap: best_overlap = overlap best_ctx = ctx # Clip to service region boundary clipped = crop_poly.intersection(best_ctx["service_polygon"]) if clipped.is_empty: clipped = crop_poly if isinstance(clipped, MultiPolygon): clipped = max(clipped.geoms, key=lambda g: g.area) if isinstance(clipped, Polygon) and clipped.area > 1: best_ctx["crop_zones"].append({ "crop": crop_zone.get("crop", "generic"), "polygon": clipped, "area_m2": clipped.area, }) valves = [] zones = [] pipe_networks = [] valve_counter = 0 for source_context in source_contexts: service_poly = source_context["service_polygon"] source_crop_zones = source_context["crop_zones"] # If no crop zones landed in this region, cover it with generic if not source_crop_zones: source_crop_zones = [{ "crop": "generic", "polygon": service_poly, "area_m2": service_poly.area, }] source_valves = place_valves_hierarchical( farm_polygon=service_poly, pump_point=source_context["pump_point"], crop_zones=source_crop_zones, pump_hp=source_context["pump_hp"], centralized=centralized, elevation_data=elevation_data, max_valves=max_valves, ) # Renumber valve IDs globally to avoid duplicates across sources for valve in source_valves: valve["id"] = f"valve_{valve_counter:03d}" valve["source_id"] = source_context["source_id"] valve_counter += 1 # Generate zones first (without valve IDs) source_zones = generate_valve_zones( service_poly, len(source_valves), main_direction=farm_main_direction, crop_zones=source_crop_zones, ) # Anchor valves to zones source_zones = anchor_valves_to_zones( source_zones, source_context["pump_point"], design_type=design_type, ) # Assign valve IDs and source IDs to zones for zone, valve in zip(source_zones, source_valves): zone["valve_id"] = valve["id"] zone["source_id"] = source_context["source_id"] source_pipe_network = generate_pipe_network( farm_polygon=service_poly, pump_point=source_context["pump_point"], zones=source_zones, main_direction=farm_main_direction, design_type=design_type, ) valves.extend(source_valves) zones.extend(source_zones) pipe_networks.append(source_pipe_network) trunk_main_length = sum( network.get("total_trunk_length_m", 0) for network in pipe_networks ) submain_length = sum( network.get("total_submain_length_m", 0) for network in pipe_networks ) # ── 6. Run drip layout per zone ──────────────────────────────── all_drip_designs = [] all_boms = [] zone_summaries = [] for zone in zones: zone_poly = zone["polygon"] valve_id = zone["valve_id"] # Determine crop for this zone (from valve metadata or default) valve_meta = next((v for v in valves if v["id"] == valve_id), None) crop = valve_meta.get("crop", "generic") if valve_meta else "generic" try: design = generate_drip_layout( polygon_utm=zone_poly, crop=crop, headland_buffer_m=headland_m, override_spacing_m=override_spacing if override_spacing else None, main_direction=farm_main_direction, valve_location=zone.get("valve_location"), ) bom = estimate_bom(design, unit="usd") all_drip_designs.append((valve_id, design)) all_boms.append(bom) zone_summaries.append({ "valve_id": valve_id, "crop": crop, "area_ha": design["farm_area_ha"], "emitters": design["emitter_count"], "main_m": design["total_main_length_m"], "lateral_m": design["total_drip_tape_m"], }) except DripLayoutError as e: # Zone too small after headland — skip with warning zone_summaries.append({ "valve_id": valve_id, "crop": crop, "error": str(e), }) # ── 7. Aggregate totals ───────────────────────────────────────── total_area_ha = sum(s.get("area_ha", 0) for s in zone_summaries if "area_ha" in s) total_emitters = sum(s.get("emitters", 0) for s in zone_summaries if "emitters" in s) total_main_m = sum(s.get("main_m", 0) for s in zone_summaries if "main_m" in s) total_lateral_m = sum(s.get("lateral_m", 0) for s in zone_summaries if "lateral_m" in s) # Aggregate BOM — use pricing config for valve cost pricing = get_default_pricing_config() total_bom = { "main_line_16mm_m": round(sum(b.get("main_line_16mm_m", 0) for b in all_boms), 2), "drip_tape_16mm_m": round(sum(b.get("drip_tape_16mm_m", 0) for b in all_boms), 2), "inline_emitters": sum(b.get("inline_emitters", 0) for b in all_boms), "total_pipe_m": round(sum(b.get("total_pipe_m", 0) for b in all_boms), 2), "valves_count": len(valves), } if all_boms and "cost_main" in all_boms[0]: total_bom["cost_main"] = round(sum(b.get("cost_main", 0) for b in all_boms), 2) total_bom["cost_drip_tape"] = round(sum(b.get("cost_drip_tape", 0) for b in all_boms), 2) total_bom["cost_emitters"] = round(sum(b.get("cost_emitters", 0) for b in all_boms), 2) total_bom["cost_valves"] = round(len(valves) * pricing.get_price("valve"), 2) total_bom["total_cost_usd"] = round( total_bom.get("cost_main", 0) + total_bom.get("cost_drip_tape", 0) + total_bom.get("cost_emitters", 0) + total_bom.get("cost_valves", 0), 2, ) # ── 8. Convert back to lat/lon for GeoJSON output ─────────────── # Build output features in UTM, then transform all coordinates back output_features = [] # Farm boundary (echo input) output_features.append({ "type": "Feature", "properties": {"type": "farm_boundary", "area_ha": round(total_area_ha, 2)}, "geometry": _polygon_to_geojson(farm_boundary), }) # Valves (convert UTM points back to lat/lon) for valve in valves: valve_point_utm = valve["location"] valve_point_latlon = _apply_transform(valve_point_utm, transformer_from_utm) output_features.append({ "type": "Feature", "properties": { "type": "valve", "id": valve["id"], "strategy": valve["strategy"], "reason": valve["reason"], "crop": valve.get("crop", "generic"), }, "geometry": { "type": "Point", "coordinates": [valve_point_latlon.x, valve_point_latlon.y], }, }) # Valve zones (convert UTM polygons back to lat/lon) for zone in zones: zone_poly_utm = zone["polygon"] zone_poly_latlon = _apply_polygon_transform(zone_poly_utm, transformer_from_utm) output_features.append({ "type": "Feature", "properties": { "type": "valve_zone", "valve_id": zone["valve_id"], "area_m2": round(zone["area_m2"], 2), "area_ha": round(zone["area_m2"] / 10000, 4), }, "geometry": _polygon_to_geojson(zone_poly_latlon), }) # Drip layout: main lines and laterals per zone for valve_id, design in all_drip_designs: # Main line main_utm = design["main_line"] main_latlon = _apply_linestring_transform(main_utm, transformer_from_utm) output_features.append({ "type": "Feature", "properties": { "type": "main_line", "valve_id": valve_id, "length_m": round(main_utm.length, 2), "crop": design["crop"], }, "geometry": _linestring_to_geojson(main_latlon), }) # Laterals for i, lateral_utm in enumerate(design["laterals"]): lateral_latlon = _apply_linestring_transform(lateral_utm, transformer_from_utm) output_features.append({ "type": "Feature", "properties": { "type": "lateral", "index": i, "valve_id": valve_id, "length_m": round(lateral_utm.length, 2), "spacing_m": design["design_params"]["lateral_spacing_m"], }, "geometry": _linestring_to_geojson(lateral_latlon), }) # ── 9. Build output FeatureCollection ──────────────────────────── output = { "type": "FeatureCollection", "properties": { "type": "farm_design", "farm_id": top_props.get("farm_id", "unknown"), "generated_at": _iso_timestamp(), "design_summary": { "farm_area_ha": round(total_area_ha, 2), "total_valves": len(valves), "total_drip_tape_m": round(total_lateral_m, 2), "total_main_line_m": round(total_main_m, 2), "total_emitters": total_emitters, "pump_hp": pump_hp, "pump_flow_lph": round(calculate_pump_flow_lph(pump_hp), 2), "design_type": design_type, }, "bom": total_bom, "zone_details": zone_summaries, }, "features": output_features, } return output except (gj_io.GeoJSONError, ValveEngineError, DripLayoutError) as e: # Structured error response (still valid GeoJSON) return _error_response(str(e), type(e).__name__) except Exception as e: return _error_response(str(e), "INTERNAL_ERROR") # ────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────── def _resolve_pump_hp(top_props: Dict, pump_props: Dict, features: List[Dict]) -> float: """Get pump HP from top-level, pump feature, or feature scan.""" # Top-level property takes precedence if "pump_hp" in top_props and top_props["pump_hp"] is not None: return float(top_props["pump_hp"]) # Pump feature property if "pump_hp" in pump_props and pump_props["pump_hp"] is not None: return float(pump_props["pump_hp"]) # Scan all features hp = gj_io.validate_pump_hp(features) if hp is not None: return hp raise DesignAPIError("No pump_hp found in input. Add 'pump_hp' to top-level properties or pump feature.") def _resolve_design_type(top_props: Dict, farm_area_m2: float = 0) -> str: """Get design_type flag from top-level properties. Returns: "centralized" or "distributed" Accepts both new design_type (string) and legacy centralized (bool). When no explicit flag is provided, derives the default from farm area: < 1 ha (10,000 m²) → "centralized", ≥ 1 ha → "distributed". """ # Check new design_type property first val = top_props.get("design_type") if isinstance(val, str): val_lower = val.lower().strip() if val_lower in ("centralized", "distributed"): return val_lower # Check legacy centralized boolean for backward compatibility val = top_props.get("centralized") if isinstance(val, bool): return "centralized" if val else "distributed" if isinstance(val, str): return "centralized" if val.lower() in ("true", "yes", "1", "centralized") else "distributed" # Derive from farm area return "centralized" if farm_area_m2 < 10000 else "distributed" def _resolve_centralized(top_props: Dict, farm_area_m2: float = 0) -> bool: """Get centralized flag from top-level properties. When no explicit flag is provided, derives the default from farm area. This is kept for backward compatibility; prefer _resolve_design_type(). """ design_type = _resolve_design_type(top_props, farm_area_m2) return design_type == "centralized" def _build_utm_transformers( reference_polygon: Polygon, ) -> tuple: """Compute UTM CRS from a lat/lon polygon and return reusable transformers. Returns: (utm_crs_string, transformer_to_utm, transformer_from_utm) """ centroid = reference_polygon.centroid lon, lat = centroid.x, centroid.y utm_zone = int((lon + 180) / 6) + 1 is_southern = lat < 0 utm_crs = f"EPSG:{32700 + utm_zone if is_southern else 32600 + utm_zone}" to_utm = pyproj.Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True) from_utm = pyproj.Transformer.from_crs(utm_crs, "EPSG:4326", always_xy=True) return utm_crs, to_utm, from_utm def _apply_transform(point: Point, transformer: "pyproj.Transformer") -> Point: """Transform a Point using a precomputed pyproj Transformer.""" x, y = transformer.transform(point.x, point.y) return Point(x, y) def _apply_polygon_transform( polygon: Polygon, transformer: "pyproj.Transformer" ) -> Polygon: """Transform a Polygon using a precomputed pyproj Transformer.""" coords = [transformer.transform(x, y) for x, y in polygon.exterior.coords] return Polygon(coords) def _apply_linestring_transform( line: LineString, transformer: "pyproj.Transformer" ) -> LineString: """Transform a LineString using a precomputed pyproj Transformer.""" coords = [transformer.transform(x, y) for x, y in line.coords] return LineString(coords) def _polygon_to_geojson(polygon: Polygon) -> Dict: """Convert Shapely Polygon to GeoJSON Polygon dict.""" coords = [] for x, y in polygon.exterior.coords: coords.append([x, y]) return {"type": "Polygon", "coordinates": [coords]} def _linestring_to_geojson(line: LineString) -> Dict: """Convert Shapely LineString to GeoJSON LineString dict.""" coords = [] for x, y in line.coords: coords.append([x, y]) return {"type": "LineString", "coordinates": coords} def _iso_timestamp() -> str: """Return current ISO timestamp string.""" from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat() def _error_response(message: str, code: str) -> Dict: """Return a valid GeoJSON FeatureCollection containing error info.""" return { "type": "FeatureCollection", "properties": { "type": "farm_design_error", "error": { "code": code, "message": message, }, }, "features": [], } # ────────────────────────────────────────────────────────────────────── # Convenience functions # ────────────────────────────────────────────────────────────────────── def process_from_file(file_path: str) -> str: """Read GeoJSON from file, run pipeline, return JSON string.""" with open(file_path, "r", encoding="utf-8") as f: geojson_str = f.read() result = process_farm_design(geojson_str) return json.dumps(result, indent=2) def process_from_string(geojson_str: str) -> str: """Run pipeline on GeoJSON string, return JSON string.""" result = process_farm_design(geojson_str) return json.dumps(result, indent=2)