| """ |
| Bayesian changepoint detection for compulsive usage patterns. |
| |
| Inspired by the Recovery Viability Theory (Kepner, White, O'Neill): |
| - Cusp catastrophe model: sudden transitions in posting behavior |
| - Critical slowing down: variance/autocorrelation increase before transitions |
| - Logit-bounded state space: natural [0,1] bounds for behavioral metrics |
| - Recursive parameter updates: behavior shifts with each "engagement cycle" |
| |
| Detects: |
| 1. Changepoints in posting frequency (sudden acceleration/deceleration) |
| 2. Changepoints in virulence (tone shifts) |
| 3. Critical slowing down signatures (variance, autocorrelation approaching 1) |
| 4. Cusp catastrophe dynamics (bistable regime detection) |
| """ |
| import logging |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy import stats, signal |
| from scipy.special import expit, logit |
|
|
| from .config import SESSION_GAP_MINUTES |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class Changepoint: |
| """A detected changepoint in the time series.""" |
| date: str |
| metric: str |
| direction: str |
| magnitude: float |
| confidence: float |
| before_mean: float |
| after_mean: float |
|
|
|
|
| @dataclass |
| class EarlyWarningSignal: |
| """Critical slowing down signature before a behavioral transition.""" |
| window_end: str |
| metric: str |
| variance_trend: float |
| autocorrelation_trend: float |
| signal_strength: float |
|
|
|
|
| @dataclass |
| class ChangepointProfile: |
| """Complete changepoint analysis for an account.""" |
| senator_name: str = "" |
| changepoints: list = field(default_factory=list) |
| early_warnings: list = field(default_factory=list) |
| regime_periods: list = field(default_factory=list) |
| cusp_events: list = field(default_factory=list) |
| posting_rate_ts: Optional[pd.Series] = field(default=None, repr=False) |
|
|
| def to_dict(self) -> dict: |
| d = {k: v for k, v in self.__dict__.items() if k != "posting_rate_ts"} |
| d["n_changepoints"] = len(self.changepoints) |
| d["n_early_warnings"] = len(self.early_warnings) |
| return d |
|
|
|
|
| class BayesianChangepointDetector: |
| """ |
| Bayesian online changepoint detection (Adams & MacKay 2007) |
| adapted for social media posting patterns. |
| |
| The core idea: at each time step, compute the posterior probability |
| of a changepoint having occurred. When this probability exceeds a |
| threshold, we've found a regime change. |
| """ |
|
|
| def __init__(self, hazard_rate: float = 1 / 250): |
| """ |
| Args: |
| hazard_rate: Prior probability of changepoint at any step. |
| 1/250 means we expect a change roughly every 250 time steps. |
| """ |
| self.hazard_rate = hazard_rate |
|
|
| def detect_changepoints( |
| self, |
| df: pd.DataFrame, |
| resolution: str = "D", |
| min_confidence: float = 0.5, |
| ) -> ChangepointProfile: |
| """ |
| Run full changepoint analysis on a tweet dataset. |
| |
| Args: |
| df: DataFrame with created_at and optionally virulence scores |
| resolution: Time aggregation ('D'=daily, 'W'=weekly) |
| min_confidence: Minimum posterior probability to report a changepoint |
| """ |
| profile = ChangepointProfile() |
|
|
| if "created_at" not in df.columns or df.empty: |
| return profile |
|
|
| df = df.sort_values("created_at").copy() |
|
|
| |
| rate_ts = self._compute_posting_rate(df, resolution) |
| if len(rate_ts) > 10: |
| profile.posting_rate_ts = rate_ts |
| rate_cps = self._bayesian_online_cpd( |
| rate_ts.values, metric_name="posting_rate", |
| dates=rate_ts.index, min_confidence=min_confidence, |
| ) |
| profile.changepoints.extend(rate_cps) |
|
|
| |
| if "composite_virulence" in df.columns: |
| vir_ts = df.set_index("created_at")["composite_virulence"].resample(resolution).mean().dropna() |
| if len(vir_ts) > 10: |
| vir_cps = self._bayesian_online_cpd( |
| vir_ts.values, metric_name="virulence", |
| dates=vir_ts.index, min_confidence=min_confidence, |
| ) |
| profile.changepoints.extend(vir_cps) |
|
|
| |
| night_ts = self._compute_night_share(df, resolution) |
| if len(night_ts) > 10: |
| night_cps = self._bayesian_online_cpd( |
| night_ts.values, metric_name="night_share", |
| dates=night_ts.index, min_confidence=min_confidence, |
| ) |
| profile.changepoints.extend(night_cps) |
|
|
| |
| if len(rate_ts) > 30: |
| ews = self._detect_critical_slowing_down(rate_ts, metric="posting_rate") |
| profile.early_warnings.extend(ews) |
|
|
| |
| if profile.changepoints: |
| profile.regime_periods = self._identify_regimes( |
| rate_ts, profile.changepoints |
| ) |
|
|
| |
| if len(rate_ts) > 30: |
| profile.cusp_events = self._detect_cusp_events(rate_ts) |
|
|
| return profile |
|
|
| def _compute_posting_rate(self, df: pd.DataFrame, resolution: str) -> pd.Series: |
| """Compute posting rate time series.""" |
| ts = df.set_index("created_at") |
| counts = ts.resample(resolution).size() |
| |
| full_range = pd.date_range(counts.index.min(), counts.index.max(), freq=resolution) |
| counts = counts.reindex(full_range, fill_value=0) |
| return counts |
|
|
| def _compute_night_share(self, df: pd.DataFrame, resolution: str) -> pd.Series: |
| """Compute night posting share (00:00-06:00 UTC) per period.""" |
| df = df.copy() |
| df["is_night"] = df["created_at"].dt.hour.between(0, 5) |
| ts = df.set_index("created_at") |
| night_share = ts["is_night"].resample(resolution).mean().dropna() |
| return night_share |
|
|
| def _bayesian_online_cpd( |
| self, |
| data: np.ndarray, |
| metric_name: str, |
| dates: pd.DatetimeIndex, |
| min_confidence: float = 0.5, |
| ) -> list[Changepoint]: |
| """ |
| Bayesian Online Changepoint Detection (Adams & MacKay 2007). |
| |
| Uses a conjugate Normal-Gamma prior for Gaussian observations. |
| At each step, maintains a run-length distribution P(r_t | x_{1:t}). |
| """ |
| n = len(data) |
| if n < 5: |
| return [] |
|
|
| |
| mu0 = np.mean(data) |
| kappa0 = 1.0 |
| alpha0 = 1.0 |
| beta0 = np.var(data) if np.var(data) > 0 else 1.0 |
|
|
| |
| |
| max_run = n + 1 |
| R = np.zeros(max_run) |
| R[0] = 1.0 |
|
|
| |
| mu = np.full(max_run, mu0) |
| kappa = np.full(max_run, kappa0) |
| alpha = np.full(max_run, alpha0) |
| beta = np.full(max_run, beta0) |
|
|
| changepoint_probs = np.zeros(n) |
|
|
| for t in range(n): |
| x = data[t] |
|
|
| |
| pred_var = beta * (kappa + 1) / (alpha * kappa) |
| pred_var = np.maximum(pred_var, 1e-10) |
| pred_probs = stats.t.pdf(x, 2 * alpha, loc=mu, scale=np.sqrt(pred_var)) |
|
|
| |
| R_new = np.zeros(max_run) |
| R_new[1:t + 2] = R[:t + 1] * pred_probs[:t + 1] * (1 - self.hazard_rate) |
|
|
| |
| R_new[0] = np.sum(R[:t + 1] * pred_probs[:t + 1] * self.hazard_rate) |
|
|
| |
| total = R_new.sum() |
| if total > 0: |
| R_new /= total |
| else: |
| R_new[0] = 1.0 |
|
|
| R = R_new |
| changepoint_probs[t] = R[0] |
|
|
| |
| mu_new = (kappa * mu + x) / (kappa + 1) |
| kappa_new = kappa + 1 |
| alpha_new = alpha + 0.5 |
| beta_new = beta + kappa * (x - mu) ** 2 / (2 * (kappa + 1)) |
|
|
| |
| mu[1:] = mu_new[:-1] |
| kappa[1:] = kappa_new[:-1] |
| alpha[1:] = alpha_new[:-1] |
| beta[1:] = beta_new[:-1] |
|
|
| |
| mu[0] = mu0 |
| kappa[0] = kappa0 |
| alpha[0] = alpha0 |
| beta[0] = beta0 |
|
|
| |
| changepoints = [] |
| |
| peaks, properties = signal.find_peaks( |
| changepoint_probs, height=min_confidence, distance=7 |
| ) |
|
|
| for peak in peaks: |
| if peak < 3 or peak >= n - 3: |
| continue |
|
|
| before = data[max(0, peak - 14):peak] |
| after = data[peak:min(n, peak + 14)] |
|
|
| if len(before) < 3 or len(after) < 3: |
| continue |
|
|
| before_mean = np.mean(before) |
| after_mean = np.mean(after) |
| magnitude = abs(after_mean - before_mean) / (np.std(data) + 1e-10) |
| direction = "increase" if after_mean > before_mean else "decrease" |
|
|
| changepoints.append(Changepoint( |
| date=str(dates[peak].date()) if peak < len(dates) else "", |
| metric=metric_name, |
| direction=direction, |
| magnitude=round(float(magnitude), 3), |
| confidence=round(float(changepoint_probs[peak]), 3), |
| before_mean=round(float(before_mean), 3), |
| after_mean=round(float(after_mean), 3), |
| )) |
|
|
| return changepoints |
|
|
| def _detect_critical_slowing_down( |
| self, |
| ts: pd.Series, |
| metric: str, |
| window_size: int = 30, |
| step: int = 7, |
| ) -> list[EarlyWarningSignal]: |
| """ |
| Detect critical slowing down signatures: |
| - Variance increases (Var ~ sigma^2 / (2|lambda|), diverges as lambda -> 0) |
| - Lag-1 autocorrelation approaches 1 (rho = exp(lambda * dt) -> 1) |
| |
| From Kepner et al.: these are derived markers of approaching |
| a cusp bifurcation boundary. |
| """ |
| data = ts.values |
| n = len(data) |
| signals = [] |
|
|
| if n < window_size + step: |
| return signals |
|
|
| variances = [] |
| autocorrs = [] |
| window_ends = [] |
|
|
| for start in range(0, n - window_size, step): |
| window = data[start:start + window_size] |
| variances.append(np.var(window)) |
|
|
| |
| if np.std(window) > 0: |
| ac = np.corrcoef(window[:-1], window[1:])[0, 1] |
| autocorrs.append(ac if np.isfinite(ac) else 0) |
| else: |
| autocorrs.append(0) |
|
|
| idx = min(start + window_size, len(ts.index) - 1) |
| window_ends.append(ts.index[idx]) |
|
|
| if len(variances) < 3: |
| return signals |
|
|
| variances = np.array(variances) |
| autocorrs = np.array(autocorrs) |
|
|
| |
| var_trend = stats.kendalltau(range(len(variances)), variances) |
| ac_trend = stats.kendalltau(range(len(autocorrs)), autocorrs) |
|
|
| |
| combined = ( |
| max(0, var_trend.statistic) * 0.5 |
| + max(0, ac_trend.statistic) * 0.5 |
| ) |
|
|
| if combined > 0.2: |
| signals.append(EarlyWarningSignal( |
| window_end=str(window_ends[-1].date()) if window_ends else "", |
| metric=metric, |
| variance_trend=round(float(var_trend.statistic), 3), |
| autocorrelation_trend=round(float(ac_trend.statistic), 3), |
| signal_strength=round(float(combined), 3), |
| )) |
|
|
| return signals |
|
|
| def _detect_cusp_events( |
| self, ts: pd.Series, z_threshold: float = 3.0 |
| ) -> list[dict]: |
| """ |
| Detect cusp catastrophe events: sudden, discontinuous jumps |
| in posting rate (like a saddle-node bifurcation). |
| |
| From Kepner et al.: dz3 = (-z3^3 + b*z3 + a) dt + sigma dW |
| When a crosses a_cusp, the system jumps discontinuously. |
| """ |
| data = ts.values |
| diffs = np.diff(data) |
|
|
| if len(diffs) < 5 or np.std(diffs) == 0: |
| return [] |
|
|
| z_scores = (diffs - np.mean(diffs)) / np.std(diffs) |
| events = [] |
|
|
| for i, z in enumerate(z_scores): |
| if abs(z) > z_threshold: |
| events.append({ |
| "date": str(ts.index[i + 1].date()) if i + 1 < len(ts.index) else "", |
| "direction": "surge" if z > 0 else "collapse", |
| "z_score": round(float(z), 2), |
| "before": round(float(data[i]), 1), |
| "after": round(float(data[i + 1]), 1), |
| }) |
|
|
| return sorted(events, key=lambda x: abs(x["z_score"]), reverse=True)[:10] |
|
|
| def _identify_regimes( |
| self, ts: pd.Series, changepoints: list[Changepoint] |
| ) -> list[dict]: |
| """Identify behavioral regimes between changepoints.""" |
| if not changepoints or ts is None or ts.empty: |
| return [] |
|
|
| |
| cp_dates = sorted(set( |
| cp.date for cp in changepoints if cp.metric == "posting_rate" |
| )) |
|
|
| if not cp_dates: |
| return [] |
|
|
| regimes = [] |
| boundaries = [str(ts.index[0].date())] + cp_dates + [str(ts.index[-1].date())] |
|
|
| for i in range(len(boundaries) - 1): |
| start = pd.Timestamp(boundaries[i]) |
| end = pd.Timestamp(boundaries[i + 1]) |
| mask = (ts.index >= start) & (ts.index <= end) |
| segment = ts[mask] |
|
|
| if len(segment) < 3: |
| continue |
|
|
| regimes.append({ |
| "start": boundaries[i], |
| "end": boundaries[i + 1], |
| "mean_rate": round(float(segment.mean()), 1), |
| "std_rate": round(float(segment.std()), 1), |
| "days": len(segment), |
| }) |
|
|
| return regimes |
|
|