File size: 7,672 Bytes
938949f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | """
SpectralAggregator: batch preprocessing of CWSI, NDVI, and PRI indices.
Consumes raw sensor columns (from ThingsBoard or Seymour CSVs) and produces
cleaned, gap-filled spectral indices ready for the 15-min control loop.
Design: stateless functions, not a service. The control loop calls
``aggregate_spectral()`` each slot with raw sensor readings; the function
returns validated indices with quality flags.
Sensor sources
--------------
- NDVI / PRI: Air1 reference station (``Air1_NDVI_ref``, ``Air1_PRI_ref``)
and per-panel Crop devices (ThingsBoard).
- CWSI: computed from air–leaf temperature delta (proxy) or explicit
ThingsBoard telemetry if available.
- rNDVI / RENDVI: optional red-edge indices from Air1.
Physical bounds (Sde Boker, Semillon grapevine)
------------------------------------------------
- NDVI: [0.1, 0.95] — bare soil ~0.1, healthy canopy 0.7–0.9
- PRI: [-0.2, 0.1] — stressed < -0.05, unstressed > 0.0
- CWSI: [0.0, 1.0] — well-watered 0.0, severe stress 1.0
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
from src.utils import cwsi_from_delta_t
# ---------------------------------------------------------------------------
# Physical plausibility bounds
# ---------------------------------------------------------------------------
_BOUNDS = {
"ndvi": (0.1, 0.95),
"pri": (-0.2, 0.1),
"cwsi": (0.0, 1.0),
"rndvi": (0.1, 0.95),
"rendvi": (0.1, 0.95),
}
# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------
@dataclass
class SpectralResult:
"""Validated spectral indices for a single timestep."""
ndvi: Optional[float] = None
pri: Optional[float] = None
cwsi: float = 0.0
rndvi: Optional[float] = None
rendvi: Optional[float] = None
quality_flags: list[str] = field(default_factory=list)
@property
def is_stressed(self) -> bool:
"""Quick stress check: CWSI ≥ 0.4 indicates meaningful water stress."""
return self.cwsi >= 0.4
@property
def pri_stress(self) -> bool:
"""PRI below -0.05 indicates photosynthetic down-regulation."""
return self.pri is not None and self.pri < -0.05
# ---------------------------------------------------------------------------
# Core aggregation function
# ---------------------------------------------------------------------------
def aggregate_spectral(
*,
ndvi: Optional[float] = None,
pri: Optional[float] = None,
air_temp_c: Optional[float] = None,
leaf_temp_c: Optional[float] = None,
cwsi_explicit: Optional[float] = None,
vpd_kpa: Optional[float] = None,
rndvi: Optional[float] = None,
rendvi: Optional[float] = None,
) -> SpectralResult:
"""Validate and aggregate spectral indices for one timestep.
Parameters
----------
ndvi : float, optional
Raw NDVI reading (Air1 or Crop device).
pri : float, optional
Raw PRI reading.
air_temp_c : float, optional
Air temperature (°C) — for CWSI proxy calculation.
leaf_temp_c : float, optional
Leaf temperature (°C) — for CWSI proxy calculation.
cwsi_explicit : float, optional
Direct CWSI measurement from ThingsBoard (overrides proxy).
vpd_kpa : float, optional
Vapour pressure deficit — secondary stress indicator.
rndvi : float, optional
Red-edge NDVI.
rendvi : float, optional
Red-edge NDVI (alternative band).
Returns
-------
SpectralResult
Validated indices with quality flags.
"""
flags: list[str] = []
# --- NDVI ---
clean_ndvi = _clip_or_flag(ndvi, "ndvi", flags)
# --- PRI ---
clean_pri = _clip_or_flag(pri, "pri", flags)
# --- CWSI ---
if cwsi_explicit is not None:
clean_cwsi = _clip_value(cwsi_explicit, *_BOUNDS["cwsi"])
if cwsi_explicit != clean_cwsi:
flags.append("cwsi_clipped")
elif leaf_temp_c is not None and air_temp_c is not None:
clean_cwsi = cwsi_from_delta_t(leaf_temp_c, air_temp_c)
flags.append("cwsi_from_delta_t")
elif vpd_kpa is not None:
# Last-resort VPD-based proxy: high VPD → likely stress
# VPD 1-2 kPa normal, >3 kPa high stress in Negev
raw_cwsi = _clip_value((vpd_kpa - 1.0) / 4.0, 0.0, 1.0)
clean_cwsi = raw_cwsi
flags.append("cwsi_from_vpd")
else:
clean_cwsi = 0.0
flags.append("cwsi_missing")
# --- Optional red-edge indices ---
clean_rndvi = _clip_or_flag(rndvi, "rndvi", flags)
clean_rendvi = _clip_or_flag(rendvi, "rendvi", flags)
return SpectralResult(
ndvi=clean_ndvi,
pri=clean_pri,
cwsi=clean_cwsi,
rndvi=clean_rndvi,
rendvi=clean_rendvi,
quality_flags=flags,
)
# ---------------------------------------------------------------------------
# Batch processing for DataFrames
# ---------------------------------------------------------------------------
def aggregate_spectral_df(
df: pd.DataFrame,
*,
ndvi_col: str = "Air1_NDVI_ref",
pri_col: str = "Air1_PRI_ref",
air_temp_col: str = "Air1_airTemperature_ref",
leaf_temp_col: str = "Air1_leafTemperature_ref",
vpd_col: str = "Air1_VPD_ref",
cwsi_col: Optional[str] = None,
rndvi_col: str = "Air1_rNDVI_ref",
rendvi_col: str = "Air1_RENDVI_ref",
) -> pd.DataFrame:
"""Process a DataFrame of raw sensor data into cleaned spectral indices.
Returns a DataFrame with columns: ndvi, pri, cwsi, rndvi, rendvi, quality_flags.
Index is aligned to the input DataFrame.
"""
records = []
for _, row in df.iterrows():
result = aggregate_spectral(
ndvi=_safe_float(row, ndvi_col),
pri=_safe_float(row, pri_col),
air_temp_c=_safe_float(row, air_temp_col),
leaf_temp_c=_safe_float(row, leaf_temp_col),
cwsi_explicit=_safe_float(row, cwsi_col) if cwsi_col else None,
vpd_kpa=_safe_float(row, vpd_col),
rndvi=_safe_float(row, rndvi_col),
rendvi=_safe_float(row, rendvi_col),
)
records.append({
"ndvi": result.ndvi,
"pri": result.pri,
"cwsi": result.cwsi,
"rndvi": result.rndvi,
"rendvi": result.rendvi,
"is_stressed": result.is_stressed,
"pri_stress": result.pri_stress,
"quality_flags": ",".join(result.quality_flags),
})
return pd.DataFrame(records, index=df.index)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _clip_value(val: float, lo: float, hi: float) -> float:
return max(lo, min(hi, val))
def _clip_or_flag(
val: Optional[float],
name: str,
flags: list[str],
) -> Optional[float]:
"""Clip value to physical bounds; flag if out-of-range or missing."""
if val is None or (isinstance(val, float) and np.isnan(val)):
return None
lo, hi = _BOUNDS[name]
clipped = _clip_value(float(val), lo, hi)
if float(val) < lo or float(val) > hi:
flags.append(f"{name}_clipped")
return clipped
def _safe_float(row: pd.Series, col: str) -> Optional[float]:
"""Extract a float from a DataFrame row, returning None if missing."""
if col not in row.index:
return None
v = row[col]
if pd.isna(v):
return None
return float(v)
|