bhanug2026
Initial commit
47c6cfd
"""
src/features/sentiment.py
==========================
Sentiment feature engineering:
- Aggregates GDELT article tone into a Sentiment Score per region
- Computes Sentiment Momentum (rolling change)
- Adds lagged sentiment features for lead/lag analysis
Sentiment Score convention:
Positive values → more negative news (conflict escalation)
0 → neutral
Higher score → greater concern
"""
import pandas as pd
import numpy as np
from src.utils.logger import get_logger
from config.settings import (
PROCESSED_DIR, SENTIMENT_MOMENTUM_WINDOW, SENTIMENT_LAG_HOURS
)
logger = get_logger(__name__)
def compute_sentiment_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Add sentiment_momentum and lagged sentiment features.
Expects columns: timestamp, region, sentiment_score
"""
df = df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.sort_values(["region", "timestamp"]).reset_index(drop=True)
grp = df.groupby("region")["sentiment_score"]
# Momentum: rolling change in sentiment score
df["sentiment_momentum"] = grp.transform(
lambda x: x.rolling(SENTIMENT_MOMENTUM_WINDOW, min_periods=2).mean().diff()
).round(3)
df["sentiment_momentum"] = df["sentiment_momentum"].fillna(0)
# Lagged features for lead/lag analysis
for lag_h in SENTIMENT_LAG_HOURS:
lag_n = max(1, lag_h // 6) # assuming 6h intervals
df[f"sentiment_lag_{lag_h}h"] = grp.transform(
lambda x, n=lag_n: x.shift(n)
).round(3)
df[f"sentiment_lag_{lag_h}h"] = df[f"sentiment_lag_{lag_h}h"].fillna(
df["sentiment_score"])
return df
def aggregate_regional_sentiment(articles_df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate raw GDELT articles into regional sentiment scores.
One row per (region, 6h bucket).
"""
if articles_df.empty:
return pd.DataFrame()
df = articles_df.copy()
df["seendate"] = pd.to_datetime(df.get("seendate", df.get("fetched_at")),
errors="coerce", utc=True)
df["bucket"] = df["seendate"].dt.floor("6H")
# Classify articles by region based on source country / keywords
region_map = {
"Iran": "Middle East", "Iraq": "Middle East", "Israel": "Middle East",
"Yemen": "Middle East", "Syria": "Middle East", "UAE": "Middle East",
"Ukraine": "Eastern Europe", "Russia": "Eastern Europe",
"Pakistan": "South Asia", "India": "South Asia",
}
def classify_region(title):
title = str(title) if title else ""
for keyword, region in region_map.items():
if keyword.lower() in title.lower():
return region
return "Global"
df["region"] = df.get("title", "").apply(classify_region)
# Tone-based sentiment (if tone column exists)
if "tone" in df.columns:
df["sentiment_score"] = -pd.to_numeric(df["tone"], errors="coerce") * 10
else:
df["sentiment_score"] = 5.0 # default mild negative
agg = (
df.groupby(["bucket", "region"])
.agg(
sentiment_score=("sentiment_score", "mean"),
article_count=("title", "count"),
)
.reset_index()
.rename(columns={"bucket": "timestamp"})
)
agg["timestamp"] = agg["timestamp"].astype(str)
return agg
def compute_lag_correlations(
sentiment_df: pd.DataFrame,
disruption_df: pd.DataFrame,
region: str = "Middle East",
) -> pd.DataFrame:
"""
Compute Pearson correlation between lagged sentiment and disruption index.
Returns a DataFrame with columns: lag_hours, correlation, p_value
"""
sent = sentiment_df[sentiment_df["region"] == region].copy()
disrupt = disruption_df[disruption_df["region"] == region].copy() if "region" in disruption_df.columns else disruption_df.copy()
sent["timestamp"] = pd.to_datetime(sent["timestamp"], errors="coerce")
disrupt["timestamp"] = pd.to_datetime(disrupt["timestamp"], errors="coerce")
sent = sent.sort_values("timestamp")
disrupt = disrupt.sort_values("timestamp")
results = []
for lag_h in SENTIMENT_LAG_HOURS + [0]:
lag_n = lag_h // 6
sent_lagged = sent.copy()
sent_lagged["timestamp"] = sent_lagged["timestamp"] + pd.Timedelta(hours=lag_h)
merged = pd.merge_asof(
disrupt.sort_values("timestamp"),
sent_lagged[["timestamp", "sentiment_score"]].sort_values("timestamp"),
on="timestamp",
tolerance=pd.Timedelta("3h"),
direction="nearest",
)
if len(merged) < 5 or "disruption_index" not in merged.columns:
continue
valid = merged[["sentiment_score", "disruption_index"]].dropna()
if len(valid) < 5:
continue
corr = valid["sentiment_score"].corr(valid["disruption_index"])
results.append({"lag_hours": lag_h, "correlation": round(corr, 4),
"n_observations": len(valid)})
return pd.DataFrame(results)
def run(df: pd.DataFrame = None) -> pd.DataFrame:
"""Load and enrich sentiment data."""
if df is None:
df = pd.read_csv(PROCESSED_DIR / "sentiment.csv", low_memory=False)
return compute_sentiment_features(df)
if __name__ == "__main__":
df = run()
print(df[["timestamp", "region", "sentiment_score", "sentiment_momentum"]].head(20))