copper-mind / deep_learning /data /sentiment_features.py
ifieryarrows's picture
Sync from GitHub (tests passed)
e57e9d1 verified
"""
Advanced Sentiment Feature Engineering.
Computes second-order derivatives of sentiment signals that capture
market-moving dynamics the raw sentiment_index cannot:
- Sentiment Momentum (SMA/EMA rate-of-change)
- Sentiment Surprise (Z-score anomaly detection)
- Volume-Weighted Sentiment
- Event-Type Intensity (supply_disruption counts, etc.)
"""
from __future__ import annotations
import logging
from typing import Optional, Sequence
import numpy as np
import pandas as pd
from deep_learning.config import SentimentFeatureConfig, get_tft_config
logger = logging.getLogger(__name__)
def compute_sentiment_momentum(
sentiment_index: pd.Series,
windows: Sequence[int] = (5, 10, 30),
) -> pd.DataFrame:
"""
Sentiment momentum: how fast and in which direction market mood is shifting.
For each window *w* the feature set contains:
- sent_momentum_{w}d : current index minus SMA(w)
- sent_ema_{w}d : EMA(w) of the index (smoothed trend)
- sent_roc_{w}d : rate-of-change over w days
"""
features = pd.DataFrame(index=sentiment_index.index)
for w in windows:
sma = sentiment_index.rolling(window=w, min_periods=1).mean()
features[f"sent_momentum_{w}d"] = sentiment_index - sma
features[f"sent_ema_{w}d"] = sentiment_index.ewm(span=w, adjust=False).mean()
features[f"sent_roc_{w}d"] = sentiment_index.diff(w)
return features
def compute_sentiment_surprise(
sentiment_index: pd.Series,
lookback: int = 30,
threshold: float = 2.0,
) -> pd.DataFrame:
"""
Sentiment Surprise: Z-score of today's sentiment relative to recent history.
When |z| >= threshold the market received an "unexpected" signal that
historically triggers outsized price moves.
Features:
- sent_surprise_z : rolling Z-score
- sent_surprise_flag : binary flag (|z| >= threshold)
- sent_surprise_signed : z * sign (directional surprise magnitude)
"""
roll_mean = sentiment_index.rolling(window=lookback, min_periods=5).mean()
roll_std = sentiment_index.rolling(window=lookback, min_periods=5).std()
roll_std = roll_std.replace(0, np.nan)
z_score = (sentiment_index - roll_mean) / roll_std
features = pd.DataFrame(index=sentiment_index.index)
features["sent_surprise_z"] = z_score
features["sent_surprise_flag"] = (z_score.abs() >= threshold).astype(np.float32)
features["sent_surprise_signed"] = z_score * np.sign(sentiment_index)
return features
def compute_volume_weighted_sentiment(
sentiment_index: pd.Series,
news_count: pd.Series,
) -> pd.DataFrame:
"""
Weight sentiment by news volume: high-volume days carry stronger signal.
Features:
- sent_vol_weighted : sentiment * log(1 + news_count)
- sent_vol_zscore : Z-score of volume-weighted series (30-day)
- news_count_zscore : Z-score of news volume itself
"""
log_count = np.log1p(news_count.fillna(0))
vol_weighted = sentiment_index * log_count
vol_roll_mean = vol_weighted.rolling(30, min_periods=5).mean()
vol_roll_std = vol_weighted.rolling(30, min_periods=5).std().replace(0, np.nan)
nc_roll_mean = news_count.rolling(30, min_periods=5).mean()
nc_roll_std = news_count.rolling(30, min_periods=5).std().replace(0, np.nan)
features = pd.DataFrame(index=sentiment_index.index)
features["sent_vol_weighted"] = vol_weighted
features["sent_vol_zscore"] = (vol_weighted - vol_roll_mean) / vol_roll_std
features["news_count_zscore"] = (news_count - nc_roll_mean) / nc_roll_std
return features
def compute_event_type_intensity(
event_counts: pd.DataFrame,
event_types: Optional[Sequence[str]] = None,
) -> pd.DataFrame:
"""
Daily counts per event-type category from NewsSentimentV2.
*event_counts* is expected to have date index and one column per event_type
with daily occurrence counts.
Features (per event type):
- evt_{type}_count : raw daily count
- evt_{type}_ma5 : 5-day moving average
- evt_{type}_spike : flag when count > 2 * MA5
"""
if event_types is None:
cfg = get_tft_config()
event_types = list(cfg.sentiment.event_types)
features = pd.DataFrame(index=event_counts.index)
for etype in event_types:
col = etype if etype in event_counts.columns else None
if col is None:
features[f"evt_{etype}_count"] = 0.0
features[f"evt_{etype}_ma5"] = 0.0
features[f"evt_{etype}_spike"] = 0.0
continue
counts = event_counts[col].fillna(0).astype(float)
ma5 = counts.rolling(5, min_periods=1).mean()
features[f"evt_{etype}_count"] = counts
features[f"evt_{etype}_ma5"] = ma5
features[f"evt_{etype}_spike"] = (counts > 2.0 * ma5.clip(lower=0.5)).astype(np.float32)
return features
def build_event_counts_from_db(
session,
start_date,
end_date,
) -> pd.DataFrame:
"""
Query NewsSentimentV2 to build a (date x event_type) count matrix.
"""
from sqlalchemy import func as sa_func
from app.models import NewsSentimentV2, NewsProcessed, NewsRaw
rows = (
session.query(
sa_func.date(NewsRaw.published_at).label("date"),
NewsSentimentV2.event_type,
sa_func.count(NewsSentimentV2.id).label("cnt"),
)
.join(NewsProcessed, NewsSentimentV2.news_processed_id == NewsProcessed.id)
.join(NewsRaw, NewsProcessed.raw_id == NewsRaw.id)
.filter(NewsRaw.published_at >= start_date, NewsRaw.published_at <= end_date)
.group_by(sa_func.date(NewsRaw.published_at), NewsSentimentV2.event_type)
.all()
)
if not rows:
return pd.DataFrame()
records = [{"date": r.date, "event_type": r.event_type, "count": r.cnt} for r in rows]
df = pd.DataFrame(records)
pivot = df.pivot_table(index="date", columns="event_type", values="count", fill_value=0)
pivot.index = pd.to_datetime(pivot.index)
return pivot
# ---------------------------------------------------------------------------
# Unified builder
# ---------------------------------------------------------------------------
def build_all_sentiment_features(
daily_sentiment: pd.DataFrame,
event_counts: Optional[pd.DataFrame] = None,
cfg: Optional[SentimentFeatureConfig] = None,
) -> pd.DataFrame:
"""
Build the complete sentiment feature set from daily_sentiment DataFrame.
Expected columns in *daily_sentiment*:
- sentiment_index (float)
- news_count (int)
"""
if cfg is None:
cfg = get_tft_config().sentiment
si = daily_sentiment["sentiment_index"]
nc = daily_sentiment["news_count"]
parts: list[pd.DataFrame] = [
compute_sentiment_momentum(si, windows=cfg.momentum_windows),
compute_sentiment_surprise(si, lookback=cfg.surprise_lookback, threshold=cfg.surprise_threshold),
compute_volume_weighted_sentiment(si, nc),
]
if event_counts is not None and not event_counts.empty:
evt_aligned = event_counts.reindex(daily_sentiment.index).fillna(0)
parts.append(compute_event_type_intensity(evt_aligned, event_types=cfg.event_types))
combined = pd.concat(parts, axis=1)
return combined