File size: 6,986 Bytes
b1cdbab 9245dd7 b1cdbab 62d5329 b1cdbab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
loader.py
Loads master_data.parquet from HF Dataset.
Returns price series and daily returns for all ETFs + benchmarks.
No external API calls β HF Dataset only.
"""
import pandas as pd
import numpy as np
import streamlit as st
from huggingface_hub import hf_hub_download
from datetime import datetime, timedelta
import pytz
try:
import pandas_market_calendars as mcal
NYSE_CAL_AVAILABLE = True
except Exception:
NYSE_CAL_AVAILABLE = False
DATASET_REPO = "P2SAMAPA/fi-etf-macro-signal-master-data"
PARQUET_FILE = "master_data.parquet"
# ββ UPDATED ETF LIST βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Removed: TBT
# Added: VCIT, LQD, HYG (Fixed Income ETFs)
TARGET_ETFS = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD"]
BENCHMARK_COLS = ["SPY", "AGG"]
TBILL_COL = "TBILL_3M"
# ββ NYSE calendar βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_last_nyse_trading_day(as_of=None):
est = pytz.timezone("US/Eastern")
if as_of is None:
as_of = datetime.now(est)
today = as_of.date()
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(
start_date=today - timedelta(days=10),
end_date=today,
)
if len(sched) > 0:
return sched.index[-1].date()
except Exception:
pass
candidate = today
while candidate.weekday() >= 5:
candidate -= timedelta(days=1)
return candidate
def get_next_trading_day():
est = pytz.timezone("US/Eastern")
now = datetime.now(est)
today = now.date()
pre_market = now.hour < 9 or (now.hour == 9 and now.minute < 30)
if NYSE_CAL_AVAILABLE:
try:
nyse = mcal.get_calendar("NYSE")
sched = nyse.schedule(
start_date=today,
end_date=today + timedelta(days=10),
)
if len(sched) == 0:
return today
first = sched.index[0].date()
if first == today and pre_market:
return today
for ts in sched.index:
if ts.date() > today:
return ts.date()
return sched.index[-1].date()
except Exception:
pass
candidate = today if pre_market else today + timedelta(days=1)
while candidate.weekday() >= 5:
candidate += timedelta(days=1)
return candidate
def get_est_time():
return datetime.now(pytz.timezone("US/Eastern"))
# ββ Data loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@st.cache_data(ttl=3600, show_spinner=False)
def load_dataset(hf_token: str) -> pd.DataFrame:
try:
path = hf_hub_download(
repo_id=DATASET_REPO,
filename=PARQUET_FILE,
repo_type="dataset",
token=hf_token,
)
df = pd.read_parquet(path)
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["Date", "date", "DATE"]:
if col in df.columns:
df = df.set_index(col)
break
df.index = pd.to_datetime(df.index)
return df.sort_index()
except Exception as e:
st.error(f"β Failed to load dataset: {e}")
return pd.DataFrame()
# ββ Freshness check βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def check_data_freshness(df: pd.DataFrame) -> dict:
if df.empty:
return {"fresh": False, "message": "Dataset is empty.", "last_date": None}
last = df.index[-1].date()
expect = get_last_nyse_trading_day()
fresh = last >= expect
msg = (
f"β
Dataset up to date through **{last}**." if fresh else
f"β οΈ Latest data: **{last}**. Expected **{expect}**. Updates after market close."
)
return {"fresh": fresh, "last_date": last, "message": msg}
# ββ Price β returns βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _to_returns(series: pd.Series) -> pd.Series:
clean = series.dropna()
if len(clean) == 0:
return series
if abs(clean.median()) > 2:
return series.pct_change()
return series
# ββ Prepare sliced dataset ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def prepare_data(df: pd.DataFrame, start_yr: int):
df = df[df.index.year >= start_yr].copy()
availability = {}
for etf in TARGET_ETFS:
if etf not in df.columns:
availability[etf] = {
"available": False,
"message": f"β οΈ {etf} not found in dataset.",
}
continue
col_data = df[etf].dropna()
if len(col_data) == 0:
availability[etf] = {
"available": False,
"message": f"β οΈ {etf} has no data from {start_yr}.",
}
continue
first = col_data.index[0].date()
last = col_data.index[-1].date()
df[f"{etf}_Ret"] = _to_returns(df[etf])
availability[etf] = {
"available": True,
"message": f"β
{etf}: {first} β {last}",
}
for bm in BENCHMARK_COLS:
if bm in df.columns:
df[f"{bm}_Ret"] = _to_returns(df[bm])
tbill_rate = 0.045
if TBILL_COL in df.columns:
raw = df[TBILL_COL].dropna()
if len(raw) > 0:
v = float(raw.iloc[-1])
tbill_rate = v / 100 if v > 1 else v
active_etfs = [e for e in TARGET_ETFS if availability.get(e, {}).get("available")]
return df, availability, active_etfs, tbill_rate
# ββ Dataset summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def dataset_summary(df: pd.DataFrame) -> dict:
if df.empty:
return {}
return {
"rows": len(df),
"start_date": df.index[0].strftime("%Y-%m-%d"),
"end_date": df.index[-1].strftime("%Y-%m-%d"),
"etfs": [e for e in TARGET_ETFS if e in df.columns],
"benchmarks": [b for b in BENCHMARK_COLS if b in df.columns],
"tbill": TBILL_COL in df.columns,
}
|