symbolic_mutations / stage12.py
RFTSystems's picture
Create stage12.py
115630c verified
# stage12.py
# Author: Liam Grinstead
# Purpose: RFT Production Pilot & Longitudinal Monitoring — Stage Twelve of Twelve
import sys, time, json, argparse, pathlib, statistics
from collections import deque
from datetime import datetime, timezone
import subprocess as sp
# ---- Thresholds ----
DRIFT_SOFT = 0.08
DRIFT_HARD = 0.10
DFLUX_JUMP = 0.50
DELTAT_BAND = 0.02
EFF_SOFT = 0.20
EFF_HARD = 0.15
# ---- Windows (seconds) ----
WIN_SOFT = 5 * 60
WIN_HARD = 60
WIN_DELTAT = 10 * 60
WIN_EFF_SOFT = 15 * 60
WIN_EFF_HARD = 5 * 60
def utcnow_iso():
return datetime.now(timezone.utc).isoformat(timespec="seconds")
class FleetStats:
def __init__(self):
self.drift = deque(maxlen=WIN_SOFT)
self.flux = deque(maxlen=WIN_SOFT)
self.temp = deque(maxlen=WIN_DELTAT)
self.jtok = deque(maxlen=WIN_EFF_SOFT)
self.jtok_hard = deque(maxlen=WIN_EFF_HARD)
self.flux_hist_24h = deque(maxlen=24*60*60)
def add(self, drift=None, flux=None, tempC=None, j=None):
if drift is not None: self.drift.append(float(drift))
if flux is not None:
self.flux.append(float(flux))
self.flux_hist_24h.append(float(flux))
if tempC is not None: self.temp.append(float(tempC))
if j is not None:
self.jtok.append(float(j))
self.jtok_hard.append(float(j))
def mean(self, dq):
return statistics.fmean(dq) if dq else None
def load_baseline(path):
try:
with open(path,"r") as f: return json.load(f)
except Exception: return {"J_token_median": None, "tempC_mean": None}
def emit(event, detail):
print(json.dumps({"ts": utcnow_iso(), "event": event, **detail}, separators=(",",":")), flush=True)
def main():
ap=argparse.ArgumentParser()
ap.add_argument("--stream", required=True, help="Path to JSONL stream file or '-' for stdin")
ap.add_argument("--baseline", required=True, help="Path to baseline JSON")
ap.add_argument("--rollback_cmd", default="echo ROLLBACK", help="Shell command to trigger rollback")
args=ap.parse_args()
base=load_baseline(args.baseline)
j_base=base.get("J_token_median",None)
t_base=base.get("tempC_mean",None)
stats=FleetStats()
last_soft_drift=last_hard_drift=last_flux_jump=last_temp_breach=last_soft_eff=last_hard_eff=0
f=sys.stdin if args.stream=="-" else open(args.stream,"r"); follow=(args.stream!="-")
if follow: f.seek(0,2)
emit("monitor_start", {"baseline_J_token": j_base, "baseline_tempC": t_base})
while True:
line=f.readline()
if not line:
if follow: time.sleep(0.5); continue
else: break
try: rec=json.loads(line)
except Exception: continue
drift=rec.get("drift"); flux=rec.get("flux")
tempC=rec.get("tempC"); jtok=rec.get("J_step") or rec.get("J_token")
stats.add(drift,flux,tempC,jtok)
now=time.time()
# Drift alerts
m_drift=stats.mean(stats.drift)
if m_drift and m_drift>=DRIFT_SOFT and now-last_soft_drift>60:
emit("alert_soft_drift", {"mean_drift": round(m_drift,3)}); last_soft_drift=now
if len(stats.drift)>=WIN_HARD and all(x>=DRIFT_HARD for x in list(stats.drift)[-WIN_HARD:]) and now-last_hard_drift>30:
emit("alert_hard_drift", {"window_s": WIN_HARD}); last_hard_drift=now
# Flux jump
if len(stats.flux_hist_24h)>3600:
base_flux=statistics.median(stats.flux_hist_24h)
cur_flux=stats.mean(stats.flux)
if base_flux and cur_flux and (cur_flux-base_flux)/max(base_flux,1e-9)>=DFLUX_JUMP and now-last_flux_jump>300:
emit("alert_flux_jump", {"base": round(base_flux,4), "cur": round(cur_flux,4)}); last_flux_jump=now
# Thermal band
if t_base and stats.temp:
dev=abs(stats.temp[-1]-t_base)
if dev>DELTAT_BAND and now-last_temp_breach>60:
emit("alert_temp_band", {"tempC": stats.temp[-1], "base": t_base, "band": DELTAT_BAND}); last_temp_breach=now
# Energy efficiency
if j_base and stats.jtok:
j_med=statistics.median(stats.jtok); gain=(j_base-j_med)/j_base
if gain<EFF_SOFT and now-last_soft_eff>120:
emit("alert_soft_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)}); last_soft_eff=now
if j_base and stats.jtok_hard:
j_med=statistics.median(stats.jtok_hard); gain=(j_base-j_med)/j_base
if gain<EFF_HARD and now-last_hard_eff>60:
emit("alert_hard_energy", {"baseline": j_base, "median": round(j_med,6), "gain": round(gain,3)})
sp.Popen(args.rollback_cmd,shell=True)
emit("rollback_invoked", {"cmd": args.rollback_cmd}); last_hard_eff=now
emit("monitor_stop", {})
return 0
if __name__=="__main__":
sys.exit(main())