File size: 7,672 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
SpectralAggregator: batch preprocessing of CWSI, NDVI, and PRI indices.

Consumes raw sensor columns (from ThingsBoard or Seymour CSVs) and produces
cleaned, gap-filled spectral indices ready for the 15-min control loop.

Design: stateless functions, not a service.  The control loop calls
``aggregate_spectral()`` each slot with raw sensor readings; the function
returns validated indices with quality flags.

Sensor sources
--------------
- NDVI / PRI: Air1 reference station (``Air1_NDVI_ref``, ``Air1_PRI_ref``)
  and per-panel Crop devices (ThingsBoard).
- CWSI: computed from air–leaf temperature delta (proxy) or explicit
  ThingsBoard telemetry if available.
- rNDVI / RENDVI: optional red-edge indices from Air1.

Physical bounds (Sde Boker, Semillon grapevine)
------------------------------------------------
- NDVI: [0.1, 0.95]  — bare soil ~0.1, healthy canopy 0.7–0.9
- PRI:  [-0.2, 0.1]  — stressed < -0.05, unstressed > 0.0
- CWSI: [0.0, 1.0]   — well-watered 0.0, severe stress 1.0
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Optional

import numpy as np
import pandas as pd

from src.utils import cwsi_from_delta_t


# ---------------------------------------------------------------------------
# Physical plausibility bounds
# ---------------------------------------------------------------------------

_BOUNDS = {
    "ndvi": (0.1, 0.95),
    "pri": (-0.2, 0.1),
    "cwsi": (0.0, 1.0),
    "rndvi": (0.1, 0.95),
    "rendvi": (0.1, 0.95),
}


# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------

@dataclass
class SpectralResult:
    """Validated spectral indices for a single timestep."""

    ndvi: Optional[float] = None
    pri: Optional[float] = None
    cwsi: float = 0.0
    rndvi: Optional[float] = None
    rendvi: Optional[float] = None
    quality_flags: list[str] = field(default_factory=list)

    @property
    def is_stressed(self) -> bool:
        """Quick stress check: CWSI ≥ 0.4 indicates meaningful water stress."""
        return self.cwsi >= 0.4

    @property
    def pri_stress(self) -> bool:
        """PRI below -0.05 indicates photosynthetic down-regulation."""
        return self.pri is not None and self.pri < -0.05


# ---------------------------------------------------------------------------
# Core aggregation function
# ---------------------------------------------------------------------------

def aggregate_spectral(
    *,
    ndvi: Optional[float] = None,
    pri: Optional[float] = None,
    air_temp_c: Optional[float] = None,
    leaf_temp_c: Optional[float] = None,
    cwsi_explicit: Optional[float] = None,
    vpd_kpa: Optional[float] = None,
    rndvi: Optional[float] = None,
    rendvi: Optional[float] = None,
) -> SpectralResult:
    """Validate and aggregate spectral indices for one timestep.

    Parameters
    ----------
    ndvi : float, optional
        Raw NDVI reading (Air1 or Crop device).
    pri : float, optional
        Raw PRI reading.
    air_temp_c : float, optional
        Air temperature (°C) — for CWSI proxy calculation.
    leaf_temp_c : float, optional
        Leaf temperature (°C) — for CWSI proxy calculation.
    cwsi_explicit : float, optional
        Direct CWSI measurement from ThingsBoard (overrides proxy).
    vpd_kpa : float, optional
        Vapour pressure deficit — secondary stress indicator.
    rndvi : float, optional
        Red-edge NDVI.
    rendvi : float, optional
        Red-edge NDVI (alternative band).

    Returns
    -------
    SpectralResult
        Validated indices with quality flags.
    """
    flags: list[str] = []

    # --- NDVI ---
    clean_ndvi = _clip_or_flag(ndvi, "ndvi", flags)

    # --- PRI ---
    clean_pri = _clip_or_flag(pri, "pri", flags)

    # --- CWSI ---
    if cwsi_explicit is not None:
        clean_cwsi = _clip_value(cwsi_explicit, *_BOUNDS["cwsi"])
        if cwsi_explicit != clean_cwsi:
            flags.append("cwsi_clipped")
    elif leaf_temp_c is not None and air_temp_c is not None:
        clean_cwsi = cwsi_from_delta_t(leaf_temp_c, air_temp_c)
        flags.append("cwsi_from_delta_t")
    elif vpd_kpa is not None:
        # Last-resort VPD-based proxy: high VPD → likely stress
        # VPD 1-2 kPa normal, >3 kPa high stress in Negev
        raw_cwsi = _clip_value((vpd_kpa - 1.0) / 4.0, 0.0, 1.0)
        clean_cwsi = raw_cwsi
        flags.append("cwsi_from_vpd")
    else:
        clean_cwsi = 0.0
        flags.append("cwsi_missing")

    # --- Optional red-edge indices ---
    clean_rndvi = _clip_or_flag(rndvi, "rndvi", flags)
    clean_rendvi = _clip_or_flag(rendvi, "rendvi", flags)

    return SpectralResult(
        ndvi=clean_ndvi,
        pri=clean_pri,
        cwsi=clean_cwsi,
        rndvi=clean_rndvi,
        rendvi=clean_rendvi,
        quality_flags=flags,
    )


