""" Feature Builder — assembles all signals (price, sentiment, events, weather, geopolitical) into a single feature matrix per commodity. CRITICAL: zero lookahead. All signal windows use T-1 to T-N only. Target variable uses T+7 and T+30 prices (shifted forward, excluded from features). Usage: from model.feature_builder import build_training_data, build_prediction_features """ import logging import sys from datetime import date, datetime, timedelta from pathlib import Path import numpy as np import pandas as pd sys.path.insert(0, str(Path(__file__).parent.parent)) from data.db import get_conn from signals.price_features import build_feature_matrix, ALL_SYMBOLS from signals.weather_features import get_weather_dataframe from signals.macro_features import build_macro_dataframe, get_macro_features log = logging.getLogger(__name__) # Per-commodity direction thresholds — calibrated to each asset's typical volatility. # USDINR is a managed float (rarely moves ±2% in 7 days → extreme STABLE imbalance). # NG=F is highly volatile → needs wider threshold to avoid noise. DIRECTION_THRESHOLDS: dict[str, float] = { "CL=F": 2.0, "NG=F": 3.5, "GC=F": 1.5, "ZW=F": 2.0, "ZC=F": 2.0, "ZS=F": 2.0, "CT=F": 2.0, "SB=F": 2.0, "USDINR=X": 0.4, "HG=F": 2.0, } DIRECTION_THRESHOLD_PCT = 2.0 # fallback # ── helpers ──────────────────────────────────────────────────────────────────── def _load_prices_for_target(symbol: str) -> pd.DataFrame: """Load close prices with enough future rows to compute T+7 and T+30 targets.""" conn = get_conn() df = conn.execute( "SELECT date, close FROM prices WHERE symbol = ? ORDER BY date", [symbol], ).df() conn.close() df["date"] = pd.to_datetime(df["date"]).dt.date return df.sort_values("date").reset_index(drop=True) def _compute_targets(price_df: pd.DataFrame, symbol: str = None) -> pd.DataFrame: """ Compute direction_7d and direction_30d target columns. Labels: 1 (UP) if future price > current * 1.02 0 (STABLE) if within ±2% -1 (DOWN) if future price < current * 0.98 """ df = price_df.copy().sort_values("date").reset_index(drop=True) closes = df["close"].values threshold = DIRECTION_THRESHOLDS.get(symbol, DIRECTION_THRESHOLD_PCT) if symbol else DIRECTION_THRESHOLD_PCT def _direction(current: float, future: float) -> int: if future == 0 or current == 0: return 0 chg = (future - current) / current * 100 if chg > threshold: return 1 if chg < -threshold: return -1 return 0 dir_7d, dir_30d = [], [] n = len(closes) for i in range(n): # Find the index approximately 7 / 30 trading days forward # Use calendar-day shifted date to find the nearest actual price row fwd7 = df[df["date"] >= (df.at[i, "date"] + timedelta(days=7))].head(1) fwd30 = df[df["date"] >= (df.at[i, "date"] + timedelta(days=30))].head(1) dir_7d.append( _direction(closes[i], float(fwd7["close"].values[0])) if not fwd7.empty else None ) dir_30d.append( _direction(closes[i], float(fwd30["close"].values[0])) if not fwd30.empty else None ) df["direction_7d"] = dir_7d df["direction_30d"] = dir_30d return df def _load_sentiment_series(symbol: str) -> pd.DataFrame: """Load daily sentiment aggregates for a commodity from DuckDB.""" conn = get_conn() df = conn.execute( """ SELECT date, sentiment_score, article_count, positive_count FROM sentiment_daily WHERE commodity = ? ORDER BY date """, [symbol], ).df() conn.close() if df.empty: return df df["date"] = pd.to_datetime(df["date"]).dt.date df = df.sort_values("date").reset_index(drop=True) # Rolling aggregates df["sentiment_3d"] = df["sentiment_score"].rolling(3, min_periods=1).mean() df["sentiment_7d"] = df["sentiment_score"].rolling(7, min_periods=1).mean() df["article_count_7d"] = df["article_count"].rolling(7, min_periods=1).sum() df["positive_ratio_7d"] = ( df["positive_count"].rolling(7, min_periods=1).sum() / df["article_count_7d"].replace(0, 1) ) return df.rename(columns={"sentiment_score": "sentiment_score_1d"}) def _load_event_series(symbol: str) -> pd.DataFrame: """Load daily event aggregates for a commodity from DuckDB.""" conn = get_conn() df = conn.execute( """ SELECT date, event_type, direction, severity FROM extracted_events WHERE commodity = ? ORDER BY date """, [symbol], ).df() conn.close() if df.empty: return pd.DataFrame() df["date"] = pd.to_datetime(df["date"]).dt.date df["dir_score"] = df["direction"].map({"BULLISH": 1, "BEARISH": -1, "NEUTRAL": 0}).fillna(0) agg = df.groupby("date").agg( bullish_events_7d =("direction", lambda x: int((x == "BULLISH").sum())), bearish_events_7d =("direction", lambda x: int((x == "BEARISH").sum())), max_severity_7d =("severity", "max"), direction_score_7d =("dir_score", "sum"), supply_shock_flag =("event_type", lambda x: int((x == "SUPPLY_SHOCK").any())), policy_change_flag =("event_type", lambda x: int((x == "POLICY_CHANGE").any())), ).reset_index() # Rolling 7-day window for event counts agg = agg.sort_values("date").reset_index(drop=True) for col in ["bullish_events_7d", "bearish_events_7d", "direction_score_7d"]: agg[col] = agg[col].rolling(7, min_periods=1).sum() return agg def _load_geo_series(symbol: str) -> pd.DataFrame: """Load rolling geopolitical risk scores for a commodity.""" conn = get_conn() df = conn.execute( "SELECT date, risk_score FROM geopolitical_events WHERE commodity = ? ORDER BY date", [symbol], ).df() conn.close() if df.empty: return pd.DataFrame() df["date"] = pd.to_datetime(df["date"]).dt.date agg = df.groupby("date")["risk_score"].mean().reset_index() agg = agg.sort_values("date").reset_index(drop=True) agg["risk_score_7d"] = agg["risk_score"].rolling(7, min_periods=1).mean() agg["risk_score_30d"] = agg["risk_score"].rolling(30, min_periods=1).mean() return agg[["date", "risk_score_7d", "risk_score_30d"]] def _safe_merge(base: pd.DataFrame, other: pd.DataFrame, on: str = "date") -> pd.DataFrame: """Left-join `other` onto `base`, filling NaN with 0.""" if other.empty: return base merged = base.merge(other, on=on, how="left") merged = merged.fillna(0) return merged # ── public API ───────────────────────────────────────────────────────────────── def build_training_data( symbol: str, ) -> tuple[pd.DataFrame, pd.Series, pd.Series]: """ Assemble the full feature matrix + targets for a commodity. Uses all available history in DuckDB. No lookahead: signal features reflect data known at close of each trading day. Args: symbol: Commodity ticker, e.g. "ZW=F" Returns: (X, y_7d, y_30d) where: X — DataFrame, one row per date, all feature columns y_7d — Series of direction labels {-1, 0, 1} for 7-day horizon y_30d — Series of direction labels {-1, 0, 1} for 30-day horizon """ # Price features (covers ~5yr history) end_date = date.today().isoformat() start_date = (date.today() - timedelta(days=365 * 5)).isoformat() price_feat = build_feature_matrix(symbol, start_date, end_date) if price_feat.empty: log.warning("%s: no price features available", symbol) return pd.DataFrame(), pd.Series(dtype=int), pd.Series(dtype=int) # Targets — computed from raw close prices with per-commodity threshold prices = _load_prices_for_target(symbol) targets = _compute_targets(prices, symbol=symbol)[["date", "direction_7d", "direction_30d"]] # All signal series sentiment = _load_sentiment_series(symbol) events = _load_event_series(symbol) geo = _load_geo_series(symbol) weather = get_weather_dataframe(symbol, days=365 * 5) if not weather.empty: weather["date"] = pd.to_datetime(weather["date"]).dt.date macro = build_macro_dataframe(symbol, start_date, end_date) if not macro.empty: macro["date"] = pd.to_datetime(macro["date"]).dt.date # Merge everything onto price_feat (left join → zero-fill missing signal days) df = price_feat.copy() df = _safe_merge(df, targets, on="date") df = _safe_merge(df, sentiment[["date", "sentiment_score_1d", "sentiment_3d", "sentiment_7d", "article_count_7d", "positive_ratio_7d"]] if not sentiment.empty else pd.DataFrame(), on="date") df = _safe_merge(df, events, on="date") df = _safe_merge(df, geo, on="date") df = _safe_merge(df, weather, on="date") df = _safe_merge(df, macro, on="date") # Add binary indicator: 1 on days where we have real news signal, 0 elsewhere. # This lets the model learn "trust sentiment when has_news_signal=1" rather than # treating zero-padded sentiment rows as neutral-sentiment days. if "sentiment_score_1d" in df.columns: df["has_news_signal"] = (df["sentiment_score_1d"].abs() > 0.01).astype(int) else: df["has_news_signal"] = 0 # Drop rows where targets are unavailable (last 30 days have no T+30 target) df = df.dropna(subset=["direction_7d", "direction_30d"]) df = df.sort_values("date").reset_index(drop=True) feature_cols = [c for c in df.columns if c not in ("date", "direction_7d", "direction_30d")] X = df[feature_cols].fillna(0).astype(float) y_7d = df["direction_7d"].astype(int) y_30d = df["direction_30d"].astype(int) log.info("%s: training data shape %s, class dist 7d: %s", symbol, X.shape, y_7d.value_counts().to_dict()) return X, y_7d, y_30d def build_prediction_features(symbol: str, as_of_date: str = None) -> pd.Series: """ Build a single-row feature vector for inference. Uses only data available up to (and including) as_of_date. No future data touches this vector. Args: symbol: Commodity ticker as_of_date: ISO date string. Defaults to today. Returns: pd.Series with the same feature names as build_training_data returns. """ from signals.price_features import get_price_features from signals.weather_features import get_weather_features target_date = as_of_date or date.today().isoformat() # Price features (T-1 based internally) price_f = get_price_features(symbol, target_date) # Sentiment: last 7 days before target_date cutoff = (datetime.strptime(target_date, "%Y-%m-%d").date() - timedelta(days=7)).isoformat() conn = get_conn() sent_rows = conn.execute( """ SELECT date, sentiment_score, article_count, positive_count FROM sentiment_daily WHERE commodity = ? AND date >= ? AND date <= ? ORDER BY date DESC """, [symbol, cutoff, target_date], ).df() conn.close() sentiment_1d = float(sent_rows.iloc[0]["sentiment_score"]) if not sent_rows.empty else 0.0 sentiment_3d = float(sent_rows.head(3)["sentiment_score"].mean()) if len(sent_rows) >= 1 else 0.0 sentiment_7d = float(sent_rows["sentiment_score"].mean()) if not sent_rows.empty else 0.0 article_count_7d = int(sent_rows["article_count"].sum()) if not sent_rows.empty else 0 positive_ratio_7d = ( float(sent_rows["positive_count"].sum() / max(article_count_7d, 1)) if not sent_rows.empty else 0.0 ) # Events: last 7 days conn = get_conn() evt_rows = conn.execute( """ SELECT event_type, direction, severity FROM extracted_events WHERE commodity = ? AND date >= ? AND date <= ? """, [symbol, cutoff, target_date], ).df() conn.close() bullish_events_7d = int((evt_rows["direction"] == "BULLISH").sum()) if not evt_rows.empty else 0 bearish_events_7d = int((evt_rows["direction"] == "BEARISH").sum()) if not evt_rows.empty else 0 max_severity_7d = int(evt_rows["severity"].max()) if not evt_rows.empty else 0 dir_map = {"BULLISH": 1, "BEARISH": -1, "NEUTRAL": 0} direction_score_7d = int(evt_rows["direction"].map(dir_map).fillna(0).sum()) if not evt_rows.empty else 0 supply_shock_flag = int((evt_rows["event_type"] == "SUPPLY_SHOCK").any()) if not evt_rows.empty else 0 policy_change_flag = int((evt_rows["event_type"] == "POLICY_CHANGE").any()) if not evt_rows.empty else 0 # Geopolitical risk cutoff_30 = (datetime.strptime(target_date, "%Y-%m-%d").date() - timedelta(days=30)).isoformat() conn = get_conn() geo_rows = conn.execute( "SELECT risk_score FROM geopolitical_events WHERE commodity = ? AND date >= ? AND date <= ?", [symbol, cutoff_30, target_date], ).df() conn.close() risk_score_7d = float(geo_rows.tail(7)["risk_score"].mean()) if not geo_rows.empty else 0.05 risk_score_30d = float(geo_rows["risk_score"].mean()) if not geo_rows.empty else 0.05 # Weather weather_f = get_weather_features(symbol, days=90) macro_f = get_macro_features(symbol, target_date) features = { **price_f, "sentiment_score_1d": sentiment_1d, "sentiment_3d": sentiment_3d, "sentiment_7d": sentiment_7d, "article_count_7d": article_count_7d, "positive_ratio_7d": positive_ratio_7d, "bullish_events_7d": bullish_events_7d, "bearish_events_7d": bearish_events_7d, "max_severity_7d": max_severity_7d, "direction_score_7d": direction_score_7d, "supply_shock_flag": supply_shock_flag, "policy_change_flag": policy_change_flag, "risk_score_7d": risk_score_7d, "risk_score_30d": risk_score_30d, **weather_f, **macro_f, } return pd.Series(features)