P2SAMAPA commited on
Commit
ff640d6
Β·
unverified Β·
1 Parent(s): 2c5ef71

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +191 -246
data_manager.py CHANGED
@@ -22,96 +22,72 @@ from utils import get_est_time
22
 
23
  REPO_ID = "P2SAMAPA/my-etf-data"
24
 
 
 
 
 
 
25
 
26
  def fetch_macro_data_robust(start_date="2008-01-01"):
27
- """Fetch macro signals from multiple sources with proper error handling"""
28
  all_data = []
29
-
30
- # 1. FRED Data
31
  if PANDAS_DATAREADER_AVAILABLE:
32
  try:
33
  fred_symbols = {
34
- "T10Y2Y": "T10Y2Y",
35
- "T10Y3M": "T10Y3M",
36
- "DTB3": "DTB3", # 3-Month T-Bill β€” correct risk-free rate
37
  "BAMLH0A0HYM2": "HY_Spread",
38
- "VIXCLS": "VIX",
39
- "DTWEXBGS": "DXY"
40
  }
41
-
42
  fred_data = web.DataReader(
43
- list(fred_symbols.keys()),
44
- "fred",
45
- start_date,
46
- datetime.now()
47
  )
48
  fred_data.columns = [fred_symbols[col] for col in fred_data.columns]
49
-
50
  if fred_data.index.tz is not None:
51
  fred_data.index = fred_data.index.tz_localize(None)
52
-
53
  all_data.append(fred_data)
54
-
55
  except Exception as e:
56
  st.warning(f"⚠️ FRED partial failure: {e}")
57
-
58
- # 2. Yahoo Finance Data
59
  try:
60
- yf_symbols = {
61
- "GC=F": "GOLD",
62
- "HG=F": "COPPER",
63
- "^VIX": "VIX_YF",
64
- }
65
-
66
  yf_data = yf.download(
67
- list(yf_symbols.keys()),
68
- start=start_date,
69
- progress=False,
70
- auto_adjust=True
71
- )['Close']
72
-
73
  if isinstance(yf_data, pd.Series):
74
  yf_data = yf_data.to_frame()
75
-
76
  yf_data.columns = [yf_symbols.get(col, col) for col in yf_data.columns]
77
-
78
  if yf_data.index.tz is not None:
79
  yf_data.index = yf_data.index.tz_localize(None)
80
-
81
  all_data.append(yf_data)
82
-
83
  except Exception as e:
84
  st.warning(f"⚠️ Yahoo Finance failed: {e}")
85
-
86
- # 3. VIX Term Structure
87
  try:
88
  vix_term = yf.download(
89
- ["^VIX", "^VIX3M"],
90
- start=start_date,
91
- progress=False,
92
- auto_adjust=True
93
- )['Close']
94
-
95
  if not vix_term.empty:
96
  if isinstance(vix_term, pd.Series):
97
  vix_term = vix_term.to_frame()
98
-
99
  vix_term.columns = ["VIX_Spot", "VIX_3M"]
100
- vix_term['VIX_Term_Slope'] = vix_term['VIX_3M'] - vix_term['VIX_Spot']
101
-
102
  if vix_term.index.tz is not None:
103
  vix_term.index = vix_term.index.tz_localize(None)
104
-
105
  all_data.append(vix_term)
106
-
107
  except Exception as e:
108
  st.warning(f"⚠️ VIX Term Structure failed: {e}")
109
-
110
- # Combine
111
  if all_data:
112
- combined = pd.concat(all_data, axis=1, join='outer')
113
  combined = combined.loc[:, ~combined.columns.duplicated()]
114
- combined = combined.fillna(method='ffill', limit=5)
115
  return combined
116
  else:
117
  st.error("❌ Failed to fetch any macro data!")
@@ -119,71 +95,40 @@ def fetch_macro_data_robust(start_date="2008-01-01"):
119
 
120
 
121
  def fetch_etf_data(etfs, start_date="2008-01-01"):
122
- """Fetch ETF price data and calculate returns + momentum features"""
 
 
 
 
 
 
 
123
  try:
124
  etf_data = yf.download(
125
- etfs,
126
- start=start_date,
127
- progress=False,
128
- auto_adjust=True
129
- )['Close']
130
-
131
  if isinstance(etf_data, pd.Series):
132
  etf_data = etf_data.to_frame()
133
-
134
  if etf_data.index.tz is not None:
135
  etf_data.index = etf_data.index.tz_localize(None)
136
-
137
  daily_rets = etf_data.pct_change()
138
 
139
- # ── Daily returns (targets will be built from these) ─────────────────
140
  etf_returns = daily_rets.copy()
141
  etf_returns.columns = [f"{col}_Ret" for col in etf_returns.columns]
142
 
143
- # ── 20-day realized volatility ────────────────────────────────────────
 
 
 
 
144
  etf_vol = daily_rets.rolling(20).std() * np.sqrt(252)
145
  etf_vol.columns = [f"{col}_Vol" for col in etf_vol.columns]
146
 
