# stage12.py # Author: Liam Grinstead # Purpose: RFT Production Pilot & Longitudinal Monitoring — Stage Twelve of Twelve import sys, time, json, argparse, pathlib, statistics from collections import deque from datetime import datetime, timezone import subprocess as sp # ---- Thresholds ---- DRIFT_SOFT = 0.08 DRIFT_HARD = 0.10 DFLUX_JUMP = 0.50 DELTAT_BAND = 0.02 EFF_SOFT = 0.20 EFF_HARD = 0.15 # ---- Windows (seconds) ---- WIN_SOFT = 5 * 60 WIN_HARD = 60 WIN_DELTAT = 10 * 60 WIN_EFF_SOFT = 15 * 60 WIN_EFF_HARD = 5 * 60 def utcnow_iso(): return datetime.now(timezone.utc).isoformat(timespec="seconds") class FleetStats: def __init__(self): self.drift = deque(maxlen=WIN_SOFT) self.flux = deque(maxlen=WIN_SOFT) self.temp = deque(maxlen=WIN_DELTAT) self.jtok = deque(maxlen=WIN_EFF_SOFT) self.jtok_hard = deque(maxlen=WIN_EFF_HARD) self.flux_hist_24h = deque(maxlen=24*60*60) def add(self, drift=None, flux=None, tempC=None, j=None): if drift is not None: self.drift.append(float(drift)) if flux is not None: self.flux.append(float(flux)) self.flux_hist_24h.append(float(flux)) if tempC is not None: self.temp.append(float(tempC)) if j is not None: self.jtok.append(float(j)) self.jtok_hard.append(float(j)) def mean(self, dq): return statistics.fmean(dq) if dq else None def load_baseline(path): try: with open(path,"r") as f: return json.load(f) except Exception: return {"J_token_median": None, "tempC_mean": None} def emit(event, detail): print(json.dumps({"ts": utcnow_iso(), "event": event, **detail}, separators=(",",":")), flush=True) def main(): ap=argparse.ArgumentParser() ap.add_argument("--stream", required=True, help="Path to JSONL stream file or '-' for stdin") ap.add_argument("--baseline", required=True, help="Path to baseline JSON") ap.add_argument("--rollback_cmd", default="echo ROLLBACK", help="Shell command to trigger rollback") args=ap.parse_args() base=load_baseline(args.baseline) j_base=base.get("J_token_median",None) t_base=base.get("tempC_mean",None) stats=FleetStats() last_soft_drift=last_hard_drift=last_flux_jump=last_temp_breach=last_soft_eff=last_hard_eff=0 f=sys.stdin if args.stream=="-" else open(args.stream,"r"); follow=(args.stream!="-") if follow: f.seek(0,2) emit("monitor_start", {"baseline_J_token": j_base, "baseline_tempC": t_base}) while True: line=f.readline() if not line: if follow: time.sleep(0.5); continue else: break try: rec=json.loads(line) except Exception: continue drift=rec.get("drift"); flux=rec.get("flux") tempC=rec.get("tempC"); jtok=rec.get("J_step") or rec.get("J_token") stats.add(drift,flux,tempC,jtok) now=time.time() # Drift alerts m_drift=stats.mean(stats.drift) if m_drift and m_drift>=DRIFT_SOFT and now-last_soft_drift>60: emit("alert_soft_drift", {"mean_drift": round(m_drift,3)}); last_soft_drift=now if len(stats.drift)>=WIN_HARD and all(x>=DRIFT_HARD for x in list(stats.drift)[-WIN_HARD:]) and now-last_hard_drift>30: emit("alert_hard_drift", {"window_s": WIN_HARD}); last_hard_drift=now # Flux jump if len(stats.flux_hist_24h)>3600: base_flux=statistics.median(stats.flux_hist_24h) cur_flux=stats.mean(stats.flux) if base_flux and cur_flux and (cur_flux-base_flux)/max(base_flux,1e-9)>=DFLUX_JUMP and now-last_flux_jump>300: emit("alert_flux_jump", {"base": round(base_flux,4), "cur": round(cur_flux,4)}); last_flux_jump=now # Thermal band if t_base and stats.temp: dev=abs(stats.temp[-1]-t_base) if dev>DELTAT_BAND and now-last_temp_breach>60: emit("alert_temp_band", {"tempC": stats.temp[-1], "base": t_base, "band": DELTAT_BAND}); last_temp_breach=now # Energy efficiency if j_base and stats.jtok: j_med=statistics.median(stats.jtok); gain=(j_base-j_med)/j_base if gain120: emit("alert_soft_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)}); last_soft_eff=now if j_base and stats.jtok_hard: j_med=statistics.median(stats.jtok_hard); gain=(j_base-j_med)/j_base if gain60: emit("alert_hard_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)}) sp.Popen(args.rollback_cmd,shell=True) emit("rollback_invoked", {"cmd": args.rollback_cmd}); last_hard_eff=now emit("monitor_stop", {}) return 0 if __name__=="__main__": sys.exit(main())