P2SAMAPA
Update loader.py
62d5329 unverified
"""
loader.py
Loads master_data.parquet from HF Dataset.
Returns price series and daily returns for all ETFs + benchmarks.
No external API calls β€” HF Dataset only.
"""
import pandas as pd
import numpy as np
import streamlit as st
from huggingface_hub import hf_hub_download
from datetime import datetime, timedelta
import pytz
try:
import pandas_market_calendars as mcal
NYSE_CAL_AVAILABLE = True
except Exception:
NYSE_CAL_AVAILABLE = False
DATASET_REPO = "P2SAMAPA/fi-etf-macro-signal-master-data"
PARQUET_FILE = "master_data.parquet"
# ── UPDATED ETF LIST ─────────────────────────────────────────────────────────
# Removed: TBT
# Added: VCIT, LQD, HYG (Fixed Income ETFs)
TARGET_ETFS = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD"]
BENCHMARK_COLS = ["SPY", "AGG"]
TBILL_COL = "TBILL_3M"
# ── NYSE calendar ─────────────────────────────────────────────────────────────
def get_last_nyse_trading_day(as_of=None):
est = pytz.timezone("US/Eastern")
if as_of is None:
as_of = datetime.now(est)
today = as_of.date()
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(
start_date=today - timedelta(days=10),
end_date=today,
)
if len(sched) > 0:
return sched.index[-1].date()
except Exception:
pass
candidate = today
while candidate.weekday() >= 5:
candidate -= timedelta(days=1)
return candidate
def get_next_trading_day():
est = pytz.timezone("US/Eastern")
now = datetime.now(est)
today = now.date()
pre_market = now.hour < 9 or (now.hour == 9 and now.minute < 30)
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(
start_date=today,
end_date=today + timedelta(days=10),
)
if len(sched) == 0:
return today
first = sched.index[0].date()
if first == today and pre_market:
return today
for ts in sched.index:
if ts.date() > today:
return ts.date()
return sched.index[-1].date()
except Exception:
pass
candidate = today if pre_market else today + timedelta(days=1)
while candidate.weekday() >= 5:
candidate += timedelta(days=1)
return candidate
def get_est_time():
return datetime.now(pytz.timezone("US/Eastern"))
# ── Data loading ──────────────────────────────────────────────────────────────
@st.cache_data(ttl=3600, show_spinner=False)
def load_dataset(hf_token: str) -> pd.DataFrame:
try:
path = hf_hub_download(
repo_id=DATASET_REPO,
filename=PARQUET_FILE,
repo_type="dataset",
token=hf_token,
)
df = pd.read_parquet(path)
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["Date", "date", "DATE"]:
if col in df.columns:
df = df.set_index(col)
break
df.index = pd.to_datetime(df.index)
return df.sort_index()
except Exception as e:
st.error(f"❌ Failed to load dataset: {e}")
return pd.DataFrame()
# ── Freshness check ───────────────────────────────────────────────────────────
def check_data_freshness(df: pd.DataFrame) -> dict:
if df.empty:
return {"fresh": False, "message": "Dataset is empty.", "last_date": None}
last = df.index[-1].date()
expect = get_last_nyse_trading_day()
fresh = last >= expect
msg = (
f"βœ… Dataset up to date through **{last}**." if fresh else
f"⚠️ Latest data: **{last}**. Expected **{expect}**. Updates after market close."
)
return {"fresh": fresh, "last_date": last, "message": msg}
# ── Price β†’ returns ───────────────────────────────────────────────────────────
def _to_returns(series: pd.Series) -> pd.Series:
clean = series.dropna()
if len(clean) == 0:
return series
if abs(clean.median()) > 2:
return series.pct_change()
return series
# ── Prepare sliced dataset ────────────────────────────────────────────────────
def prepare_data(df: pd.DataFrame, start_yr: int):
df = df[df.index.year >= start_yr].copy()
availability = {}
for etf in TARGET_ETFS:
if etf not in df.columns:
availability[etf] = {
"available": False,
"message": f"⚠️ {etf} not found in dataset.",
}
continue
col_data = df[etf].dropna()
if len(col_data) == 0:
availability[etf] = {
"available": False,
"message": f"⚠️ {etf} has no data from {start_yr}.",
}
continue
first = col_data.index[0].date()
last = col_data.index[-1].date()
df[f"{etf}_Ret"] = _to_returns(df[etf])
availability[etf] = {
"available": True,
"message": f"βœ… {etf}: {first} β†’ {last}",
}
for bm in BENCHMARK_COLS:
if bm in df.columns:
df[f"{bm}_Ret"] = _to_returns(df[bm])
tbill_rate = 0.045
if TBILL_COL in df.columns:
raw = df[TBILL_COL].dropna()
if len(raw) > 0:
v = float(raw.iloc[-1])
tbill_rate = v / 100 if v > 1 else v
active_etfs = [e for e in TARGET_ETFS if availability.get(e, {}).get("available")]
return df, availability, active_etfs, tbill_rate
# ── Dataset summary ───────────────────────────────────────────────────────────
def dataset_summary(df: pd.DataFrame) -> dict:
if df.empty:
return {}
return {
"rows": len(df),
"start_date": df.index[0].strftime("%Y-%m-%d"),
"end_date": df.index[-1].strftime("%Y-%m-%d"),
"etfs": [e for e in TARGET_ETFS if e in df.columns],
"benchmarks": [b for b in BENCHMARK_COLS if b in df.columns],
"tbill": TBILL_COL in df.columns,
}