""" TradeoffEngine: InterventionGate + minimum-dose search for SolarWine 2.0. Design philosophy ----------------- The tracker's default is ALWAYS full astronomical tracking (maximum energy generation). Shading is an exception, not a rule. The decision logic has two clean responsibilities: InterventionGate — "Is the vine significantly stressed?" Asks only physiological questions — time of day and month are NOT checked here. The sun geometry handles those cases naturally: 1. Is there meaningful sunlight? (GHI > MIN_MEANINGFUL_GHI — night/cloud guard) 2. Is the leaf temperature above the Rubisco transition? (Tleaf ≥ 30°C) 3. Is water stress confirmed by sensors? (CWSI ≥ 0.4) 4. Is irradiance high enough to cause real heat load? (GHI ≥ 400 W/m²) 5. Does the FvCB model agree shading would help? (shading_helps = True) TradeoffEngine — "Does any offset actually help the fruiting zone right now?" Uses the 3D ShadowModel to ray-trace each candidate offset and checks: (a) fruiting-zone PAR drops below FRUITING_ZONE_TARGET_PAR (400 µmol) (b) top-canopy PAR stays ≥ 70% of ambient (apical leaves remain productive) (c) energy sacrifice ≤ remaining per-slot budget Returns the SMALLEST offset satisfying all three, or offset=0 (stay put). Why manual time/month rules are replaced by geometry ---------------------------------------------------- - Morning (9:00, sun in east): astronomical tilt already faces east. No positive offset places the shadow over the fruiting zone; find_minimum_dose() returns no effective dose naturally. - May (fruit-set, low stress): CWSI < 0.4 and Tleaf < 30°C → gate blocks. In rare extreme heat: gate passes, but if the geometry still doesn't deliver shade to the fruiting zone, no dose is selected. - Overcast (GHI < 100 W/m²): MIN_MEANINGFUL_GHI guard fires. - Every other edge case: geometry decides, not the calendar. """ from __future__ import annotations import math from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional from config.settings import ( CANDIDATE_OFFSETS, FRUITING_ZONE_INDEX, FRUITING_ZONE_TARGET_PAR, MIN_MEANINGFUL_GHI, SHADE_ELIGIBLE_CWSI_ABOVE, SHADE_ELIGIBLE_GHI_ABOVE, SHADE_ELIGIBLE_TLEAF_ABOVE, ) # Top-canopy zone index in the 3-zone ShadowModel (0=basal, 1=fruiting, 2=apical) _TOP_ZONE_INDEX = 2 # Relative thresholds used in find_minimum_dose (relative to astronomical baseline) # The panel at astronomical tracking already shades the canopy substantially at high # sun elevations. These thresholds compare offset vs. baseline, not vs. ambient. _FRUITING_IMPROVEMENT_MIN = 0.85 # sun-side fruiting PAR must drop ≥ 15% below astro baseline _TOP_CANOPY_TOLERANCE = 0.85 # top canopy must retain ≥ 85% of its astro-baseline PAR # --------------------------------------------------------------------------- # InterventionGate # --------------------------------------------------------------------------- @dataclass class GateDecision: """Result of the InterventionGate evaluation.""" passed: bool rejection_reason: Optional[str] = None # Diagnostic flags (for SimulationLog explainability) no_meaningful_sun: bool = False tleaf_below_threshold: bool = False cwsi_below_threshold: bool = False ghi_below_threshold: bool = False biology_says_shade_helps: bool = False def decision_tags(self) -> List[str]: if not self.passed and self.rejection_reason: return [f"gate_blocked:{self.rejection_reason}"] return ["gate_passed"] class InterventionGate: """ Physiological pass/fail check — answers "is the vine stressed enough to consider intervention?" Time of day, month, and sun angle are NOT evaluated here. The 3D ShadowModel in TradeoffEngine determines whether a candidate offset can geometrically deliver shade to the fruiting zone for any given sun position. Default answer: NO (full astronomical tracking). Gate opens only when ALL stress conditions are simultaneously met. """ def __init__( self, min_meaningful_ghi: float = MIN_MEANINGFUL_GHI, shade_eligible_tleaf_above: float = SHADE_ELIGIBLE_TLEAF_ABOVE, shade_eligible_cwsi_above: float = SHADE_ELIGIBLE_CWSI_ABOVE, shade_eligible_ghi_above: float = SHADE_ELIGIBLE_GHI_ABOVE, ) -> None: self.min_meaningful_ghi = min_meaningful_ghi self.shade_eligible_tleaf_above = shade_eligible_tleaf_above self.shade_eligible_cwsi_above = shade_eligible_cwsi_above self.shade_eligible_ghi_above = shade_eligible_ghi_above # ------------------------------------------------------------------ # Individual gate checks (pipeline pattern) # ------------------------------------------------------------------ def _check_meaningful_sun(self, ghi_w_m2: Optional[float], dec: GateDecision) -> Optional[GateDecision]: """Block if GHI too low (night / deep overcast).""" if ghi_w_m2 is not None and ghi_w_m2 < self.min_meaningful_ghi: dec.no_meaningful_sun = True dec.rejection_reason = ( f"no_meaningful_sun:GHI={ghi_w_m2:.0f} W/m² " f"< {self.min_meaningful_ghi:.0f}" ) return dec return None def _check_heat_stress(self, tleaf_c: Optional[float], dec: GateDecision) -> Optional[GateDecision]: """Block if leaf temperature below Rubisco transition (vine is light-limited).""" if tleaf_c is not None and tleaf_c < self.shade_eligible_tleaf_above: dec.tleaf_below_threshold = True dec.rejection_reason = ( f"no_heat_stress:Tleaf={tleaf_c:.1f}°C " f"< {self.shade_eligible_tleaf_above:.0f}°C (Rubisco transition)" ) return dec return None def _check_water_stress(self, cwsi: Optional[float], dec: GateDecision) -> Optional[GateDecision]: """Block if CWSI below threshold (vine not water-stressed).""" if cwsi is not None and cwsi < self.shade_eligible_cwsi_above: dec.cwsi_below_threshold = True dec.rejection_reason = ( f"no_water_stress:CWSI={cwsi:.2f} " f"< {self.shade_eligible_cwsi_above:.2f}" ) return dec return None def _check_radiation_load(self, ghi_w_m2: Optional[float], dec: GateDecision) -> Optional[GateDecision]: """Block if radiation too low for meaningful heat build-up.""" if ghi_w_m2 is not None and ghi_w_m2 < self.shade_eligible_ghi_above: dec.ghi_below_threshold = True dec.rejection_reason = ( f"low_radiation:GHI={ghi_w_m2:.0f} W/m² " f"< {self.shade_eligible_ghi_above:.0f} W/m²" ) return dec return None def _check_biology(self, shading_helps: Optional[bool], dec: GateDecision) -> Optional[GateDecision]: """Block if FvCB model says shading would hurt (RuBP-limited).""" dec.biology_says_shade_helps = bool(shading_helps) if not shading_helps: dec.rejection_reason = ( "biology:shading_helps=False — vine is RuBP-limited despite high Tleaf; " "possibly declining afternoon PAR or unusual conditions" ) return dec return None # ------------------------------------------------------------------ # Main evaluate (pipeline composition) # ------------------------------------------------------------------ def evaluate( self, tleaf_c: Optional[float], ghi_w_m2: Optional[float], cwsi: Optional[float], shading_helps: Optional[bool], dt: Optional[datetime] = None, # accepted but not used; preserved for logging ) -> GateDecision: """ Evaluate whether the vine is significantly stressed. Runs a pipeline of 5 checks in order. First rejection stops the pipeline. Gate passes only when ALL stress conditions are simultaneously met. Parameters ---------- tleaf_c : leaf temperature (°C) ghi_w_m2 : global horizontal irradiance (W/m²) cwsi : Crop Water Stress Index [0–1] shading_helps : output of FarquharModel dt : slot datetime (optional; for logging only) """ dec = GateDecision(passed=False) # Run checks as a pipeline — first rejection short-circuits checks = [ lambda d: self._check_meaningful_sun(ghi_w_m2, d), lambda d: self._check_heat_stress(tleaf_c, d), lambda d: self._check_water_stress(cwsi, d), lambda d: self._check_radiation_load(ghi_w_m2, d), lambda d: self._check_biology(shading_helps, d), ] for check in checks: rejection = check(dec) if rejection is not None: return rejection # All stress conditions met dec.passed = True return dec # --------------------------------------------------------------------------- # TradeoffEngine # --------------------------------------------------------------------------- @dataclass class DoseResult: """Result of the minimum-dose offset search.""" success: bool chosen_offset_deg: float = 0.0 offsets_tested: List[float] = field(default_factory=list) fruiting_par_at_chosen: Optional[float] = None # µmol m⁻² s⁻¹ top_par_fraction: Optional[float] = None # top_par / ambient_par energy_sacrifice_fraction: Optional[float] = None # approx 1 − cos(offset) rationale: str = "" def decision_tags(self) -> List[str]: if self.success: tags = [f"dose:{self.chosen_offset_deg:.0f}deg"] if self.fruiting_par_at_chosen is not None: tags.append(f"fruiting_par:{self.fruiting_par_at_chosen:.0f}") return tags return ["no_effective_dose"] class TradeoffEngine: """ Minimum-effective-dose search over candidate tilt offsets. For each offset (smallest first), ray-traces the shadow at θ_astro + offset using the 3D ShadowModel and returns the FIRST offset that simultaneously: (a) reduces fruiting-zone PAR below FRUITING_ZONE_TARGET_PAR (b) keeps top-canopy PAR ≥ 70% of ambient (preserves apical productivity) (c) costs ≤ the available slot budget (energy sacrifice fraction) Falls back to offset=0 (stay at astronomical) if no offset qualifies. Conditions (a) and (b) are geometry-only — they naturally encode the morning/evening cases where the sun angle means any offset either over-shades the whole canopy or misses the fruiting zone entirely. """ def __init__( self, shadow_model=None, candidate_offsets: Optional[List[float]] = None, fruiting_zone_target_par: float = FRUITING_ZONE_TARGET_PAR, fruiting_zone_index: int = FRUITING_ZONE_INDEX, top_canopy_min_sunlit: float = _TOP_CANOPY_TOLERANCE, ) -> None: self._shadow_model = shadow_model self.candidate_offsets = ( [o for o in CANDIDATE_OFFSETS if o > 0] if candidate_offsets is None else candidate_offsets ) self.fruiting_zone_target_par = fruiting_zone_target_par self.fruiting_zone_index = fruiting_zone_index self.top_canopy_min_sunlit = top_canopy_min_sunlit @property def shadow_model(self): if self._shadow_model is None: from src.solar_geometry import ShadowModel self._shadow_model = ShadowModel() return self._shadow_model def find_minimum_dose( self, ambient_par_umol: float, solar_elevation_deg: float, solar_azimuth_deg: float, astronomical_tilt_deg: float, max_sacrifice_fraction: float = 1.0, diffuse_fraction: float = 0.15, ) -> DoseResult: """ Find the smallest tilt offset that meaningfully protects the fruiting zone without disproportionately sacrificing top-canopy productivity. Offset direction ---------------- The offset is applied TOWARD HORIZONTAL — i.e. it reduces the absolute tilt angle. This is the direction that increases overhead shadow footprint on the vine below the panel. morning (astro_tilt > 0, panel faces east): trial = astro − offset afternoon (astro_tilt < 0, panel faces west): trial = astro + offset near-noon (astro_tilt ≈ 0): panel already near-horizontal; no beneficial offset. Conditions (evaluated relative to the astronomical-tracking baseline) ----------------------------------------------------------------------- A. Sun-side fruiting-face PAR drops below FRUITING_ZONE_TARGET_PAR (400 µmol). "Sun-side" = whichever vertical face receives more direct beam right now. This is the face where sunburn risk is highest. B. Top canopy does not lose more than TOP_CANOPY_TOLERANCE (15%) of its astronomical-baseline PAR. Computed from the horizontal top face of the canopy (top[]) which is most sensitive to panel tilt changes. C. Energy sacrifice (1 − cos(offset)) ≤ max_sacrifice_fraction. Parameters ---------- ambient_par_umol : total above-canopy PAR (µmol m⁻² s⁻¹) solar_elevation_deg : solar elevation above horizon (°) solar_azimuth_deg : solar azimuth (°) astronomical_tilt_deg : sun-following tilt from pvlib (°, +east / −west) max_sacrifice_fraction : per-slot energy budget ceiling (fraction of max gen) diffuse_fraction : diffuse fraction of ambient PAR (default 0.15) """ if ambient_par_umol <= 0 or solar_elevation_deg <= 2: return DoseResult( success=False, rationale="Solar elevation ≤ 2° or PAR = 0; no shading meaningful.", ) # Near-noon: panel already near-horizontal; moving toward horizontal # adds negligible additional shade. Skip entirely. if abs(astronomical_tilt_deg) < 3.0: return DoseResult( success=False, rationale=( f"Near-noon: astro_tilt={astronomical_tilt_deg:.1f}° already near-horizontal; " "no beneficial offset direction." ), ) # Geometric feasibility pre-check: # The panel (width=panel_w, center height=panel_h) can only intercept # direct beam on the vine's vertical face (fruiting zone at fruiting_z) # when the required horizontal reach is ≤ half the panel width. # Below this elevation threshold, the direct side-beam always bypasses # the panel and tilt offsets cannot reduce fruiting-face PAR. # For this site (panel_w=1.13m, panel_h=2.05m, fruiting_z=0.6m): # min_elevation ≈ arctan((2.05-0.6)/(1.13/2)) ≈ 68.6° panel_half = self.shadow_model.panel_width / 2.0 panel_height = self.shadow_model.panel_height from config.settings import FRUITING_ZONE_HEIGHT_M min_elev_for_side_block = math.degrees( math.atan((panel_height - FRUITING_ZONE_HEIGHT_M) / max(panel_half, 0.001)) ) if solar_elevation_deg < min_elev_for_side_block: return DoseResult( success=False, rationale=( f"Solar elevation {solar_elevation_deg:.1f}° < {min_elev_for_side_block:.1f}° " f"— direct beam bypasses panel (panel half-width {panel_half:.3f}m reaches only " f"{(panel_height - FRUITING_ZONE_HEIGHT_M) / math.tan(math.radians(solar_elevation_deg)):.2f}m " f"vs {panel_half:.3f}m needed). Tracker stays at θ_astro; passive overhead " "shading provides all available protection." ), ) # Baseline at astronomical tracking try: astro_pz = self.shadow_model.compute_face_par_zones( total_par=ambient_par_umol, solar_elevation=solar_elevation_deg, solar_azimuth=solar_azimuth_deg, tracker_tilt=astronomical_tilt_deg, diffuse_fraction=diffuse_fraction, ) except Exception as exc: return DoseResult( success=False, rationale=f"Shadow model error at baseline: {exc}", ) # Sun-side face: the face receiving more direct beam has higher sunburn risk east_astro = float(astro_pz["east"][self.fruiting_zone_index]) west_astro = float(astro_pz["west"][self.fruiting_zone_index]) sun_side = "west" if west_astro >= east_astro else "east" fruiting_par_astro = max(east_astro, west_astro) # Top canopy baseline: maximum across the horizontal top face top_astro = float(max(astro_pz["top"])) # Offset direction: toward horizontal to increase overhead shadow sign_astro = 1 if astronomical_tilt_deg > 0 else -1 tested: List[float] = [] for offset in self.candidate_offsets: tested.append(offset) # Apply offset toward horizontal trial_tilt = astronomical_tilt_deg - sign_astro * offset try: trial_pz = self.shadow_model.compute_face_par_zones( total_par=ambient_par_umol, solar_elevation=solar_elevation_deg, solar_azimuth=solar_azimuth_deg, tracker_tilt=trial_tilt, diffuse_fraction=diffuse_fraction, ) except Exception: continue east_trial = float(trial_pz["east"][self.fruiting_zone_index]) west_trial = float(trial_pz["west"][self.fruiting_zone_index]) fruiting_par_trial = east_trial if sun_side == "east" else west_trial top_par_trial = float(max(trial_pz["top"])) top_par_fraction = top_par_trial / ambient_par_umol sacrifice_fraction = 1.0 - math.cos(math.radians(offset)) # Condition A: sun-side fruiting face meaningfully shaded # - Below absolute sunburn threshold, AND # - At least 15% reduction from astronomical baseline cond_a = ( fruiting_par_trial < self.fruiting_zone_target_par and fruiting_par_trial <= fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN ) # Condition B: top canopy doesn't lose more than tolerance% vs astronomical # (panel at astro already shades canopy top substantially; we must not # make it significantly worse, but the absolute fraction is not the goal) cond_b = ( top_astro <= 0 # astronomical already zero → no further degradation check or top_par_trial >= top_astro * _TOP_CANOPY_TOLERANCE ) # Condition C: energy sacrifice within budget cond_c = sacrifice_fraction <= max_sacrifice_fraction if cond_a and cond_b and cond_c: return DoseResult( success=True, chosen_offset_deg=float(offset), offsets_tested=tested, fruiting_par_at_chosen=round(fruiting_par_trial, 1), top_par_fraction=round(top_par_fraction, 3), energy_sacrifice_fraction=round(sacrifice_fraction, 5), rationale=( f"Offset {offset}° (trial_tilt={trial_tilt:.1f}°): " f"{sun_side}-face fruiting PAR {fruiting_par_trial:.0f} µmol " f"(astro={fruiting_par_astro:.0f}, target <{self.fruiting_zone_target_par:.0f}), " f"top canopy {top_par_trial:.0f}/{top_astro:.0f} µmol " f"({top_par_trial / max(top_astro, 1) * 100:.0f}% of baseline), " f"sacrifice {sacrifice_fraction * 100:.2f}%." ), ) # No offset qualified — build diagnostic rationale_parts = [ f"No offset in {self.candidate_offsets}° (toward-horizontal) satisfied conditions. " f"Baseline: {sun_side}-face fruiting={fruiting_par_astro:.0f} µmol, " f"top={top_astro:.0f} µmol. Staying at θ_astro." ] if tested: last = tested[-1] last_tilt = astronomical_tilt_deg - sign_astro * last try: pz = self.shadow_model.compute_face_par_zones( total_par=ambient_par_umol, solar_elevation=solar_elevation_deg, solar_azimuth=solar_azimuth_deg, tracker_tilt=last_tilt, diffuse_fraction=diffuse_fraction, ) fp = pz["east"][self.fruiting_zone_index] if sun_side == "east" else pz["west"][self.fruiting_zone_index] tp = max(pz["top"]) sf = 1.0 - math.cos(math.radians(last)) fails = [] if not (fp < self.fruiting_zone_target_par and fp <= fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN): fails.append( f"fruiting {fp:.0f} µmol (need <{self.fruiting_zone_target_par:.0f} " f"and ≤{fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN:.0f})" ) if top_astro > 0 and tp < top_astro * _TOP_CANOPY_TOLERANCE: fails.append( f"top canopy {tp:.0f} µmol < {top_astro * _TOP_CANOPY_TOLERANCE:.0f} " f"({_TOP_CANOPY_TOLERANCE * 100:.0f}% of baseline {top_astro:.0f})" ) if sf > max_sacrifice_fraction: fails.append(f"sacrifice {sf * 100:.2f}% > budget {max_sacrifice_fraction * 100:.2f}%") rationale_parts.append(f"At {last}°: {'; '.join(fails) or 'unknown'}.") except Exception: pass return DoseResult( success=False, chosen_offset_deg=0.0, offsets_tested=tested, rationale=" ".join(rationale_parts), ) @staticmethod def energy_sacrifice_fraction(offset_deg: float) -> float: """Approximate per-slot energy sacrifice: 1 − cos(offset_deg).""" return 1.0 - math.cos(math.radians(offset_deg)) # --------------------------------------------------------------------------- # CLI smoke test # --------------------------------------------------------------------------- if __name__ == "__main__": from src.solar_geometry import ShadowModel # --- InterventionGate tests --- gate = InterventionGate() print("=== InterventionGate (geometry-first) ===\n") cases = [ # (tleaf, ghi, cwsi, helps, label) (33.0, 800, 0.5, True, "All stress conditions met → PASS (geometry decides next)"), (25.0, 800, 0.5, True, "Tleaf < 30°C → no heat stress"), (33.0, 50, 0.5, True, "GHI < 100 → night/deep cloud"), (33.0, 800, 0.2, True, "CWSI < 0.4 → vine healthy"), (33.0, 300, 0.5, True, "GHI < 400 → low radiation load"), (33.0, 800, 0.5, False, "FvCB says shading hurts → RuBP-limited"), # Cases that were previously hard-blocked by time/month rules: (33.0, 800, 0.5, True, "9:00 morning (no longer blocked — geometry will decide)"), (33.0, 800, 0.5, True, "May heat wave (no longer blocked — geometry will decide)"), ] for tleaf, ghi, cwsi, helps, label in cases: dec = gate.evaluate(tleaf_c=tleaf, ghi_w_m2=ghi, cwsi=cwsi, shading_helps=helps) status = "PASS" if dec.passed else "BLOCK" reason = dec.rejection_reason or "—" print(f" [{status}] {label}") print(f" {reason}\n") # --- TradeoffEngine test with real Sde Boker summer day --- print("=== TradeoffEngine — real July-15 Sde Boker trajectories ===\n") import pandas as pd import pvlib shadow = ShadowModel() engine = TradeoffEngine(shadow_model=shadow) loc = pvlib.location.Location(30.87, 34.79, tz='Asia/Jerusalem', altitude=475) times = pd.date_range('2025-07-15 06:00', '2025-07-15 19:00', freq='2h', tz='Asia/Jerusalem') sol = loc.get_solarposition(times) print(f" {'Time':>6} {'Elev':>6} {'Azim':>6} {'Astro':>6} {'Result':>12} Notes") print(f" {'-'*70}") for t in times: elev = float(sol.loc[t, 'apparent_elevation']) azim = float(sol.loc[t, 'azimuth']) if elev < 5: continue tr = shadow.compute_tracker_tilt(azim, elev) astro = tr['tracker_theta'] par = min(elev * 15, 1100) res = engine.find_minimum_dose( ambient_par_umol=par, solar_elevation_deg=elev, solar_azimuth_deg=azim, astronomical_tilt_deg=astro, max_sacrifice_fraction=0.08, ) status = f"offset={res.chosen_offset_deg:.0f}°" if res.success else "no dose" print(f" {t.strftime('%H:%M'):>6} {elev:>6.1f} {azim:>6.1f} {astro:>6.1f} {status:>12} {res.rationale[:60]}") print() print(" Energy sacrifice by offset:") for off in [0, 3, 5, 8, 10, 15, 20]: s = TradeoffEngine.energy_sacrifice_fraction(off) print(f" {off:2d}° → {s * 100:.2f}%")