Spaces:
Sleeping
Sleeping
| # stage12.py | |
| # Author: Liam Grinstead | |
| # Purpose: RFT Production Pilot & Longitudinal Monitoring — Stage Twelve of Twelve | |
| import sys, time, json, argparse, pathlib, statistics | |
| from collections import deque | |
| from datetime import datetime, timezone | |
| import subprocess as sp | |
| # ---- Thresholds ---- | |
| DRIFT_SOFT = 0.08 | |
| DRIFT_HARD = 0.10 | |
| DFLUX_JUMP = 0.50 | |
| DELTAT_BAND = 0.02 | |
| EFF_SOFT = 0.20 | |
| EFF_HARD = 0.15 | |
| # ---- Windows (seconds) ---- | |
| WIN_SOFT = 5 * 60 | |
| WIN_HARD = 60 | |
| WIN_DELTAT = 10 * 60 | |
| WIN_EFF_SOFT = 15 * 60 | |
| WIN_EFF_HARD = 5 * 60 | |
| def utcnow_iso(): | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| class FleetStats: | |
| def __init__(self): | |
| self.drift = deque(maxlen=WIN_SOFT) | |
| self.flux = deque(maxlen=WIN_SOFT) | |
| self.temp = deque(maxlen=WIN_DELTAT) | |
| self.jtok = deque(maxlen=WIN_EFF_SOFT) | |
| self.jtok_hard = deque(maxlen=WIN_EFF_HARD) | |
| self.flux_hist_24h = deque(maxlen=24*60*60) | |
| def add(self, drift=None, flux=None, tempC=None, j=None): | |
| if drift is not None: self.drift.append(float(drift)) | |
| if flux is not None: | |
| self.flux.append(float(flux)) | |
| self.flux_hist_24h.append(float(flux)) | |
| if tempC is not None: self.temp.append(float(tempC)) | |
| if j is not None: | |
| self.jtok.append(float(j)) | |
| self.jtok_hard.append(float(j)) | |
| def mean(self, dq): | |
| return statistics.fmean(dq) if dq else None | |
| def load_baseline(path): | |
| try: | |
| with open(path,"r") as f: return json.load(f) | |
| except Exception: return {"J_token_median": None, "tempC_mean": None} | |
| def emit(event, detail): | |
| print(json.dumps({"ts": utcnow_iso(), "event": event, **detail}, separators=(",",":")), flush=True) | |
| def main(): | |
| ap=argparse.ArgumentParser() | |
| ap.add_argument("--stream", required=True, help="Path to JSONL stream file or '-' for stdin") | |
| ap.add_argument("--baseline", required=True, help="Path to baseline JSON") | |
| ap.add_argument("--rollback_cmd", default="echo ROLLBACK", help="Shell command to trigger rollback") | |
| args=ap.parse_args() | |
| base=load_baseline(args.baseline) | |
| j_base=base.get("J_token_median",None) | |
| t_base=base.get("tempC_mean",None) | |
| stats=FleetStats() | |
| last_soft_drift=last_hard_drift=last_flux_jump=last_temp_breach=last_soft_eff=last_hard_eff=0 | |
| f=sys.stdin if args.stream=="-" else open(args.stream,"r"); follow=(args.stream!="-") | |
| if follow: f.seek(0,2) | |
| emit("monitor_start", {"baseline_J_token": j_base, "baseline_tempC": t_base}) | |
| while True: | |
| line=f.readline() | |
| if not line: | |
| if follow: time.sleep(0.5); continue | |
| else: break | |
| try: rec=json.loads(line) | |
| except Exception: continue | |
| drift=rec.get("drift"); flux=rec.get("flux") | |
| tempC=rec.get("tempC"); jtok=rec.get("J_step") or rec.get("J_token") | |
| stats.add(drift,flux,tempC,jtok) | |
| now=time.time() | |
| # Drift alerts | |
| m_drift=stats.mean(stats.drift) | |
| if m_drift and m_drift>=DRIFT_SOFT and now-last_soft_drift>60: | |
| emit("alert_soft_drift", {"mean_drift": round(m_drift,3)}); last_soft_drift=now | |
| if len(stats.drift)>=WIN_HARD and all(x>=DRIFT_HARD for x in list(stats.drift)[-WIN_HARD:]) and now-last_hard_drift>30: | |
| emit("alert_hard_drift", {"window_s": WIN_HARD}); last_hard_drift=now | |
| # Flux jump | |
| if len(stats.flux_hist_24h)>3600: | |
| base_flux=statistics.median(stats.flux_hist_24h) | |
| cur_flux=stats.mean(stats.flux) | |
| if base_flux and cur_flux and (cur_flux-base_flux)/max(base_flux,1e-9)>=DFLUX_JUMP and now-last_flux_jump>300: | |
| emit("alert_flux_jump", {"base": round(base_flux,4), "cur": round(cur_flux,4)}); last_flux_jump=now | |
| # Thermal band | |
| if t_base and stats.temp: | |
| dev=abs(stats.temp[-1]-t_base) | |
| if dev>DELTAT_BAND and now-last_temp_breach>60: | |
| emit("alert_temp_band", {"tempC": stats.temp[-1], "base": t_base, "band": DELTAT_BAND}); last_temp_breach=now | |
| # Energy efficiency | |
| if j_base and stats.jtok: | |
| j_med=statistics.median(stats.jtok); gain=(j_base-j_med)/j_base | |
| if gain<EFF_SOFT and now-last_soft_eff>120: | |
| emit("alert_soft_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)}); last_soft_eff=now | |
| if j_base and stats.jtok_hard: | |
| j_med=statistics.median(stats.jtok_hard); gain=(j_base-j_med)/j_base | |
| if gain<EFF_HARD and now-last_hard_eff>60: | |
| emit("alert_hard_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)}) | |
| sp.Popen(args.rollback_cmd,shell=True) | |
| emit("rollback_invoked", {"cmd": args.rollback_cmd}); last_hard_eff=now | |
| emit("monitor_stop", {}) | |
| return 0 | |
| if __name__=="__main__": | |
| sys.exit(main()) | |