Clutch2 / drift.py
Aluode's picture
Upload 5 files
a7bdf50 verified
Raw
History Blame Contribute Delete
5.89 kB
"""
drift.py — a SECOND substrate for the same Clutch, to show it is not grid-specific.
Problem: an online predictor must forecast a streaming signal y_t. The signal is
piecewise-linear with occasional REGIME CHANGES (the slope/offset jumps) — the
time-series analogue of a wall dropping. Refitting a model every step is accurate but
expensive; never refitting drifts badly after a regime change. The Clutch refits only
when prediction error trips the gate.
Substrate mapping (identical Clutch API, different callbacks):
cheap_step(state) -> extrapolate the CACHED linear model (O(1))
expensive_plan(state) -> REFIT a linear model on the last `window` obs (O(window))
error_signal(state) -> normalized residual of the last prediction
This is exactly "concept-drift-gated retraining": spend training compute only when the
world has actually drifted. Compute is counted honestly as samples fit (refits x window).
"""
import numpy as np
from clutch import Clutch, MagnitudeGate, AcceleratorGate
def make_signal(rng, T=600, n_regimes=5, noise=0.4):
"""Piecewise-linear signal with abrupt regime changes."""
change_pts = sorted(rng.choice(range(40, T - 20), size=n_regimes - 1, replace=False))
bounds = [0] + list(change_pts) + [T]
y = np.zeros(T)
val = float(rng.uniform(-2, 2))
for i in range(len(bounds) - 1):
lo, hi = bounds[i], bounds[i + 1]
slope = float(rng.uniform(-0.15, 0.15))
for t in range(lo, hi):
val += slope
y[t] = val
y = y + rng.normal(0, noise, size=T)
return y, set(change_pts)
class DriftPredictor:
"""Holds streaming state; exposes the three Clutch callbacks."""
def __init__(self, y, window=25):
self.y = y
self.window = window
self.t = 0
self.a = 0.0 # cached slope
self.b = float(y[0]) # cached offset
self.origin = 0 # x-origin the cached model was fit at
self.last_resid = 1.0 # normalized; start "surprised" so we plan first
self.scale = np.std(y) + 1e-6
self.pred_log = np.full(len(y), np.nan)
self.refit_samples = 0
def _predict_at(self, t):
return self.a * (t - self.origin) + self.b
# ---- Clutch callbacks -------------------------------------------------
def cheap_step(self, _):
return self._predict_at(self.t) # extrapolate cache; never None
def expensive_plan(self, _):
lo = max(0, self.t - self.window)
xs = np.arange(lo, self.t + 1)
ys = self.y[lo:self.t + 1]
self.refit_samples += len(xs)
if len(xs) >= 2:
a, b = np.polyfit(xs - lo, ys, 1)
self.a, self.b, self.origin = float(a), float(b), lo
pred = self._predict_at(self.t)
insample = np.mean(np.abs(np.polyval([self.a, self.b], xs - lo) - ys)) if len(xs) else 0.0
calm = (insample / self.scale) < 0.6 # clean fit -> safe to latch to habit
return pred, calm
def error_signal(self, _):
return self.last_resid
def run_drift(y, change_pts, strategy, window=25, gate_params=None):
"""strategy in {ALWAYS_REFIT, NEVER_REFIT, CLUTCH_MAG, CLUTCH_ACC}."""
gp = gate_params or {}
pred = DriftPredictor(y, window=window)
clutch = None
if strategy == "CLUTCH_MAG":
clutch = Clutch(MagnitudeGate(gain=gp.get("gain", 4.0),
leak=gp.get("leak", 0.5),
trip=gp.get("trip_mag", 3.0)))
elif strategy == "CLUTCH_ACC":
clutch = Clutch(AcceleratorGate(trip=gp.get("trip_acc", 0.8),
refractory=gp.get("refractory", 3)))
refits = 0
trip_times = []
for t in range(len(y)):
pred.t = t
if strategy == "ALWAYS_REFIT":
p, _ = pred.expensive_plan(None); refits += 1; mode = "COGNITIVE"
elif strategy == "NEVER_REFIT":
if t == 0:
p, _ = pred.expensive_plan(None); refits += 1
else:
p = pred.cheap_step(None)
mode = "HABITUAL"
else:
before = clutch.stats.expensive_calls
p, mode = clutch.step(None, pred.cheap_step,
pred.expensive_plan, pred.error_signal)
if clutch.stats.expensive_calls > before:
refits += 1
if clutch.stats.trips and mode == "COGNITIVE":
trip_times.append(t)
pred.pred_log[t] = p
# observe truth, update normalized residual for next step's gate
pred.last_resid = abs(p - y[t]) / pred.scale
mae = float(np.nanmean(np.abs(pred.pred_log - y)))
return dict(strategy=strategy, mae=mae, refits=refits,
refit_samples=pred.refit_samples, pred=pred.pred_log,
trip_times=trip_times)
def benchmark_drift(seeds, window=25, gate_params=None, T=600):
strategies = ["ALWAYS_REFIT", "NEVER_REFIT", "CLUTCH_MAG", "CLUTCH_ACC"]
agg = {s: {"mae": [], "refits": [], "samples": []} for s in strategies}
for seed in seeds:
rng = np.random.default_rng(seed)
y, cps = make_signal(rng, T=T)
for s in strategies:
r = run_drift(y, cps, s, window=window, gate_params=gate_params)
agg[s]["mae"].append(r["mae"])
agg[s]["refits"].append(r["refits"])
agg[s]["samples"].append(r["refit_samples"])
out = []
base = np.mean(agg["ALWAYS_REFIT"]["samples"])
for s in strategies:
out.append(dict(strategy=s,
mae=float(np.mean(agg[s]["mae"])),
refits=float(np.mean(agg[s]["refits"])),
samples=float(np.mean(agg[s]["samples"])),
vs_ceiling=float(np.mean(agg[s]["samples"]) / base * 100.0)))
return out