| """ |
| src/features/sentiment.py |
| ========================== |
| Sentiment feature engineering: |
| - Aggregates GDELT article tone into a Sentiment Score per region |
| - Computes Sentiment Momentum (rolling change) |
| - Adds lagged sentiment features for lead/lag analysis |
| |
| Sentiment Score convention: |
| Positive values → more negative news (conflict escalation) |
| 0 → neutral |
| Higher score → greater concern |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from src.utils.logger import get_logger |
| from config.settings import ( |
| PROCESSED_DIR, SENTIMENT_MOMENTUM_WINDOW, SENTIMENT_LAG_HOURS |
| ) |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def compute_sentiment_features(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Add sentiment_momentum and lagged sentiment features. |
| |
| Expects columns: timestamp, region, sentiment_score |
| """ |
| df = df.copy() |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| df = df.sort_values(["region", "timestamp"]).reset_index(drop=True) |
|
|
| grp = df.groupby("region")["sentiment_score"] |
|
|
| |
| df["sentiment_momentum"] = grp.transform( |
| lambda x: x.rolling(SENTIMENT_MOMENTUM_WINDOW, min_periods=2).mean().diff() |
| ).round(3) |
| df["sentiment_momentum"] = df["sentiment_momentum"].fillna(0) |
|
|
| |
| for lag_h in SENTIMENT_LAG_HOURS: |
| lag_n = max(1, lag_h // 6) |
| df[f"sentiment_lag_{lag_h}h"] = grp.transform( |
| lambda x, n=lag_n: x.shift(n) |
| ).round(3) |
| df[f"sentiment_lag_{lag_h}h"] = df[f"sentiment_lag_{lag_h}h"].fillna( |
| df["sentiment_score"]) |
|
|
| return df |
|
|
|
|
| def aggregate_regional_sentiment(articles_df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Aggregate raw GDELT articles into regional sentiment scores. |
| One row per (region, 6h bucket). |
| """ |
| if articles_df.empty: |
| return pd.DataFrame() |
|
|
| df = articles_df.copy() |
| df["seendate"] = pd.to_datetime(df.get("seendate", df.get("fetched_at")), |
| errors="coerce", utc=True) |
| df["bucket"] = df["seendate"].dt.floor("6H") |
|
|
| |
| region_map = { |
| "Iran": "Middle East", "Iraq": "Middle East", "Israel": "Middle East", |
| "Yemen": "Middle East", "Syria": "Middle East", "UAE": "Middle East", |
| "Ukraine": "Eastern Europe", "Russia": "Eastern Europe", |
| "Pakistan": "South Asia", "India": "South Asia", |
| } |
|
|
| def classify_region(title): |
| title = str(title) if title else "" |
| for keyword, region in region_map.items(): |
| if keyword.lower() in title.lower(): |
| return region |
| return "Global" |
|
|
| df["region"] = df.get("title", "").apply(classify_region) |
|
|
| |
| if "tone" in df.columns: |
| df["sentiment_score"] = -pd.to_numeric(df["tone"], errors="coerce") * 10 |
| else: |
| df["sentiment_score"] = 5.0 |
|
|
| agg = ( |
| df.groupby(["bucket", "region"]) |
| .agg( |
| sentiment_score=("sentiment_score", "mean"), |
| article_count=("title", "count"), |
| ) |
| .reset_index() |
| .rename(columns={"bucket": "timestamp"}) |
| ) |
| agg["timestamp"] = agg["timestamp"].astype(str) |
| return agg |
|
|
|
|
| def compute_lag_correlations( |
| sentiment_df: pd.DataFrame, |
| disruption_df: pd.DataFrame, |
| region: str = "Middle East", |
| ) -> pd.DataFrame: |
| """ |
| Compute Pearson correlation between lagged sentiment and disruption index. |
| |
| Returns a DataFrame with columns: lag_hours, correlation, p_value |
| """ |
| sent = sentiment_df[sentiment_df["region"] == region].copy() |
| disrupt = disruption_df[disruption_df["region"] == region].copy() if "region" in disruption_df.columns else disruption_df.copy() |
|
|
| sent["timestamp"] = pd.to_datetime(sent["timestamp"], errors="coerce") |
| disrupt["timestamp"] = pd.to_datetime(disrupt["timestamp"], errors="coerce") |
|
|
| sent = sent.sort_values("timestamp") |
| disrupt = disrupt.sort_values("timestamp") |
|
|
| results = [] |
| for lag_h in SENTIMENT_LAG_HOURS + [0]: |
| lag_n = lag_h // 6 |
| sent_lagged = sent.copy() |
| sent_lagged["timestamp"] = sent_lagged["timestamp"] + pd.Timedelta(hours=lag_h) |
|
|
| merged = pd.merge_asof( |
| disrupt.sort_values("timestamp"), |
| sent_lagged[["timestamp", "sentiment_score"]].sort_values("timestamp"), |
| on="timestamp", |
| tolerance=pd.Timedelta("3h"), |
| direction="nearest", |
| ) |
|
|
| if len(merged) < 5 or "disruption_index" not in merged.columns: |
| continue |
|
|
| valid = merged[["sentiment_score", "disruption_index"]].dropna() |
| if len(valid) < 5: |
| continue |
|
|
| corr = valid["sentiment_score"].corr(valid["disruption_index"]) |
| results.append({"lag_hours": lag_h, "correlation": round(corr, 4), |
| "n_observations": len(valid)}) |
|
|
| return pd.DataFrame(results) |
|
|
|
|
| def run(df: pd.DataFrame = None) -> pd.DataFrame: |
| """Load and enrich sentiment data.""" |
| if df is None: |
| df = pd.read_csv(PROCESSED_DIR / "sentiment.csv", low_memory=False) |
| return compute_sentiment_features(df) |
|
|
|
|
| if __name__ == "__main__": |
| df = run() |
| print(df[["timestamp", "region", "sentiment_score", "sentiment_momentum"]].head(20)) |
|
|