Spaces:
Running
Running
| """ | |
| Centralised Feature Store for TFT-ASRO. | |
| Fuses all heterogeneous data sources (price, sentiment, embeddings, LME, | |
| calendar) into a single long-format DataFrame suitable for | |
| pytorch_forecasting.TimeSeriesDataSet. | |
| TFT data categories: | |
| 1. time_varying_unknown_reals - observed only in the past | |
| 2. time_varying_known_reals - known into the future (calendar, etc.) | |
| 3. static_reals / static_categoricals - time-invariant per group | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| import numpy as np | |
| import pandas as pd | |
| from deep_learning.config import TFTASROConfig, get_tft_config | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Screener bridge: load correlated symbols from active.json / screener output | |
| # --------------------------------------------------------------------------- | |
| def load_training_symbols() -> list[str]: | |
| """ | |
| Load the active symbol set from ``config/symbol_sets/active.json``. | |
| Falls back to settings.training_symbols if the file cannot be read. | |
| This bridges the screener's challenger/champion pipeline with the | |
| TFT feature store so that the same statistically validated symbols | |
| feed both XGBoost and TFT. | |
| """ | |
| import json | |
| from pathlib import Path | |
| backend_root = Path(__file__).resolve().parent.parent.parent | |
| active_path = backend_root / "config" / "symbol_sets" / "active.json" | |
| if active_path.exists(): | |
| try: | |
| data = json.loads(active_path.read_text(encoding="utf-8")) | |
| symbols = data.get("symbols", []) | |
| if symbols: | |
| logger.info( | |
| "Loaded %d training symbols from %s (v%s)", | |
| len(symbols), active_path.name, data.get("version", "?"), | |
| ) | |
| return symbols | |
| except Exception as exc: | |
| logger.warning("Failed to read %s: %s", active_path, exc) | |
| try: | |
| from app.settings import get_settings | |
| return get_settings().training_symbols | |
| except Exception: | |
| return ["HG=F", "DX-Y.NYB", "CL=F", "FXI"] | |
| def load_screener_selected_symbols( | |
| artifacts_dir: str = "artifacts/runs/latest", | |
| ) -> list[dict]: | |
| """ | |
| Read the screener's ``selected_symbols.json`` to get the full audit-trail | |
| entries including IS/OOS Pearson, category, and lead-lag information. | |
| Returns a list of dicts (one per selected symbol). | |
| """ | |
| import json | |
| from pathlib import Path | |
| backend_root = Path(__file__).resolve().parent.parent.parent | |
| selected_path = backend_root / artifacts_dir / "selected_symbols.json" | |
| if not selected_path.exists(): | |
| logger.info("No screener selected_symbols.json found at %s", selected_path) | |
| return [] | |
| try: | |
| data = json.loads(selected_path.read_text(encoding="utf-8")) | |
| selected = data.get("selected", []) | |
| logger.info( | |
| "Loaded %d screener-selected symbols (rules v%s, run %s)", | |
| len(selected), | |
| data.get("selection_rules_version", "?"), | |
| data.get("screener_run_id", "?"), | |
| ) | |
| return selected | |
| except Exception as exc: | |
| logger.warning("Failed to read screener output: %s", exc) | |
| return [] | |
| # --------------------------------------------------------------------------- | |
| # Calendar / known-future features | |
| # --------------------------------------------------------------------------- | |
| def _build_calendar_features(index: pd.DatetimeIndex) -> pd.DataFrame: | |
| """Deterministic features known at any future date.""" | |
| cal = pd.DataFrame(index=index) | |
| cal["day_of_week"] = index.dayofweek.astype(np.float32) / 6.0 | |
| cal["day_of_month"] = index.day.astype(np.float32) / 31.0 | |
| cal["month"] = index.month.astype(np.float32) / 12.0 | |
| day_frac = 2 * np.pi * index.dayofyear / 365.25 | |
| cal["cal_sin_day"] = np.sin(day_frac).astype(np.float32) | |
| cal["cal_cos_day"] = np.cos(day_frac).astype(np.float32) | |
| month_frac = 2 * np.pi * index.month / 12.0 | |
| cal["cal_sin_month"] = np.sin(month_frac).astype(np.float32) | |
| cal["cal_cos_month"] = np.cos(month_frac).astype(np.float32) | |
| cal["is_monday"] = (index.dayofweek == 0).astype(np.float32) | |
| cal["is_friday"] = (index.dayofweek == 4).astype(np.float32) | |
| cal["is_month_start"] = index.is_month_start.astype(np.float32) | |
| cal["is_month_end"] = index.is_month_end.astype(np.float32) | |
| cal["is_quarter_end"] = index.is_quarter_end.astype(np.float32) | |
| return cal | |
| # --------------------------------------------------------------------------- | |
| # Price / technical features (reuses existing helpers) | |
| # --------------------------------------------------------------------------- | |
| def _build_price_features( | |
| session, | |
| symbol: str, | |
| start_date, | |
| end_date, | |
| ) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Load price data and compute technical features for *symbol*. | |
| Returns (raw_price_df, features_df). | |
| """ | |
| from app.features import load_price_data, generate_symbol_features | |
| price_df = load_price_data(session, symbol, start_date, end_date) | |
| if price_df.empty: | |
| return pd.DataFrame(), pd.DataFrame() | |
| features = generate_symbol_features(price_df, symbol) | |
| return price_df, features | |
| # --------------------------------------------------------------------------- | |
| # Embedding features (daily aggregated PCA vectors) | |
| # --------------------------------------------------------------------------- | |
| def _build_daily_embedding_features( | |
| session, | |
| index: pd.DatetimeIndex, | |
| pca_dim: int = 32, | |
| ) -> pd.DataFrame: | |
| """ | |
| Load PCA-reduced FinBERT embeddings, aggregate to daily level, | |
| and reindex onto the trading calendar. | |
| """ | |
| from sqlalchemy import func as sa_func | |
| from app.models import NewsEmbedding, NewsProcessed, NewsRaw | |
| from deep_learning.data.embeddings import bytes_to_embedding, aggregate_daily_embeddings | |
| rows = ( | |
| session.query( | |
| sa_func.date(NewsRaw.published_at).label("date"), | |
| NewsEmbedding.embedding_pca, | |
| ) | |
| .join(NewsProcessed, NewsEmbedding.news_processed_id == NewsProcessed.id) | |
| .join(NewsRaw, NewsProcessed.raw_id == NewsRaw.id) | |
| .order_by(NewsRaw.published_at.asc()) | |
| .all() | |
| ) | |
| if not rows: | |
| cols = [f"emb_pca_{i}" for i in range(pca_dim)] | |
| return pd.DataFrame(0.0, index=index, columns=cols) | |
| date_groups: dict[str, list[np.ndarray]] = {} | |
| for r in rows: | |
| d = str(r.date) | |
| vec = bytes_to_embedding(r.embedding_pca, dim=pca_dim) | |
| date_groups.setdefault(d, []).append(vec) | |
| records = [] | |
| for d, vecs in date_groups.items(): | |
| agg = aggregate_daily_embeddings(np.stack(vecs)) | |
| record = {"date": pd.Timestamp(d)} | |
| for i, v in enumerate(agg): | |
| record[f"emb_pca_{i}"] = float(v) | |
| records.append(record) | |
| emb_df = pd.DataFrame(records).set_index("date").sort_index() | |
| emb_df.index = pd.to_datetime(emb_df.index) | |
| emb_aligned = emb_df.reindex(index).ffill(limit=3).fillna(0.0) | |
| return emb_aligned | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_tft_dataframe( | |
| session, | |
| cfg: Optional[TFTASROConfig] = None, | |
| ) -> tuple[pd.DataFrame, list[str], list[str], list[str]]: | |
| """ | |
| Build the master DataFrame for TFT training / inference. | |
| Returns: | |
| (df, time_varying_unknown_reals, time_varying_known_reals, target_cols) | |
| The returned df has: | |
| - "time_idx" : integer time index (required by pytorch_forecasting) | |
| - "group_id" : constant "copper" (single series) | |
| - "target" : next-day simple return | |
| - columns for all three TFT feature categories | |
| """ | |
| if cfg is None: | |
| cfg = get_tft_config() | |
| target_symbol = cfg.feature_store.target_symbol | |
| end_date = datetime.now(timezone.utc) | |
| start_date = end_date - timedelta(days=cfg.training.lookback_days) | |
| # ---- 1. Price & technical indicators ---- | |
| # Use screener-validated symbols from active.json | |
| training_symbols = load_training_symbols() | |
| logger.info("Building features with %d symbols: %s", len(training_symbols), training_symbols[:5]) | |
| price_df, price_features = _build_price_features(session, target_symbol, start_date, end_date) | |
| if price_df.empty: | |
| raise ValueError(f"No price data for {target_symbol}") | |
| # Add correlated symbols' features (screener-validated) | |
| from app.features import load_price_data, generate_symbol_features, align_to_target_calendar | |
| other_dfs = {} | |
| for sym in training_symbols: | |
| if sym == target_symbol: | |
| continue | |
| sym_df = load_price_data(session, sym, start_date, end_date) | |
| if not sym_df.empty: | |
| other_dfs[sym] = sym_df | |
| if other_dfs: | |
| aligned = align_to_target_calendar(price_df, other_dfs, max_ffill=cfg.feature_store.max_ffill) | |
| for sym, df in aligned.items(): | |
| if not df.empty: | |
| sym_feats = generate_symbol_features(df, sym) | |
| price_features = price_features.join(sym_feats, how="left") | |
| logger.info("Added features from %d correlated symbols", len(aligned)) | |
| target_index = price_df.index | |
| logger.info("Price data: %d bars for %s", len(target_index), target_symbol) | |
| # ---- 2. Sentiment features ---- | |
| from app.features import load_sentiment_data | |
| from deep_learning.data.sentiment_features import ( | |
| build_all_sentiment_features, | |
| build_event_counts_from_db, | |
| ) | |
| sent_df = load_sentiment_data(session, start_date, end_date) | |
| if not sent_df.empty: | |
| sent_aligned = sent_df.reindex(target_index).ffill(limit=cfg.feature_store.max_ffill) | |
| sent_aligned["sentiment_index"] = sent_aligned["sentiment_index"].fillna(0.0) | |
| sent_aligned["news_count"] = sent_aligned["news_count"].fillna(0) | |
| event_counts = build_event_counts_from_db(session, start_date, end_date) | |
| advanced_sent = build_all_sentiment_features(sent_aligned, event_counts=event_counts, cfg=cfg.sentiment) | |
| else: | |
| sent_aligned = pd.DataFrame( | |
| {"sentiment_index": 0.0, "news_count": 0}, | |
| index=target_index, | |
| ) | |
| advanced_sent = pd.DataFrame(index=target_index) | |
| # ---- 3. Embedding features ---- | |
| emb_features = _build_daily_embedding_features(session, target_index, pca_dim=cfg.embedding.pca_dim) | |
| # ---- 4. LME / physical market features ---- | |
| from deep_learning.data.lme_warehouse import fetch_lme_data, compute_lme_features, compute_proxy_lme_features | |
| from deep_learning.data.futures_curve import build_futures_features_from_yfinance | |
| lme_raw = fetch_lme_data(cfg.lme) | |
| if not lme_raw.empty: | |
| lme_features = compute_lme_features(lme_raw, windows=cfg.lme.stock_change_windows) | |
| lme_features = lme_features.reindex(target_index).ffill(limit=cfg.lme.max_ffill_days) | |
| else: | |
| lme_features = compute_proxy_lme_features(price_df) | |
| futures_features = build_futures_features_from_yfinance(session, target_symbol, cfg.training.lookback_days) | |
| if not futures_features.empty: | |
| futures_features = futures_features.reindex(target_index).ffill(limit=3) | |
| else: | |
| futures_features = pd.DataFrame(index=target_index) | |
| # ---- 5. Calendar (known future) ---- | |
| calendar_features = _build_calendar_features(target_index) | |
| # ---- 6. Target: next-day simple return ---- | |
| close = price_df["close"] | |
| target_ret = close.pct_change().shift(-1) | |
| target_ret.name = "target" | |
| # ---- Assemble master DataFrame ---- | |
| parts = [ | |
| price_features, | |
| sent_aligned[["sentiment_index", "news_count"]], | |
| advanced_sent, | |
| emb_features, | |
| lme_features, | |
| futures_features, | |
| calendar_features, | |
| target_ret.to_frame(), | |
| ] | |
| master = pd.concat(parts, axis=1) | |
| master = master.loc[target_index] | |
| valid_mask = master["target"].notna() | |
| master = master[valid_mask].copy() | |
| master = master.fillna(0.0) | |
| # Sanitize column names: pytorch_forecasting forbids '.' and '-' in names | |
| master.columns = [ | |
| col.replace(".", "_").replace("-", "_") | |
| for col in master.columns | |
| ] | |
| master["time_idx"] = np.arange(len(master)) | |
| master["group_id"] = "copper" | |
| # Categorise columns – use sanitized calendar col names | |
| calendar_cols = [ | |
| c.replace(".", "_").replace("-", "_") | |
| for c in calendar_features.columns | |
| ] | |
| target_cols = ["target"] | |
| all_feature_cols = [c for c in master.columns if c not in ("time_idx", "group_id", "target")] | |
| time_varying_known = [c for c in calendar_cols if c in master.columns] | |
| time_varying_unknown = [c for c in all_feature_cols if c not in time_varying_known] | |
| logger.info( | |
| "Feature store built: %d rows, %d unknown features, %d known features, %d embedding dims", | |
| len(master), | |
| len(time_varying_unknown), | |
| len(time_varying_known), | |
| len([c for c in master.columns if c.startswith("emb_pca_")]), | |
| ) | |
| return master, time_varying_unknown, time_varying_known, target_cols | |