Spaces:
Running
Running
| """ | |
| change.py β 'Did something actually change?' engine for normal people. | |
| Same Clutch + MagnitudeGate as the compute demo, pointed at a human question: | |
| is this series just its usual wobble, or did something really shift, and when? | |
| Loop (identical structure to the drift substrate): | |
| cheap = extrapolate the cached linear model of your recent numbers | |
| costly = refit that model on the last `window` points | |
| error = |prediction - today's number| / typical wobble | |
| A gate trip == the model of "normal" broke == a real change. Trips close together | |
| are merged into one EVENT with a plain-language before/after summary. | |
| Honesty rules: warm-up trips are ignored, pure noise must yield "no change", | |
| slow steady trends are reported as trends (they never break a linear model, and | |
| saying otherwise would be lying). | |
| """ | |
| import re | |
| import numpy as np | |
| from clutch import Clutch, MagnitudeGate | |
| # ------------------------------------------------------------------ input | |
| def parse_numbers(text=None, file_obj=None): | |
| raw = "" | |
| if file_obj is not None: | |
| path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) | |
| if path: | |
| with open(path, "r", errors="ignore") as f: | |
| raw = f.read() | |
| elif text: | |
| raw = text | |
| if not raw.strip(): | |
| return None, "No numbers yet β paste some, or pick an example above." | |
| rows = [] | |
| for line in raw.strip().splitlines(): | |
| nums = re.findall(r"[-+]?\d*[\.,]?\d+(?:[eE][-+]?\d+)?", line.replace(",", ".")) | |
| if nums: | |
| rows.append([float(x) for x in nums]) | |
| if not rows: | |
| return None, "I couldn't find any numbers in that." | |
| ncol = max(len(r) for r in rows) | |
| if ncol == 1: | |
| y = np.array([r[0] for r in rows if len(r) == 1], float) | |
| else: | |
| y = np.array([r[-1] for r in rows if len(r) == ncol], float) | |
| y = y[np.isfinite(y)] | |
| if len(y) < 14: | |
| return None, f"Only {len(y)} values β I need at least 14 to tell change from noise." | |
| return y, None | |
| # ------------------------------------------------------------------ core | |
| def _wobble(y): | |
| """Typical day-to-day wobble: robust std (MAD) of first differences.""" | |
| d = np.diff(y) | |
| mad = np.median(np.abs(d - np.median(d))) | |
| return float(1.4826 * mad + 1e-9) | |
| class _Model: | |
| def __init__(self, y, window, scale): | |
| self.y, self.window, self.scale = y, window, scale | |
| self.t = 0 | |
| self.a, self.b, self.origin = 0.0, float(y[0]), 0 | |
| self.last_resid = 1.0 | |
| def predict(self, t): | |
| return self.a * (t - self.origin) + self.b | |
| def cheap(self, _): | |
| return self.predict(self.t) | |
| def costly(self, _): | |
| lo = max(0, self.t - self.window) | |
| xs = np.arange(lo, self.t + 1) | |
| ys = self.y[lo:self.t + 1] | |
| if len(xs) >= 2: | |
| a, b = np.polyfit(xs - lo, ys, 1) | |
| self.a, self.b, self.origin = float(a), float(b), lo | |
| insample = float(np.mean(np.abs(np.polyval([self.a, self.b], xs - lo) - ys))) if len(xs) else 0.0 | |
| return self.predict(self.t), (insample / self.scale) < 1.2 | |
| def err(self, _): | |
| return self.last_resid | |
| def detect(y, sensitivity=1.0, sigma_mode="iid"): | |
| """Run the clutch over y. Returns dict with trips, events, checks, window, scale.""" | |
| n = len(y) | |
| window = int(np.clip(n // 10, 7, 30)) | |
| scale = _wobble(y) # day-to-day wobble (for the human text) | |
| # iid: noise around a trend -> one-step noise is wobble/sqrt(2) | |
| # walk: random-walk-like (stock prices) -> the daily move IS the innovation | |
| sigma = scale / np.sqrt(2.0) if sigma_mode == "iid" else scale | |
| # sensitivity 0.5 (paranoid) .. 2.0 (relaxed): scales the trip threshold | |
| gate = MagnitudeGate(gain=2.0, leak=1.8, trip=8.0 * sensitivity) | |
| clutch = Clutch(gate) | |
| m = _Model(y, window, sigma) | |
| trips, checks = [], 0 | |
| for t in range(n): | |
| m.t = t | |
| before = clutch.stats.expensive_calls | |
| pred, _mode = clutch.step(None, m.cheap, m.costly, m.err) | |
| if clutch.stats.expensive_calls > before: | |
| checks += 1 | |
| if t > window: # ignore warm-up | |
| trips.append(t) | |
| m.last_resid = abs(pred - y[t]) / sigma | |
| # merge trips within `window` of each other into events | |
| events = [] | |
| for t in trips: | |
| if events and t - events[-1][-1] <= window: | |
| events[-1].append(t) | |
| else: | |
| events.append([t]) | |
| out_events = [] | |
| for grp in events: | |
| at0 = grp[0] | |
| last = min(grp[-1], at0 + 3 * window) | |
| lo = max(0, at0 - 2 * window) | |
| hi = min(n, last + 1 + window) | |
| # refine: best single step position within the local window | |
| best_c, best_sse = None, np.inf | |
| for c in range(lo + 3, hi - 2): | |
| l, r = y[lo:c], y[c:hi] | |
| sse = ((l - l.mean()) ** 2).sum() + ((r - r.mean()) ** 2).sum() | |
| if sse < best_sse: | |
| best_sse, best_c = sse, c | |
| cp = best_c if best_c is not None else at0 | |
| before_mean = float(np.mean(y[lo:cp])) | |
| after_mean = float(np.mean(y[cp:hi])) | |
| shift = after_mean - before_mean | |
| kind = "shift" if abs(shift) >= 2.0 * sigma else "blip" | |
| out_events.append(dict(at=cp, span=(grp[0], last), before=before_mean, | |
| after=after_mean, shift=shift, kind=kind)) | |
| # overall slow trend (fits the whole series; never trips the gate, honestly reported) | |
| xs = np.arange(n) | |
| slope = float(np.polyfit(xs, y, 1)[0]) | |
| trend_total = slope * n | |
| trendy = abs(trend_total) > 3.0 * scale and not any(e["kind"] == "shift" for e in out_events) | |
| return dict(events=out_events, trips=trips, checks=checks, window=window, | |
| scale=scale, slope=slope, trend_total=trend_total, trendy=trendy, n=n) | |
| # ------------------------------------------------------------------ language | |
| def verdict_text(y, res, unit="", period="day"): | |
| u = f" {unit}" if unit else "" | |
| n, scale = res["n"], res["scale"] | |
| shifts = [e for e in res["events"] if e["kind"] == "shift"] | |
| blips = [e for e in res["events"] if e["kind"] == "blip"] | |
| lines = [] | |
| if not shifts and not res["trendy"]: | |
| lines.append(f"## π Just noise β nothing actually changed") | |
| lines.append(f"Across all **{n} {period}s**, your numbers stayed inside their normal " | |
| f"wobble of about **Β±{scale:.2g}{u}** per {period}. " | |
| f"Ups and downs smaller than that are not signal β reacting to them is " | |
| f"reacting to dice rolls.") | |
| if blips: | |
| days = ", ".join(f"{period} {e['at']}" for e in blips) | |
| lines.append(f"There were brief odd readings around **{days}**, but the numbers " | |
| f"came straight back β one-off blips, not a real change.") | |
| elif res["trendy"]: | |
| direction = "upward" if res["slope"] > 0 else "downward" | |
| lines.append(f"## π No sudden change β but a steady {direction} drift") | |
| lines.append(f"Nothing jumped, but over the whole {n} {period}s your numbers drifted " | |
| f"**{res['trend_total']:+.3g}{u}** in total (about {res['slope']:+.3g}{u} " | |
| f"per {period}). Day-to-day comparisons will feel like noise (wobble " | |
| f"Β±{scale:.2g}{u}); the drift only shows over weeks. That slow kind of " | |
| f"change is exactly what people miss.") | |
| else: | |
| lines.append(f"## π Yes β something really changed") | |
| for e in shifts: | |
| direction = "up" if e["shift"] > 0 else "down" | |
| times = abs(e["shift"]) / scale | |
| lines.append(f"- Around **{period} {e['at']}**, your typical level moved " | |
| f"**{direction} from {e['before']:.3g}{u} to {e['after']:.3g}{u}** " | |
| f"({e['shift']:+.3g}{u} β about {times:.0f}Γ your normal {period}-to-" | |
| f"{period} wobble). That is a real shift, not luck.") | |
| if blips: | |
| lines.append(f"- ({len(blips)} brief blip(s) also detected that reversed on their " | |
| f"own β those you can ignore.)") | |
| saved = (1 - res["checks"] / n) * 100 | |
| lines.append("") | |
| lines.append(f"**Your attention, saved:** instead of judging every single {period} " | |
| f"({n} looks), checking on the **{res['checks']} {period}s flagged above** " | |
| f"would have caught everything that mattered β **{saved:.0f}% fewer looks, " | |
| f"zero missed changes** on this data.") | |
| lines.append("") | |
| lines.append(f"<small>How it works: a tiny model keeps predicting your next number from " | |
| f"the recent trend; only when reality breaks the prediction harder than your " | |
| f"normal wobble (Β±{scale:.2g}{u}) does it flag a change. This is a statistics " | |
| f"tool, not medical or financial advice.</small>") | |
| return "\n".join(lines) | |
| # ------------------------------------------------------------------ examples | |
| def example_series(name, seed=3): | |
| rng = np.random.default_rng(seed) | |
| if name.startswith("Weight"): | |
| n = 90 | |
| y = 84.0 + rng.normal(0, 0.45, n) | |
| y[52:] -= np.linspace(0, 0.11 * (n - 52), n - 52) # diet bites ~day 52 (~0.8 kg/wk) | |
| return np.round(y, 1), "kg", "day" | |
| if name.startswith("Sleep"): | |
| n = 60 | |
| y = 7.1 + rng.normal(0, 0.55, n) # pure noise: nothing changed | |
| return np.round(y, 1), "h", "night" | |
| if name.startswith("Electricity"): | |
| n = 52 | |
| y = 62 + rng.normal(0, 4.5, n) | |
| y[30:] += 21 # heater breaks / tariff jumps week 30 | |
| return np.round(y, 1), "β¬", "week" | |
| # "Spending β slow creep" | |
| n = 80 | |
| y = 31 + np.linspace(0, 13.0, n) + rng.normal(0, 2.2, n) # lifestyle creep, no jump | |
| return np.round(y, 2), "β¬", "day" | |