agroenv / server /simulation /soil_model.py
PranovRaghavendhra's picture
Initial commit: AgroEnv Precision Agriculture Advisor
3bf3009
"""
Soil Water Balance Model
========================
Implements the FAO-56 single layer soil water balance.
Tracks root zone depletion, actual vs potential ET, and drainage.
Reference: Allen et al. (1998), FAO-56, Chapter 8.
"""
import json
import os
from dataclasses import dataclass, field
DATA_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "soils.json")
@dataclass
class SoilState:
moisture_pct: float # Current volumetric water content %
depletion_mm: float # Root zone depletion from field capacity
drainage_mm_today: float # Drainage below root zone today
runoff_mm_today: float # Surface runoff today
cumulative_stress_days: int # Days below critical moisture threshold
waterlog_days: int # Days above waterlogging threshold
ks: float # Water stress coefficient (0=full stress, 1=no stress)
raw_mm: float # Readily Available Water in mm
class SoilModel:
"""
FAO-56 single-layer root zone water balance.
Tracks depletion, stress, drainage, and runoff.
"""
def __init__(self, soil_key: str = "loamy_soil", root_depth_m: float = 0.5):
with open(DATA_PATH, "r") as f:
all_soils = json.load(f)
if soil_key not in all_soils:
soil_key = "loamy_soil"
self.soil = all_soils[soil_key]
self.root_depth_m = root_depth_m
# Soil hydraulic properties
self.theta_fc = self.soil["field_capacity_pct"] # Field capacity %
self.theta_wp = self.soil["wilting_point_pct"] # Wilting point %
self.theta_sat = self.soil["saturation_pct"] # Saturation %
self.infiltration_rate = self.soil["infiltration_rate_mm_per_hr"]
self.depth_mm = root_depth_m * 1000.0 # Root depth in mm
# Total available water (TAW) in mm
self.taw_mm = (self.theta_fc - self.theta_wp) / 100.0 * self.depth_mm
# Depletion fraction for stress (p) — typical 0.5 for most crops
self.p_depletion = 0.50
# State
self._moisture_pct: float = self.theta_fc # Start at field capacity
self._depletion_mm: float = 0.0
self._cumulative_stress_days: int = 0
self._waterlog_days: int = 0
def reset(self, initial_moisture_pct: float = None) -> SoilState:
"""Reset soil to initial conditions."""
if initial_moisture_pct is None:
initial_moisture_pct = self.theta_fc
self._moisture_pct = max(self.theta_wp, min(self.theta_sat, initial_moisture_pct))
self._depletion_mm = max(0.0, (self.theta_fc - self._moisture_pct) / 100.0 * self.depth_mm)
self._cumulative_stress_days = 0
self._waterlog_days = 0
return self._build_state()
def update(
self,
et0_mm: float,
kc: float,
rainfall_mm: float,
irrigation_mm: float,
) -> tuple[SoilState, dict]:
"""
Advance soil water balance by one day.
Args:
et0_mm: Reference ET (FAO-56 Penman-Monteith)
kc: Crop coefficient for current growth stage
rainfall_mm: Daily rainfall
irrigation_mm: Irrigation applied today
Returns:
(SoilState, info_dict)
"""
# Potential crop ET
etc_mm = et0_mm * kc
# Water input: rainfall + irrigation
infiltration_mm, runoff_mm = self._compute_infiltration(rainfall_mm, irrigation_mm)
total_input_mm = infiltration_mm
# Compute water stress coefficient (Ks) — FAO-56 Eq. 84
raw_mm = self.p_depletion * self.taw_mm
if self._depletion_mm <= raw_mm:
ks = 1.0 # No stress
elif self._depletion_mm >= self.taw_mm:
ks = 0.0 # Full stress
else:
ks = (self.taw_mm - self._depletion_mm) / ((1 - self.p_depletion) * self.taw_mm)
ks = max(0.0, min(1.0, ks))
# Actual ET (reduced by stress)
actual_et_mm = etc_mm * ks
# Update depletion
self._depletion_mm = self._depletion_mm + actual_et_mm - total_input_mm
self._depletion_mm = max(0.0, min(self.taw_mm * 1.5, self._depletion_mm))
# Compute drainage (excess above field capacity)
drainage_mm = 0.0
current_water_mm = (self.theta_fc / 100.0 * self.depth_mm) - self._depletion_mm
max_water_mm = self.theta_sat / 100.0 * self.depth_mm
if current_water_mm > self.theta_fc / 100.0 * self.depth_mm:
excess = current_water_mm - self.theta_fc / 100.0 * self.depth_mm
drainage_mm = min(excess, self.infiltration_rate * 24 * 0.1)
self._depletion_mm = max(0.0, self._depletion_mm + drainage_mm)
# Update moisture percentage
depletion_pct = self._depletion_mm / self.depth_mm * 100.0
self._moisture_pct = max(self.theta_wp, self.theta_fc - depletion_pct)
self._moisture_pct = min(self.theta_sat, self._moisture_pct)
# Track stress days
critical_moisture = self.soil.get("critical_moisture_min_pct",
self.theta_wp + (self.theta_fc - self.theta_wp) * 0.3)
waterlog_threshold = self.soil.get("waterlog_threshold_pct",
self.theta_fc + (self.theta_sat - self.theta_fc) * 0.8)
if self._moisture_pct < critical_moisture:
self._cumulative_stress_days += 1
if self._moisture_pct > waterlog_threshold:
self._waterlog_days += 1
info = {
"etc_mm": round(etc_mm, 2),
"actual_et_mm": round(actual_et_mm, 2),
"infiltration_mm": round(infiltration_mm, 2),
"runoff_mm": round(runoff_mm, 2),
"drainage_mm": round(drainage_mm, 2),
"ks": round(ks, 3),
}
return self._build_state(drainage_mm, runoff_mm, ks, raw_mm), info
def get_irrigation_recommendation_mm(self, et0_mm: float, kc: float) -> float:
"""
Compute scientifically correct irrigation to refill to field capacity.
This is the 'oracle' value used by the grader to assess agent decisions.
"""
etc_mm = et0_mm * kc
# Irrigate when depletion exceeds RAW threshold
raw_mm = self.p_depletion * self.taw_mm
if self._depletion_mm > raw_mm:
# Refill to field capacity
net_irrigation = self._depletion_mm
# Add 15% for distribution uniformity (standard practice)
gross_irrigation = net_irrigation / 0.85
return round(max(0.0, min(gross_irrigation, 80.0)), 1)
return 0.0
def _compute_infiltration(self, rainfall_mm: float, irrigation_mm: float) -> tuple[float, float]:
"""Simple CN-based runoff estimation."""
total_input = rainfall_mm + irrigation_mm
max_infiltration_per_day = self.infiltration_rate * 24
if total_input <= max_infiltration_per_day:
return total_input, 0.0
else:
runoff = total_input - max_infiltration_per_day
return max_infiltration_per_day, runoff
def _build_state(
self,
drainage_mm: float = 0.0,
runoff_mm: float = 0.0,
ks: float = None,
raw_mm: float = None
) -> SoilState:
if ks is None:
raw_mm = self.p_depletion * self.taw_mm
if self._depletion_mm <= raw_mm:
ks = 1.0
elif self._depletion_mm >= self.taw_mm:
ks = 0.0
else:
ks = (self.taw_mm - self._depletion_mm) / ((1 - self.p_depletion) * self.taw_mm)
ks = max(0.0, min(1.0, ks))
return SoilState(
moisture_pct=round(self._moisture_pct, 2),
depletion_mm=round(self._depletion_mm, 2),
drainage_mm_today=round(drainage_mm, 2),
runoff_mm_today=round(runoff_mm, 2),
cumulative_stress_days=self._cumulative_stress_days,
waterlog_days=self._waterlog_days,
ks=round(ks, 3),
raw_mm=round(raw_mm if raw_mm is not None else self.p_depletion * self.taw_mm, 2),
)