147
- # ── Momentum features: rolling returns over multiple windows ──────────
148
- momentum_frames = []
149
- for window in [5, 10, 21, 63]: # 1W, 2W, 1M, 3M
150
- mom = etf_data.pct_change(window)
151
- mom.columns = [f"{col}_Mom{window}d" for col in mom.columns]
152
- momentum_frames.append(mom)
153
-
154
- # ── Relative strength vs SPY ──────────────────────────────────────────
155
- rel_frames = []
156
- if 'SPY' in etf_data.columns:
157
- spy_ret = etf_data['SPY'].pct_change(21)
158
- for col in etf_data.columns:
159
- if col != 'SPY':
160
- rel = etf_data[col].pct_change(21) - spy_ret
161
- rel_frames.append(rel.rename(f"{col}_RelSPY21d"))
162
-
163
- # ── Cross-sectional momentum rank (1=worst, 5=best among universe) ───
164
- target_etfs_only = [c for c in etf_data.columns
165
- if c not in ['SPY', 'AGG']]
166
- rank_frames = []
167
- for window in [21, 63]:
168
- mom_w = etf_data[target_etfs_only].pct_change(window)
169
- ranked = mom_w.rank(axis=1, pct=True)
170
- ranked.columns = [f"{col}_Rank{window}d" for col in ranked.columns]
171
- rank_frames.append(ranked)
172
-
173
- # ── Recent trend: 5d and 10d price change ─────────────────────────────
174
- trend_frames = []
175
- for window in [5, 10]:
176
- trend = etf_data.pct_change(window)
177
- trend.columns = [f"{col}_Trend{window}d" for col in trend.columns]
178
- trend_frames.append(trend)
179
-
180
- result = pd.concat(
181
- [etf_returns, etf_vol] + momentum_frames +
182
- (rel_frames if rel_frames else []) +
183
- rank_frames + trend_frames,
184
- axis=1
185
- )
186
-
187
  return result
188
 
189
  except Exception as e:
@@ -192,37 +137,37 @@ def fetch_etf_data(etfs, start_date="2008-01-01"):
192
 
193
 
194
  def smart_update_hf_dataset(new_data, token, force_upload=False):
195
- """Smart update: Only uploads if new data exists or gaps are filled.
196
-
197
- Handles new ETFs added to ETF_LIST: detects columns present in new_data
198
- but missing from existing HF dataset, fetches their full history, and
199
- backfills before merging β€” so the full history is populated, not just
200
- recent days.
 
201
  """
202
  if not token:
203
  st.warning("⚠️ No HF_TOKEN found. Skipping dataset update.")
204
  return new_data
205
-
206
  raw_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/etf_data.csv"
207
-
208
  try:
209
  existing_df = pd.read_csv(raw_url)
210
  existing_df.columns = existing_df.columns.str.strip()
211
-
212
- date_col = next((c for c in existing_df.columns
213
- if c.lower() in ['date', 'unnamed: 0']), existing_df.columns[0])
214
-
215
  existing_df[date_col] = pd.to_datetime(existing_df[date_col])
216
  existing_df = existing_df.set_index(date_col).sort_index()
217
-
218
  if existing_df.index.tz is not None:
219
  existing_df.index = existing_df.index.tz_localize(None)
220
 
221
- # ── Detect newly added ETFs ───────────────────────────────────────────
222
- # New ETF columns will be in new_data but have all-NaN in existing_df
223
- # (or be completely absent). Fetch their full history and backfill.
 
224
  new_etf_cols = []
225
- # Infer ETF names from new_data columns ending in _Ret
226
  all_etfs = [c.replace("_Ret", "") for c in new_data.columns if c.endswith("_Ret")]
227
  for etf in all_etfs:
228
  ret_col = f"{etf}_Ret"
@@ -230,47 +175,40 @@ def smart_update_hf_dataset(new_data, token, force_upload=False):
230
  new_etf_cols.append(etf)
231
 
232
  if new_etf_cols:
233
- st.info(f"πŸ†• Detected new ETFs not in HF dataset: {new_etf_cols} β€” fetching full history...")
234
- # Fetch full history for new ETFs from 2008
235
  full_history = fetch_etf_data(new_etf_cols, start_date="2008-01-01")
236
  if not full_history.empty:
237
  if full_history.index.tz is not None:
238
  full_history.index = full_history.index.tz_localize(None)
239
- st.success(f"βœ… Full history fetched for {new_etf_cols}: "
240
- f"{len(full_history)} rows from "
241
- f"{full_history.index[0].date()} to "
242
- f"{full_history.index[-1].date()}")
243
- # Merge full history into new_data by reindexing to the union of dates
244
- combined_index = existing_df.index.union(full_history.index)
245
- existing_df = existing_df.reindex(combined_index)
246
- new_data = new_data.reindex(combined_index)
247
- # Write new ETF columns directly into new_data across full date range
248
- new_cols_only = [c for c in full_history.columns
249
- if c not in existing_df.columns
250
- or existing_df[c].isna().mean() > 0.9]
251
- for col in new_cols_only:
252
- new_data[col] = full_history.reindex(combined_index)[col]
 
 
253
  else:
