File size: 6,333 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""
SafetyRails: FvCB vs ML divergence guard for the SolarWine 2.0 control loop.

Position in the control loop (Phase 3, Step 7):
  After TradeoffEngine selects a minimum dose, SafetyRails validates that
  the FvCB and ML photosynthesis predictions are sufficiently consistent.

  If the two models disagree by more than DIVERGENCE_THRESHOLD (12%), the
  system cannot confidently predict that shading will help, so it falls back
  to full astronomical tracking (zero energy sacrifice, zero risk).

Rationale
---------
The FvCB mechanistic model and ML ensemble are calibrated on different
assumptions:
  - FvCB is reliable in standard conditions (T < 30°C, moderate VPD).
  - ML handles non-linear stress regimes better.

When both agree → high confidence → proceed with intervention.
When they disagree significantly → sensor fault, regime change, or edge
case not covered by calibration. The safe default is no intervention.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

from config.settings import DIVERGENCE_THRESHOLD


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------

@dataclass
class SafetyCheckResult:
    """Outcome of a single FvCB vs ML divergence check."""

    passed: bool
    fvcb_a: float
    ml_a: float
    divergence_pct: float          # |fvcb_a - ml_a| / max(fvcb_a, ml_a) × 100
    fallback_needed: bool          # True when divergence > threshold
    reason: str                    # human-readable explanation

    def __str__(self) -> str:
        status = "PASS" if self.passed else "FAIL → fallback to θ_astro"
        return (
            f"SafetyRails [{status}] "
            f"FvCB={self.fvcb_a:.2f}  ML={self.ml_a:.2f}  "
            f"divergence={self.divergence_pct:.1f}%  "
            f"(threshold={DIVERGENCE_THRESHOLD * 100:.0f}%)"
        )


# ---------------------------------------------------------------------------
# SafetyRails
# ---------------------------------------------------------------------------

class SafetyRails:
    """
    Validates that FvCB and ML model outputs are consistent before any
    shading command is issued.

    Usage
    -----
    rails = SafetyRails()
    result = rails.check(fvcb_a=14.3, ml_a=14.8)
    if result.fallback_needed:
        # stay at θ_astro, log result
    """

    def __init__(self, threshold: Optional[float] = None) -> None:
        """
        Parameters
        ----------
        threshold : divergence fraction (0–1) that triggers fallback.
                    Defaults to DIVERGENCE_THRESHOLD (0.12) from settings.
        """
        self.threshold = threshold if threshold is not None else DIVERGENCE_THRESHOLD

    def check(
        self,
        fvcb_a: float,
        ml_a: float,
        context: Optional[str] = None,
    ) -> SafetyCheckResult:
        """
        Compare FvCB and ML photosynthesis outputs.

        Parameters
        ----------
        fvcb_a  : net A from FarquharModel (µmol CO₂ m⁻² s⁻¹)
        ml_a    : net A from ML ensemble (µmol CO₂ m⁻² s⁻¹)
        context : optional string for logging (e.g. "2025-07-15 13:00")

        Returns
        -------
        SafetyCheckResult
        """
        denominator = max(abs(fvcb_a), abs(ml_a), 1e-6)
        divergence = abs(fvcb_a - ml_a) / denominator
        divergence_pct = divergence * 100.0

        fallback_needed = divergence > self.threshold

        if fallback_needed:
            reason = (
                f"Models diverge by {divergence_pct:.1f}% "
                f"(FvCB={fvcb_a:.2f}, ML={ml_a:.2f}) — "
                f"exceeds {self.threshold * 100:.0f}% threshold. "
                f"Falling back to full astronomical tracking."
            )
        elif fvcb_a < 0 and ml_a < 0:
            reason = "Both models predict carbon loss (dark/night); no shading beneficial."
            fallback_needed = True
        else:
            reason = (
                f"Models agree within {self.threshold * 100:.0f}% threshold "
                f"(FvCB={fvcb_a:.2f}, ML={ml_a:.2f}, "
                f"divergence={divergence_pct:.1f}%). Proceeding."
            )

        return SafetyCheckResult(
            passed=not fallback_needed,
            fvcb_a=fvcb_a,
            ml_a=ml_a,
            divergence_pct=round(divergence_pct, 2),
            fallback_needed=fallback_needed,
            reason=reason,
        )

    def check_from_log(self, fvcb_a: Optional[float], ml_a: Optional[float]) -> SafetyCheckResult:
        """
        Variant that handles None inputs gracefully (e.g. ML model not loaded).

        If either value is None, defaults to passing with a warning — the
        calling code should use whichever model is available.
        """
        if fvcb_a is None or ml_a is None:
            available = fvcb_a if fvcb_a is not None else ml_a
            return SafetyCheckResult(
                passed=True,
                fvcb_a=fvcb_a or 0.0,
                ml_a=ml_a or 0.0,
                divergence_pct=0.0,
                fallback_needed=False,
                reason=(
                    f"Only one model available (value={available:.2f}). "
                    "Cannot check divergence; proceeding with available model."
                ),
            )
        return self.check(fvcb_a, ml_a)


# ---------------------------------------------------------------------------
# CLI smoke test
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    rails = SafetyRails()

    cases = [
        (14.3, 14.8,  "Normal agreement (3.4%)"),
        (14.3, 16.5,  "Borderline (15.4% — over threshold)"),
        (14.3, 12.0,  "Below threshold (17.6% — over)"),
        (14.3, 14.3,  "Perfect agreement"),
        (14.3, None,  "ML unavailable"),
        (-2.0, -1.8,  "Carbon loss (night)"),
    ]

    print(f"SafetyRails — threshold={rails.threshold * 100:.0f}%\n")
    for fvcb, ml, label in cases:
        result = rails.check_from_log(fvcb, ml)
        status = "FALLBACK" if result.fallback_needed else "OK     "
        print(f"  [{status}] {label}")
        print(f"           {result.reason}")
        print()