File size: 10,719 Bytes
02eaa3c 7455898 02eaa3c 7455898 25f0954 d1dbf91 02eaa3c d1dbf91 5b7a80a 02eaa3c 5b7a80a d1dbf91 5b7a80a 02eaa3c 5b7a80a 25f0954 d1dbf91 02eaa3c d1dbf91 02eaa3c d1dbf91 25f0954 d1dbf91 7455898 d1dbf91 7455898 25f0954 7455898 25f0954 7455898 02eaa3c 7455898 02eaa3c 7455898 02eaa3c 7455898 02eaa3c d1dbf91 7455898 02eaa3c 5b7a80a 02eaa3c 5b7a80a 7455898 02eaa3c d1dbf91 5b7a80a 7455898 d1dbf91 7455898 d1dbf91 7455898 5b7a80a 7455898 5b7a80a d1dbf91 02eaa3c d1dbf91 02eaa3c 7455898 d1dbf91 02eaa3c 5b7a80a 02eaa3c 5b7a80a d1dbf91 02eaa3c d1dbf91 02eaa3c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 | """
data/loader.py
Loads master_data.parquet from HF Dataset.
Engineers rich feature set from raw price/macro columns.
No external pings β all data from HF Dataset only.
"""
import pandas as pd
import numpy as np
import streamlit as st
from huggingface_hub import hf_hub_download
from datetime import datetime, timedelta
import pytz
try:
import pandas_market_calendars as mcal
NYSE_CAL_AVAILABLE = True
except ImportError:
NYSE_CAL_AVAILABLE = False
DATASET_REPO = "P2SAMAPA/fi-etf-macro-signal-master-data"
PARQUET_FILE = "master_data.parquet"
TARGET_ETF_COLS = ["TLT", "VNQ", "SLV", "GLD", "LQD", "HYG", "VCIT"]
BENCHMARK_COLS = ["SPY", "AGG"]
TBILL_COL = "TBILL_3M"
MACRO_COLS = ["VIX", "DXY", "T10Y2Y", "IG_SPREAD", "HY_SPREAD"]
# ββ NYSE calendar βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_last_nyse_trading_day(as_of=None):
est = pytz.timezone("US/Eastern")
if as_of is None:
as_of = datetime.now(est)
today = as_of.date()
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(start_date=today - timedelta(days=10), end_date=today)
if len(sched) > 0:
return sched.index[-1].date()
except Exception:
pass
candidate = today
while candidate.weekday() >= 5:
candidate -= timedelta(days=1)
return candidate
# ββ Data loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@st.cache_data(ttl=3600, show_spinner=False)
def load_dataset(hf_token: str) -> pd.DataFrame:
try:
path = hf_hub_download(
repo_id=DATASET_REPO,
filename=PARQUET_FILE,
repo_type="dataset",
token=hf_token,
)
df = pd.read_parquet(path)
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["Date", "date", "DATE"]:
if col in df.columns:
df = df.set_index(col)
break
df.index = pd.to_datetime(df.index)
return df.sort_index()
except Exception as e:
st.error(f"β Failed to load dataset: {e}")
return pd.DataFrame()
# ββ Freshness check βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def check_data_freshness(df: pd.DataFrame) -> dict:
if df.empty:
return {"fresh": False, "message": "Dataset is empty."}
last = df.index[-1].date()
expect = get_last_nyse_trading_day()
fresh = last >= expect
msg = (
f"β
Dataset up to date through {last}." if fresh else
f"β οΈ {expect} data not yet updated. Latest: {last}. "
f"Dataset updates daily after market close."
)
return {"fresh": fresh, "last_date_in_data": last,
"expected_date": expect, "message": msg}
# ββ Price β returns βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _to_returns(series: pd.Series) -> pd.Series:
"""Convert price series to daily pct returns. If already returns, pass through."""
clean = series.dropna()
if len(clean) == 0:
return series
if abs(clean.median()) > 2: # price series
return series.pct_change()
return series # already returns
# ββ Feature engineering βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _engineer_features(df: pd.DataFrame, ret_cols: list) -> pd.DataFrame:
"""
Build a rich feature set from raw macro + ETF return columns.
Features added per ETF return:
- 1d, 5d, 21d lagged returns
- 5d, 21d rolling volatility
- 5d, 21d momentum (cumulative return)
Features added per macro column:
- raw value (z-scored over rolling 252d window)
- 5d change
- 1d lag
Also adds:
- TBILL_3M as a feature (rate level)
- VIX regime flag (VIX > 25)
- Yield curve slope (already T10Y2Y)
- Cross-asset momentum: spread between TLT_ret and AGG_ret
"""
feat = pd.DataFrame(index=df.index)
# ββ ETF return features βββββββββββββββββββββββββββββββββββββββββββββββββββ
for col in ret_cols:
r = df[col]
feat[f"{col}_lag1"] = r.shift(1)
feat[f"{col}_lag5"] = r.shift(5)
feat[f"{col}_lag21"] = r.shift(21)
feat[f"{col}_vol5"] = r.rolling(5).std()
feat[f"{col}_vol21"] = r.rolling(21).std()
feat[f"{col}_mom5"] = r.rolling(5).sum()
feat[f"{col}_mom21"] = r.rolling(21).sum()
# ββ Macro features ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
for col in MACRO_COLS:
if col not in df.columns:
continue
s = df[col]
# Z-score over rolling 252-day window
roll_mean = s.rolling(252, min_periods=63).mean()
roll_std = s.rolling(252, min_periods=63).std()
feat[f"{col}_z"] = (s - roll_mean) / (roll_std + 1e-9)
feat[f"{col}_chg5"] = s.diff(5)
feat[f"{col}_lag1"] = s.shift(1)
# ββ TBILL level βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if TBILL_COL in df.columns:
tbill = df[TBILL_COL]
feat["TBILL_level"] = tbill
feat["TBILL_chg5"] = tbill.diff(5)
# ββ Derived cross-asset signals βββββββββββββββββββββββββββββββββββββββββββ
if "TLT_Ret" in df.columns and "AGG_Ret" in df.columns:
feat["TLT_AGG_spread_mom5"] = (
df["TLT_Ret"].rolling(5).sum() - df["AGG_Ret"].rolling(5).sum()
)
if "VIX" in df.columns:
feat["VIX_regime"] = (df["VIX"] > 25).astype(float)
feat["VIX_mom5"] = df["VIX"].diff(5)
if "T10Y2Y" in df.columns:
feat["YC_inverted"] = (df["T10Y2Y"] < 0).astype(float)
if "IG_SPREAD" in df.columns and "HY_SPREAD" in df.columns:
feat["credit_ratio"] = df["HY_SPREAD"] / (df["IG_SPREAD"] + 1e-9)
return feat
# ββ Main extraction function ββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_features_and_targets(df: pd.DataFrame):
"""
Build return columns for target ETFs and engineer a rich feature set.
Returns:
input_features : list[str]
target_etfs : list[str] e.g. ["TLT_Ret", ...]
tbill_rate : float
df_out : DataFrame with all columns
col_info : dict of diagnostics
"""
missing = [c for c in TARGET_ETF_COLS if c not in df.columns]
if missing:
raise ValueError(
f"Missing ETF columns: {missing}. "
f"Found: {list(df.columns)}"
)
col_info = {}
# ββ Build ETF return columns ββββββββββββββββββββββββββββββββββββββββββββββ
target_etfs = []
for col in TARGET_ETF_COLS:
ret_col = f"{col}_Ret"
df[ret_col] = _to_returns(df[col])
med = abs(df[col].dropna().median())
col_info[col] = f"priceβpct_change (median={med:.2f})" if med > 2 else f"used as-is (median={med:.4f})"
target_etfs.append(ret_col)
# ββ Build benchmark return columns ββββββββββββββββββββββββββββββββββββββββ
for col in BENCHMARK_COLS:
if col in df.columns:
df[f"{col}_Ret"] = _to_returns(df[col])
# ββ Drop NaN from first pct_change row ββββββββββββββββββββββββββββββββββββ
df = df.dropna(subset=target_etfs).copy()
# ββ Engineer features βββββββββββββββββββββββββββββββββββββββββββββββββββββ
feat_df = _engineer_features(df, target_etfs)
# Merge features into df
for col in feat_df.columns:
df[col] = feat_df[col].values
# Drop rows with NaN in features (from lags/rolling)
feat_cols = list(feat_df.columns)
df = df.dropna(subset=feat_cols).copy()
# ββ T-bill rate βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
tbill_rate = 0.045
if TBILL_COL in df.columns:
raw = df[TBILL_COL].dropna()
if len(raw) > 0:
v = float(raw.iloc[-1])
tbill_rate = v / 100 if v > 1 else v
# Input features = all engineered feature columns
exclude = set(
TARGET_ETF_COLS + BENCHMARK_COLS + target_etfs +
[f"{c}_Ret" for c in BENCHMARK_COLS] + [TBILL_COL] +
list(MACRO_COLS)
)
input_features = [c for c in feat_cols if c not in exclude]
return input_features, target_etfs, tbill_rate, df, col_info
# ββ Dataset summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def dataset_summary(df: pd.DataFrame) -> dict:
if df.empty:
return {}
return {
"rows": len(df),
"columns": len(df.columns),
"start_date": df.index[0].strftime("%Y-%m-%d"),
"end_date": df.index[-1].strftime("%Y-%m-%d"),
"etfs_found": [c for c in TARGET_ETF_COLS if c in df.columns],
"benchmarks": [c for c in BENCHMARK_COLS if c in df.columns],
"macro_found": [c for c in MACRO_COLS if c in df.columns],
"tbill_found": TBILL_COL in df.columns,
"all_cols": list(df.columns),
}
|