xbox-compulsion-classifier / xbox /changepoint.py
jimnoneill's picture
Upload folder using huggingface_hub
178b774 verified
"""
Bayesian changepoint detection for compulsive usage patterns.
Inspired by the Recovery Viability Theory (Kepner, White, O'Neill):
- Cusp catastrophe model: sudden transitions in posting behavior
- Critical slowing down: variance/autocorrelation increase before transitions
- Logit-bounded state space: natural [0,1] bounds for behavioral metrics
- Recursive parameter updates: behavior shifts with each "engagement cycle"
Detects:
1. Changepoints in posting frequency (sudden acceleration/deceleration)
2. Changepoints in virulence (tone shifts)
3. Critical slowing down signatures (variance, autocorrelation approaching 1)
4. Cusp catastrophe dynamics (bistable regime detection)
"""
import logging
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
from scipy import stats, signal
from scipy.special import expit, logit # sigmoid / logit transform
from .config import SESSION_GAP_MINUTES
log = logging.getLogger(__name__)
@dataclass
class Changepoint:
"""A detected changepoint in the time series."""
date: str
metric: str
direction: str # "increase" or "decrease"
magnitude: float # effect size
confidence: float # posterior probability
before_mean: float
after_mean: float
@dataclass
class EarlyWarningSignal:
"""Critical slowing down signature before a behavioral transition."""
window_end: str
metric: str
variance_trend: float # slope of variance over time
autocorrelation_trend: float # slope of lag-1 autocorrelation
signal_strength: float # combined indicator (0-1)
@dataclass
class ChangepointProfile:
"""Complete changepoint analysis for an account."""
senator_name: str = ""
changepoints: list = field(default_factory=list)
early_warnings: list = field(default_factory=list)
regime_periods: list = field(default_factory=list)
cusp_events: list = field(default_factory=list)
posting_rate_ts: Optional[pd.Series] = field(default=None, repr=False)
def to_dict(self) -> dict:
d = {k: v for k, v in self.__dict__.items() if k != "posting_rate_ts"}
d["n_changepoints"] = len(self.changepoints)
d["n_early_warnings"] = len(self.early_warnings)
return d
class BayesianChangepointDetector:
"""
Bayesian online changepoint detection (Adams & MacKay 2007)
adapted for social media posting patterns.
The core idea: at each time step, compute the posterior probability
of a changepoint having occurred. When this probability exceeds a
threshold, we've found a regime change.
"""
def __init__(self, hazard_rate: float = 1 / 250):
"""
Args:
hazard_rate: Prior probability of changepoint at any step.
1/250 means we expect a change roughly every 250 time steps.
"""
self.hazard_rate = hazard_rate
def detect_changepoints(
self,
df: pd.DataFrame,
resolution: str = "D",
min_confidence: float = 0.5,
) -> ChangepointProfile:
"""
Run full changepoint analysis on a tweet dataset.
Args:
df: DataFrame with created_at and optionally virulence scores
resolution: Time aggregation ('D'=daily, 'W'=weekly)
min_confidence: Minimum posterior probability to report a changepoint
"""
profile = ChangepointProfile()
if "created_at" not in df.columns or df.empty:
return profile
df = df.sort_values("created_at").copy()
# ── Posting rate changepoints ─────────────────
rate_ts = self._compute_posting_rate(df, resolution)
if len(rate_ts) > 10:
profile.posting_rate_ts = rate_ts
rate_cps = self._bayesian_online_cpd(
rate_ts.values, metric_name="posting_rate",
dates=rate_ts.index, min_confidence=min_confidence,
)
profile.changepoints.extend(rate_cps)
# ── Virulence changepoints ────────────────────
if "composite_virulence" in df.columns:
vir_ts = df.set_index("created_at")["composite_virulence"].resample(resolution).mean().dropna()
if len(vir_ts) > 10:
vir_cps = self._bayesian_online_cpd(
vir_ts.values, metric_name="virulence",
dates=vir_ts.index, min_confidence=min_confidence,
)
profile.changepoints.extend(vir_cps)
# ── Night activity changepoints ───────────────
night_ts = self._compute_night_share(df, resolution)
if len(night_ts) > 10:
night_cps = self._bayesian_online_cpd(
night_ts.values, metric_name="night_share",
dates=night_ts.index, min_confidence=min_confidence,
)
profile.changepoints.extend(night_cps)
# ── Early warning signals (critical slowing down) ──
if len(rate_ts) > 30:
ews = self._detect_critical_slowing_down(rate_ts, metric="posting_rate")
profile.early_warnings.extend(ews)
# ── Regime identification ─────────────────────
if profile.changepoints:
profile.regime_periods = self._identify_regimes(
rate_ts, profile.changepoints
)
# ── Cusp catastrophe events ───────────────────
if len(rate_ts) > 30:
profile.cusp_events = self._detect_cusp_events(rate_ts)
return profile
def _compute_posting_rate(self, df: pd.DataFrame, resolution: str) -> pd.Series:
"""Compute posting rate time series."""
ts = df.set_index("created_at")
counts = ts.resample(resolution).size()
# Fill missing periods with 0
full_range = pd.date_range(counts.index.min(), counts.index.max(), freq=resolution)
counts = counts.reindex(full_range, fill_value=0)
return counts
def _compute_night_share(self, df: pd.DataFrame, resolution: str) -> pd.Series:
"""Compute night posting share (00:00-06:00 UTC) per period."""
df = df.copy()
df["is_night"] = df["created_at"].dt.hour.between(0, 5)
ts = df.set_index("created_at")
night_share = ts["is_night"].resample(resolution).mean().dropna()
return night_share
def _bayesian_online_cpd(
self,
data: np.ndarray,
metric_name: str,
dates: pd.DatetimeIndex,
min_confidence: float = 0.5,
) -> list[Changepoint]:
"""
Bayesian Online Changepoint Detection (Adams & MacKay 2007).
Uses a conjugate Normal-Gamma prior for Gaussian observations.
At each step, maintains a run-length distribution P(r_t | x_{1:t}).
"""
n = len(data)
if n < 5:
return []
# Hyperparameters for Normal-Gamma prior
mu0 = np.mean(data)
kappa0 = 1.0
alpha0 = 1.0
beta0 = np.var(data) if np.var(data) > 0 else 1.0
# Run-length probabilities: R[t, r] = P(r_t = r | x_{1:t})
# For memory efficiency, only track the current distribution
max_run = n + 1
R = np.zeros(max_run)
R[0] = 1.0 # Initial run length is 0
# Sufficient statistics for each run length
mu = np.full(max_run, mu0)
kappa = np.full(max_run, kappa0)
alpha = np.full(max_run, alpha0)
beta = np.full(max_run, beta0)
changepoint_probs = np.zeros(n)
for t in range(n):
x = data[t]
# Predictive probability under each run length (Student-t)
pred_var = beta * (kappa + 1) / (alpha * kappa)
pred_var = np.maximum(pred_var, 1e-10)
pred_probs = stats.t.pdf(x, 2 * alpha, loc=mu, scale=np.sqrt(pred_var))
# Growth: P(r_t = r_{t-1} + 1 | ...)
R_new = np.zeros(max_run)
R_new[1:t + 2] = R[:t + 1] * pred_probs[:t + 1] * (1 - self.hazard_rate)
# Changepoint: P(r_t = 0 | ...)
R_new[0] = np.sum(R[:t + 1] * pred_probs[:t + 1] * self.hazard_rate)
# Normalize
total = R_new.sum()
if total > 0:
R_new /= total
else:
R_new[0] = 1.0
R = R_new
changepoint_probs[t] = R[0]
# Update sufficient statistics
mu_new = (kappa * mu + x) / (kappa + 1)
kappa_new = kappa + 1
alpha_new = alpha + 0.5
beta_new = beta + kappa * (x - mu) ** 2 / (2 * (kappa + 1))
# Shift for growth
mu[1:] = mu_new[:-1]
kappa[1:] = kappa_new[:-1]
alpha[1:] = alpha_new[:-1]
beta[1:] = beta_new[:-1]
# Reset for changepoint
mu[0] = mu0
kappa[0] = kappa0
alpha[0] = alpha0
beta[0] = beta0
# Extract changepoints above threshold
changepoints = []
# Find peaks in changepoint probability
peaks, properties = signal.find_peaks(
changepoint_probs, height=min_confidence, distance=7
)
for peak in peaks:
if peak < 3 or peak >= n - 3:
continue
before = data[max(0, peak - 14):peak]
after = data[peak:min(n, peak + 14)]
if len(before) < 3 or len(after) < 3:
continue
before_mean = np.mean(before)
after_mean = np.mean(after)
magnitude = abs(after_mean - before_mean) / (np.std(data) + 1e-10)
direction = "increase" if after_mean > before_mean else "decrease"
changepoints.append(Changepoint(
date=str(dates[peak].date()) if peak < len(dates) else "",
metric=metric_name,
direction=direction,
magnitude=round(float(magnitude), 3),
confidence=round(float(changepoint_probs[peak]), 3),
before_mean=round(float(before_mean), 3),
after_mean=round(float(after_mean), 3),
))
return changepoints
def _detect_critical_slowing_down(
self,
ts: pd.Series,
metric: str,
window_size: int = 30,
step: int = 7,
) -> list[EarlyWarningSignal]:
"""
Detect critical slowing down signatures:
- Variance increases (Var ~ sigma^2 / (2|lambda|), diverges as lambda -> 0)
- Lag-1 autocorrelation approaches 1 (rho = exp(lambda * dt) -> 1)
From Kepner et al.: these are derived markers of approaching
a cusp bifurcation boundary.
"""
data = ts.values
n = len(data)
signals = []
if n < window_size + step:
return signals
variances = []
autocorrs = []
window_ends = []
for start in range(0, n - window_size, step):
window = data[start:start + window_size]
variances.append(np.var(window))
# Lag-1 autocorrelation
if np.std(window) > 0:
ac = np.corrcoef(window[:-1], window[1:])[0, 1]
autocorrs.append(ac if np.isfinite(ac) else 0)
else:
autocorrs.append(0)
idx = min(start + window_size, len(ts.index) - 1)
window_ends.append(ts.index[idx])
if len(variances) < 3:
return signals
variances = np.array(variances)
autocorrs = np.array(autocorrs)
# Compute trends using Kendall's tau (nonparametric trend)
var_trend = stats.kendalltau(range(len(variances)), variances)
ac_trend = stats.kendalltau(range(len(autocorrs)), autocorrs)
# Signal strength: combined increasing variance + increasing autocorrelation
combined = (
max(0, var_trend.statistic) * 0.5
+ max(0, ac_trend.statistic) * 0.5
)
if combined > 0.2: # Meaningful signal
signals.append(EarlyWarningSignal(
window_end=str(window_ends[-1].date()) if window_ends else "",
metric=metric,
variance_trend=round(float(var_trend.statistic), 3),
autocorrelation_trend=round(float(ac_trend.statistic), 3),
signal_strength=round(float(combined), 3),
))
return signals
def _detect_cusp_events(
self, ts: pd.Series, z_threshold: float = 3.0
) -> list[dict]:
"""
Detect cusp catastrophe events: sudden, discontinuous jumps
in posting rate (like a saddle-node bifurcation).
From Kepner et al.: dz3 = (-z3^3 + b*z3 + a) dt + sigma dW
When a crosses a_cusp, the system jumps discontinuously.
"""
data = ts.values
diffs = np.diff(data)
if len(diffs) < 5 or np.std(diffs) == 0:
return []
z_scores = (diffs - np.mean(diffs)) / np.std(diffs)
events = []
for i, z in enumerate(z_scores):
if abs(z) > z_threshold:
events.append({
"date": str(ts.index[i + 1].date()) if i + 1 < len(ts.index) else "",
"direction": "surge" if z > 0 else "collapse",
"z_score": round(float(z), 2),
"before": round(float(data[i]), 1),
"after": round(float(data[i + 1]), 1),
})
return sorted(events, key=lambda x: abs(x["z_score"]), reverse=True)[:10]
def _identify_regimes(
self, ts: pd.Series, changepoints: list[Changepoint]
) -> list[dict]:
"""Identify behavioral regimes between changepoints."""
if not changepoints or ts is None or ts.empty:
return []
# Sort changepoints by date
cp_dates = sorted(set(
cp.date for cp in changepoints if cp.metric == "posting_rate"
))
if not cp_dates:
return []
regimes = []
boundaries = [str(ts.index[0].date())] + cp_dates + [str(ts.index[-1].date())]
for i in range(len(boundaries) - 1):
start = pd.Timestamp(boundaries[i])
end = pd.Timestamp(boundaries[i + 1])
mask = (ts.index >= start) & (ts.index <= end)
segment = ts[mask]
if len(segment) < 3:
continue
regimes.append({
"start": boundaries[i],
"end": boundaries[i + 1],
"mean_rate": round(float(segment.mean()), 1),
"std_rate": round(float(segment.std()), 1),
"days": len(segment),
})
return regimes