# ---------------------------------------------------------------------------
# Batch processing for DataFrames
# ---------------------------------------------------------------------------

def aggregate_spectral_df(
    df: pd.DataFrame,
    *,
    ndvi_col: str = "Air1_NDVI_ref",
    pri_col: str = "Air1_PRI_ref",
    air_temp_col: str = "Air1_airTemperature_ref",
    leaf_temp_col: str = "Air1_leafTemperature_ref",
    vpd_col: str = "Air1_VPD_ref",
    cwsi_col: Optional[str] = None,
    rndvi_col: str = "Air1_rNDVI_ref",
    rendvi_col: str = "Air1_RENDVI_ref",
) -> pd.DataFrame:
    """Process a DataFrame of raw sensor data into cleaned spectral indices.

    Returns a DataFrame with columns: ndvi, pri, cwsi, rndvi, rendvi, quality_flags.
    Index is aligned to the input DataFrame.
    """
    records = []
    for _, row in df.iterrows():
        result = aggregate_spectral(
            ndvi=_safe_float(row, ndvi_col),
            pri=_safe_float(row, pri_col),
            air_temp_c=_safe_float(row, air_temp_col),
            leaf_temp_c=_safe_float(row, leaf_temp_col),
            cwsi_explicit=_safe_float(row, cwsi_col) if cwsi_col else None,
            vpd_kpa=_safe_float(row, vpd_col),
            rndvi=_safe_float(row, rndvi_col),
            rendvi=_safe_float(row, rendvi_col),
        )
        records.append({
            "ndvi": result.ndvi,
            "pri": result.pri,
            "cwsi": result.cwsi,
            "rndvi": result.rndvi,
            "rendvi": result.rendvi,
            "is_stressed": result.is_stressed,
            "pri_stress": result.pri_stress,
            "quality_flags": ",".join(result.quality_flags),
        })
    return pd.DataFrame(records, index=df.index)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _clip_value(val: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, val))


def _clip_or_flag(
    val: Optional[float],
    name: str,
    flags: list[str],
) -> Optional[float]:
    """Clip value to physical bounds; flag if out-of-range or missing."""
    if val is None or (isinstance(val, float) and np.isnan(val)):
        return None
    lo, hi = _BOUNDS[name]
    clipped = _clip_value(float(val), lo, hi)
    if float(val) < lo or float(val) > hi:
        flags.append(f"{name}_clipped")
    return clipped


def _safe_float(row: pd.Series, col: str) -> Optional[float]:
    """Extract a float from a DataFrame row, returning None if missing."""
    if col not in row.index:
        return None
    v = row[col]
    if pd.isna(v):
        return None
    return float(v)