Spaces:
Sleeping
Sleeping
Fix lateral parallelism and uniformity: use farm-level direction for consistent lateral angles
093e740 | """ | |
| Drip Irrigation Layout Engine | |
| Pure geometry module for: | |
| - Parsing farm boundaries (geofences) | |
| - Converting coordinates to real-world units (UTM) | |
| - Generating drip irrigation layouts (mains + parallel laterals) | |
| - Computing material BOMs | |
| """ | |
| from typing import List, Tuple, Dict, Optional | |
| import math | |
| import numpy as np | |
| from shapely.geometry import Polygon, LineString, Point, MultiLineString | |
| import pyproj | |
| from unit_converter import length_to_meters, m2_to_area, area_unit_label, UnitError | |
| from pricing_config import PricingConfig, get_default_pricing_config | |
| # Crop-specific drip parameters (extensible) | |
| CROP_DEFAULTS = { | |
| "tomato": {"lateral_spacing_m": 0.5, "emitter_spacing_m": 0.3, "emitter_discharge_lph": 4}, | |
| "pepper": {"lateral_spacing_m": 0.6, "emitter_spacing_m": 0.3, "emitter_discharge_lph": 4}, | |
| "lettuce": {"lateral_spacing_m": 0.4, "emitter_spacing_m": 0.2, "emitter_discharge_lph": 2}, | |
| "cucumber": {"lateral_spacing_m": 1.0, "emitter_spacing_m": 0.5, "emitter_discharge_lph": 4}, | |
| "orchard": {"lateral_spacing_m": 2.0, "emitter_spacing_m": 1.0, "emitter_discharge_lph": 8}, | |
| "generic": {"lateral_spacing_m": 0.8, "emitter_spacing_m": 0.4, "emitter_discharge_lph": 4}, | |
| } | |
| # Default pricing config (INR). Users can override via rest_api or config file. | |
| _DEFAULT_PRICING = get_default_pricing_config() | |
| class DripLayoutError(Exception): | |
| """Custom exception for drip layout errors.""" | |
| pass | |
| def parse_geofence_to_polygon( | |
| geofence_input: str, | |
| input_type: str = "pixel", | |
| coord_unit: str = "m", | |
| ) -> Polygon: | |
| """ | |
| Parse geofence text input into a Shapely Polygon (internal coords in meters). | |
| Args: | |
| geofence_input: String like "100,100;200,100;200,200;100,200" | |
| input_type: "pixel" (image coords) or "latlon" (geographic) | |
| coord_unit: Length unit of the input coordinates (default "m"). | |
| Any key from unit_converter.supported_length_units(). | |
| E.g. "ft", "yd", "chain". Coordinates are converted | |
| to meters before building the Polygon. | |
| Returns: | |
| Shapely Polygon with coordinates in meters. | |
| Raises: | |
| DripLayoutError: If parsing fails or polygon is invalid | |
| """ | |
| try: | |
| points = [] | |
| for pair in geofence_input.strip().split(";"): | |
| pair = pair.strip() | |
| x, y = pair.split(",") | |
| try: | |
| x_m = length_to_meters(float(x), coord_unit) | |
| y_m = length_to_meters(float(y), coord_unit) | |
| except UnitError as e: | |
| raise DripLayoutError(str(e)) from e | |
| points.append((x_m, y_m)) | |
| if len(points) < 3: | |
| raise DripLayoutError(f"Polygon must have ≥3 points, got {len(points)}") | |
| polygon = Polygon(points) | |
| # Validate | |
| if not polygon.is_valid: | |
| raise DripLayoutError(f"Invalid polygon: {polygon.geom_type}") | |
| if polygon.area == 0: | |
| raise DripLayoutError("Polygon has zero area") | |
| return polygon | |
| except ValueError as e: | |
| raise DripLayoutError( | |
| f"Parse error. Use format: x,y;x,y;x,y (e.g., 50,50;150,50;150,150)" | |
| ) from e | |
| def validate_polygon(polygon: Polygon) -> Tuple[bool, str]: | |
| """ | |
| Validate a polygon for drip design. | |
| Args: | |
| polygon: Shapely Polygon | |
| Returns: | |
| (is_valid, error_message) | |
| """ | |
| if not polygon.is_valid: | |
| return False, f"Self-intersecting or degenerate polygon" | |
| if polygon.area < 1: # Arbitrary minimum: 1 sq unit | |
| return False, f"Polygon area too small: {polygon.area:.2f}" | |
| if len(polygon.exterior.coords) < 4: # 3 points + closing point | |
| return False, f"Polygon must have ≥3 vertices" | |
| return True, "Valid" | |
| def pixel_to_utm( | |
| polygon_px: Polygon, | |
| image_width_px: float, | |
| image_height_px: float, | |
| center_lat: float, | |
| center_lon: float, | |
| zoom_level: int, | |
| ) -> Polygon: | |
| """ | |
| Convert pixel coordinates to UTM (meters). | |
| Assumes the image is a Web Mercator map tile centered at (center_lat, center_lon). | |
| Args: | |
| polygon_px: Shapely Polygon in pixel coords | |
| image_width_px: Image width in pixels | |
| image_height_px: Image height in pixels | |
| center_lat: Center latitude of map | |
| center_lon: Center longitude of map | |
| zoom_level: Web Mercator zoom level (0-20) | |
| Returns: | |
| Shapely Polygon in UTM (meters) | |
| """ | |
| # Web Mercator meters per pixel at zoom level | |
| earth_radius_m = 6378137.0 | |
| meters_per_pixel = ( | |
| 2 * math.pi * earth_radius_m * math.cos(math.radians(center_lat)) | |
| ) / (256 * (2 ** zoom_level)) | |
| # Convert center lat/lon to UTM | |
| transformer = pyproj.Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True) | |
| center_x_m, center_y_m = transformer.transform(center_lon, center_lat) | |
| # Pixel center | |
| px_center_x = image_width_px / 2 | |
| px_center_y = image_height_px / 2 | |
| # Convert polygon points | |
| utm_coords = [] | |
| for px_x, px_y in polygon_px.exterior.coords[:-1]: # Skip closing point | |
| # Offset from center in pixels | |
| dx_px = px_x - px_center_x | |
| dy_px = -(px_y - px_center_y) # Invert Y (image is top-down) | |
| # Convert to meters | |
| utm_x = center_x_m + dx_px * meters_per_pixel | |
| utm_y = center_y_m + dy_px * meters_per_pixel | |
| utm_coords.append((utm_x, utm_y)) | |
| return Polygon(utm_coords) | |
| def latlon_to_utm(polygon_latlon: Polygon, crs: str = "EPSG:4326") -> Polygon: | |
| """ | |
| Convert lat/lon polygon to local UTM zone. | |
| Args: | |
| polygon_latlon: Shapely Polygon in lat/lon (EPSG:4326) | |
| crs: Input CRS (default: WGS84 lat/lon) | |
| Returns: | |
| Shapely Polygon in UTM (meters), auto-selected zone | |
| """ | |
| # Get polygon centroid to determine UTM zone | |
| centroid = polygon_latlon.centroid | |
| lon, lat = centroid.x, centroid.y | |
| # Determine UTM zone | |
| utm_zone = int((lon + 180) / 6) + 1 | |
| is_southern = lat < 0 | |
| utm_crs = f"EPSG:{32700 + utm_zone if is_southern else 32600 + utm_zone}" | |
| # Transform | |
| transformer = pyproj.Transformer.from_crs(crs, utm_crs, always_xy=True) | |
| utm_coords = [ | |
| transformer.transform(x, y) for x, y in polygon_latlon.exterior.coords | |
| ] | |
| return Polygon(utm_coords) | |
| def polygon_area_hectares(polygon_utm: Polygon) -> float: | |
| """ | |
| Calculate polygon area in hectares (from UTM polygon in meters). | |
| Args: | |
| polygon_utm: Shapely Polygon in UTM coordinates (meters) | |
| Returns: | |
| Area in hectares | |
| """ | |
| return polygon_utm.area / 10000 # 1 hectare = 10,000 m² | |
| def compute_farm_axis( | |
| farm_polygon: Polygon, | |
| ) -> Tuple[Tuple[float, float], Tuple[float, float]]: | |
| """ | |
| Compute global farm axis from the farm polygon's minimum rotated rectangle. | |
| Returns normalized direction vectors for the main axis and perpendicular | |
| lateral axis. This ensures all zones in the farm use consistent orientation. | |
| Args: | |
| farm_polygon: Shapely Polygon in UTM (meters) | |
| Returns: | |
| (main_direction, lateral_direction) where each is a tuple (dx, dy). | |
| - main_direction: normalized vector along the farm's long axis | |
| - lateral_direction: normalized vector perpendicular to main (90° rotation) | |
| """ | |
| # Get minimum rotated rectangle (oriented bounding box) | |
| min_rect = farm_polygon.minimum_rotated_rectangle | |
| rect_coords = list(min_rect.exterior.coords)[:-1] # Skip closing point | |
| if len(rect_coords) < 2: | |
| # Degenerate case; default to cardinal directions | |
| return (1.0, 0.0), (0.0, 1.0) | |
| # Find the two longest sides of the rectangle | |
| # Rectangle has 4 sides; compute their lengths | |
| side_lengths = [] | |
| for i in range(4): | |
| p1 = rect_coords[i] | |
| p2 = rect_coords[(i + 1) % 4] | |
| length = math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2) | |
| side_lengths.append((length, i)) | |
| # Sort by length; the longest side is the main axis | |
| side_lengths.sort(reverse=True) | |
| main_side_idx = side_lengths[0][1] | |
| # Get the two endpoints of the main side | |
| p1 = rect_coords[main_side_idx] | |
| p2 = rect_coords[(main_side_idx + 1) % 4] | |
| # Compute main direction vector | |
| main_vec = (p2[0] - p1[0], p2[1] - p1[1]) | |
| main_len = math.sqrt(main_vec[0] ** 2 + main_vec[1] ** 2) | |
| if main_len == 0: | |
| # Fallback to default | |
| return (1.0, 0.0), (0.0, 1.0) | |
| # Normalize | |
| main_direction = (main_vec[0] / main_len, main_vec[1] / main_len) | |
| # Lateral direction is perpendicular (rotate 90 degrees counterclockwise) | |
| lateral_direction = (-main_direction[1], main_direction[0]) | |
| return main_direction, lateral_direction | |
| def generate_drip_layout( | |
| polygon_utm: Polygon, | |
| crop: str = "generic", | |
| headland_buffer_m: float = 1.0, | |
| main_line_edge: str = "longest", | |
| override_spacing_m: Optional[float] = None, | |
| override_discharge_lph: Optional[float] = None, | |
| main_direction: Optional[Tuple[float, float]] = None, | |
| ) -> Dict: | |
| """ | |
| Generate drip irrigation layout with mains + parallel laterals. | |
| Args: | |
| polygon_utm: Shapely Polygon in UTM (meters) | |
| crop: Crop type (key in CROP_DEFAULTS) | |
| headland_buffer_m: Inward buffer for headland | |
| main_line_edge: "longest" or "shortest" polygon edge | |
| override_spacing_m: Override lateral spacing (meters) | |
| override_discharge_lph: Override emitter discharge (L/h) | |
| main_direction: Optional normalized direction vector (dx, dy) for the main line. | |
| If provided, overrides per-zone longest edge selection. | |
| Returns: | |
| Dict with: | |
| - farm_area_ha: Area in hectares | |
| - main_line: LineString for main line | |
| - laterals: List of LineString objects | |
| - total_lateral_length_m: Sum of all lateral lengths | |
| - total_main_length_m: Main line length | |
| - total_drip_tape_m: Lateral length (drip tape) | |
| - emitter_count: Estimated number of emitters | |
| - design_params: Dict with spacing, discharge, etc. | |
| """ | |
| # Get crop defaults | |
| if crop not in CROP_DEFAULTS: | |
| crop = "generic" | |
| params = CROP_DEFAULTS[crop].copy() | |
| if override_spacing_m: | |
| params["lateral_spacing_m"] = override_spacing_m | |
| if override_discharge_lph: | |
| params["emitter_discharge_lph"] = override_discharge_lph | |
| # Apply headland buffer | |
| buffered_polygon = polygon_utm.buffer(-headland_buffer_m, resolution=8) | |
| if not isinstance(buffered_polygon, Polygon): | |
| # Buffer can return empty or multi-polygon if it's too aggressive | |
| raise DripLayoutError( | |
| f"Headland buffer {headland_buffer_m}m too large for this field" | |
| ) | |
| # Get exterior boundary of buffered polygon | |
| boundary = buffered_polygon.exterior | |
| # Determine main line | |
| if main_direction is not None: | |
| # Use provided farm axis direction | |
| # Find the two points on the boundary that are furthest apart along main_direction | |
| coords = list(boundary.coords) | |
| if len(coords) < 2: | |
| raise DripLayoutError("Boundary has insufficient points") | |
| # Project each point onto the main direction vector | |
| projections = [] | |
| for coord in coords: | |
| proj = coord[0] * main_direction[0] + coord[1] * main_direction[1] | |
| projections.append(proj) | |
| # Find start and end points (extremes along the main direction) | |
| min_idx = projections.index(min(projections)) | |
| max_idx = projections.index(max(projections)) | |
| main_start = coords[min_idx] | |
| main_end = coords[max_idx] | |
| main_line = LineString([main_start, main_end]) | |
| else: | |
| # Use longest or shortest edge (original logic) | |
| coords = list(boundary.coords) | |
| edges = [(coords[i], coords[i + 1]) for i in range(len(coords) - 1)] | |
| edge_lengths = [ | |
| math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2) for p1, p2 in edges | |
| ] | |
| if not edge_lengths: | |
| raise DripLayoutError("Field has no valid edges after headland buffer") | |
| if main_line_edge == "longest": | |
| main_idx = edge_lengths.index(max(edge_lengths)) | |
| else: # "shortest" | |
| main_idx = edge_lengths.index(min(edge_lengths)) | |
| main_start, main_end = edges[main_idx] | |
| main_line = LineString([main_start, main_end]) | |
| # Generate parallel laterals perpendicular to main line | |
| spacing = params["lateral_spacing_m"] | |
| laterals = _generate_parallel_laterals( | |
| main_line, buffered_polygon, spacing, main_direction | |
| ) | |
| # Clip laterals to polygon | |
| clipped_laterals = [] | |
| min_lateral_length_m = 1.0 # Filter out degenerate laterals < 1.0m to improve uniformity | |
| for lateral in laterals: | |
| clipped = lateral.intersection(buffered_polygon) | |
| if not clipped.is_empty: | |
| if isinstance(clipped, LineString): | |
| # Only keep laterals with meaningful length | |
| if clipped.length >= min_lateral_length_m: | |
| clipped_laterals.append(clipped) | |
| elif hasattr(clipped, 'geoms'): | |
| # Handle MultiLineString or other multi-geometry types | |
| for geom in clipped.geoms: | |
| if isinstance(geom, LineString) and geom.length >= min_lateral_length_m: | |
| clipped_laterals.append(geom) | |
| # Calculate total lengths | |
| total_lateral_m = sum(lat.length for lat in clipped_laterals) | |
| main_length_m = main_line.length | |
| # Estimate emitters | |
| emitter_count = int( | |
| total_lateral_m / params["emitter_spacing_m"] | |
| ) # One per spacing | |
| return { | |
| "farm_area_ha": polygon_area_hectares(buffered_polygon), | |
| "main_line": main_line, | |
| "laterals": clipped_laterals, | |
| "total_lateral_length_m": total_lateral_m, | |
| "total_main_length_m": main_length_m, | |
| "total_drip_tape_m": total_lateral_m, | |
| "emitter_count": emitter_count, | |
| "design_params": params, | |
| "crop": crop, | |
| "headland_m": headland_buffer_m, | |
| } | |
| def _generate_parallel_laterals( | |
| main_line: LineString, polygon: Polygon, spacing: float, | |
| main_direction: Optional[Tuple[float, float]] = None | |
| ) -> List[LineString]: | |
| """ | |
| Generate parallel lines perpendicular to the main line, spaced at intervals. | |
| Args: | |
| main_line: LineString representing the main irrigation line | |
| polygon: Bounding polygon | |
| spacing: Distance between parallel lines (meters) | |
| main_direction: Optional farm-level direction for consistency across zones. | |
| If provided, use this for lateral direction to ensure | |
| parallelism across the entire farm. Otherwise, derive from main_line. | |
| Returns: | |
| List of LineString objects (before clipping to polygon) | |
| """ | |
| # Get main line start/end | |
| start = Point(main_line.coords[0]) | |
| end = Point(main_line.coords[-1]) | |
| main_vec = (end.x - start.x, end.y - start.y) | |
| main_len = math.sqrt(main_vec[0] ** 2 + main_vec[1] ** 2) | |
| if main_len == 0: | |
| raise DripLayoutError("Main line has zero length") | |
| # Use farm-level main_direction if provided for consistency | |
| if main_direction is not None: | |
| main_dir = main_direction | |
| else: | |
| # Otherwise derive from this zone's main line | |
| main_dir = (main_vec[0] / main_len, main_vec[1] / main_len) | |
| # Perpendicular direction (rotate 90 degrees, consistent across farm) | |
| perp_dir = (-main_dir[1], main_dir[0]) | |
| # Get bounding box | |
| minx, miny, maxx, maxy = polygon.bounds | |
| bbox_diag = math.sqrt((maxx - minx) ** 2 + (maxy - miny) ** 2) | |
| # Generate laterals perpendicular to main, spaced at intervals along its length | |
| laterals = [] | |
| num_laterals = int(main_len / spacing) + 1 | |
| for i in range(num_laterals): | |
| # Point along main line | |
| t = i * spacing if num_laterals == 1 else i / (num_laterals - 1) * main_len | |
| t = max(0, min(t, main_len)) | |
| point_on_main = ( | |
| start.x + (t / main_len) * main_vec[0], | |
| start.y + (t / main_len) * main_vec[1], | |
| ) | |
| # Draw perpendicular line (extend far enough to cross polygon) | |
| p1 = ( | |
| point_on_main[0] - perp_dir[0] * bbox_diag, | |
| point_on_main[1] - perp_dir[1] * bbox_diag, | |
| ) | |
| p2 = ( | |
| point_on_main[0] + perp_dir[0] * bbox_diag, | |
| point_on_main[1] + perp_dir[1] * bbox_diag, | |
| ) | |
| laterals.append(LineString([p1, p2])) | |
| return laterals | |
| def estimate_bom( | |
| design: Dict, pricing: Optional[PricingConfig] = None, unit: str = "inr" | |
| ) -> Dict: | |
| """ | |
| Estimate bill of materials (cost, quantities) for the design. | |
| Args: | |
| design: Output dict from generate_drip_layout() | |
| pricing: PricingConfig object; uses default INR if not provided | |
| unit: "inr", "usd", or "metric" (m, count, etc.) | |
| Returns: | |
| Dict with quantities, costs, and currency symbol | |
| """ | |
| if pricing is None: | |
| pricing = _DEFAULT_PRICING | |
| main_m = design["total_main_length_m"] | |
| drip_m = design["total_drip_tape_m"] | |
| emitters = design["emitter_count"] | |
| num_valves = 1 # Default; can be overridden by caller if needed | |
| # Apply waste factor to quantities | |
| main_m_with_waste = main_m * pricing.waste_factor | |
| drip_m_with_waste = drip_m * pricing.waste_factor | |
| bom = { | |
| "currency": pricing.currency, | |
| "main_line_16mm_m": round(main_m, 2), | |
| "main_line_16mm_m_with_waste": round(main_m_with_waste, 2), | |
| "drip_tape_16mm_m": round(drip_m, 2), | |
| "drip_tape_16mm_m_with_waste": round(drip_m_with_waste, 2), | |
| "inline_emitters": emitters, | |
| "total_pipe_m": round(main_m + drip_m, 2), | |
| "waste_factor": pricing.waste_factor, | |
| } | |
| if unit.lower() in ["inr", "usd", "cost"]: | |
| # Calculate costs using configured pricing | |
| cost_main = main_m_with_waste * pricing.get_price("main_line") | |
| cost_drip = drip_m_with_waste * pricing.get_price("drip_tape") | |
| cost_emitters = emitters * pricing.get_price("emitter") | |
| cost_valves = num_valves * pricing.get_price("valve") | |
| bom["cost_main"] = round(cost_main, 2) | |
| bom["cost_drip_tape"] = round(cost_drip, 2) | |
| bom["cost_emitters"] = round(cost_emitters, 2) | |
| bom["cost_valves"] = round(cost_valves, 2) | |
| bom["total_cost"] = round( | |
| cost_main + cost_drip + cost_emitters + cost_valves, 2 | |
| ) | |
| # For backwards compatibility, also add currency-specific key | |
| currency_key = f"total_cost_{pricing.currency.lower()}" | |
| bom[currency_key] = bom["total_cost"] | |
| return bom | |
| def design_summary(design: Dict, bom: Dict, area_unit: str = "ha") -> str: | |
| """ | |
| Generate human-readable summary of the design. | |
| Args: | |
| design: Output dict from generate_drip_layout() | |
| bom: Output dict from estimate_bom() | |
| area_unit: Unit to display farm area in (default "ha"). | |
| Any key from unit_converter.supported_area_units(). | |
| Returns: | |
| Formatted string summary | |
| """ | |
| params = design["design_params"] | |
| # Convert internal hectares → requested display unit | |
| area_m2 = design["farm_area_ha"] * 10_000 | |
| try: | |
| area_disp = m2_to_area(area_m2, area_unit) | |
| unit_label = area_unit_label(area_unit) | |
| except UnitError: | |
| area_disp = design["farm_area_ha"] | |
| unit_label = "ha" | |
| summary = f""" | |
| === Drip Irrigation Design Summary === | |
| Farm Area: {area_disp:.4f} {unit_label} | |
| Crop: {design['crop'].title()} | |
| Headland Buffer: {design['headland_m']} m | |
| Layout: | |
| Main Line Length: {design['total_main_length_m']:.1f} m | |
| Total Laterals: {design['total_drip_tape_m']:.1f} m | |
| Emitter Count: {design['emitter_count']} | |
| Design Parameters: | |
| Lateral Spacing: {params['lateral_spacing_m']} m | |
| Emitter Spacing: {params['emitter_spacing_m']} m | |
| Emitter Discharge: {params['emitter_discharge_lph']} L/h | |
| Bill of Materials: | |
| Main Pipe (16mm): {bom['main_line_16mm_m']:.1f} m (${bom.get('cost_main', 'N/A')}) | |
| Drip Tape (16mm): {bom['drip_tape_16mm_m']:.1f} m (${bom.get('cost_drip_tape', 'N/A')}) | |
| Inline Emitters: {bom['inline_emitters']} pcs (${bom.get('cost_emitters', 'N/A')}) | |
| Total Cost: ${bom.get('total_cost_usd', 'N/A')} | |
| """ | |
| return summary.strip() | |