254
  st.warning(f"⚠️ Could not fetch full history for {new_etf_cols}")
255
 
256
- combined = new_data.combine_first(existing_df)
257
-
258
- # Count changes β€” compare against original existing_df row count
259
- # (existing_df may have been reindexed above if new ETFs were added)
260
- original_row_count = len(pd.read_csv(raw_url, nrows=1)) if False else None
261
  new_rows = len(combined) - len(existing_df)
262
  old_nulls = existing_df.isna().sum().sum()
263
  new_nulls = combined.isna().sum().sum()
264
  filled_gaps = old_nulls - new_nulls
265
-
266
- # Force upload if new ETFs were backfilled (filled_gaps may undercount
267
- # because existing_df was reindexed to match the new date union)
268
  needs_update = force_upload or new_rows > 0 or filled_gaps > 0 or len(new_etf_cols) > 0
269
-
270
  if needs_update:
271
  combined.index.name = "Date"
272
  combined.reset_index().to_csv("etf_data.csv", index=False)
273
-
274
  api = HfApi()
275
  api.upload_file(
276
  path_or_fileobj="etf_data.csv",
@@ -278,53 +216,80 @@ def smart_update_hf_dataset(new_data, token, force_upload=False):
278
  repo_id=REPO_ID,
279
  repo_type="dataset",
280
  token=token,
281
- commit_message=("FORCE " if force_upload else "") + f"Update: {get_est_time().strftime('%Y-%m-%d %H:%M EST')} | +{new_rows} rows, filled {filled_gaps} gaps" + (f", backfilled {new_etf_cols}" if new_etf_cols else ""),
 
 
 
 
 
282
  )
283
-
284
- st.success(f"βœ… Dataset updated: +{new_rows} rows, filled {filled_gaps} gaps")
285
  return combined
286
  else:
287
  st.info("πŸ“Š Dataset already up-to-date. No upload needed.")
288
  return existing_df
289
-
290
  except Exception as e:
291
  st.warning(f"⚠️ Dataset update failed: {e}. Using new data only.")
292
  return new_data
293
 
294
 
295
  def add_regime_features(df):
296
- """Add regime detection features"""
297
-
298
- # VIX Regime
299
- if 'VIX' in df.columns:
300
- df['VIX_Regime_Low'] = (df['VIX'] < 15).astype(int)
301
- df['VIX_Regime_Med'] = ((df['VIX'] >= 15) & (df['VIX'] < 25)).astype(int)
302
- df['VIX_Regime_High'] = (df['VIX'] >= 25).astype(int)
303
-
304
- # Yield Curve Regime
305
- if 'T10Y2Y' in df.columns:
306
- df['YC_Inverted'] = (df['T10Y2Y'] < 0).astype(int)
307
- df['YC_Flat'] = ((df['T10Y2Y'] >= 0) & (df['T10Y2Y'] < 0.5)).astype(int)
308
- df['YC_Steep'] = (df['T10Y2Y'] >= 0.5).astype(int)
309
-
310
- # Credit Stress Regime
311
- if 'HY_Spread' in df.columns:
312
- df['Credit_Stress_Low'] = (df['HY_Spread'] < 400).astype(int)
313
- df['Credit_Stress_Med'] = ((df['HY_Spread'] >= 400) & (df['HY_Spread'] < 600)).astype(int)
314
- df['Credit_Stress_High'] = (df['HY_Spread'] >= 600).astype(int)
315
-
316
- # VIX Term Structure Regime
317
- if 'VIX_Term_Slope' in df.columns:
318
- df['VIX_Term_Contango'] = (df['VIX_Term_Slope'] > 2).astype(int)
319
- df['VIX_Term_Backwardation'] = (df['VIX_Term_Slope'] < -2).astype(int)
320
-
321
- # Rate Environment
322
- if 'T10Y3M' in df.columns:
323
- df['Rates_VeryLow'] = (df['T10Y3M'] < 1.0).astype(int)
324
- df['Rates_Low'] = ((df['T10Y3M'] >= 1.0) & (df['T10Y3M'] < 2.0)).astype(int)
325
- df['Rates_Normal'] = ((df['T10Y3M'] >= 2.0) & (df['T10Y3M'] < 3.0)).astype(int)
326
- df['Rates_High'] = (df['T10Y3M'] >= 3.0).astype(int)
327
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328
  return df
329
 
330
 
@@ -332,41 +297,36 @@ def get_data(start_year, force_refresh=False, clean_hf_dataset=False):
332
  """Main data fetching and processing pipeline"""
333
  raw_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/etf_data.csv"
334
  df = pd.DataFrame()
335
-
336
- # Load from HuggingFace
337
  try:
338
  df = pd.read_csv(raw_url)
339
  df.columns = df.columns.str.strip()
340
-
341
- date_col = next((c for c in df.columns if c.lower() in ['date', 'unnamed: 0']), df.columns[0])
 
342
  df[date_col] = pd.to_datetime(df[date_col])
343
  df = df.set_index(date_col).sort_index()
344
-
345
  if df.index.tz is not None:
346
  df.index = df.index.tz_localize(None)
