| """ |
| Trend Analysis & Forecasting Engine |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| Problem: Teams were reacting to brand crises days after they peaked because |
| they had no way to detect sentiment inflection points in real time. |
| |
| Solution: Rolling statistical analysis on sentiment time series β detects |
| spikes, dips, and emerging trends before they become crises. Simple |
| exponential smoothing for short-horizon forecasting. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import logging |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Tuple, Optional |
| from collections import defaultdict |
|
|
| import numpy as np |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| SPIKE_THRESHOLD_STD = 2.0 |
| TREND_WINDOW = 7 |
| FORECAST_HORIZON = 14 |
| ALPHA = 0.3 |
|
|
|
|
| def _exponential_smoothing(series: List[float], alpha: float = ALPHA) -> List[float]: |
| """Simple exponential smoothing (SES).""" |
| if not series: |
| return [] |
| smoothed = [series[0]] |
| for val in series[1:]: |
| smoothed.append(alpha * val + (1 - alpha) * smoothed[-1]) |
| return smoothed |
|
|
|
|
| def _linear_trend(y: List[float]) -> Tuple[float, float]: |
| """OLS linear regression slope and intercept.""" |
| n = len(y) |
| if n < 2: |
| return 0.0, y[0] if y else 0.0 |
| x = list(range(n)) |
| x_mean = sum(x) / n |
| y_mean = sum(y) / n |
| ss_xy = sum((xi - x_mean) * (yi - y_mean) for xi, yi in zip(x, y)) |
| ss_xx = sum((xi - x_mean) ** 2 for xi in x) |
| slope = ss_xy / ss_xx if ss_xx != 0 else 0.0 |
| intercept = y_mean - slope * x_mean |
| return slope, intercept |
|
|
|
|
| def _rolling_stats(series: List[float], window: int) -> Tuple[List[float], List[float]]: |
| """Rolling mean and standard deviation.""" |
| means, stds = [], [] |
| for i in range(len(series)): |
| window_data = series[max(0, i - window + 1) : i + 1] |
| means.append(sum(window_data) / len(window_data)) |
| if len(window_data) > 1: |
| variance = sum((x - means[-1]) ** 2 for x in window_data) / (len(window_data) - 1) |
| stds.append(math.sqrt(variance)) |
| else: |
| stds.append(0.0) |
| return means, stds |
|
|
|
|
| class TrendAnalyzer: |
| """ |
| Sentiment trend analysis engine. |
| |
| Takes a time-indexed list of labeled posts and returns: |
| - Daily sentiment time series |
| - Rolling trend direction + momentum |
| - Anomaly / crisis detection flags |
| - Short-term sentiment forecast |
| - Volume trend analysis |
| - Emerging topic velocity |
| """ |
|
|
| def analyze_time_series(self, series_data: List[Dict]) -> Dict: |
| """ |
| Full trend analysis from daily aggregated series data. |
| |
| Args: |
| series_data: list of {date, sentiment, volume, positive, negative} |
| |
| Returns: |
| Comprehensive trend analysis payload for frontend. |
| """ |
| if not series_data: |
| return {} |
|
|
| dates = [d["date"] for d in series_data] |
| sentiments = [d["sentiment"] for d in series_data] |
| volumes = [d["volume"] for d in series_data] |
|
|
| |
| roll_means, roll_stds = _rolling_stats(sentiments, TREND_WINDOW) |
|
|
| |
| anomalies = [] |
| for i, (s, m, std) in enumerate(zip(sentiments, roll_means, roll_stds)): |
| if std > 0: |
| z_score = (s - m) / std |
| if abs(z_score) >= SPIKE_THRESHOLD_STD: |
| anomalies.append({ |
| "date": dates[i], |
| "sentiment": round(s, 3), |
| "z_score": round(z_score, 2), |
| "direction": "spike" if z_score > 0 else "dip", |
| "severity": "high" if abs(z_score) > 3 else "medium", |
| }) |
|
|
| |
| slope, intercept = _linear_trend(sentiments) |
| if slope > 0.002: |
| trend_direction = "improving" |
| elif slope < -0.002: |
| trend_direction = "declining" |
| else: |
| trend_direction = "stable" |
|
|
| |
| smoothed = _exponential_smoothing(sentiments) |
| last_smoothed = smoothed[-1] if smoothed else 0.5 |
| forecast = [] |
| last_date = datetime.strptime(dates[-1], "%Y-%m-%d") |
| for h in range(1, FORECAST_HORIZON + 1): |
| forecast_date = last_date + timedelta(days=h) |
| projected = last_smoothed + slope * h |
| projected = max(0.05, min(0.99, projected)) |
| forecast.append({ |
| "date": forecast_date.strftime("%Y-%m-%d"), |
| "sentiment": round(projected, 3), |
| "lower": round(max(0.05, projected - 0.08 * math.sqrt(h)), 3), |
| "upper": round(min(0.99, projected + 0.08 * math.sqrt(h)), 3), |
| }) |
|
|
| |
| vol_slope, _ = _linear_trend(volumes[-14:]) |
| volume_trend = "growing" if vol_slope > 1 else "shrinking" if vol_slope < -1 else "stable" |
|
|
| |
| avg_7 = sum(sentiments[-7:]) / 7 if len(sentiments) >= 7 else sentiments[-1] |
| avg_30 = sum(sentiments[-30:]) / 30 if len(sentiments) >= 30 else sum(sentiments) / len(sentiments) |
| sentiment_delta = round(avg_7 - avg_30, 3) |
|
|
| return { |
| "time_series": series_data, |
| "smoothed": [ |
| {"date": dates[i], "sentiment": round(smoothed[i], 3)} |
| for i in range(len(dates)) |
| ], |
| "forecast": forecast, |
| "anomalies": anomalies, |
| "trend": { |
| "direction": trend_direction, |
| "slope": round(slope, 5), |
| "current_sentiment": round(sentiments[-1], 3), |
| "avg_7d": round(avg_7, 3), |
| "avg_30d": round(avg_30, 3), |
| "delta_7d_vs_30d": sentiment_delta, |
| "volume_trend": volume_trend, |
| "total_volume": sum(volumes), |
| "avg_daily_volume": round(sum(volumes) / len(volumes), 1), |
| }, |
| } |
|
|
| def aggregate_posts_to_series(self, posts: List[Dict]) -> List[Dict]: |
| """ |
| Aggregate raw posts into daily sentiment time series. |
| |
| Args: |
| posts: list of {text, timestamp, true_label or sentiment} |
| """ |
| daily: Dict[str, Dict] = defaultdict(lambda: {"pos": 0, "neg": 0, "neu": 0, "total": 0}) |
|
|
| for post in posts: |
| try: |
| date = post["timestamp"][:10] |
| label = post.get("sentiment", post.get("true_label", "neutral")) |
| if label in ("positive", "crisis"): |
| daily[date]["pos" if label == "positive" else "neg"] += 1 |
| elif label == "negative": |
| daily[date]["neg"] += 1 |
| else: |
| daily[date]["neu"] += 1 |
| daily[date]["total"] += 1 |
| except Exception: |
| continue |
|
|
| series = [] |
| for date in sorted(daily.keys()): |
| d = daily[date] |
| total = max(d["total"], 1) |
| sentiment = d["pos"] / total |
| series.append({ |
| "date": date, |
| "sentiment": round(sentiment, 3), |
| "volume": d["total"], |
| "positive": d["pos"], |
| "negative": d["neg"], |
| "neutral": d["neu"], |
| }) |
|
|
| return series |
|
|
|
|
| |
| _analyzer: Optional[TrendAnalyzer] = None |
|
|
|
|
| def get_trend_analyzer() -> TrendAnalyzer: |
| global _analyzer |
| if _analyzer is None: |
| _analyzer = TrendAnalyzer() |
| return _analyzer |
|
|