modelx / src /utils /trending_detector.py
nivakaran's picture
Deploy from GitHub Actions
52329fa verified
"""
src/utils/trending_detector.py
Trending/Velocity Detection Module for Roger
Tracks topic mention frequency over time to detect:
- Topics gaining traction (momentum)
- Sudden volume spikes (alerts)
- Trending topics across the system
Uses SQLite for persistence.
"""
import os
import json
import sqlite3
import hashlib
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
logger = logging.getLogger("Roger.trending")
# Default database path
DEFAULT_DB_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "data", "trending.db"
)
# Stopwords - common terms that should NOT trigger trending alerts
# These are generic Sri Lankan context words that appear in almost every news item
TRENDING_STOPWORDS = {
# Country/location
"sri",
"lanka",
"srilanka",
"sri lanka",
"colombo",
"lka",
# Government/political generic terms
"government",
"gov",
"political",
"politics",
"minister",
"ministry",
"parliament",
"president",
"presidential",
"cabinet",
# Economy generic terms
"economy",
"economic",
"economical",
"finance",
"financial",
# Common news words
"news",
"report",
"update",
"latest",
"breaking",
"today",
"announced",
"statement",
"official",
"officials",
# Time words
"yesterday",
"tomorrow",
"week",
"month",
"year",
"hour",
"minute",
"second",
"time",
"date",
# Days
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
# Months
"january",
"february",
"march",
"april",
"may",
"june",
"july",
"august",
"september",
"october",
"november",
"december",
# Generic actions/descriptions
"said",
"says",
"told",
"according",
"sources",
"media",
"press",
"release",
"statement",
"general",
"public",
"national",
"international",
"local",
"central",
"department",
"division",
"authority",
"board",
"committee",
"director",
"secretary",
"commission",
"report",
"reports",
"reported",
# Location generic
"district",
"province",
"area",
"region",
"island",
"nation",
"country",
"state",
"western",
"eastern",
"southern",
"northern",
"central",
}
class TrendingDetector:
"""
Detects trending topics and velocity spikes.
Features:
- Records topic mentions with timestamps
- Calculates momentum (current_hour / avg_last_6_hours)
- Detects spikes (>3x normal volume in 1 hour)
- Returns trending topics for dashboard display
"""
def __init__(
self,
db_path: str = None,
spike_threshold: float = 3.0,
momentum_threshold: float = 2.0,
):
"""
Initialize the TrendingDetector.
Args:
db_path: Path to SQLite database (default: data/trending.db)
spike_threshold: Multiplier for spike detection (default: 3x)
momentum_threshold: Minimum momentum to be considered trending (default: 2.0)
"""
self.db_path = db_path or DEFAULT_DB_PATH
self.spike_threshold = spike_threshold
self.momentum_threshold = momentum_threshold
# Ensure directory exists
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# Initialize database
self._init_db()
logger.info(f"[TrendingDetector] Initialized with db: {self.db_path}")
def _init_db(self):
"""Create tables if they don't exist"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS topic_mentions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
topic_hash TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
source TEXT,
domain TEXT
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_topic_hash ON topic_mentions(topic_hash)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_timestamp ON topic_mentions(timestamp)
"""
)
# Hourly aggregates for faster queries
conn.execute(
"""
CREATE TABLE IF NOT EXISTS hourly_counts (
topic_hash TEXT NOT NULL,
hour_bucket TEXT NOT NULL,
count INTEGER DEFAULT 1,
topic TEXT,
PRIMARY KEY (topic_hash, hour_bucket)
)
"""
)
conn.commit()
def _topic_hash(self, topic: str) -> str:
"""Generate a hash for a topic (normalized lowercase)"""
normalized = topic.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()[:12]
def _get_hour_bucket(self, dt: datetime = None) -> str:
"""Get the hour bucket string (YYYY-MM-DD-HH)"""
dt = dt or datetime.now(timezone.utc)
return dt.strftime("%Y-%m-%d-%H")
def record_mention(
self,
topic: str,
source: str = None,
domain: str = None,
timestamp: datetime = None,
):
"""
Record a topic mention.
Args:
topic: The topic/keyword mentioned
source: Source of the mention (e.g., 'twitter', 'news')
domain: Domain (e.g., 'political', 'economical')
timestamp: When the mention occurred (default: now)
"""
# Skip stopwords - common generic terms that shouldn't trigger trending
normalized_topic = topic.lower().strip()
if normalized_topic in TRENDING_STOPWORDS:
return # Silently skip stopwords
topic_hash = self._topic_hash(topic)
ts = timestamp or datetime.now(timezone.utc)
hour_bucket = self._get_hour_bucket(ts)
with sqlite3.connect(self.db_path) as conn:
# Insert mention
conn.execute(
"""
INSERT INTO topic_mentions (topic, topic_hash, timestamp, source, domain)
VALUES (?, ?, ?, ?, ?)
""",
(topic.lower().strip(), topic_hash, ts.isoformat(), source, domain),
)
# Update hourly aggregate
conn.execute(
"""
INSERT INTO hourly_counts (topic_hash, hour_bucket, count, topic)
VALUES (?, ?, 1, ?)
ON CONFLICT(topic_hash, hour_bucket) DO UPDATE SET count = count + 1
""",
(topic_hash, hour_bucket, topic.lower().strip()),
)
conn.commit()
def record_mentions_batch(self, mentions: List[Dict[str, Any]]):
"""
Record multiple mentions at once.
Args:
mentions: List of dicts with keys: topic, source, domain, timestamp
"""
for mention in mentions:
self.record_mention(
topic=mention.get("topic", ""),
source=mention.get("source"),
domain=mention.get("domain"),
timestamp=mention.get("timestamp"),
)
def get_momentum(self, topic: str) -> float:
"""
Calculate momentum for a topic.
Momentum = mentions_in_current_hour / avg_mentions_in_last_6_hours
Returns:
Momentum value (1.0 = normal, >2.0 = trending, >3.0 = spike)
"""
topic_hash = self._topic_hash(topic)
now = datetime.now(timezone.utc)
current_hour = self._get_hour_bucket(now)
with sqlite3.connect(self.db_path) as conn:
# Get current hour count
result = conn.execute(
"""
SELECT count FROM hourly_counts
WHERE topic_hash = ? AND hour_bucket = ?
""",
(topic_hash, current_hour),
).fetchone()
current_count = result[0] if result else 0
# Get average of last 6 hours
past_hours = []
for i in range(1, 7):
past_dt = now - timedelta(hours=i)
past_hours.append(self._get_hour_bucket(past_dt))
placeholders = ",".join(["?" for _ in past_hours])
result = conn.execute(
f"""
SELECT AVG(count) FROM hourly_counts
WHERE topic_hash = ? AND hour_bucket IN ({placeholders})
""",
[topic_hash] + past_hours,
).fetchone()
avg_count = (
result[0] if result and result[0] else 0.1
) # Avoid division by zero
return current_count / avg_count if avg_count > 0 else current_count
def is_spike(self, topic: str, window_hours: int = 1) -> bool:
"""
Check if a topic is experiencing a spike.
A spike is when current volume > spike_threshold * normal volume.
"""
momentum = self.get_momentum(topic)
return momentum >= self.spike_threshold
def get_trending_topics(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Get topics with momentum above threshold.
Returns:
List of trending topics with their momentum values
"""
now = datetime.now(timezone.utc)
current_hour = self._get_hour_bucket(now)
trending = []
with sqlite3.connect(self.db_path) as conn:
# Get all topics mentioned in current hour
results = conn.execute(
"""
SELECT DISTINCT topic, topic_hash, count
FROM hourly_counts
WHERE hour_bucket = ?
ORDER BY count DESC
LIMIT 50
""",
(current_hour,),
).fetchall()
for topic, topic_hash, count in results:
momentum = self.get_momentum(topic)
if momentum >= self.momentum_threshold:
trending.append(
{
"topic": topic,
"momentum": round(momentum, 2),
"mentions_this_hour": count,
"is_spike": momentum >= self.spike_threshold,
"severity": (
"high"
if momentum >= 5
else "medium" if momentum >= 3 else "low"
),
}
)
# Sort by momentum descending
trending.sort(key=lambda x: x["momentum"], reverse=True)
return trending[:limit]
def get_spike_alerts(self, limit: int = 5) -> List[Dict[str, Any]]:
"""
Get topics with spike alerts (>3x normal volume).
Returns:
List of spike alerts
"""
return [t for t in self.get_trending_topics(limit=50) if t["is_spike"]][:limit]
def get_topic_history(self, topic: str, hours: int = 24) -> List[Dict[str, Any]]:
"""
Get hourly mention counts for a topic.
Args:
topic: Topic to get history for
hours: Number of hours to look back
Returns:
List of hourly counts
"""
topic_hash = self._topic_hash(topic)
now = datetime.now(timezone.utc)
history = []
with sqlite3.connect(self.db_path) as conn:
for i in range(hours):
hour_dt = now - timedelta(hours=i)
hour_bucket = self._get_hour_bucket(hour_dt)
result = conn.execute(
"""
SELECT count FROM hourly_counts
WHERE topic_hash = ? AND hour_bucket = ?
""",
(topic_hash, hour_bucket),
).fetchone()
history.append(
{"hour": hour_bucket, "count": result[0] if result else 0}
)
return list(reversed(history)) # Oldest first
def cleanup_old_data(self, days: int = 7):
"""
Remove data older than specified days.
Args:
days: Number of days to keep
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
cutoff_str = cutoff.isoformat()
cutoff_bucket = self._get_hour_bucket(cutoff)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
DELETE FROM topic_mentions WHERE timestamp < ?
""",
(cutoff_str,),
)
conn.execute(
"""
DELETE FROM hourly_counts WHERE hour_bucket < ?
""",
(cutoff_bucket,),
)
conn.commit()
logger.info(f"[TrendingDetector] Cleaned up data older than {days} days")
# Singleton instance for easy access
_trending_detector = None
def get_trending_detector() -> TrendingDetector:
"""Get the global TrendingDetector instance"""
global _trending_detector
if _trending_detector is None:
_trending_detector = TrendingDetector()
return _trending_detector
# Convenience functions
def record_topic_mention(topic: str, source: str = None, domain: str = None):
"""Record a single topic mention"""
get_trending_detector().record_mention(topic, source, domain)
def get_trending_now(limit: int = 10) -> List[Dict[str, Any]]:
"""Get current trending topics"""
return get_trending_detector().get_trending_topics(limit)
def get_spikes() -> List[Dict[str, Any]]:
"""Get current spike alerts"""
return get_trending_detector().get_spike_alerts()