347
-
348
- # Clean dataset if requested
349
  if clean_hf_dataset:
350
  st.warning("🧹 **Cleaning HF Dataset Mode Active**")
351
  original_cols = len(df.columns)
352
-
353
- nan_pct = (df.isna().sum() / len(df)) * 100
354
  bad_cols = nan_pct[nan_pct > 30].index.tolist()
355
-
356
  if bad_cols:
357
  st.write(f"πŸ“‹ Found {len(bad_cols)} columns with >30% NaNs:")
358
  for col in bad_cols[:10]:
359
  st.write(f" - {col}: {nan_pct[col]:.1f}% NaNs")
360
-
361
  df = df.drop(columns=bad_cols)
362
  st.success(f"βœ… Dropped {len(bad_cols)} columns ({original_cols} β†’ {len(df.columns)})")
363
-
364
  token = os.getenv("HF_TOKEN")
365
  if token:
366
  st.info("πŸ“€ Uploading cleaned dataset...")
367
  df.index.name = "Date"
368
  df.reset_index().to_csv("etf_data.csv", index=False)
369
-
370
  api = HfApi()
371
  api.upload_file(
372
  path_or_fileobj="etf_data.csv",
@@ -374,84 +334,69 @@ def get_data(start_year, force_refresh=False, clean_hf_dataset=False):
374
  repo_id=REPO_ID,
375
  repo_type="dataset",
376
  token=token,
377
- commit_message=f"Cleaned dataset: Removed {len(bad_cols)} columns"
378
  )
379
  st.success("βœ… HF dataset updated!")
380
-
381
  except Exception as e:
382
  st.warning(f"⚠️ Could not load from HuggingFace: {e}")
383
-
384
- # Sync fresh data if needed
385
  from utils import is_sync_window
386
  should_sync = is_sync_window() or force_refresh
387
-
388
  if should_sync:
389
  sync_reason = "πŸ”„ Manual Refresh" if force_refresh else "πŸ”„ Sync Window Active"
390
-
391
  with st.status(f"{sync_reason} - Updating Dataset...", expanded=False):
392
- etf_list = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD", "AGG", "SPY"]
393
-
394
- etf_data = fetch_etf_data(etf_list)
395
  macro_data = fetch_macro_data_robust()
396
-
397
  if not etf_data.empty and not macro_data.empty:
398
  new_df = pd.concat([etf_data, macro_data], axis=1)
399
- token = os.getenv("HF_TOKEN")
400
- df = smart_update_hf_dataset(new_df, token, force_upload=force_refresh)
401
-
402
- # Fetch fresh if still empty
403
  if df.empty:
404
  st.warning("πŸ“Š Fetching fresh data...")
405
- etf_list = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD", "AGG", "SPY"]
406
- etf_data = fetch_etf_data(etf_list)
407
  macro_data = fetch_macro_data_robust()
408
-
409
  if not etf_data.empty and not macro_data.empty:
410
  df = pd.concat([etf_data, macro_data], axis=1)
411
-
412
- # Feature Engineering: Z-Scores for macro + vol columns
413
- macro_cols = ['VIX', 'DXY', 'COPPER', 'GOLD', 'HY_Spread', 'T10Y2Y', 'T10Y3M',
414
- 'VIX_Spot', 'VIX_3M', 'VIX_Term_Slope']
415
 
 
 
 
 
 
416
  for col in df.columns:
417
- if any(m in col for m in macro_cols) or '_Vol' in col:
418
- rolling_mean = df[col].rolling(20, min_periods=5).mean()
419
- rolling_std = df[col].rolling(20, min_periods=5).std()
420
- z_col = f"{col}_Z"
421
- df[z_col] = (df[col] - rolling_mean) / (rolling_std + 1e-9)
422
-
423
- # Z-score the momentum/rank/trend features too so they're on same scale
424
- mom_pattern_cols = [c for c in df.columns if any(
425
- tag in c for tag in ['_Mom', '_RelSPY', '_Rank', '_Trend']
426
- )]
427
- for col in mom_pattern_cols:
428
- rolling_mean = df[col].rolling(60, min_periods=10).mean()
429
- rolling_std = df[col].rolling(60, min_periods=10).std()
430
- df[f"{col}_Z"] = (df[col] - rolling_mean) / (rolling_std + 1e-9)
431
-
432
- # Add regime features
433
  st.write("🎯 **Adding Regime Detection Features...**")
434
  df = add_regime_features(df)
435
-
436
- # Filter by start year
437
  df = df[df.index.year >= start_year]
438
  st.info(f"πŸ“… After year filter ({start_year}+): {len(df)} samples")
439
-
440
- # Cleaning
441
- nan_percentages = df.isna().sum() / len(df)
442
- bad_features = nan_percentages[nan_percentages > 0.5].index.tolist()
443
-
444
  if bad_features:
445
  st.warning(f"πŸ—‘οΈ Dropping {len(bad_features)} features with >50% NaNs")
446
  df = df.drop(columns=bad_features)
447
-
448
- df = df.fillna(method='ffill', limit=5)
449
- df = df.fillna(method='bfill', limit=100)
450
- df = df.fillna(method='ffill')
451
-
452
  df = df.dropna()
