jimnoneill's picture
Upload folder using huggingface_hub
178b774 verified
"""
Compulsive usage behavioral analyzer — temporal and metadata features.
Analyzes posting patterns to score compulsion-like behavior:
- Posting frequency and volume
- Burst detection (clusters of rapid posts)
- Night activity patterns
- Session analysis (reactive loops)
- Reply/quote/RT ratios
- Repetition patterns
- Engagement distribution skew
"""
import logging
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
from scipy import stats
from .config import (
BURST_WINDOW_MINUTES,
COMPULSION_WEIGHTS,
NIGHT_END_HOUR,
NIGHT_START_HOUR,
SESSION_GAP_MINUTES,
)
log = logging.getLogger(__name__)
@dataclass
class BehavioralProfile:
"""Complete behavioral profile for a single account."""
senator_name: str = ""
twitter_handle: str = ""
# Volume metrics
n_tweets: int = 0
span_days: int = 0
tweets_per_day: float = 0.0
active_days_pct: float = 0.0
# Temporal metrics
median_gap_minutes: float = 0.0
p_gap_lt_10m: float = 0.0
p_gap_lt_60m: float = 0.0
# Session metrics
sessions_per_day: float = 0.0
median_session_length: float = 0.0
max_session_length: int = 0
max_posts_in_hour: int = 0
# Night activity
night_share: float = 0.0
# Post type distribution
reply_ratio: float = 0.0
quote_ratio: float = 0.0
retweet_ratio: float = 0.0
original_ratio: float = 0.0
# Repetition
exact_repeat_share: float = 0.0
prefix_repeat_share: float = 0.0
# Engagement distribution
engagement_mean: float = 0.0
engagement_median: float = 0.0
engagement_p90: float = 0.0
engagement_gini: float = 0.0
# Scores
compulsion_score: float = 0.0
compulsion_subscores: dict = field(default_factory=dict)
# Burst events
burst_events: list = field(default_factory=list)
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items()}
class BehavioralAnalyzer:
"""Analyze temporal posting patterns for compulsion-like behavior."""
def analyze(
self,
df: pd.DataFrame,
senator_name: str = "",
twitter_handle: str = "",
) -> BehavioralProfile:
"""
Analyze a DataFrame of tweets from a single account.
Expects columns: created_at (datetime), text, and optionally
tweet_type, like_count, retweet_count, reply_count, quote_count.
"""
profile = BehavioralProfile(
senator_name=senator_name,
twitter_handle=twitter_handle,
)
if df.empty:
return profile
# Ensure sorted by time
df = df.sort_values("created_at").reset_index(drop=True)
# ── Volume ────────────────────────────────────
profile.n_tweets = len(df)
ts = df["created_at"]
span = (ts.max() - ts.min()).days
profile.span_days = max(span, 1)
profile.tweets_per_day = profile.n_tweets / profile.span_days
unique_days = ts.dt.date.nunique()
profile.active_days_pct = unique_days / profile.span_days
# ── Inter-post gaps ───────────────────────────
gaps = ts.diff().dt.total_seconds().dropna() / 60.0 # minutes
if len(gaps) > 0:
profile.median_gap_minutes = float(gaps.median())
profile.p_gap_lt_10m = float((gaps < 10).mean())
profile.p_gap_lt_60m = float((gaps < 60).mean())
# ── Sessions ──────────────────────────────────
sessions = self._detect_sessions(ts, gap_minutes=SESSION_GAP_MINUTES)
if sessions:
session_lengths = [s["length"] for s in sessions]
profile.sessions_per_day = len(sessions) / profile.span_days
profile.median_session_length = float(np.median(session_lengths))
profile.max_session_length = max(session_lengths)
# ── Hourly burst ──────────────────────────────
hourly_counts = ts.dt.floor("h").value_counts()
profile.max_posts_in_hour = int(hourly_counts.max()) if len(hourly_counts) > 0 else 0
# ── Night activity ────────────────────────────
hours = ts.dt.hour
night_mask = (hours >= NIGHT_START_HOUR) & (hours < NIGHT_END_HOUR)
profile.night_share = float(night_mask.mean())
# ── Post type distribution ────────────────────
if "tweet_type" in df.columns:
type_counts = df["tweet_type"].str.lower().value_counts(normalize=True)
profile.reply_ratio = float(type_counts.get("reply", type_counts.get("replied_to", 0)))
profile.quote_ratio = float(type_counts.get("quote", type_counts.get("quoted", 0)))
profile.retweet_ratio = float(type_counts.get("retweet", type_counts.get("retweeted", 0)))
profile.original_ratio = float(type_counts.get("tweet", type_counts.get("original", 0)))
elif "in_reply_to_user_id" in df.columns:
has_reply = df["in_reply_to_user_id"].notna()
profile.reply_ratio = float(has_reply.mean())
profile.original_ratio = 1.0 - profile.reply_ratio
# ── Repetition ────────────────────────────────
if "text" in df.columns:
import re
texts = df["text"].str.strip().str.lower()
# Strip URLs — t.co links make otherwise-identical tweets unique
stripped = texts.apply(lambda t: re.sub(r'https?://\S+', '', t).strip())
profile.exact_repeat_share = float(1 - stripped.nunique() / max(len(stripped), 1))
# Structural repetition: first 3 words (captures rhetorical patterns
# like "Do you think...", "Should we...", "Raise your hand if...")
first_words = stripped.str.split().str[:3].str.join(" ")
first_words = first_words[first_words.str.len() > 0]
n_fw = len(first_words)
if n_fw > 0:
# Share of tweets whose opening 3-word phrase appears 3+ times
fw_counts = first_words.value_counts()
repeated_openings = fw_counts[fw_counts >= 3].sum()
profile.prefix_repeat_share = float(repeated_openings / n_fw)
else:
profile.prefix_repeat_share = 0.0
# ── Engagement distribution ───────────────────
engagement = self._compute_engagement(df)
if engagement is not None and len(engagement) > 0:
profile.engagement_mean = float(engagement.mean())
profile.engagement_median = float(engagement.median())
profile.engagement_p90 = float(np.percentile(engagement, 90))
profile.engagement_gini = float(self._gini(engagement.values))
# ── Burst detection ───────────────────────────
profile.burst_events = self._detect_bursts(
ts, window_minutes=BURST_WINDOW_MINUTES
)
# ── Compulsion scoring ────────────────────────
profile.compulsion_subscores = self._compute_subscores(profile)
profile.compulsion_score = self._weighted_score(
profile.compulsion_subscores, COMPULSION_WEIGHTS
)
return profile
def _detect_sessions(
self, timestamps: pd.Series, gap_minutes: float = 30
) -> list[dict]:
"""Detect posting sessions (new session when gap >= threshold)."""
if len(timestamps) < 2:
return []
gaps = timestamps.diff().dt.total_seconds() / 60.0
session_breaks = gaps > gap_minutes
session_ids = session_breaks.cumsum()
sessions = []
for sid, group in timestamps.groupby(session_ids):
sessions.append({
"start": group.iloc[0],
"end": group.iloc[-1],
"length": len(group),
"duration_minutes": (group.iloc[-1] - group.iloc[0]).total_seconds() / 60.0,
})
return sessions
def _detect_bursts(
self, timestamps: pd.Series, window_minutes: float = 60
) -> list[dict]:
"""Detect unusual posting bursts using z-score on rolling windows."""
if len(timestamps) < 10:
return []
# Count posts per window
counts = timestamps.dt.floor(f"{int(window_minutes)}min").value_counts().sort_index()
if len(counts) < 3:
return []
mean_count = counts.mean()
std_count = counts.std()
if std_count == 0:
return []
z_scores = (counts - mean_count) / std_count
burst_mask = z_scores > 2.0 # >2 standard deviations
bursts = []
for ts_window, z in z_scores[burst_mask].items():
bursts.append({
"window_start": str(ts_window),
"count": int(counts[ts_window]),
"z_score": round(float(z), 2),
})
return sorted(bursts, key=lambda x: x["z_score"], reverse=True)[:20]
def _compute_engagement(self, df: pd.DataFrame) -> Optional[pd.Series]:
"""Compute total engagement per tweet."""
eng_cols = ["like_count", "retweet_count", "reply_count", "quote_count"]
available = [c for c in eng_cols if c in df.columns]
if not available:
return None
return df[available].sum(axis=1)
@staticmethod
def _gini(values: np.ndarray) -> float:
"""Compute Gini coefficient for engagement inequality."""
values = np.sort(values)
n = len(values)
if n == 0 or values.sum() == 0:
return 0.0
index = np.arange(1, n + 1)
return float(((2 * index - n - 1) * values).sum() / (n * values.sum()))
def _compute_subscores(self, profile: BehavioralProfile) -> dict:
"""
Compute normalized 0-100 subscores for each compulsion dimension.
Calibrated so that Mike Lee's known profile (~37 tweets/day, 2.5min
median gap, 40% night share) scores ~99+ on most dimensions.
"""
scores = {}
# Activity: tweets per day — sigmoid with 50% at 5/day (most senators do <5)
scores["activity"] = self._sigmoid_score(
profile.tweets_per_day, midpoint=5, steepness=0.3
)
# Burstiness: fraction of gaps < 10 minutes — 50% threshold = extreme
scores["burstiness"] = min(100, profile.p_gap_lt_10m * 100 / 0.5) if profile.p_gap_lt_10m else 0
# Night activity: share of posts 00:00-06:00
# Expected uniform would be 25%; >30% is elevated; >35% is extreme
scores["night_activity"] = min(100, profile.night_share * 100 / 0.25)
# Session intensity: combine sessions/day and max session length
# High sessions/day AND long max sessions = compulsive pattern
session_score = self._sigmoid_score(
profile.sessions_per_day, midpoint=3, steepness=0.5
)
max_session_score = self._sigmoid_score(
profile.max_session_length, midpoint=20, steepness=0.1
)
scores["session_intensity"] = (session_score * 0.5 + max_session_score * 0.5)
# Reply reactivity: reply ratio (high = reactive posting)
# Most broadcast accounts are <10% reply; >30% is reactive
scores["reply_reactivity"] = min(100, profile.reply_ratio * 100 / 0.3)
# Repetition: structural pattern reuse
# prefix_repeat_share = share of tweets with a 3+ times reused opening
# For political accounts, >15% structural repetition is very high
# Combine with exact repeats (URL-stripped)
rep_combined = (
profile.exact_repeat_share * 0.3
+ profile.prefix_repeat_share * 0.7
)
scores["repetition"] = min(100, rep_combined * 100 / 0.15)
# Emoji/media sparsity: low emoji/media usage = text-heavy engagement-seeking
# (Proxy: if emoji_share < 10% and we don't have media data, score high)
scores["emoji_media_sparsity"] = 100 # default; will refine when media data available
return {k: round(min(100, max(0, v)), 1) for k, v in scores.items()}
@staticmethod
def _sigmoid_score(value: float, midpoint: float = 10, steepness: float = 0.2) -> float:
"""Map a value to 0-100 using a sigmoid curve."""
return 100 / (1 + np.exp(-steepness * (value - midpoint)))
@staticmethod
def _weighted_score(subscores: dict, weights: dict) -> float:
"""Compute weighted average score."""
total = 0.0
weight_sum = 0.0
for key, weight in weights.items():
if key in subscores:
total += subscores[key] * weight
weight_sum += weight
if weight_sum == 0:
return 0.0
return round(total / weight_sum, 1)