Spaces:
Sleeping
Sleeping
| """ | |
| Soil Water Balance Model | |
| ======================== | |
| Implements the FAO-56 single layer soil water balance. | |
| Tracks root zone depletion, actual vs potential ET, and drainage. | |
| Reference: Allen et al. (1998), FAO-56, Chapter 8. | |
| """ | |
| import json | |
| import os | |
| from dataclasses import dataclass, field | |
| DATA_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "soils.json") | |
| class SoilState: | |
| moisture_pct: float # Current volumetric water content % | |
| depletion_mm: float # Root zone depletion from field capacity | |
| drainage_mm_today: float # Drainage below root zone today | |
| runoff_mm_today: float # Surface runoff today | |
| cumulative_stress_days: int # Days below critical moisture threshold | |
| waterlog_days: int # Days above waterlogging threshold | |
| ks: float # Water stress coefficient (0=full stress, 1=no stress) | |
| raw_mm: float # Readily Available Water in mm | |
| class SoilModel: | |
| """ | |
| FAO-56 single-layer root zone water balance. | |
| Tracks depletion, stress, drainage, and runoff. | |
| """ | |
| def __init__(self, soil_key: str = "loamy_soil", root_depth_m: float = 0.5): | |
| with open(DATA_PATH, "r") as f: | |
| all_soils = json.load(f) | |
| if soil_key not in all_soils: | |
| soil_key = "loamy_soil" | |
| self.soil = all_soils[soil_key] | |
| self.root_depth_m = root_depth_m | |
| # Soil hydraulic properties | |
| self.theta_fc = self.soil["field_capacity_pct"] # Field capacity % | |
| self.theta_wp = self.soil["wilting_point_pct"] # Wilting point % | |
| self.theta_sat = self.soil["saturation_pct"] # Saturation % | |
| self.infiltration_rate = self.soil["infiltration_rate_mm_per_hr"] | |
| self.depth_mm = root_depth_m * 1000.0 # Root depth in mm | |
| # Total available water (TAW) in mm | |
| self.taw_mm = (self.theta_fc - self.theta_wp) / 100.0 * self.depth_mm | |
| # Depletion fraction for stress (p) — typical 0.5 for most crops | |
| self.p_depletion = 0.50 | |
| # State | |
| self._moisture_pct: float = self.theta_fc # Start at field capacity | |
| self._depletion_mm: float = 0.0 | |
| self._cumulative_stress_days: int = 0 | |
| self._waterlog_days: int = 0 | |
| def reset(self, initial_moisture_pct: float = None) -> SoilState: | |
| """Reset soil to initial conditions.""" | |
| if initial_moisture_pct is None: | |
| initial_moisture_pct = self.theta_fc | |
| self._moisture_pct = max(self.theta_wp, min(self.theta_sat, initial_moisture_pct)) | |
| self._depletion_mm = max(0.0, (self.theta_fc - self._moisture_pct) / 100.0 * self.depth_mm) | |
| self._cumulative_stress_days = 0 | |
| self._waterlog_days = 0 | |
| return self._build_state() | |
| def update( | |
| self, | |
| et0_mm: float, | |
| kc: float, | |
| rainfall_mm: float, | |
| irrigation_mm: float, | |
| ) -> tuple[SoilState, dict]: | |
| """ | |
| Advance soil water balance by one day. | |
| Args: | |
| et0_mm: Reference ET (FAO-56 Penman-Monteith) | |
| kc: Crop coefficient for current growth stage | |
| rainfall_mm: Daily rainfall | |
| irrigation_mm: Irrigation applied today | |
| Returns: | |
| (SoilState, info_dict) | |
| """ | |
| # Potential crop ET | |
| etc_mm = et0_mm * kc | |
| # Water input: rainfall + irrigation | |
| infiltration_mm, runoff_mm = self._compute_infiltration(rainfall_mm, irrigation_mm) | |
| total_input_mm = infiltration_mm | |
| # Compute water stress coefficient (Ks) — FAO-56 Eq. 84 | |
| raw_mm = self.p_depletion * self.taw_mm | |
| if self._depletion_mm <= raw_mm: | |
| ks = 1.0 # No stress | |
| elif self._depletion_mm >= self.taw_mm: | |
| ks = 0.0 # Full stress | |
| else: | |
| ks = (self.taw_mm - self._depletion_mm) / ((1 - self.p_depletion) * self.taw_mm) | |
| ks = max(0.0, min(1.0, ks)) | |
| # Actual ET (reduced by stress) | |
| actual_et_mm = etc_mm * ks | |
| # Update depletion | |
| self._depletion_mm = self._depletion_mm + actual_et_mm - total_input_mm | |
| self._depletion_mm = max(0.0, min(self.taw_mm * 1.5, self._depletion_mm)) | |
| # Compute drainage (excess above field capacity) | |
| drainage_mm = 0.0 | |
| current_water_mm = (self.theta_fc / 100.0 * self.depth_mm) - self._depletion_mm | |
| max_water_mm = self.theta_sat / 100.0 * self.depth_mm | |
| if current_water_mm > self.theta_fc / 100.0 * self.depth_mm: | |
| excess = current_water_mm - self.theta_fc / 100.0 * self.depth_mm | |
| drainage_mm = min(excess, self.infiltration_rate * 24 * 0.1) | |
| self._depletion_mm = max(0.0, self._depletion_mm + drainage_mm) | |
| # Update moisture percentage | |
| depletion_pct = self._depletion_mm / self.depth_mm * 100.0 | |
| self._moisture_pct = max(self.theta_wp, self.theta_fc - depletion_pct) | |
| self._moisture_pct = min(self.theta_sat, self._moisture_pct) | |
| # Track stress days | |
| critical_moisture = self.soil.get("critical_moisture_min_pct", | |
| self.theta_wp + (self.theta_fc - self.theta_wp) * 0.3) | |
| waterlog_threshold = self.soil.get("waterlog_threshold_pct", | |
| self.theta_fc + (self.theta_sat - self.theta_fc) * 0.8) | |
| if self._moisture_pct < critical_moisture: | |
| self._cumulative_stress_days += 1 | |
| if self._moisture_pct > waterlog_threshold: | |
| self._waterlog_days += 1 | |
| info = { | |
| "etc_mm": round(etc_mm, 2), | |
| "actual_et_mm": round(actual_et_mm, 2), | |
| "infiltration_mm": round(infiltration_mm, 2), | |
| "runoff_mm": round(runoff_mm, 2), | |
| "drainage_mm": round(drainage_mm, 2), | |
| "ks": round(ks, 3), | |
| } | |
| return self._build_state(drainage_mm, runoff_mm, ks, raw_mm), info | |
| def get_irrigation_recommendation_mm(self, et0_mm: float, kc: float) -> float: | |
| """ | |
| Compute scientifically correct irrigation to refill to field capacity. | |
| This is the 'oracle' value used by the grader to assess agent decisions. | |
| """ | |
| etc_mm = et0_mm * kc | |
| # Irrigate when depletion exceeds RAW threshold | |
| raw_mm = self.p_depletion * self.taw_mm | |
| if self._depletion_mm > raw_mm: | |
| # Refill to field capacity | |
| net_irrigation = self._depletion_mm | |
| # Add 15% for distribution uniformity (standard practice) | |
| gross_irrigation = net_irrigation / 0.85 | |
| return round(max(0.0, min(gross_irrigation, 80.0)), 1) | |
| return 0.0 | |
| def _compute_infiltration(self, rainfall_mm: float, irrigation_mm: float) -> tuple[float, float]: | |
| """Simple CN-based runoff estimation.""" | |
| total_input = rainfall_mm + irrigation_mm | |
| max_infiltration_per_day = self.infiltration_rate * 24 | |
| if total_input <= max_infiltration_per_day: | |
| return total_input, 0.0 | |
| else: | |
| runoff = total_input - max_infiltration_per_day | |
| return max_infiltration_per_day, runoff | |
| def _build_state( | |
| self, | |
| drainage_mm: float = 0.0, | |
| runoff_mm: float = 0.0, | |
| ks: float = None, | |
| raw_mm: float = None | |
| ) -> SoilState: | |
| if ks is None: | |
| raw_mm = self.p_depletion * self.taw_mm | |
| if self._depletion_mm <= raw_mm: | |
| ks = 1.0 | |
| elif self._depletion_mm >= self.taw_mm: | |
| ks = 0.0 | |
| else: | |
| ks = (self.taw_mm - self._depletion_mm) / ((1 - self.p_depletion) * self.taw_mm) | |
| ks = max(0.0, min(1.0, ks)) | |
| return SoilState( | |
| moisture_pct=round(self._moisture_pct, 2), | |
| depletion_mm=round(self._depletion_mm, 2), | |
| drainage_mm_today=round(drainage_mm, 2), | |
| runoff_mm_today=round(runoff_mm, 2), | |
| cumulative_stress_days=self._cumulative_stress_days, | |
| waterlog_days=self._waterlog_days, | |
| ks=round(ks, 3), | |
| raw_mm=round(raw_mm if raw_mm is not None else self.p_depletion * self.taw_mm, 2), | |
| ) | |