453
-
454
  if len(df) > 0:
455
- st.success(f"βœ… Final dataset: {len(df)} samples from {df.index[0].strftime('%Y-%m-%d')} to {df.index[-1].strftime('%Y-%m-%d')}")
456
-
 
 
 
457
  return df
 
22
 
23
  REPO_ID = "P2SAMAPA/my-etf-data"
24
 
25
+ # ── ETF universe ──────────────────────────────────────────────────────────────
26
+ # TBT removed (leveraged decay). Added: VCIT, LQD, HYG (investment grade + HY credit)
27
+ ETF_LIST = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD", "AGG", "SPY"]
28
+ TARGET_ETFS = ["TLT", "VCIT", "LQD", "HYG", "VNQ", "SLV", "GLD"] # excludes benchmarks
29
+
30
 
31
  def fetch_macro_data_robust(start_date="2008-01-01"):
32
+ """Fetch macro signals from FRED and Yahoo Finance"""
33
  all_data = []
34
+
35
+ # 1. FRED
36
  if PANDAS_DATAREADER_AVAILABLE:
37
  try:
38
  fred_symbols = {
39
+ "T10Y2Y": "T10Y2Y",
40
+ "T10Y3M": "T10Y3M",
41
+ "DTB3": "DTB3",
42
  "BAMLH0A0HYM2": "HY_Spread",
43
+ "VIXCLS": "VIX",
44
+ "DTWEXBGS": "DXY",
45
  }
 
46
  fred_data = web.DataReader(
47
+ list(fred_symbols.keys()), "fred", start_date, datetime.now()
 
 
 
48
  )
49
  fred_data.columns = [fred_symbols[col] for col in fred_data.columns]
 
50
  if fred_data.index.tz is not None:
51
  fred_data.index = fred_data.index.tz_localize(None)
 
52
  all_data.append(fred_data)
 
53
  except Exception as e:
54
  st.warning(f"⚠️ FRED partial failure: {e}")
55
+
56
+ # 2. Yahoo Finance β€” gold, copper, VIX
57
  try:
58
+ yf_symbols = {"GC=F": "GOLD", "HG=F": "COPPER", "^VIX": "VIX_YF"}
 
 
 
 
 
59
  yf_data = yf.download(
60
+ list(yf_symbols.keys()), start=start_date, progress=False, auto_adjust=True
61
+ )["Close"]
 
 
 
 
62
  if isinstance(yf_data, pd.Series):
63
  yf_data = yf_data.to_frame()
 
64
  yf_data.columns = [yf_symbols.get(col, col) for col in yf_data.columns]
 
65
  if yf_data.index.tz is not None:
66
  yf_data.index = yf_data.index.tz_localize(None)
 
67
  all_data.append(yf_data)
 
68
  except Exception as e:
69
  st.warning(f"⚠️ Yahoo Finance failed: {e}")
70
+
71
+ # 3. VIX term structure
72
  try:
73
  vix_term = yf.download(
74
+ ["^VIX", "^VIX3M"], start=start_date, progress=False, auto_adjust=True
75
+ )["Close"]
 
 
 
 
76
  if not vix_term.empty:
77
  if isinstance(vix_term, pd.Series):
78
  vix_term = vix_term.to_frame()
 
79
  vix_term.columns = ["VIX_Spot", "VIX_3M"]
80
+ vix_term["VIX_Term_Slope"] = vix_term["VIX_3M"] - vix_term["VIX_Spot"]
 
81
  if vix_term.index.tz is not None:
82
  vix_term.index = vix_term.index.tz_localize(None)
 
83
  all_data.append(vix_term)
 
84
  except Exception as e:
85
  st.warning(f"⚠️ VIX Term Structure failed: {e}")
86
+
 
87
  if all_data:
88
+ combined = pd.concat(all_data, axis=1, join="outer")
89
  combined = combined.loc[:, ~combined.columns.duplicated()]
90
+ combined = combined.ffill(limit=5)
91
  return combined
92
  else:
93
  st.error("❌ Failed to fetch any macro data!")
 
95
 
96
 
97
  def fetch_etf_data(etfs, start_date="2008-01-01"):
98
+ """
99
+ Fetch ETF price data and calculate features.
100
+
101
+ Produces exactly 3 columns per ETF to match HF dataset schema:
102
+ {ETF}_Ret β€” daily return
103
+ {ETF}_MA20 β€” 20-day simple moving average of price (raw)
104
+ {ETF}_Vol β€” 20-day annualised realised volatility
105
+ """
106
  try:
107
  etf_data = yf.download(
108
+ etfs, start=start_date, progress=False, auto_adjust=True
109
+ )["Close"]
110
+
 
 
 
111
  if isinstance(etf_data, pd.Series):
112
  etf_data = etf_data.to_frame()
113
+
114
  if etf_data.index.tz is not None:
115
  etf_data.index = etf_data.index.tz_localize(None)
116
+
117
  daily_rets = etf_data.pct_change()
118
 
