Spaces:
Paused
Paused
| """ | |
| drift.py — a SECOND substrate for the same Clutch, to show it is not grid-specific. | |
| Problem: an online predictor must forecast a streaming signal y_t. The signal is | |
| piecewise-linear with occasional REGIME CHANGES (the slope/offset jumps) — the | |
| time-series analogue of a wall dropping. Refitting a model every step is accurate but | |
| expensive; never refitting drifts badly after a regime change. The Clutch refits only | |
| when prediction error trips the gate. | |
| Substrate mapping (identical Clutch API, different callbacks): | |
| cheap_step(state) -> extrapolate the CACHED linear model (O(1)) | |
| expensive_plan(state) -> REFIT a linear model on the last `window` obs (O(window)) | |
| error_signal(state) -> normalized residual of the last prediction | |
| This is exactly "concept-drift-gated retraining": spend training compute only when the | |
| world has actually drifted. Compute is counted honestly as samples fit (refits x window). | |
| """ | |
| import numpy as np | |
| from clutch import Clutch, MagnitudeGate, AcceleratorGate | |
| def make_signal(rng, T=600, n_regimes=5, noise=0.4): | |
| """Piecewise-linear signal with abrupt regime changes.""" | |
| change_pts = sorted(rng.choice(range(40, T - 20), size=n_regimes - 1, replace=False)) | |
| bounds = [0] + list(change_pts) + [T] | |
| y = np.zeros(T) | |
| val = float(rng.uniform(-2, 2)) | |
| for i in range(len(bounds) - 1): | |
| lo, hi = bounds[i], bounds[i + 1] | |
| slope = float(rng.uniform(-0.15, 0.15)) | |
| for t in range(lo, hi): | |
| val += slope | |
| y[t] = val | |
| y = y + rng.normal(0, noise, size=T) | |
| return y, set(change_pts) | |
| class DriftPredictor: | |
| """Holds streaming state; exposes the three Clutch callbacks.""" | |
| def __init__(self, y, window=25): | |
| self.y = y | |
| self.window = window | |
| self.t = 0 | |
| self.a = 0.0 # cached slope | |
| self.b = float(y[0]) # cached offset | |
| self.origin = 0 # x-origin the cached model was fit at | |
| self.last_resid = 1.0 # normalized; start "surprised" so we plan first | |
| self.scale = np.std(y) + 1e-6 | |
| self.pred_log = np.full(len(y), np.nan) | |
| self.refit_samples = 0 | |
| def _predict_at(self, t): | |
| return self.a * (t - self.origin) + self.b | |
| # ---- Clutch callbacks ------------------------------------------------- | |
| def cheap_step(self, _): | |
| return self._predict_at(self.t) # extrapolate cache; never None | |
| def expensive_plan(self, _): | |
| lo = max(0, self.t - self.window) | |
| xs = np.arange(lo, self.t + 1) | |
| ys = self.y[lo:self.t + 1] | |
| self.refit_samples += len(xs) | |
| if len(xs) >= 2: | |
| a, b = np.polyfit(xs - lo, ys, 1) | |
| self.a, self.b, self.origin = float(a), float(b), lo | |
| pred = self._predict_at(self.t) | |
| insample = np.mean(np.abs(np.polyval([self.a, self.b], xs - lo) - ys)) if len(xs) else 0.0 | |
| calm = (insample / self.scale) < 0.6 # clean fit -> safe to latch to habit | |
| return pred, calm | |
| def error_signal(self, _): | |
| return self.last_resid | |
| def run_drift(y, change_pts, strategy, window=25, gate_params=None): | |
| """strategy in {ALWAYS_REFIT, NEVER_REFIT, CLUTCH_MAG, CLUTCH_ACC}.""" | |
| gp = gate_params or {} | |
| pred = DriftPredictor(y, window=window) | |
| clutch = None | |
| if strategy == "CLUTCH_MAG": | |
| clutch = Clutch(MagnitudeGate(gain=gp.get("gain", 4.0), | |
| leak=gp.get("leak", 0.5), | |
| trip=gp.get("trip_mag", 3.0))) | |
| elif strategy == "CLUTCH_ACC": | |
| clutch = Clutch(AcceleratorGate(trip=gp.get("trip_acc", 0.8), | |
| refractory=gp.get("refractory", 3))) | |
| refits = 0 | |
| trip_times = [] | |
| for t in range(len(y)): | |
| pred.t = t | |
| if strategy == "ALWAYS_REFIT": | |
| p, _ = pred.expensive_plan(None); refits += 1; mode = "COGNITIVE" | |
| elif strategy == "NEVER_REFIT": | |
| if t == 0: | |
| p, _ = pred.expensive_plan(None); refits += 1 | |
| else: | |
| p = pred.cheap_step(None) | |
| mode = "HABITUAL" | |
| else: | |
| before = clutch.stats.expensive_calls | |
| p, mode = clutch.step(None, pred.cheap_step, | |
| pred.expensive_plan, pred.error_signal) | |
| if clutch.stats.expensive_calls > before: | |
| refits += 1 | |
| if clutch.stats.trips and mode == "COGNITIVE": | |
| trip_times.append(t) | |
| pred.pred_log[t] = p | |
| # observe truth, update normalized residual for next step's gate | |
| pred.last_resid = abs(p - y[t]) / pred.scale | |
| mae = float(np.nanmean(np.abs(pred.pred_log - y))) | |
| return dict(strategy=strategy, mae=mae, refits=refits, | |
| refit_samples=pred.refit_samples, pred=pred.pred_log, | |
| trip_times=trip_times) | |
| def benchmark_drift(seeds, window=25, gate_params=None, T=600): | |
| strategies = ["ALWAYS_REFIT", "NEVER_REFIT", "CLUTCH_MAG", "CLUTCH_ACC"] | |
| agg = {s: {"mae": [], "refits": [], "samples": []} for s in strategies} | |
| for seed in seeds: | |
| rng = np.random.default_rng(seed) | |
| y, cps = make_signal(rng, T=T) | |
| for s in strategies: | |
| r = run_drift(y, cps, s, window=window, gate_params=gate_params) | |
| agg[s]["mae"].append(r["mae"]) | |
| agg[s]["refits"].append(r["refits"]) | |
| agg[s]["samples"].append(r["refit_samples"]) | |
| out = [] | |
| base = np.mean(agg["ALWAYS_REFIT"]["samples"]) | |
| for s in strategies: | |
| out.append(dict(strategy=s, | |
| mae=float(np.mean(agg[s]["mae"])), | |
| refits=float(np.mean(agg[s]["refits"])), | |
| samples=float(np.mean(agg[s]["samples"])), | |
| vs_ceiling=float(np.mean(agg[s]["samples"]) / base * 100.0))) | |
| return out | |