Spaces:
Running
Running
| """ | |
| Advanced Sentiment Feature Engineering. | |
| Computes second-order derivatives of sentiment signals that capture | |
| market-moving dynamics the raw sentiment_index cannot: | |
| - Sentiment Momentum (SMA/EMA rate-of-change) | |
| - Sentiment Surprise (Z-score anomaly detection) | |
| - Volume-Weighted Sentiment | |
| - Event-Type Intensity (supply_disruption counts, etc.) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Optional, Sequence | |
| import numpy as np | |
| import pandas as pd | |
| from deep_learning.config import SentimentFeatureConfig, get_tft_config | |
| logger = logging.getLogger(__name__) | |
| def compute_sentiment_momentum( | |
| sentiment_index: pd.Series, | |
| windows: Sequence[int] = (5, 10, 30), | |
| ) -> pd.DataFrame: | |
| """ | |
| Sentiment momentum: how fast and in which direction market mood is shifting. | |
| For each window *w* the feature set contains: | |
| - sent_momentum_{w}d : current index minus SMA(w) | |
| - sent_ema_{w}d : EMA(w) of the index (smoothed trend) | |
| - sent_roc_{w}d : rate-of-change over w days | |
| """ | |
| features = pd.DataFrame(index=sentiment_index.index) | |
| for w in windows: | |
| sma = sentiment_index.rolling(window=w, min_periods=1).mean() | |
| features[f"sent_momentum_{w}d"] = sentiment_index - sma | |
| features[f"sent_ema_{w}d"] = sentiment_index.ewm(span=w, adjust=False).mean() | |
| features[f"sent_roc_{w}d"] = sentiment_index.diff(w) | |
| return features | |
| def compute_sentiment_surprise( | |
| sentiment_index: pd.Series, | |
| lookback: int = 30, | |
| threshold: float = 2.0, | |
| ) -> pd.DataFrame: | |
| """ | |
| Sentiment Surprise: Z-score of today's sentiment relative to recent history. | |
| When |z| >= threshold the market received an "unexpected" signal that | |
| historically triggers outsized price moves. | |
| Features: | |
| - sent_surprise_z : rolling Z-score | |
| - sent_surprise_flag : binary flag (|z| >= threshold) | |
| - sent_surprise_signed : z * sign (directional surprise magnitude) | |
| """ | |
| roll_mean = sentiment_index.rolling(window=lookback, min_periods=5).mean() | |
| roll_std = sentiment_index.rolling(window=lookback, min_periods=5).std() | |
| roll_std = roll_std.replace(0, np.nan) | |
| z_score = (sentiment_index - roll_mean) / roll_std | |
| features = pd.DataFrame(index=sentiment_index.index) | |
| features["sent_surprise_z"] = z_score | |
| features["sent_surprise_flag"] = (z_score.abs() >= threshold).astype(np.float32) | |
| features["sent_surprise_signed"] = z_score * np.sign(sentiment_index) | |
| return features | |
| def compute_volume_weighted_sentiment( | |
| sentiment_index: pd.Series, | |
| news_count: pd.Series, | |
| ) -> pd.DataFrame: | |
| """ | |
| Weight sentiment by news volume: high-volume days carry stronger signal. | |
| Features: | |
| - sent_vol_weighted : sentiment * log(1 + news_count) | |
| - sent_vol_zscore : Z-score of volume-weighted series (30-day) | |
| - news_count_zscore : Z-score of news volume itself | |
| """ | |
| log_count = np.log1p(news_count.fillna(0)) | |
| vol_weighted = sentiment_index * log_count | |
| vol_roll_mean = vol_weighted.rolling(30, min_periods=5).mean() | |
| vol_roll_std = vol_weighted.rolling(30, min_periods=5).std().replace(0, np.nan) | |
| nc_roll_mean = news_count.rolling(30, min_periods=5).mean() | |
| nc_roll_std = news_count.rolling(30, min_periods=5).std().replace(0, np.nan) | |
| features = pd.DataFrame(index=sentiment_index.index) | |
| features["sent_vol_weighted"] = vol_weighted | |
| features["sent_vol_zscore"] = (vol_weighted - vol_roll_mean) / vol_roll_std | |
| features["news_count_zscore"] = (news_count - nc_roll_mean) / nc_roll_std | |
| return features | |
| def compute_event_type_intensity( | |
| event_counts: pd.DataFrame, | |
| event_types: Optional[Sequence[str]] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Daily counts per event-type category from NewsSentimentV2. | |
| *event_counts* is expected to have date index and one column per event_type | |
| with daily occurrence counts. | |
| Features (per event type): | |
| - evt_{type}_count : raw daily count | |
| - evt_{type}_ma5 : 5-day moving average | |
| - evt_{type}_spike : flag when count > 2 * MA5 | |
| """ | |
| if event_types is None: | |
| cfg = get_tft_config() | |
| event_types = list(cfg.sentiment.event_types) | |
| features = pd.DataFrame(index=event_counts.index) | |
| for etype in event_types: | |
| col = etype if etype in event_counts.columns else None | |
| if col is None: | |
| features[f"evt_{etype}_count"] = 0.0 | |
| features[f"evt_{etype}_ma5"] = 0.0 | |
| features[f"evt_{etype}_spike"] = 0.0 | |
| continue | |
| counts = event_counts[col].fillna(0).astype(float) | |
| ma5 = counts.rolling(5, min_periods=1).mean() | |
| features[f"evt_{etype}_count"] = counts | |
| features[f"evt_{etype}_ma5"] = ma5 | |
| features[f"evt_{etype}_spike"] = (counts > 2.0 * ma5.clip(lower=0.5)).astype(np.float32) | |
| return features | |
| def build_event_counts_from_db( | |
| session, | |
| start_date, | |
| end_date, | |
| ) -> pd.DataFrame: | |
| """ | |
| Query NewsSentimentV2 to build a (date x event_type) count matrix. | |
| """ | |
| from sqlalchemy import func as sa_func | |
| from app.models import NewsSentimentV2, NewsProcessed, NewsRaw | |
| rows = ( | |
| session.query( | |
| sa_func.date(NewsRaw.published_at).label("date"), | |
| NewsSentimentV2.event_type, | |
| sa_func.count(NewsSentimentV2.id).label("cnt"), | |
| ) | |
| .join(NewsProcessed, NewsSentimentV2.news_processed_id == NewsProcessed.id) | |
| .join(NewsRaw, NewsProcessed.raw_id == NewsRaw.id) | |
| .filter(NewsRaw.published_at >= start_date, NewsRaw.published_at <= end_date) | |
| .group_by(sa_func.date(NewsRaw.published_at), NewsSentimentV2.event_type) | |
| .all() | |
| ) | |
| if not rows: | |
| return pd.DataFrame() | |
| records = [{"date": r.date, "event_type": r.event_type, "count": r.cnt} for r in rows] | |
| df = pd.DataFrame(records) | |
| pivot = df.pivot_table(index="date", columns="event_type", values="count", fill_value=0) | |
| pivot.index = pd.to_datetime(pivot.index) | |
| return pivot | |
| # --------------------------------------------------------------------------- | |
| # Unified builder | |
| # --------------------------------------------------------------------------- | |
| def build_all_sentiment_features( | |
| daily_sentiment: pd.DataFrame, | |
| event_counts: Optional[pd.DataFrame] = None, | |
| cfg: Optional[SentimentFeatureConfig] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Build the complete sentiment feature set from daily_sentiment DataFrame. | |
| Expected columns in *daily_sentiment*: | |
| - sentiment_index (float) | |
| - news_count (int) | |
| """ | |
| if cfg is None: | |
| cfg = get_tft_config().sentiment | |
| si = daily_sentiment["sentiment_index"] | |
| nc = daily_sentiment["news_count"] | |
| parts: list[pd.DataFrame] = [ | |
| compute_sentiment_momentum(si, windows=cfg.momentum_windows), | |
| compute_sentiment_surprise(si, lookback=cfg.surprise_lookback, threshold=cfg.surprise_threshold), | |
| compute_volume_weighted_sentiment(si, nc), | |
| ] | |
| if event_counts is not None and not event_counts.empty: | |
| evt_aligned = event_counts.reindex(daily_sentiment.index).fillna(0) | |
| parts.append(compute_event_type_intensity(evt_aligned, event_types=cfg.event_types)) | |
| combined = pd.concat(parts, axis=1) | |
| return combined | |