119
+ # Daily returns
120
  etf_returns = daily_rets.copy()
121
  etf_returns.columns = [f"{col}_Ret" for col in etf_returns.columns]
122
 
123
+ # 20-day simple moving average (raw price)
124
+ etf_ma20 = etf_data.rolling(20).mean()
125
+ etf_ma20.columns = [f"{col}_MA20" for col in etf_ma20.columns]
126
+
127
+ # 20-day annualised realised volatility
128
  etf_vol = daily_rets.rolling(20).std() * np.sqrt(252)
129
  etf_vol.columns = [f"{col}_Vol" for col in etf_vol.columns]
130
 
131
+ result = pd.concat([etf_returns, etf_ma20, etf_vol], axis=1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  return result
133
 
134
  except Exception as e:
 
137
 
138
 
139
  def smart_update_hf_dataset(new_data, token, force_upload=False):
140
+ """
141
+ Smart update: merges new_data on top of existing HF dataset and uploads
142
+ if anything changed β€” or always uploads when force_upload=True.
143
+
144
+ Also handles newly added ETFs: detects ETFs whose _Ret column is missing
145
+ or all-NaN in the existing dataset, fetches their full history back to
146
+ 2008, and backfills before uploading.
147
  """
148
  if not token:
149
  st.warning("⚠️ No HF_TOKEN found. Skipping dataset update.")
150
  return new_data
151
+
152
  raw_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/etf_data.csv"
153
+
154
  try:
155
  existing_df = pd.read_csv(raw_url)
156
  existing_df.columns = existing_df.columns.str.strip()
157
+ date_col = next(
158
+ (c for c in existing_df.columns if c.lower() in ["date", "unnamed: 0"]),
159
+ existing_df.columns[0],
160
+ )
161
  existing_df[date_col] = pd.to_datetime(existing_df[date_col])
162
  existing_df = existing_df.set_index(date_col).sort_index()
 
163
  if existing_df.index.tz is not None:
164
  existing_df.index = existing_df.index.tz_localize(None)
165
 
166
+ # ── Step 1: merge recent new_data on top of existing ─────────────────
167
+ combined = new_data.combine_first(existing_df)
168
+
169
+ # ── Step 2: detect ETFs missing / all-NaN in existing dataset ────────
170
  new_etf_cols = []
 
171
  all_etfs = [c.replace("_Ret", "") for c in new_data.columns if c.endswith("_Ret")]
172
  for etf in all_etfs:
173
  ret_col = f"{etf}_Ret"
 
175
  new_etf_cols.append(etf)
176
 
177
  if new_etf_cols:
178
+ st.info(f"πŸ†• New ETFs detected: {new_etf_cols} β€” fetching full history from 2008...")
 
179
  full_history = fetch_etf_data(new_etf_cols, start_date="2008-01-01")
180
  if not full_history.empty:
181
  if full_history.index.tz is not None:
182
  full_history.index = full_history.index.tz_localize(None)
183
+ # Expand combined to cover all dates in full_history
184
+ full_index = combined.index.union(full_history.index)
185
+ combined = combined.reindex(full_index)
186
+ # Write full history directly into combined (overwrites NaN-only columns)
187
+ cols_to_backfill = [
188
+ c for c in full_history.columns
189
+ if c not in combined.columns or combined[c].isna().mean() > 0.9
190
+ ]
191
+ for col in cols_to_backfill:
192
+ combined[col] = full_history.reindex(full_index)[col]
193
+ st.success(
194
+ f"βœ… Full history fetched for {new_etf_cols}: "
195
+ f"{len(full_history)} rows "
196
+ f"({full_history.index[0].date()} β†’ {full_history.index[-1].date()}), "
197
+ f"{len(cols_to_backfill)} columns backfilled"
198
+ )
199
  else:
200
  st.warning(f"⚠️ Could not fetch full history for {new_etf_cols}")
201
 
202
+ # ── Step 3: decide whether to upload ─────────────────────────────────
 
 
 
 
203
  new_rows = len(combined) - len(existing_df)
204
  old_nulls = existing_df.isna().sum().sum()
205
  new_nulls = combined.isna().sum().sum()
206
  filled_gaps = old_nulls - new_nulls
 
 
 
207
  needs_update = force_upload or new_rows > 0 or filled_gaps > 0 or len(new_etf_cols) > 0
208
+
209
  if needs_update:
210
  combined.index.name = "Date"
211
  combined.reset_index().to_csv("etf_data.csv", index=False)
 
212
  api = HfApi()
213
  api.upload_file(
214
  path_or_fileobj="etf_data.csv",
 
216
  repo_id=REPO_ID,
217
  repo_type="dataset",
218
  token=token,
219
+ commit_message=(
220
+ ("FORCE " if force_upload else "") +
221
+ f"Update: {get_est_time().strftime('%Y-%m-%d %H:%M EST')} | "
222
+ f"+{new_rows} rows, filled {filled_gaps} gaps" +
223
+ (f", backfilled {new_etf_cols}" if new_etf_cols else "")
224
+ ),
225
  )
226
+ st.success(f"βœ… Dataset updated: +{new_rows} rows, filled {filled_gaps} gaps"
227
+ + (f", backfilled {new_etf_cols}" if new_etf_cols else ""))
228
  return combined
229
  else:
230
  st.info("πŸ“Š Dataset already up-to-date. No upload needed.")
231
  return existing_df
232
+
233
  except Exception as e:
234
  st.warning(f"⚠️ Dataset update failed: {e}. Using new data only.")
235
  return new_data
236
 
237
 
238
  def add_regime_features(df):
239
+ """Add regime detection features using pd.concat to avoid fragmentation"""
240
+ new_cols = {}
241
+
242
+ if "VIX" in df.columns:
243
+ new_cols["VIX_Regime_Low"] = (df["VIX"] < 15).astype(int)
244
+ new_cols["VIX_Regime_Med"] = ((df["VIX"] >= 15) & (df["VIX"] < 25)).astype(int)
245
+ new_cols["VIX_Regime_High"] = (df["VIX"] >= 25).astype(int)
246
+
247
+ if "T10Y2Y" in df.columns:
248
+ new_cols["YC_Inverted"] = (df["T10Y2Y"] < 0).astype(int)
249
+ new_cols["YC_Flat"] = ((df["T10Y2Y"] >= 0) & (df["T10Y2Y"] < 0.5)).astype(int)
250
+ new_cols["YC_Steep"] = (df["T10Y2Y"] >= 0.5).astype(int)
251
+
252
+ if "HY_Spread" in df.columns:
253
+ new_cols["Credit_Stress_Low"] = (df["HY_Spread"] < 400).astype(int)
254
+ new_cols["Credit_Stress_Med"] = ((df["HY_Spread"] >= 400) & (df["HY_Spread"] < 600)).astype(int)
255
+ new_cols["Credit_Stress_High"] = (df["HY_Spread"] >= 600).astype(int)
256
+
257
+ if "VIX_Term_Slope" in df.columns:
258
+ new_cols["VIX_Term_Contango"] = (df["VIX_Term_Slope"] > 2).astype(int)
259
+ new_cols["VIX_Term_Backwardation"] = (df["VIX_Term_Slope"] < -2).astype(int)
260
+
261
+ if "T10Y3M" in df.columns:
262
+ new_cols["Rates_VeryLow"] = (df["T10Y3M"] < 1.0).astype(int)
263
+ new_cols["Rates_Low"] = ((df["T10Y3M"] >= 1.0) & (df["T10Y3M"] < 2.0)).astype(int)
264
+ new_cols["Rates_Normal"] = ((df["T10Y3M"] >= 2.0) & (df["T10Y3M"] < 3.0)).astype(int)
265
+ new_cols["Rates_High"] = (df["T10Y3M"] >= 3.0).astype(int)
266
+
267
+ if "T10Y2Y" in df.columns:
268
+ yc_mom20 = df["T10Y2Y"].diff(20)
269
+ yc_mom60 = df["T10Y2Y"].diff(60)
270
+ new_cols["YC_Mom20d"] = yc_mom20
271
+ new_cols["YC_Mom60d"] = yc_mom60
272
+ new_cols["Rates_Rising20d"] = (yc_mom20 > 0).astype(int)
273
+ new_cols["Rates_Falling20d"] = (yc_mom20 < 0).astype(int)
274
+ new_cols["Rates_Rising60d"] = (yc_mom60 > 0).astype(int)
275
+ new_cols["Rates_Falling60d"] = (yc_mom60 < 0).astype(int)
276
+ yc_accel = yc_mom20.diff(20)
277
+ new_cols["YC_Accel"] = yc_accel
278
+ new_cols["Rates_Accelerating"] = (yc_accel > 0).astype(int)
279
+
280
+ if "T10Y3M" in df.columns:
281
+ t3m_mom20 = df["T10Y3M"].diff(20)
282
+ t3m_mom60 = df["T10Y3M"].diff(60)
283
+ new_cols["T10Y3M_Mom20d"] = t3m_mom20
284
+ new_cols["T10Y3M_Mom60d"] = t3m_mom60
285
+ new_cols["T10Y3M_Rising20d"] = (t3m_mom20 > 0).astype(int)
286
+ new_cols["T10Y3M_Falling20d"] = (t3m_mom20 < 0).astype(int)
287
+ new_cols["T10Y3M_Rising60d"] = (t3m_mom60 > 0).astype(int)
288
+ new_cols["T10Y3M_Falling60d"] = (t3m_mom60 < 0).astype(int)
289
+
290
+ if new_cols:
291
+ df = pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)
292
+
293
  return df
