Spaces:
Running
Running
| """ | |
| Design API β End-to-end farm irrigation design pipeline. | |
| Orchestrates: | |
| GeoJSON Input β Parse β Valve Placement β Drip Layout β GeoJSON Output | |
| Input: GeoJSON FeatureCollection (farm_boundary, pump, crop_zones, elevation) | |
| Output: GeoJSON FeatureCollection (valves, zones, mains, laterals, BOM) | |
| """ | |
| import json | |
| import math | |
| from typing import Dict, List, Any, Tuple, Optional | |
| import pyproj | |
| from shapely.geometry import Polygon, Point, LineString, MultiPolygon | |
| import geojson_io as gj_io | |
| from drip_engine import ( | |
| generate_drip_layout, | |
| estimate_bom, | |
| latlon_to_utm, | |
| compute_farm_axis, | |
| CROP_DEFAULTS, | |
| DripLayoutError, | |
| ) | |
| from pipe_network import ( | |
| generate_pipe_network, | |
| calculate_pipe_lengths, | |
| PipeNetworkError, | |
| ) | |
| from pricing_config import get_default_pricing_config | |
| from valve_engine import ( | |
| place_valves_hierarchical, | |
| generate_valve_zones, | |
| anchor_valves_to_zones, | |
| partition_farm_by_sources, | |
| calculate_pump_flow_lph, | |
| calculate_total_emitter_flow, | |
| calculate_num_zones, | |
| choose_manifold_strategy, | |
| _refine_zones_by_crop_boundaries, | |
| ValveEngineError, | |
| ) | |
| class DesignAPIError(Exception): | |
| """Top-level exception for design pipeline errors.""" | |
| pass | |
| def process_farm_design(geojson_input: str) -> Dict[str, Any]: | |
| """ | |
| Main entry point: parse GeoJSON input, run design pipeline, return GeoJSON output. | |
| Args: | |
| geojson_input: GeoJSON FeatureCollection string (or dict) | |
| Returns: | |
| Dict that is a GeoJSON FeatureCollection with: | |
| - properties: design_summary, bom | |
| - features: farm_boundary, valves, valve_zones, main_lines, laterals | |
| Raises: | |
| DesignAPIError: On any pipeline failure (with structured error info) | |
| """ | |
| try: | |
| # ββ 1. Parse input ββββββββββββββββββββββββββββββββββββββββββββββ | |
| fc = gj_io.parse_geojson_feature_collection(geojson_input) | |
| features = fc.get("features", []) | |
| top_props = fc.get("properties", {}) | |
| # ββ 2. Extract geometries βββββββββββββββββββββββββββββββββββββββ | |
| farm_boundary, _ = gj_io.extract_farm_boundary(fc) | |
| pump_point, pump_props = gj_io.extract_pump_location(fc) | |
| water_sources = gj_io.extract_all_water_sources(fc) | |
| crop_zones = gj_io.extract_crop_zones(fc) | |
| elevation_data = gj_io.extract_elevation_data(fc) | |
| # ββ 3. Resolve parameters (top-level props override feature props) | |
| pump_hp = _resolve_pump_hp(top_props, pump_props, features) | |
| headland_m = top_props.get("headland_buffer_m", 1.0) | |
| override_spacing = top_props.get("override_lateral_spacing_m") | |
| max_valves = top_props.get("max_valves") | |
| if max_valves is not None: | |
| max_valves = int(max_valves) | |
| # ββ 4. Convert to UTM for accurate calculations βββββββββββββββββ | |
| # Farm boundary lat/lon β UTM | |
| farm_utm = latlon_to_utm(farm_boundary) | |
| # Build a reusable UTM transformer (computed once, used everywhere) | |
| utm_crs, transformer_to_utm, transformer_from_utm = _build_utm_transformers(farm_boundary) | |
| pump_utm = _apply_transform(pump_point, transformer_to_utm) | |
| farm_main_direction, _farm_lateral_direction = compute_farm_axis(farm_utm) | |
| # Resolve design_type flag β explicit override or derive from farm area | |
| design_type = _resolve_design_type(top_props, farm_utm.area) | |
| # Convert design_type string to boolean centralized flag for engine compatibility | |
| centralized = (design_type == "centralized") | |
| # Convert crop zone polygons to UTM | |
| crop_zones_utm = [] | |
| for zone in crop_zones: | |
| zone_poly = zone.get("polygon") | |
| if zone_poly is None: | |
| continue | |
| zone_utm = _apply_polygon_transform(zone_poly, transformer_to_utm) | |
| crop_zones_utm.append({ | |
| "crop": zone.get("crop", "generic"), | |
| "polygon": zone_utm, | |
| "area_m2": zone_utm.area, | |
| }) | |
| # If no explicit crop zones, treat entire farm as single generic zone. | |
| # If crop zones exist but don't cover the full farm, add a generic | |
| # zone for the uncovered area so strip generation can tile the | |
| # entire farm and valve counts stay consistent. | |
| if not crop_zones_utm: | |
| crop_zones_utm = [{ | |
| "crop": "generic", | |
| "polygon": farm_utm, | |
| "area_m2": farm_utm.area, | |
| }] | |
| else: | |
| from shapely.ops import unary_union | |
| crop_union = unary_union([z["polygon"] for z in crop_zones_utm]) | |
| uncovered = farm_utm.difference(crop_union) | |
| if not uncovered.is_empty and uncovered.area > farm_utm.area * 0.05: | |
| if isinstance(uncovered, MultiPolygon): | |
| for part in uncovered.geoms: | |
| if part.area > farm_utm.area * 0.02: | |
| crop_zones_utm.append({ | |
| "crop": "generic", | |
| "polygon": part, | |
| "area_m2": part.area, | |
| }) | |
| elif isinstance(uncovered, Polygon): | |
| crop_zones_utm.append({ | |
| "crop": "generic", | |
| "polygon": uncovered, | |
| "area_m2": uncovered.area, | |
| }) | |
| # ββ 5. Run valve placement engine with multi-source orchestration βββ | |
| source_contexts = [] | |
| for source in water_sources: | |
| source_point_utm = _apply_transform(source["location"], transformer_to_utm) | |
| source_contexts.append( | |
| { | |
| "source_id": f"source_{source['index']:03d}", | |
| "source_index": source["index"], | |
| "pump_point": source_point_utm, | |
| "pump_hp": float(source.get("pump_hp") or pump_hp), | |
| "properties": source.get("properties", {}), | |
| "crop_zones": [], | |
| } | |
| ) | |
| # Partition the farm into non-overlapping service regions per source. | |
| # Each source's valve count and zones are scoped to its region. | |
| service_regions = partition_farm_by_sources(farm_utm, source_contexts) | |
| for ctx, region in zip(source_contexts, service_regions): | |
| ctx["service_polygon"] = region | |
| # Clip crop zones to each source's service region | |
| for crop_zone in crop_zones_utm: | |
| crop_poly = crop_zone["polygon"] | |
| best_overlap = 0 | |
| best_ctx = source_contexts[0] | |
| for ctx in source_contexts: | |
| overlap = crop_poly.intersection(ctx["service_polygon"]).area | |
| if overlap > best_overlap: | |
| best_overlap = overlap | |
| best_ctx = ctx | |
| # Clip to service region boundary | |
| clipped = crop_poly.intersection(best_ctx["service_polygon"]) | |
| if clipped.is_empty: | |
| clipped = crop_poly | |
| if isinstance(clipped, MultiPolygon): | |
| clipped = max(clipped.geoms, key=lambda g: g.area) | |
| if isinstance(clipped, Polygon) and clipped.area > 1: | |
| best_ctx["crop_zones"].append({ | |
| "crop": crop_zone.get("crop", "generic"), | |
| "polygon": clipped, | |
| "area_m2": clipped.area, | |
| }) | |
| valves = [] | |
| zones = [] | |
| pipe_networks = [] | |
| valve_counter = 0 | |
| for source_context in source_contexts: | |
| service_poly = source_context["service_polygon"] | |
| source_crop_zones = source_context["crop_zones"] | |
| # If no crop zones landed in this region, cover it with generic | |
| if not source_crop_zones: | |
| source_crop_zones = [{ | |
| "crop": "generic", | |
| "polygon": service_poly, | |
| "area_m2": service_poly.area, | |
| }] | |
| source_valves = place_valves_hierarchical( | |
| farm_polygon=service_poly, | |
| pump_point=source_context["pump_point"], | |
| crop_zones=source_crop_zones, | |
| pump_hp=source_context["pump_hp"], | |
| centralized=centralized, | |
| elevation_data=elevation_data, | |
| max_valves=max_valves, | |
| ) | |
| # Renumber valve IDs globally to avoid duplicates across sources | |
| for valve in source_valves: | |
| valve["id"] = f"valve_{valve_counter:03d}" | |
| valve["source_id"] = source_context["source_id"] | |
| valve_counter += 1 | |
| # Generate zones first (without valve IDs) | |
| source_zones = generate_valve_zones( | |
| service_poly, | |
| len(source_valves), | |
| main_direction=farm_main_direction, | |
| crop_zones=source_crop_zones, | |
| ) | |
| # Anchor valves to zones | |
| source_zones = anchor_valves_to_zones( | |
| source_zones, | |
| source_context["pump_point"], | |
| design_type=design_type, | |
| ) | |
| # Assign valve IDs and source IDs to zones | |
| for zone, valve in zip(source_zones, source_valves): | |
| zone["valve_id"] = valve["id"] | |
| zone["source_id"] = source_context["source_id"] | |
| source_pipe_network = generate_pipe_network( | |
| farm_polygon=service_poly, | |
| pump_point=source_context["pump_point"], | |
| zones=source_zones, | |
| main_direction=farm_main_direction, | |
| design_type=design_type, | |
| ) | |
| valves.extend(source_valves) | |
| zones.extend(source_zones) | |
| pipe_networks.append(source_pipe_network) | |
| trunk_main_length = sum( | |
| network.get("total_trunk_length_m", 0) for network in pipe_networks | |
| ) | |
| submain_length = sum( | |
| network.get("total_submain_length_m", 0) for network in pipe_networks | |
| ) | |
| # ββ 6. Run drip layout per zone ββββββββββββββββββββββββββββββββ | |
| all_drip_designs = [] | |
| all_boms = [] | |
| zone_summaries = [] | |
| for zone in zones: | |
| zone_poly = zone["polygon"] | |
| valve_id = zone["valve_id"] | |
| # Determine crop for this zone (from valve metadata or default) | |
| valve_meta = next((v for v in valves if v["id"] == valve_id), None) | |
| crop = valve_meta.get("crop", "generic") if valve_meta else "generic" | |
| try: | |
| design = generate_drip_layout( | |
| polygon_utm=zone_poly, | |
| crop=crop, | |
| headland_buffer_m=headland_m, | |
| override_spacing_m=override_spacing if override_spacing else None, | |
| main_direction=farm_main_direction, | |
| valve_location=zone.get("valve_location"), | |
| ) | |
| bom = estimate_bom(design, unit="usd") | |
| all_drip_designs.append((valve_id, design)) | |
| all_boms.append(bom) | |
| zone_summaries.append({ | |
| "valve_id": valve_id, | |
| "crop": crop, | |
| "area_ha": design["farm_area_ha"], | |
| "emitters": design["emitter_count"], | |
| "main_m": design["total_main_length_m"], | |
| "lateral_m": design["total_drip_tape_m"], | |
| }) | |
| except DripLayoutError as e: | |
| # Zone too small after headland β skip with warning | |
| zone_summaries.append({ | |
| "valve_id": valve_id, | |
| "crop": crop, | |
| "error": str(e), | |
| }) | |
| # ββ 7. Aggregate totals βββββββββββββββββββββββββββββββββββββββββ | |
| total_area_ha = sum(s.get("area_ha", 0) for s in zone_summaries if "area_ha" in s) | |
| total_emitters = sum(s.get("emitters", 0) for s in zone_summaries if "emitters" in s) | |
| total_main_m = sum(s.get("main_m", 0) for s in zone_summaries if "main_m" in s) | |
| total_lateral_m = sum(s.get("lateral_m", 0) for s in zone_summaries if "lateral_m" in s) | |
| # Aggregate BOM β use pricing config for valve cost | |
| pricing = get_default_pricing_config() | |
| total_bom = { | |
| "main_line_16mm_m": round(sum(b.get("main_line_16mm_m", 0) for b in all_boms), 2), | |
| "drip_tape_16mm_m": round(sum(b.get("drip_tape_16mm_m", 0) for b in all_boms), 2), | |
| "inline_emitters": sum(b.get("inline_emitters", 0) for b in all_boms), | |
| "total_pipe_m": round(sum(b.get("total_pipe_m", 0) for b in all_boms), 2), | |
| "valves_count": len(valves), | |
| } | |
| if all_boms and "cost_main" in all_boms[0]: | |
| total_bom["cost_main"] = round(sum(b.get("cost_main", 0) for b in all_boms), 2) | |
| total_bom["cost_drip_tape"] = round(sum(b.get("cost_drip_tape", 0) for b in all_boms), 2) | |
| total_bom["cost_emitters"] = round(sum(b.get("cost_emitters", 0) for b in all_boms), 2) | |
| total_bom["cost_valves"] = round(len(valves) * pricing.get_price("valve"), 2) | |
| total_bom["total_cost_usd"] = round( | |
| total_bom.get("cost_main", 0) | |
| + total_bom.get("cost_drip_tape", 0) | |
| + total_bom.get("cost_emitters", 0) | |
| + total_bom.get("cost_valves", 0), | |
| 2, | |
| ) | |
| # ββ 8. Convert back to lat/lon for GeoJSON output βββββββββββββββ | |
| # Build output features in UTM, then transform all coordinates back | |
| output_features = [] | |
| # Farm boundary (echo input) | |
| output_features.append({ | |
| "type": "Feature", | |
| "properties": {"type": "farm_boundary", "area_ha": round(total_area_ha, 2)}, | |
| "geometry": _polygon_to_geojson(farm_boundary), | |
| }) | |
| # Valves (convert UTM points back to lat/lon) | |
| for valve in valves: | |
| valve_point_utm = valve["location"] | |
| valve_point_latlon = _apply_transform(valve_point_utm, transformer_from_utm) | |
| output_features.append({ | |
| "type": "Feature", | |
| "properties": { | |
| "type": "valve", | |
| "id": valve["id"], | |
| "strategy": valve["strategy"], | |
| "reason": valve["reason"], | |
| "crop": valve.get("crop", "generic"), | |
| }, | |
| "geometry": { | |
| "type": "Point", | |
| "coordinates": [valve_point_latlon.x, valve_point_latlon.y], | |
| }, | |
| }) | |
| # Valve zones (convert UTM polygons back to lat/lon) | |
| for zone in zones: | |
| zone_poly_utm = zone["polygon"] | |
| zone_poly_latlon = _apply_polygon_transform(zone_poly_utm, transformer_from_utm) | |
| output_features.append({ | |
| "type": "Feature", | |
| "properties": { | |
| "type": "valve_zone", | |
| "valve_id": zone["valve_id"], | |
| "area_m2": round(zone["area_m2"], 2), | |
| "area_ha": round(zone["area_m2"] / 10000, 4), | |
| }, | |
| "geometry": _polygon_to_geojson(zone_poly_latlon), | |
| }) | |
| # Drip layout: main lines and laterals per zone | |
| for valve_id, design in all_drip_designs: | |
| # Main line | |
| main_utm = design["main_line"] | |
| main_latlon = _apply_linestring_transform(main_utm, transformer_from_utm) | |
| output_features.append({ | |
| "type": "Feature", | |
| "properties": { | |
| "type": "main_line", | |
| "valve_id": valve_id, | |
| "length_m": round(main_utm.length, 2), | |
| "crop": design["crop"], | |
| }, | |
| "geometry": _linestring_to_geojson(main_latlon), | |
| }) | |
| # Laterals | |
| for i, lateral_utm in enumerate(design["laterals"]): | |
| lateral_latlon = _apply_linestring_transform(lateral_utm, transformer_from_utm) | |
| output_features.append({ | |
| "type": "Feature", | |
| "properties": { | |
| "type": "lateral", | |
| "index": i, | |
| "valve_id": valve_id, | |
| "length_m": round(lateral_utm.length, 2), | |
| "spacing_m": design["design_params"]["lateral_spacing_m"], | |
| }, | |
| "geometry": _linestring_to_geojson(lateral_latlon), | |
| }) | |
| # ββ 9. Build output FeatureCollection ββββββββββββββββββββββββββββ | |
| output = { | |
| "type": "FeatureCollection", | |
| "properties": { | |
| "type": "farm_design", | |
| "farm_id": top_props.get("farm_id", "unknown"), | |
| "generated_at": _iso_timestamp(), | |
| "design_summary": { | |
| "farm_area_ha": round(total_area_ha, 2), | |
| "total_valves": len(valves), | |
| "total_drip_tape_m": round(total_lateral_m, 2), | |
| "total_main_line_m": round(total_main_m, 2), | |
| "total_emitters": total_emitters, | |
| "pump_hp": pump_hp, | |
| "pump_flow_lph": round(calculate_pump_flow_lph(pump_hp), 2), | |
| "design_type": design_type, | |
| }, | |
| "bom": total_bom, | |
| "zone_details": zone_summaries, | |
| }, | |
| "features": output_features, | |
| } | |
| return output | |
| except (gj_io.GeoJSONError, ValveEngineError, DripLayoutError) as e: | |
| # Structured error response (still valid GeoJSON) | |
| return _error_response(str(e), type(e).__name__) | |
| except Exception as e: | |
| return _error_response(str(e), "INTERNAL_ERROR") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _resolve_pump_hp(top_props: Dict, pump_props: Dict, features: List[Dict]) -> float: | |
| """Get pump HP from top-level, pump feature, or feature scan.""" | |
| # Top-level property takes precedence | |
| if "pump_hp" in top_props and top_props["pump_hp"] is not None: | |
| return float(top_props["pump_hp"]) | |
| # Pump feature property | |
| if "pump_hp" in pump_props and pump_props["pump_hp"] is not None: | |
| return float(pump_props["pump_hp"]) | |
| # Scan all features | |
| hp = gj_io.validate_pump_hp(features) | |
| if hp is not None: | |
| return hp | |
| raise DesignAPIError("No pump_hp found in input. Add 'pump_hp' to top-level properties or pump feature.") | |
| def _resolve_design_type(top_props: Dict, farm_area_m2: float = 0) -> str: | |
| """Get design_type flag from top-level properties. | |
| Returns: | |
| "centralized" or "distributed" | |
| Accepts both new design_type (string) and legacy centralized (bool). | |
| When no explicit flag is provided, derives the default from farm area: | |
| < 1 ha (10,000 mΒ²) β "centralized", β₯ 1 ha β "distributed". | |
| """ | |
| # Check new design_type property first | |
| val = top_props.get("design_type") | |
| if isinstance(val, str): | |
| val_lower = val.lower().strip() | |
| if val_lower in ("centralized", "distributed"): | |
| return val_lower | |
| # Check legacy centralized boolean for backward compatibility | |
| val = top_props.get("centralized") | |
| if isinstance(val, bool): | |
| return "centralized" if val else "distributed" | |
| if isinstance(val, str): | |
| return "centralized" if val.lower() in ("true", "yes", "1", "centralized") else "distributed" | |
| # Derive from farm area | |
| return "centralized" if farm_area_m2 < 10000 else "distributed" | |
| def _resolve_centralized(top_props: Dict, farm_area_m2: float = 0) -> bool: | |
| """Get centralized flag from top-level properties. | |
| When no explicit flag is provided, derives the default from farm area. | |
| This is kept for backward compatibility; prefer _resolve_design_type(). | |
| """ | |
| design_type = _resolve_design_type(top_props, farm_area_m2) | |
| return design_type == "centralized" | |
| def _build_utm_transformers( | |
| reference_polygon: Polygon, | |
| ) -> tuple: | |
| """Compute UTM CRS from a lat/lon polygon and return reusable transformers. | |
| Returns: | |
| (utm_crs_string, transformer_to_utm, transformer_from_utm) | |
| """ | |
| centroid = reference_polygon.centroid | |
| lon, lat = centroid.x, centroid.y | |
| utm_zone = int((lon + 180) / 6) + 1 | |
| is_southern = lat < 0 | |
| utm_crs = f"EPSG:{32700 + utm_zone if is_southern else 32600 + utm_zone}" | |
| to_utm = pyproj.Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True) | |
| from_utm = pyproj.Transformer.from_crs(utm_crs, "EPSG:4326", always_xy=True) | |
| return utm_crs, to_utm, from_utm | |
| def _apply_transform(point: Point, transformer: "pyproj.Transformer") -> Point: | |
| """Transform a Point using a precomputed pyproj Transformer.""" | |
| x, y = transformer.transform(point.x, point.y) | |
| return Point(x, y) | |
| def _apply_polygon_transform( | |
| polygon: Polygon, transformer: "pyproj.Transformer" | |
| ) -> Polygon: | |
| """Transform a Polygon using a precomputed pyproj Transformer.""" | |
| coords = [transformer.transform(x, y) for x, y in polygon.exterior.coords] | |
| return Polygon(coords) | |
| def _apply_linestring_transform( | |
| line: LineString, transformer: "pyproj.Transformer" | |
| ) -> LineString: | |
| """Transform a LineString using a precomputed pyproj Transformer.""" | |
| coords = [transformer.transform(x, y) for x, y in line.coords] | |
| return LineString(coords) | |
| def _polygon_to_geojson(polygon: Polygon) -> Dict: | |
| """Convert Shapely Polygon to GeoJSON Polygon dict.""" | |
| coords = [] | |
| for x, y in polygon.exterior.coords: | |
| coords.append([x, y]) | |
| return {"type": "Polygon", "coordinates": [coords]} | |
| def _linestring_to_geojson(line: LineString) -> Dict: | |
| """Convert Shapely LineString to GeoJSON LineString dict.""" | |
| coords = [] | |
| for x, y in line.coords: | |
| coords.append([x, y]) | |
| return {"type": "LineString", "coordinates": coords} | |
| def _iso_timestamp() -> str: | |
| """Return current ISO timestamp string.""" | |
| from datetime import datetime, timezone | |
| return datetime.now(timezone.utc).isoformat() | |
| def _error_response(message: str, code: str) -> Dict: | |
| """Return a valid GeoJSON FeatureCollection containing error info.""" | |
| return { | |
| "type": "FeatureCollection", | |
| "properties": { | |
| "type": "farm_design_error", | |
| "error": { | |
| "code": code, | |
| "message": message, | |
| }, | |
| }, | |
| "features": [], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Convenience functions | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_from_file(file_path: str) -> str: | |
| """Read GeoJSON from file, run pipeline, return JSON string.""" | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| geojson_str = f.read() | |
| result = process_farm_design(geojson_str) | |
| return json.dumps(result, indent=2) | |
| def process_from_string(geojson_str: str) -> str: | |
| """Run pipeline on GeoJSON string, return JSON string.""" | |
| result = process_farm_design(geojson_str) | |
| return json.dumps(result, indent=2) | |