| """ |
| drift.py — a SECOND substrate for the same Clutch, to show it is not grid-specific. |
| |
| Problem: an online predictor must forecast a streaming signal y_t. The signal is |
| piecewise-linear with occasional REGIME CHANGES (the slope/offset jumps) — the |
| time-series analogue of a wall dropping. Refitting a model every step is accurate but |
| expensive; never refitting drifts badly after a regime change. The Clutch refits only |
| when prediction error trips the gate. |
| |
| Substrate mapping (identical Clutch API, different callbacks): |
| cheap_step(state) -> extrapolate the CACHED linear model (O(1)) |
| expensive_plan(state) -> REFIT a linear model on the last `window` obs (O(window)) |
| error_signal(state) -> normalized residual of the last prediction |
| |
| This is exactly "concept-drift-gated retraining": spend training compute only when the |
| world has actually drifted. Compute is counted honestly as samples fit (refits x window). |
| """ |
|
|
| import numpy as np |
| from clutch import Clutch, MagnitudeGate, AcceleratorGate |
|
|
|
|
| def make_signal(rng, T=600, n_regimes=5, noise=0.4): |
| """Piecewise-linear signal with abrupt regime changes.""" |
| change_pts = sorted(rng.choice(range(40, T - 20), size=n_regimes - 1, replace=False)) |
| bounds = [0] + list(change_pts) + [T] |
| y = np.zeros(T) |
| val = float(rng.uniform(-2, 2)) |
| for i in range(len(bounds) - 1): |
| lo, hi = bounds[i], bounds[i + 1] |
| slope = float(rng.uniform(-0.15, 0.15)) |
| for t in range(lo, hi): |
| val += slope |
| y[t] = val |
| y = y + rng.normal(0, noise, size=T) |
| return y, set(change_pts) |
|
|
|
|
| class DriftPredictor: |
| """Holds streaming state; exposes the three Clutch callbacks.""" |
| def __init__(self, y, window=25): |
| self.y = y |
| self.window = window |
| self.t = 0 |
| self.a = 0.0 |
| self.b = float(y[0]) |
| self.origin = 0 |
| self.last_resid = 1.0 |
| self.scale = np.std(y) + 1e-6 |
| self.pred_log = np.full(len(y), np.nan) |
| self.refit_samples = 0 |
|
|
| def _predict_at(self, t): |
| return self.a * (t - self.origin) + self.b |
|
|
| |
| def cheap_step(self, _): |
| return self._predict_at(self.t) |
|
|
| def expensive_plan(self, _): |
| lo = max(0, self.t - self.window) |
| xs = np.arange(lo, self.t + 1) |
| ys = self.y[lo:self.t + 1] |
| self.refit_samples += len(xs) |
| if len(xs) >= 2: |
| a, b = np.polyfit(xs - lo, ys, 1) |
| self.a, self.b, self.origin = float(a), float(b), lo |
| pred = self._predict_at(self.t) |
| insample = np.mean(np.abs(np.polyval([self.a, self.b], xs - lo) - ys)) if len(xs) else 0.0 |
| calm = (insample / self.scale) < 0.6 |
| return pred, calm |
|
|
| def error_signal(self, _): |
| return self.last_resid |
|
|
|
|
| def run_drift(y, change_pts, strategy, window=25, gate_params=None): |
| """strategy in {ALWAYS_REFIT, NEVER_REFIT, CLUTCH_MAG, CLUTCH_ACC}.""" |
| gp = gate_params or {} |
| pred = DriftPredictor(y, window=window) |
| clutch = None |
| if strategy == "CLUTCH_MAG": |
| clutch = Clutch(MagnitudeGate(gain=gp.get("gain", 4.0), |
| leak=gp.get("leak", 0.5), |
| trip=gp.get("trip_mag", 3.0))) |
| elif strategy == "CLUTCH_ACC": |
| clutch = Clutch(AcceleratorGate(trip=gp.get("trip_acc", 0.8), |
| refractory=gp.get("refractory", 3))) |
|
|
| refits = 0 |
| trip_times = [] |
| for t in range(len(y)): |
| pred.t = t |
| if strategy == "ALWAYS_REFIT": |
| p, _ = pred.expensive_plan(None); refits += 1; mode = "COGNITIVE" |
| elif strategy == "NEVER_REFIT": |
| if t == 0: |
| p, _ = pred.expensive_plan(None); refits += 1 |
| else: |
| p = pred.cheap_step(None) |
| mode = "HABITUAL" |
| else: |
| before = clutch.stats.expensive_calls |
| p, mode = clutch.step(None, pred.cheap_step, |
| pred.expensive_plan, pred.error_signal) |
| if clutch.stats.expensive_calls > before: |
| refits += 1 |
| if clutch.stats.trips and mode == "COGNITIVE": |
| trip_times.append(t) |
| pred.pred_log[t] = p |
| |
| pred.last_resid = abs(p - y[t]) / pred.scale |
|
|
| mae = float(np.nanmean(np.abs(pred.pred_log - y))) |
| return dict(strategy=strategy, mae=mae, refits=refits, |
| refit_samples=pred.refit_samples, pred=pred.pred_log, |
| trip_times=trip_times) |
|
|
|
|
| def benchmark_drift(seeds, window=25, gate_params=None, T=600): |
| strategies = ["ALWAYS_REFIT", "NEVER_REFIT", "CLUTCH_MAG", "CLUTCH_ACC"] |
| agg = {s: {"mae": [], "refits": [], "samples": []} for s in strategies} |
| for seed in seeds: |
| rng = np.random.default_rng(seed) |
| y, cps = make_signal(rng, T=T) |
| for s in strategies: |
| r = run_drift(y, cps, s, window=window, gate_params=gate_params) |
| agg[s]["mae"].append(r["mae"]) |
| agg[s]["refits"].append(r["refits"]) |
| agg[s]["samples"].append(r["refit_samples"]) |
| out = [] |
| base = np.mean(agg["ALWAYS_REFIT"]["samples"]) |
| for s in strategies: |
| out.append(dict(strategy=s, |
| mae=float(np.mean(agg[s]["mae"])), |
| refits=float(np.mean(agg[s]["refits"])), |
| samples=float(np.mean(agg[s]["samples"])), |
| vs_ceiling=float(np.mean(agg[s]["samples"]) / base * 100.0))) |
| return out |
|
|