294
 
295
 
 
297
  """Main data fetching and processing pipeline"""
298
  raw_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/etf_data.csv"
299
  df = pd.DataFrame()
300
+
301
+ # ── Load from HuggingFace ─────────────────────────────────────────────────
302
  try:
303
  df = pd.read_csv(raw_url)
304
  df.columns = df.columns.str.strip()
305
+ date_col = next(
306
+ (c for c in df.columns if c.lower() in ["date", "unnamed: 0"]), df.columns[0]
307
+ )
308
  df[date_col] = pd.to_datetime(df[date_col])
309
  df = df.set_index(date_col).sort_index()
 
310
  if df.index.tz is not None:
311
  df.index = df.index.tz_localize(None)
312
+
313
+ # Optional: clean >30% NaN columns
314
  if clean_hf_dataset:
315
  st.warning("🧹 **Cleaning HF Dataset Mode Active**")
316
  original_cols = len(df.columns)
317
+ nan_pct = (df.isna().sum() / len(df)) * 100
 
318
  bad_cols = nan_pct[nan_pct > 30].index.tolist()
 
319
  if bad_cols:
320
  st.write(f"πŸ“‹ Found {len(bad_cols)} columns with >30% NaNs:")
321
  for col in bad_cols[:10]:
322
  st.write(f" - {col}: {nan_pct[col]:.1f}% NaNs")
 
