Spaces:
Running
Running
| """ | |
| LME Warehouse Stock Data Pipeline. | |
| Fetches London Metal Exchange copper warehouse inventory data and computes | |
| physical-market features that capture supply/demand dynamics invisible to | |
| pure price-based indicators: | |
| - Total stock levels and change velocity | |
| - Cancelled warrant ratio (leading indicator of physical demand) | |
| - Inventory depletion rate | |
| Data sources (in priority order): | |
| 1. Nasdaq Data Link (formerly Quandl) - LME/PR_CU dataset | |
| 2. Fallback: synthetic features derived from price-volume patterns | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional, Sequence | |
| import numpy as np | |
| import pandas as pd | |
| from deep_learning.config import LMEConfig, get_tft_config | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Data fetching | |
| # --------------------------------------------------------------------------- | |
| def fetch_lme_from_nasdaq( | |
| api_key: str, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch LME copper warehouse stock data from Nasdaq Data Link. | |
| Returns a DataFrame with columns: | |
| date, total_stock_tonnes, cancelled_warrants_tonnes, | |
| on_warrant_tonnes, cancelled_ratio | |
| """ | |
| try: | |
| import nasdaqdatalink | |
| except ImportError: | |
| try: | |
| import quandl as nasdaqdatalink | |
| except ImportError: | |
| logger.error("Neither nasdaqdatalink nor quandl package installed") | |
| return pd.DataFrame() | |
| nasdaqdatalink.ApiConfig.api_key = api_key | |
| try: | |
| df = nasdaqdatalink.get( | |
| "LME/PR_CU", | |
| start_date=start_date, | |
| end_date=end_date, | |
| ) | |
| except Exception as exc: | |
| logger.warning("Nasdaq Data Link fetch failed: %s", exc) | |
| return pd.DataFrame() | |
| if df.empty: | |
| return df | |
| df = df.reset_index() | |
| col_map = {} | |
| for c in df.columns: | |
| cl = c.lower() | |
| if "date" in cl: | |
| col_map[c] = "date" | |
| elif "stock" in cl or "total" in cl: | |
| col_map[c] = "total_stock_tonnes" | |
| elif "cancel" in cl: | |
| col_map[c] = "cancelled_warrants_tonnes" | |
| df = df.rename(columns=col_map) | |
| if "date" not in df.columns: | |
| df["date"] = df.index | |
| df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None) | |
| df = df.set_index("date").sort_index() | |
| if "total_stock_tonnes" not in df.columns: | |
| if len(df.columns) >= 1: | |
| df = df.rename(columns={df.columns[0]: "total_stock_tonnes"}) | |
| if "cancelled_warrants_tonnes" in df.columns and "total_stock_tonnes" in df.columns: | |
| df["on_warrant_tonnes"] = df["total_stock_tonnes"] - df["cancelled_warrants_tonnes"] | |
| stock = df["total_stock_tonnes"].replace(0, np.nan) | |
| df["cancelled_ratio"] = df["cancelled_warrants_tonnes"] / stock | |
| else: | |
| for col in ["cancelled_warrants_tonnes", "on_warrant_tonnes", "cancelled_ratio"]: | |
| if col not in df.columns: | |
| df[col] = np.nan | |
| return df | |
| def fetch_lme_data( | |
| cfg: Optional[LMEConfig] = None, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Unified LME data loader with automatic source selection. | |
| """ | |
| if cfg is None: | |
| cfg = get_tft_config().lme | |
| api_key = os.environ.get(cfg.nasdaq_api_key_env, "") | |
| if api_key: | |
| df = fetch_lme_from_nasdaq(api_key, start_date=start_date, end_date=end_date) | |
| if not df.empty: | |
| logger.info("Loaded %d LME records from Nasdaq Data Link", len(df)) | |
| return df | |
| logger.info("LME API unavailable - generating proxy features from price data") | |
| return pd.DataFrame() | |
| # --------------------------------------------------------------------------- | |
| # Feature engineering | |
| # --------------------------------------------------------------------------- | |
| def compute_lme_features( | |
| lme_data: pd.DataFrame, | |
| windows: Sequence[int] = (1, 5, 10, 20), | |
| depletion_window: int = 20, | |
| ) -> pd.DataFrame: | |
| """ | |
| Derive physical-market features from raw LME warehouse data. | |
| Features: | |
| - lme_stock_total : raw stock level (normalised) | |
| - lme_stock_change_{w}d : w-day stock change (tonnes) | |
| - lme_stock_pct_change_{w}d : w-day % change | |
| - lme_cancelled_ratio : ratio of cancelled warrants | |
| - lme_cancelled_ratio_ma5 : 5-day MA of cancelled ratio | |
| - lme_depletion_rate : avg daily stock loss over window | |
| - lme_stock_zscore : 60-day Z-score of stock levels | |
| """ | |
| features = pd.DataFrame(index=lme_data.index) | |
| stock = lme_data.get("total_stock_tonnes") | |
| if stock is None: | |
| return features | |
| stock_mean = stock.rolling(60, min_periods=10).mean() | |
| stock_std = stock.rolling(60, min_periods=10).std().replace(0, np.nan) | |
| features["lme_stock_total"] = stock | |
| features["lme_stock_zscore"] = (stock - stock_mean) / stock_std | |
| for w in windows: | |
| change = stock.diff(w) | |
| features[f"lme_stock_change_{w}d"] = change | |
| pct = stock.pct_change(w, fill_method=None) | |
| features[f"lme_stock_pct_change_{w}d"] = pct | |
| features["lme_depletion_rate"] = stock.diff(depletion_window) / depletion_window | |
| cancelled = lme_data.get("cancelled_ratio") | |
| if cancelled is not None: | |
| features["lme_cancelled_ratio"] = cancelled | |
| features["lme_cancelled_ratio_ma5"] = cancelled.rolling(5, min_periods=1).mean() | |
| cr_mean = cancelled.rolling(30, min_periods=5).mean() | |
| cr_std = cancelled.rolling(30, min_periods=5).std().replace(0, np.nan) | |
| features["lme_cancelled_ratio_zscore"] = (cancelled - cr_mean) / cr_std | |
| return features | |
| def compute_proxy_lme_features( | |
| price_df: pd.DataFrame, | |
| volume_col: str = "volume", | |
| close_col: str = "close", | |
| ) -> pd.DataFrame: | |
| """ | |
| When real LME data is unavailable, derive proxy physical-market signals | |
| from price and volume patterns. | |
| Rationale: sharp volume spikes with price increases often correlate with | |
| physical demand surges / inventory draws. | |
| """ | |
| features = pd.DataFrame(index=price_df.index) | |
| vol = price_df.get(volume_col) | |
| close = price_df.get(close_col) | |
| if vol is None or close is None: | |
| return features | |
| vol = vol.fillna(0).astype(float) | |
| close = close.astype(float) | |
| vol_ma20 = vol.rolling(20, min_periods=1).mean() | |
| vol_std20 = vol.rolling(20, min_periods=1).std().replace(0, np.nan) | |
| features["proxy_vol_zscore"] = (vol - vol_ma20) / vol_std20 | |
| features["proxy_vol_spike"] = (features["proxy_vol_zscore"] > 2.0).astype(np.float32) | |
| features["proxy_vol_price_interaction"] = features["proxy_vol_zscore"] * close.pct_change(fill_method=None) | |
| spread_5_20 = close.rolling(5).mean() - close.rolling(20).mean() | |
| features["proxy_momentum_spread"] = spread_5_20 / close.rolling(20).std().replace(0, np.nan) | |
| return features | |
| # --------------------------------------------------------------------------- | |
| # DB persistence | |
| # --------------------------------------------------------------------------- | |
| def persist_lme_data(lme_df: pd.DataFrame) -> int: | |
| """ | |
| Upsert LME warehouse data into the database. | |
| """ | |
| from app.db import SessionLocal | |
| from app.models import LMEWarehouseData | |
| if lme_df.empty: | |
| return 0 | |
| count = 0 | |
| with SessionLocal() as session: | |
| for date_idx, row in lme_df.iterrows(): | |
| date_val = pd.Timestamp(date_idx) | |
| if date_val.tzinfo is None: | |
| date_val = date_val.tz_localize("UTC") | |
| existing = session.query(LMEWarehouseData).filter( | |
| LMEWarehouseData.date == date_val | |
| ).first() | |
| total = float(row.get("total_stock_tonnes", 0)) | |
| if existing: | |
| existing.total_stock_tonnes = total | |
| existing.cancelled_warrants_tonnes = row.get("cancelled_warrants_tonnes") | |
| existing.on_warrant_tonnes = row.get("on_warrant_tonnes") | |
| existing.cancelled_ratio = row.get("cancelled_ratio") | |
| else: | |
| session.add(LMEWarehouseData( | |
| date=date_val, | |
| total_stock_tonnes=total, | |
| cancelled_warrants_tonnes=row.get("cancelled_warrants_tonnes"), | |
| on_warrant_tonnes=row.get("on_warrant_tonnes"), | |
| cancelled_ratio=row.get("cancelled_ratio"), | |
| )) | |
| count += 1 | |
| session.commit() | |
| logger.info("Persisted %d LME warehouse records", count) | |
| return count | |