| """ |
| Compulsive usage behavioral analyzer — temporal and metadata features. |
| |
| Analyzes posting patterns to score compulsion-like behavior: |
| - Posting frequency and volume |
| - Burst detection (clusters of rapid posts) |
| - Night activity patterns |
| - Session analysis (reactive loops) |
| - Reply/quote/RT ratios |
| - Repetition patterns |
| - Engagement distribution skew |
| """ |
| import logging |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy import stats |
|
|
| from .config import ( |
| BURST_WINDOW_MINUTES, |
| COMPULSION_WEIGHTS, |
| NIGHT_END_HOUR, |
| NIGHT_START_HOUR, |
| SESSION_GAP_MINUTES, |
| ) |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class BehavioralProfile: |
| """Complete behavioral profile for a single account.""" |
| senator_name: str = "" |
| twitter_handle: str = "" |
|
|
| |
| n_tweets: int = 0 |
| span_days: int = 0 |
| tweets_per_day: float = 0.0 |
| active_days_pct: float = 0.0 |
|
|
| |
| median_gap_minutes: float = 0.0 |
| p_gap_lt_10m: float = 0.0 |
| p_gap_lt_60m: float = 0.0 |
|
|
| |
| sessions_per_day: float = 0.0 |
| median_session_length: float = 0.0 |
| max_session_length: int = 0 |
| max_posts_in_hour: int = 0 |
|
|
| |
| night_share: float = 0.0 |
|
|
| |
| reply_ratio: float = 0.0 |
| quote_ratio: float = 0.0 |
| retweet_ratio: float = 0.0 |
| original_ratio: float = 0.0 |
|
|
| |
| exact_repeat_share: float = 0.0 |
| prefix_repeat_share: float = 0.0 |
|
|
| |
| engagement_mean: float = 0.0 |
| engagement_median: float = 0.0 |
| engagement_p90: float = 0.0 |
| engagement_gini: float = 0.0 |
|
|
| |
| compulsion_score: float = 0.0 |
| compulsion_subscores: dict = field(default_factory=dict) |
|
|
| |
| burst_events: list = field(default_factory=list) |
|
|
| def to_dict(self) -> dict: |
| return {k: v for k, v in self.__dict__.items()} |
|
|
|
|
| class BehavioralAnalyzer: |
| """Analyze temporal posting patterns for compulsion-like behavior.""" |
|
|
| def analyze( |
| self, |
| df: pd.DataFrame, |
| senator_name: str = "", |
| twitter_handle: str = "", |
| ) -> BehavioralProfile: |
| """ |
| Analyze a DataFrame of tweets from a single account. |
| Expects columns: created_at (datetime), text, and optionally |
| tweet_type, like_count, retweet_count, reply_count, quote_count. |
| """ |
| profile = BehavioralProfile( |
| senator_name=senator_name, |
| twitter_handle=twitter_handle, |
| ) |
|
|
| if df.empty: |
| return profile |
|
|
| |
| df = df.sort_values("created_at").reset_index(drop=True) |
|
|
| |
| profile.n_tweets = len(df) |
| ts = df["created_at"] |
| span = (ts.max() - ts.min()).days |
| profile.span_days = max(span, 1) |
| profile.tweets_per_day = profile.n_tweets / profile.span_days |
| unique_days = ts.dt.date.nunique() |
| profile.active_days_pct = unique_days / profile.span_days |
|
|
| |
| gaps = ts.diff().dt.total_seconds().dropna() / 60.0 |
| if len(gaps) > 0: |
| profile.median_gap_minutes = float(gaps.median()) |
| profile.p_gap_lt_10m = float((gaps < 10).mean()) |
| profile.p_gap_lt_60m = float((gaps < 60).mean()) |
|
|
| |
| sessions = self._detect_sessions(ts, gap_minutes=SESSION_GAP_MINUTES) |
| if sessions: |
| session_lengths = [s["length"] for s in sessions] |
| profile.sessions_per_day = len(sessions) / profile.span_days |
| profile.median_session_length = float(np.median(session_lengths)) |
| profile.max_session_length = max(session_lengths) |
|
|
| |
| hourly_counts = ts.dt.floor("h").value_counts() |
| profile.max_posts_in_hour = int(hourly_counts.max()) if len(hourly_counts) > 0 else 0 |
|
|
| |
| hours = ts.dt.hour |
| night_mask = (hours >= NIGHT_START_HOUR) & (hours < NIGHT_END_HOUR) |
| profile.night_share = float(night_mask.mean()) |
|
|
| |
| if "tweet_type" in df.columns: |
| type_counts = df["tweet_type"].str.lower().value_counts(normalize=True) |
| profile.reply_ratio = float(type_counts.get("reply", type_counts.get("replied_to", 0))) |
| profile.quote_ratio = float(type_counts.get("quote", type_counts.get("quoted", 0))) |
| profile.retweet_ratio = float(type_counts.get("retweet", type_counts.get("retweeted", 0))) |
| profile.original_ratio = float(type_counts.get("tweet", type_counts.get("original", 0))) |
| elif "in_reply_to_user_id" in df.columns: |
| has_reply = df["in_reply_to_user_id"].notna() |
| profile.reply_ratio = float(has_reply.mean()) |
| profile.original_ratio = 1.0 - profile.reply_ratio |
|
|
| |
| if "text" in df.columns: |
| import re |
| texts = df["text"].str.strip().str.lower() |
| |
| stripped = texts.apply(lambda t: re.sub(r'https?://\S+', '', t).strip()) |
| profile.exact_repeat_share = float(1 - stripped.nunique() / max(len(stripped), 1)) |
|
|
| |
| |
| first_words = stripped.str.split().str[:3].str.join(" ") |
| first_words = first_words[first_words.str.len() > 0] |
| n_fw = len(first_words) |
| if n_fw > 0: |
| |
| fw_counts = first_words.value_counts() |
| repeated_openings = fw_counts[fw_counts >= 3].sum() |
| profile.prefix_repeat_share = float(repeated_openings / n_fw) |
| else: |
| profile.prefix_repeat_share = 0.0 |
|
|
| |
| engagement = self._compute_engagement(df) |
| if engagement is not None and len(engagement) > 0: |
| profile.engagement_mean = float(engagement.mean()) |
| profile.engagement_median = float(engagement.median()) |
| profile.engagement_p90 = float(np.percentile(engagement, 90)) |
| profile.engagement_gini = float(self._gini(engagement.values)) |
|
|
| |
| profile.burst_events = self._detect_bursts( |
| ts, window_minutes=BURST_WINDOW_MINUTES |
| ) |
|
|
| |
| profile.compulsion_subscores = self._compute_subscores(profile) |
| profile.compulsion_score = self._weighted_score( |
| profile.compulsion_subscores, COMPULSION_WEIGHTS |
| ) |
|
|
| return profile |
|
|
| def _detect_sessions( |
| self, timestamps: pd.Series, gap_minutes: float = 30 |
| ) -> list[dict]: |
| """Detect posting sessions (new session when gap >= threshold).""" |
| if len(timestamps) < 2: |
| return [] |
|
|
| gaps = timestamps.diff().dt.total_seconds() / 60.0 |
| session_breaks = gaps > gap_minutes |
| session_ids = session_breaks.cumsum() |
|
|
| sessions = [] |
| for sid, group in timestamps.groupby(session_ids): |
| sessions.append({ |
| "start": group.iloc[0], |
| "end": group.iloc[-1], |
| "length": len(group), |
| "duration_minutes": (group.iloc[-1] - group.iloc[0]).total_seconds() / 60.0, |
| }) |
| return sessions |
|
|
| def _detect_bursts( |
| self, timestamps: pd.Series, window_minutes: float = 60 |
| ) -> list[dict]: |
| """Detect unusual posting bursts using z-score on rolling windows.""" |
| if len(timestamps) < 10: |
| return [] |
|
|
| |
| counts = timestamps.dt.floor(f"{int(window_minutes)}min").value_counts().sort_index() |
| if len(counts) < 3: |
| return [] |
|
|
| mean_count = counts.mean() |
| std_count = counts.std() |
| if std_count == 0: |
| return [] |
|
|
| z_scores = (counts - mean_count) / std_count |
| burst_mask = z_scores > 2.0 |
|
|
| bursts = [] |
| for ts_window, z in z_scores[burst_mask].items(): |
| bursts.append({ |
| "window_start": str(ts_window), |
| "count": int(counts[ts_window]), |
| "z_score": round(float(z), 2), |
| }) |
|
|
| return sorted(bursts, key=lambda x: x["z_score"], reverse=True)[:20] |
|
|
| def _compute_engagement(self, df: pd.DataFrame) -> Optional[pd.Series]: |
| """Compute total engagement per tweet.""" |
| eng_cols = ["like_count", "retweet_count", "reply_count", "quote_count"] |
| available = [c for c in eng_cols if c in df.columns] |
| if not available: |
| return None |
| return df[available].sum(axis=1) |
|
|
| @staticmethod |
| def _gini(values: np.ndarray) -> float: |
| """Compute Gini coefficient for engagement inequality.""" |
| values = np.sort(values) |
| n = len(values) |
| if n == 0 or values.sum() == 0: |
| return 0.0 |
| index = np.arange(1, n + 1) |
| return float(((2 * index - n - 1) * values).sum() / (n * values.sum())) |
|
|
| def _compute_subscores(self, profile: BehavioralProfile) -> dict: |
| """ |
| Compute normalized 0-100 subscores for each compulsion dimension. |
| Calibrated so that Mike Lee's known profile (~37 tweets/day, 2.5min |
| median gap, 40% night share) scores ~99+ on most dimensions. |
| """ |
| scores = {} |
|
|
| |
| scores["activity"] = self._sigmoid_score( |
| profile.tweets_per_day, midpoint=5, steepness=0.3 |
| ) |
|
|
| |
| scores["burstiness"] = min(100, profile.p_gap_lt_10m * 100 / 0.5) if profile.p_gap_lt_10m else 0 |
|
|
| |
| |
| scores["night_activity"] = min(100, profile.night_share * 100 / 0.25) |
|
|
| |
| |
| session_score = self._sigmoid_score( |
| profile.sessions_per_day, midpoint=3, steepness=0.5 |
| ) |
| max_session_score = self._sigmoid_score( |
| profile.max_session_length, midpoint=20, steepness=0.1 |
| ) |
| scores["session_intensity"] = (session_score * 0.5 + max_session_score * 0.5) |
|
|
| |
| |
| scores["reply_reactivity"] = min(100, profile.reply_ratio * 100 / 0.3) |
|
|
| |
| |
| |
| |
| rep_combined = ( |
| profile.exact_repeat_share * 0.3 |
| + profile.prefix_repeat_share * 0.7 |
| ) |
| scores["repetition"] = min(100, rep_combined * 100 / 0.15) |
|
|
| |
| |
| scores["emoji_media_sparsity"] = 100 |
|
|
| return {k: round(min(100, max(0, v)), 1) for k, v in scores.items()} |
|
|
| @staticmethod |
| def _sigmoid_score(value: float, midpoint: float = 10, steepness: float = 0.2) -> float: |
| """Map a value to 0-100 using a sigmoid curve.""" |
| return 100 / (1 + np.exp(-steepness * (value - midpoint))) |
|
|
| @staticmethod |
| def _weighted_score(subscores: dict, weights: dict) -> float: |
| """Compute weighted average score.""" |
| total = 0.0 |
| weight_sum = 0.0 |
| for key, weight in weights.items(): |
| if key in subscores: |
| total += subscores[key] * weight |
| weight_sum += weight |
| if weight_sum == 0: |
| return 0.0 |
| return round(total / weight_sum, 1) |
|
|