323
  df = df.drop(columns=bad_cols)
324
  st.success(f"βœ… Dropped {len(bad_cols)} columns ({original_cols} β†’ {len(df.columns)})")
 
325
  token = os.getenv("HF_TOKEN")
326
  if token:
327
  st.info("πŸ“€ Uploading cleaned dataset...")
328
  df.index.name = "Date"
329
  df.reset_index().to_csv("etf_data.csv", index=False)
 
330
  api = HfApi()
331
  api.upload_file(
332
  path_or_fileobj="etf_data.csv",
 
334
  repo_id=REPO_ID,
335
  repo_type="dataset",
336
  token=token,
337
+ commit_message=f"Cleaned dataset: Removed {len(bad_cols)} columns",
338
  )
339
  st.success("βœ… HF dataset updated!")
340
+
341
  except Exception as e:
342
  st.warning(f"⚠️ Could not load from HuggingFace: {e}")
343
+
344
+ # ── Sync / force refresh ──────────────────────────────────────────────────
345
  from utils import is_sync_window
346
  should_sync = is_sync_window() or force_refresh
347
+
348
  if should_sync:
349
  sync_reason = "πŸ”„ Manual Refresh" if force_refresh else "πŸ”„ Sync Window Active"
 
350
  with st.status(f"{sync_reason} - Updating Dataset...", expanded=False):
351
+ etf_data = fetch_etf_data(ETF_LIST)
 
 
352
  macro_data = fetch_macro_data_robust()
 
353
  if not etf_data.empty and not macro_data.empty:
354
  new_df = pd.concat([etf_data, macro_data], axis=1)
355
+ token = os.getenv("HF_TOKEN")
356
+ df = smart_update_hf_dataset(new_df, token, force_upload=force_refresh)
357
+
358
+ # ── Fallback: fetch fresh if still empty ──────────────────────────────────
359
  if df.empty:
360
  st.warning("πŸ“Š Fetching fresh data...")
361
+ etf_data = fetch_etf_data(ETF_LIST)
 
362
  macro_data = fetch_macro_data_robust()
 
363
  if not etf_data.empty and not macro_data.empty:
364
  df = pd.concat([etf_data, macro_data], axis=1)
 
 
 
 
365
 
366
+ # ── Feature engineering: Z-scores ──────────────��─────────────────────────
367
+ macro_cols = [
368
+ "VIX", "DXY", "COPPER", "GOLD", "HY_Spread", "T10Y2Y", "T10Y3M",
369
+ "VIX_Spot", "VIX_3M", "VIX_Term_Slope",
370
+ ]
371
  for col in df.columns:
372
+ if any(m in col for m in macro_cols) or "_Vol" in col:
373
+ roll_mean = df[col].rolling(20, min_periods=5).mean()
374
+ roll_std = df[col].rolling(20, min_periods=5).std()
375
+ df[f"{col}_Z"] = (df[col] - roll_mean) / (roll_std + 1e-9)
376
+
377
+ # ── Regime features ───────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
378
  st.write("🎯 **Adding Regime Detection Features...**")
379
  df = add_regime_features(df)
380
+
381
+ # ── Filter by start year ──────────────────────────────────────────────────
382
  df = df[df.index.year >= start_year]
383
  st.info(f"πŸ“… After year filter ({start_year}+): {len(df)} samples")
384
+
385
+ # ── Drop columns with >50% NaNs ───────────────────────────────────────────
386
+ nan_pct = df.isna().sum() / len(df)
387
+ bad_features = nan_pct[nan_pct > 0.5].index.tolist()
 
388
  if bad_features:
389
  st.warning(f"πŸ—‘οΈ Dropping {len(bad_features)} features with >50% NaNs")
390
  df = df.drop(columns=bad_features)
391
+
392
+ # ── Fill remaining NaNs ───────────────────────────────────────────────────
393
+ df = df.ffill(limit=5).bfill(limit=100).ffill()
 
 
394
  df = df.dropna()
395
+
396
  if len(df) > 0:
397
+ st.success(
398
+ f"βœ… Final dataset: {len(df)} samples from "
399
+ f"{df.index[0].strftime('%Y-%m-%d')} to {df.index[-1].strftime('%Y-%m-%d')}"
400
+ )
401
+
402
  return df