File size: 12,279 Bytes
9694a9d d3714ab 9694a9d cbd97d6 9694a9d cbd97d6 9694a9d b65a56b 74b343e 3354f5c b65a56b 9694a9d 402d7a3 e58ecdd cbd97d6 9694a9d b65a56b 9694a9d cbd97d6 9694a9d b65a56b 9694a9d cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 402d7a3 b65a56b d3714ab cbd97d6 3354f5c cbd97d6 3354f5c b65a56b d3714ab cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 d3714ab b65a56b d3714ab 9694a9d cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 9694a9d b65a56b e58ecdd 9694a9d cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 3354f5c cbd97d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | """
data/loader.py
Loads master_data.parquet from HF Dataset.
Engineers rich feature set from raw price/macro columns.
No external pings β all data from HF Dataset only.
"""
import pandas as pd
import numpy as np
import streamlit as st
from huggingface_hub import hf_hub_download
from datetime import datetime, timedelta
import pytz
try:
import pandas_market_calendars as mcal
NYSE_CAL_AVAILABLE = True
except ImportError:
NYSE_CAL_AVAILABLE = False
DATASET_REPO = "P2SAMAPA/fi-etf-macro-signal-master-data"
PARQUET_FILE = "master_data.parquet"
TARGET_ETF_COLS = ["TLT", "VNQ", "SLV", "GLD", "LQD", "HYG", "VCIT"]
BENCHMARK_COLS = ["SPY", "AGG"]
TBILL_COL = "TBILL_3M"
MACRO_COLS = ["VIX", "DXY", "T10Y2Y", "IG_SPREAD", "HY_SPREAD"]
# Minimum non-NaN fraction a feature column must have to be included in dropna.
# Columns below this threshold are forward-filled instead of causing row drops.
MIN_COVERAGE = 0.80
# ββ NYSE calendar βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_last_nyse_trading_day(as_of=None):
est = pytz.timezone("US/Eastern")
if as_of is None:
as_of = datetime.now(est)
today = as_of.date()
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(start_date=today - timedelta(days=10), end_date=today)
if len(sched) > 0:
return sched.index[-1].date()
except Exception:
pass
candidate = today
while candidate.weekday() >= 5:
candidate -= timedelta(days=1)
return candidate
# ββ Data loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@st.cache_data(ttl=3600, show_spinner=False)
def load_dataset(hf_token: str) -> pd.DataFrame:
try:
path = hf_hub_download(
repo_id=DATASET_REPO,
filename=PARQUET_FILE,
repo_type="dataset",
token=hf_token,
)
df = pd.read_parquet(path)
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["Date", "date", "DATE"]:
if col in df.columns:
df = df.set_index(col)
break
df.index = pd.to_datetime(df.index)
return df.sort_index()
except Exception as e:
st.error(f"β Failed to load dataset: {e}")
return pd.DataFrame()
# ββ Freshness check βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def check_data_freshness(df: pd.DataFrame) -> dict:
if df.empty:
return {"fresh": False, "message": "Dataset is empty."}
last = df.index[-1].date()
expect = get_last_nyse_trading_day()
fresh = last >= expect
msg = (
f"β
Dataset up to date through {last}." if fresh else
f"β οΈ {expect} data not yet updated. Latest: {last}. "
f"Dataset updates daily after market close."
)
return {"fresh": fresh, "last_date_in_data": last,
"expected_date": expect, "message": msg}
# ββ Price β returns βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _to_returns(series: pd.Series) -> pd.Series:
"""Convert price series to daily pct returns. If already returns, pass through."""
clean = series.dropna()
if len(clean) == 0:
return series
if abs(clean.median()) > 2: # price series
return series.pct_change()
return series # already returns
# ββ Feature engineering βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _engineer_features(df: pd.DataFrame, ret_cols: list) -> pd.DataFrame:
"""
Build a rich feature set from raw macro + ETF return columns.
FIX: Macro z-scores used rolling(252, min_periods=63).
When data starts at start_yr with no prior history, the first ~252 rows
all have NaN z-scores. The subsequent dropna then wipes those rows β but
that's expected and fine (252 rows ~ 1 year of warmup).
The dangerous case is when a macro column has SPARSE data (many NaNs
throughout, not just at the start) β then dropna wipes most rows.
Fix: forward-fill macro columns before computing features so sparse
macro data doesn't destroy the row count.
"""
feat = pd.DataFrame(index=df.index)
# ββ ETF return features βββββββββββββββββββββββββββββββββββββββββββββββββββ
for col in ret_cols:
r = df[col]
feat[f"{col}_lag1"] = r.shift(1)
feat[f"{col}_lag5"] = r.shift(5)
feat[f"{col}_lag21"] = r.shift(21)
feat[f"{col}_vol5"] = r.rolling(5).std()
feat[f"{col}_vol21"] = r.rolling(21).std()
feat[f"{col}_mom5"] = r.rolling(5).sum()
feat[f"{col}_mom21"] = r.rolling(21).sum()
# ββ Macro features ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
for col in MACRO_COLS:
if col not in df.columns:
continue
# FIX: forward-fill macro series before computing features.
# Macro data (VIX, DXY, spreads) often has weekend/holiday gaps.
# Without ffill, rolling windows produce NaN β dropna kills rows.
s = df[col].ffill()
roll_mean = s.rolling(252, min_periods=63).mean()
roll_std = s.rolling(252, min_periods=63).std()
feat[f"{col}_z"] = (s - roll_mean) / (roll_std + 1e-9)
feat[f"{col}_chg5"] = s.diff(5)
feat[f"{col}_lag1"] = s.shift(1)
# ββ TBILL level βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if TBILL_COL in df.columns:
# FIX: ffill T-bill too β FRED data has gaps on weekends/holidays
tbill = df[TBILL_COL].ffill()
feat["TBILL_level"] = tbill
feat["TBILL_chg5"] = tbill.diff(5)
# ββ Derived cross-asset signals βββββββββββββββββββββββββββββββββββββββββββ
if "TLT_Ret" in df.columns and "AGG_Ret" in df.columns:
feat["TLT_AGG_spread_mom5"] = (
df["TLT_Ret"].rolling(5).sum() - df["AGG_Ret"].rolling(5).sum()
)
if "VIX" in df.columns:
vix = df["VIX"].ffill()
feat["VIX_regime"] = (vix > 25).astype(float)
feat["VIX_mom5"] = vix.diff(5)
if "T10Y2Y" in df.columns:
feat["YC_inverted"] = (df["T10Y2Y"].ffill() < 0).astype(float)
if "IG_SPREAD" in df.columns and "HY_SPREAD" in df.columns:
feat["credit_ratio"] = (
df["HY_SPREAD"].ffill() / (df["IG_SPREAD"].ffill() + 1e-9)
)
return feat
# ββ Main extraction function ββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_features_and_targets(df: pd.DataFrame):
"""
Build return columns for target ETFs and engineer a rich feature set.
FIX: The original dropna(subset=feat_cols) dropped rows for ANY NaN
in ANY feature column. This wiped out all rows when:
- Macro columns had sparse data (many NaNs throughout)
- Rolling windows needed warmup (first ~252 rows)
Now we:
1. Forward-fill macro before feature engineering (in _engineer_features)
2. Only include feature columns with >= MIN_COVERAGE non-NaN values
in the strict dropna β sparse columns are ffill-ed instead.
3. Log row counts at each step so issues are visible in the UI.
Returns:
input_features : list[str]
target_etfs : list[str] e.g. ["TLT_Ret", ...]
tbill_rate : float
df_out : DataFrame with all columns
col_info : dict of diagnostics
"""
missing = [c for c in TARGET_ETF_COLS if c not in df.columns]
if missing:
raise ValueError(
f"Missing ETF columns: {missing}. "
f"Found: {list(df.columns)}"
)
col_info = {}
rows_start = len(df)
# ββ Build ETF return columns ββββββββββββββββββββββββββββββββββββββββββββββ
target_etfs = []
for col in TARGET_ETF_COLS:
ret_col = f"{col}_Ret"
df[ret_col] = _to_returns(df[col])
med = abs(df[col].dropna().median())
col_info[col] = (
f"priceβpct_change (median={med:.2f})" if med > 2
else f"used as-is (median={med:.4f})"
)
target_etfs.append(ret_col)
# ββ Build benchmark return columns ββββββββββββββββββββββββββββββββββββββββ
for col in BENCHMARK_COLS:
if col in df.columns:
df[f"{col}_Ret"] = _to_returns(df[col])
# ββ Drop NaN from first pct_change row ββββββββββββββββββββββββββββββββββββ
df = df.dropna(subset=target_etfs).copy()
rows_after_ret = len(df)
# ββ Engineer features βββββββββββββββββββββββββββββββββββββββββββββββββββββ
feat_df = _engineer_features(df, target_etfs)
# Merge features into df
for col in feat_df.columns:
df[col] = feat_df[col].values
feat_cols = list(feat_df.columns)
# ββ Smart dropna: only strict-drop on well-covered columns βββββββββββββββ
# Columns with sparse data (< MIN_COVERAGE) are forward-filled rather than
# used as dropna criteria β prevents macro gaps from wiping all rows.
n = len(df)
strict_cols = []
ffill_cols = []
for col in feat_cols:
coverage = df[col].notna().sum() / n if n > 0 else 0
if coverage >= MIN_COVERAGE:
strict_cols.append(col)
else:
ffill_cols.append(col)
# Forward-fill sparse columns
if ffill_cols:
df[ffill_cols] = df[ffill_cols].ffill()
# Drop rows where well-covered features still have NaN (warmup rows)
if strict_cols:
df = df.dropna(subset=strict_cols).copy()
rows_after_feat = len(df)
# ββ Diagnostic info βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
col_info["_diagnostics"] = (
f"rows: {rows_start} raw β {rows_after_ret} after ret dropna β "
f"{rows_after_feat} after feature dropna | "
f"strict_cols={len(strict_cols)} ffill_cols={len(ffill_cols)}"
)
# ββ T-bill rate βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
tbill_rate = 0.045
if TBILL_COL in df.columns:
raw = df[TBILL_COL].dropna()
if len(raw) > 0:
v = float(raw.iloc[-1])
tbill_rate = v / 100 if v > 1 else v
# ββ Input features ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
exclude = set(
TARGET_ETF_COLS + BENCHMARK_COLS + target_etfs +
[f"{c}_Ret" for c in BENCHMARK_COLS] + [TBILL_COL] +
list(MACRO_COLS)
)
input_features = [c for c in feat_cols if c not in exclude]
if len(df) == 0:
raise ValueError(
f"No rows remain after feature engineering. "
f"Diagnostics: {col_info['_diagnostics']}"
)
return input_features, target_etfs, tbill_rate, df, col_info
# ββ Dataset summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def dataset_summary(df: pd.DataFrame) -> dict:
if df.empty:
return {}
return {
"rows": len(df),
"columns": len(df.columns),
"start_date": df.index[0].strftime("%Y-%m-%d"),
"end_date": df.index[-1].strftime("%Y-%m-%d"),
"etfs_found": [c for c in TARGET_ETF_COLS if c in df.columns],
"benchmarks": [c for c in BENCHMARK_COLS if c in df.columns],
"macro_found":[c for c in MACRO_COLS if c in df.columns],
"tbill_found": TBILL_COL in df.columns,
"all_cols": list(df.columns),
}
|