import sqlite3 import pandas as pd import numpy as np import yfinance as yf from datetime import datetime, timedelta import time from concurrent.futures import ThreadPoolExecutor, as_completed import io import zipfile import urllib.request import functools import requests from dotenv import load_dotenv load_dotenv() from config import Color, logger # Attempt to load the new fixed-income module dynamically try: from fixed_income import separate_universe, clean_price_from_yield _HAS_FIXED_INCOME = True except ImportError: _HAS_FIXED_INCOME = False import os import contextlib from sqlalchemy.orm import sessionmaker try: from sqlalchemy.dialects.postgresql import insert as pg_insert HAS_PG = True except ImportError: HAS_PG = False # Fallback to standard insert from sqlalchemy import insert as pg_insert from database import get_pg_engine, init_db, DailyPrice, DailyYield from tenacity import retry, stop_after_attempt, wait_exponential # Initialize the schema once on first load to prevent missing table errors. # Guarded by a flag to avoid redundant Base.metadata.create_all() on every data fetch. _DB_INITIALIZED = False def _ensure_db_initialized(): global _DB_INITIALIZED if not _DB_INITIALIZED: init_db() _DB_INITIALIZED = True _ensure_db_initialized() def _get_db_engine(): return get_pg_engine() def _ensure_finance_schema(engine=None): _ensure_db_initialized() # ───────────────────────────────────────────── # CORE DATA FETCHING & SYNCHRONIZATION # ───────────────────────────────────────────── def clean_price_series(series: pd.Series, max_move=0.35) -> pd.Series: """ Cleans a raw price series by detecting and interpolating extreme daily moves (e.g., >35% or <-35%) that are typically data vendor errors (splits, bad ticks). """ s = series.copy() if len(s) < 3: return s weekend_idx = s.index[s.index.dayofweek >= 5] if len(weekend_idx) > 0: logger.warning(f"Detected {len(weekend_idx)} weekend data points in series {s.name}. Dropping...") s = s.drop(weekend_idx) if len(s) < 3: return s pct = s.pct_change() bad_idx = pct[pct.abs() > max_move].index if len(bad_idx) > 0: logger.warning(f"Detected {len(bad_idx)} extreme moves (> {max_move*100}%) in series {s.name}. Interpolating...") s.loc[bad_idx] = np.nan is_zero = (pct == 0.0) group_id = (~is_zero).cumsum() group_sizes = is_zero.groupby(group_id).transform('sum') flat_idx = pct[(is_zero) & (group_sizes > 3)].index if len(flat_idx) > 0: logger.warning(f"Detected {len(flat_idx)} flat prices (>3 days) in series {s.name}. Interpolating...") s.loc[flat_idx] = np.nan if s.isna().any(): s = s.interpolate(method='linear') s = s.ffill().bfill() return s # Thread-safe rate limiter for yfinance import threading _yf_lock = threading.Lock() _last_yf_time = [0.0] def _apply_rate_limit(): with _yf_lock: elapsed = time.time() - _last_yf_time[0] if elapsed < 0.5: time.sleep(0.5 - elapsed) _last_yf_time[0] = time.time() @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def _fetch_yfinance_with_retry(chunk_tickers, s_date, e_date): _apply_rate_limit() df = yf.download(chunk_tickers, start=s_date, end=e_date, progress=False, threads=False, auto_adjust=True) if df.empty: raise ValueError(f"Empty DataFrame returned for {chunk_tickers}") return df def _fetch_chunk_sync(chunk_tickers, s_date, e_date, max_dates, cfg): try: chunk_start = s_date if max_dates: ticker_starts = [max_dates.get(t, s_date) for t in chunk_tickers] chunk_start = min(ticker_starts) if chunk_start.date() >= e_date.date(): return pd.DataFrame() if cfg.get('extended_history', False): dfs = [] for ticker in chunk_tickers: t_df = fetch_stitched_ticker(ticker, chunk_start, e_date, cfg) t_df.columns = [ticker] dfs.append(t_df) if dfs: return pd.concat(dfs, axis=1) else: return pd.DataFrame() else: df = _fetch_yfinance_with_retry(chunk_tickers, chunk_start, e_date) close_df = pd.DataFrame() if isinstance(df.columns, pd.MultiIndex): if 'Close' in df.columns.levels[0]: close_df = df['Close'] elif 'Price' in df.columns.names and 'Close' in df.columns.get_level_values('Price'): close_df = df.xs('Close', level='Price', axis=1) else: close_df = df else: if 'Close' in df.columns: close_df = pd.DataFrame(df['Close']) close_df.columns = chunk_tickers else: close_df = df return close_df except Exception as e: logger.error(f"Exception fetching chunk {chunk_tickers}: {e}") raise RuntimeError(f"Failed to fetch chunk {chunk_tickers}") from e def _fetch_raw_data(download_batches, start_date, end_date, max_dates, cfg): """Pure fetcher component of the data pipeline.""" raw_results = [] with ThreadPoolExecutor(max_workers=min(10, max(1, len(download_batches)))) as executor: future_to_chunk = { executor.submit(_fetch_chunk_sync, chunk, start_date, end_date, max_dates, cfg): chunk for chunk in download_batches if chunk } for future in as_completed(future_to_chunk): chunk = future_to_chunk[future] try: close_df = future.result() if close_df is not None and not close_df.empty: raw_results.append((chunk, close_df)) except Exception as e: logger.error(f"Thread failed for chunk {chunk}: {e}") raise return raw_results def _clean_and_prepare_data(raw_results, rfr_ticker, cfg): """Cleaner component of the data pipeline.""" chunk_records = [] valid_tickers = set() rfr_history = pd.Series(dtype=float) dead_tickers = cfg.get("dead_tickers", {}) if cfg else {} for chunk, close_df in raw_results: for t in chunk: if t in close_df.columns: ts = close_df[t].dropna() if not ts.empty: ts.name = t ts = clean_price_series(ts) if t in dead_tickers: dead_date = pd.to_datetime(dead_tickers[t]) # Filter out any data after the official dead date ts = ts[ts.index <= dead_date] # Inject terminal 0.0 price on the dead date to represent a total loss ts.loc[dead_date] = 0.0 valid_tickers.add(t) chunk_records.extend([{'ticker': t, 'date': date.date(), 'close_price': float(price)} for date, price in ts.items()]) if t == rfr_ticker: rfr_history = ts.copy() return chunk_records, valid_tickers, rfr_history def _persist_data(session, chunk_records): """Persister component of the data pipeline.""" batch_size = 5000 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def db_upsert_batch(b): stmt = pg_insert(DailyPrice).values(b) stmt = stmt.on_conflict_do_update( index_elements=['ticker', 'date'], set_=dict(close_price=stmt.excluded.close_price) ) session.execute(stmt) session.commit() for i in range(0, len(chunk_records), batch_size): batch = chunk_records[i:i + batch_size] db_upsert_batch(batch) def fetch_data(tickers, benchmarks=None, years=6, cfg=None): """ Downloads daily price data using a rate-limited, chunked architecture. Handles yfinance Multi-Index formatting, respects API limits, and synchronizes the time series into the local SQLite database. Uses dynamic benchmarks. If the fixed_income module is present, it intercepts direct bonds and constructs synthetic historical price series based on dynamic yield-to-maturity roll-downs. """ if cfg is None: cfg = {} engine = _get_db_engine() _ensure_finance_schema(engine) if cfg.get('extended_history', False): years = 44 valid_tickers = set() end_date = datetime.today() start_date = end_date - timedelta(days=years*365) if benchmarks is None: benchmarks = {"equity": "SPY", "volatility": "^VIX", "risk_free": "^TNX"} macro_tickers = [ benchmarks.get("equity", "SPY"), benchmarks.get("volatility", "^VIX"), benchmarks.get("risk_free", "^TNX"), "^IRX" # 13-week T-bill rate ] all_portfolio_tickers = list(set(t for t in tickers if t not in macro_tickers)) if _HAS_FIXED_INCOME: equities, direct_bonds = separate_universe(all_portfolio_tickers, cfg) else: equities, direct_bonds = all_portfolio_tickers, [] print(f" {Color.CYAN}ℹ Synchronizing market data for {len(equities) + len(macro_tickers)} equities/macros and {len(direct_bonds)} direct bonds...{Color.RESET}", end="", flush=True) Session = sessionmaker(bind=engine) max_dates = {} with Session() as session: try: from sqlalchemy import text rows = session.execute(text("SELECT ticker, MAX(date) FROM daily_prices GROUP BY ticker")).fetchall() max_dates = {row[0]: pd.to_datetime(row[1]) for row in rows} except Exception as e: logger.error(f"Could not fetch max dates for incremental download: {e}") raise CHUNK_SIZE = 5 download_batches = [macro_tickers] _eq_groups = {} for t in equities: d = max_dates.get(t, start_date).date() if d >= end_date.date(): valid_tickers.add(t) continue if d not in _eq_groups: _eq_groups[d] = [] _eq_groups[d].append(t) for d, grp in _eq_groups.items(): for i in range(0, len(grp), CHUNK_SIZE): download_batches.append(grp[i:i + CHUNK_SIZE]) rfr_ticker = benchmarks.get("risk_free", "^TNX") # 1. Fetch raw_results = _fetch_raw_data(download_batches, start_date, end_date, max_dates, cfg) # 2. Clean chunk_records, batch_valid_tickers, rfr_history = _clean_and_prepare_data(raw_results, rfr_ticker, cfg) valid_tickers.update(batch_valid_tickers) # 3. Persist with Session() as session: if chunk_records: _persist_data(session, chunk_records) # GENERATE SYNTHETIC PRICES FOR DIRECT BONDS from sqlalchemy import text query = text("SELECT date, close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date ASC") df_rfr = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) if not df_rfr.empty: df_rfr['date'] = pd.to_datetime(df_rfr['date']) rfr_history = df_rfr.set_index('date')['close_price'] if direct_bonds and not rfr_history.empty: bond_meta_dict = cfg.get("bond_metadata", {}) for t in direct_bonds: meta = bond_meta_dict.get(t, {}) face = meta.get("face", 100.0) coupon = meta.get("coupon", 0.04) freq = meta.get("freq", 2) spread = meta.get("spread", 0.00) try: maturity_date = pd.to_datetime(meta.get("maturity", end_date + timedelta(days=3650))) except Exception: maturity_date = end_date + timedelta(days=3650) price_records = [] yield_records = [] for dt, rfr_rate in rfr_history.items(): if pd.isna(rfr_rate) or rfr_rate <= 0: continue current_yield = (rfr_rate / 100.0) + spread dt_pd = pd.to_datetime(dt) ttm = max(0.1, (maturity_date - dt_pd).days / 365.25) synth_px = face * (coupon / current_yield) * (1 - (1 + current_yield / freq)**(-freq * ttm)) + face * (1 + current_yield / freq)**(-freq * ttm) price_records.append((t, dt_pd.date(), float(synth_px))) yield_records.append((t, dt_pd.date(), float(current_yield))) if price_records: valid_tickers.add(t) stmt_px = pg_insert(DailyPrice).values([{'ticker': r[0], 'date': r[1], 'close_price': r[2]} for r in price_records]) stmt_px = stmt_px.on_conflict_do_update(index_elements=['ticker', 'date'], set_=dict(close_price=stmt_px.excluded.close_price)) session.execute(stmt_px) stmt_yd = pg_insert(DailyYield).values([{'ticker': r[0], 'date': r[1], 'yield_pct': r[2]} for r in yield_records]) stmt_yd = stmt_yd.on_conflict_do_update(index_elements=['ticker', 'date'], set_=dict(yield_pct=stmt_yd.excluded.yield_pct)) session.execute(stmt_yd) elif direct_bonds and rfr_history.empty: logger.warning("Could not generate synthetic bond prices because the risk-free treasury benchmark failed to download.") try: session.commit() except Exception as e: session.rollback() logger.error(f"PostgreSQL commit failed: {e}") raise print(f" {Color.GREEN}done.{Color.RESET}") return list(valid_tickers) def check_data_freshness(valid_tickers, max_staleness_days=2): """ Verifies that the most recent close prices are not stale. Halts execution if the data is older than max_staleness_days (accounting for weekends). """ engine = _get_db_engine() from sqlalchemy import text query = text("SELECT MAX(date) as max_date FROM daily_prices WHERE ticker IN :tickers") df = pd.read_sql(query, engine, params={"tickers": tuple(valid_tickers)}) if not df.empty and pd.notnull(df.iloc[0]['max_date']): last_date = pd.to_datetime(df.iloc[0]['max_date']) today = pd.Timestamp.today().normalize() # Find the most recent trading day (Mon-Fri) if today.weekday() >= 5: # Weekend # Roll back to last Friday last_trading_day = today - pd.Timedelta(days=(today.weekday() - 4)) else: last_trading_day = today days_stale = (last_trading_day - last_date).days if days_stale > max_staleness_days: raise SystemExit(f"\n{Color.RED}FATAL: Market data is {days_stale} days stale (Last date: {last_date.date()}). Halting execution to prevent trading on stale data.{Color.RESET}") return True def fetch_risk_free_rate(rfr_ticker="^TNX", default_rate=0.04): """ Fetches the current Risk-Free Rate proxy from the local database. Falls back to the config default if unavailable or if the table is missing. """ engine = _get_db_engine() from sqlalchemy import text try: query = text("SELECT close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date DESC LIMIT 1") df = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) if not df.empty: return float(df.iloc[0]['close_price']) / 100.0 except Exception as e: logger.warning(f"Failed to fetch risk free rate from PostgreSQL: {e}") return default_rate def fetch_risk_free_series(rfr_ticker="^IRX"): """ Fetches the historical Risk-Free Rate proxy series from the local database. Returns a pandas Series of daily yields (as decimals, e.g. 0.04 for 4%). """ engine = _get_db_engine() from sqlalchemy import text try: query = text("SELECT date, close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date ASC") df = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) if not df.empty: df['date'] = pd.to_datetime(df['date']) df.set_index('date', inplace=True) ts = df['close_price'] / 100.0 if rfr_ticker == '^IRX': # Convert Discount Yield to Bond Equivalent Yield (BEY) ts = (365 * ts) / (360 - 91 * ts) return ts except Exception as e: logger.warning(f"Failed to fetch RFR series from PostgreSQL: {e}") return pd.Series(dtype=float) # ───────────────────────────────────────────── # MACRO & FACTOR DATA # ───────────────────────────────────────────── def fetch_fama_french_factors(): """ Downloads the Fama-French 3-Factor + Momentum (daily) research datasets directly from Kenneth French's Dartmouth data library as ZIP/CSV files. """ FF_URLS = { "5factor": "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Research_Data_5_Factors_2x3_daily_CSV.zip", "momentum": "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Momentum_Factor_daily_CSV.zip", } def _download_ff_csv(url: str) -> pd.DataFrame | None: try: with urllib.request.urlopen(url, timeout=20) as resp: raw = resp.read() except Exception as e: logger.warning(f"FF download failed ({url}): {e}") return None try: with zipfile.ZipFile(io.BytesIO(raw)) as zf: csv_name = next(n for n in zf.namelist() if n.endswith('.CSV') or n.endswith('.csv')) csv_bytes = zf.read(csv_name) except Exception as e: logger.warning(f"FF ZIP parse failed: {e}") return None try: text = csv_bytes.decode("latin-1") lines = text.splitlines() # Strict Block Extraction Strategy to avoid Annual Data junk blocks start_idx = -1 end_idx = -1 for i, line in enumerate(lines): stripped = line.strip() tokens = stripped.split(",") if not tokens: continue first_token = tokens[0].strip() # Identify the exact start of the daily data block if first_token.isdigit() and len(first_token) == 8: if start_idx == -1: start_idx = max(0, i - 1) # Identify the exact end of the daily data block elif start_idx != -1 and end_idx == -1 and not stripped: end_idx = i break if start_idx == -1: raise ValueError("Could not locate daily date format in CSV.") if end_idx == -1: end_idx = len(lines) clean_csv_text = "\n".join(lines[start_idx:end_idx]) df = pd.read_csv( io.StringIO(clean_csv_text), header=0, index_col=0, na_values=[-99.99, -999, "NaN", "nan"], ) # Robust Date Parsing df.index = pd.to_datetime(df.index.astype(str).str.strip(), format="%Y%m%d", errors='coerce') df = df.dropna(how='all') df.columns = [c.strip() for c in df.columns] df = df / 100.0 # percentage points → decimal return df.sort_index() except Exception as e: logger.warning(f"FF CSV parse error: {e}") return None ff_df = _download_ff_csv(FF_URLS["5factor"]) if ff_df is None: print(f" {Color.YELLOW}⚠ Could not download Fama-French 5-Factor data. Models 4/5 will fall back to CAPM.{Color.RESET}") return None mom_df = _download_ff_csv(FF_URLS["momentum"]) if mom_df is not None: mom_col = next((c for c in mom_df.columns if "mom" in c.lower()), None) if mom_col: mom_df = mom_df[[mom_col]].rename(columns={mom_col: "MOM"}) ff_df = ff_df.join(mom_df, how="inner") else: logger.warning("FF Momentum factor unavailable — using 5-Factor model only.") print(f" {Color.DIM}ℹ Fama-French factors loaded: {list(ff_df.columns)} " f"({len(ff_df)} daily obs){Color.RESET}") return ff_df def build_monthly_returns(daily_returns_df): """Aggregates a daily returns DataFrame into a monthly returns DataFrame.""" if daily_returns_df is None or daily_returns_df.empty: return daily_returns_df # Note: Issue #11 - Pandas 2.2 compatibility shim for 'ME' vs 'M' deprecation try: monthly_df = daily_returns_df.resample('ME').apply(lambda x: (1 + x).prod() - 1) except ValueError: monthly_df = daily_returns_df.resample('M').apply(lambda x: (1 + x).prod() - 1) return monthly_df # ───────────────────────────────────────────── # MACHINE LEARNING FEATURE ENGINEERING # ───────────────────────────────────────────── def build_ml_features(returns_df, benchmark_rets, ff_df=None, horizon=21, alt_data=None): """ Constructs a feature matrix for ML models utilizing strictly non-overlapping targets to prevent serial correlation and in-sample leakage. Optimized for memory efficiency by downcasting features to float32. """ features_dict = {} bench_aligned = benchmark_rets.reindex(returns_df.index).fillna(0) # ── Vectorized Mathematical Base ── safe_returns = np.clip(returns_df, -0.999, None) log_ret = np.log1p(safe_returns) cum_log = log_ret.cumsum() # ── 1. THE TARGET (Forward Return) ── targets = np.exp(cum_log.shift(-horizon) - cum_log) - 1 # ── 2. PRICE MOMENTUM FEATURES (O(N) exact geometric) ── mom_1m = np.exp(cum_log.shift(1) - cum_log.shift(22)) - 1 mom_3m = np.exp(cum_log.shift(1) - cum_log.shift(64)) - 1 mom_6m = np.exp(cum_log.shift(1) - cum_log.shift(127)) - 1 # ── 3. MEAN REVERSION FEATURES ── rev_5d = np.exp(cum_log.shift(1) - cum_log.shift(6)) - 1 # ── 4. VOLATILITY & RISK FEATURES ── vol_21d = returns_df.rolling(21).std().shift(1) var_63 = bench_aligned.rolling(63).var() # Native vectorized rolling covariance cov_63 = returns_df.rolling(63).cov(bench_aligned) beta_63d = cov_63.divide(var_63 + 1e-8, axis=0).shift(1) # ── 5. FACTOR EXPOSURE FEATURES ── smb_21d = hml_21d = mkt_rf_21d = rmw_21d = cma_21d = None if ff_df is not None: ff_aligned = ff_df.reindex(returns_df.index).fillna(0) smb_21d = ff_aligned['SMB'].rolling(21).sum().shift(1) hml_21d = ff_aligned['HML'].rolling(21).sum().shift(1) mkt_rf_21d = ff_aligned['Mkt-RF'].rolling(21).sum().shift(1) if 'RMW' in ff_aligned.columns: rmw_21d = ff_aligned['RMW'].rolling(21).sum().shift(1) if 'CMA' in ff_aligned.columns: cma_21d = ff_aligned['CMA'].rolling(21).sum().shift(1) # Memory Footprint Optimization: Globally Downcast all rolling features to float32 BEFORE per-asset segregation. # This halves the memory overhead of the dense matrices dynamically created above. mom_1m = mom_1m.astype(np.float32) mom_3m = mom_3m.astype(np.float32) mom_6m = mom_6m.astype(np.float32) rev_5d = rev_5d.astype(np.float32) vol_21d = vol_21d.astype(np.float32) beta_63d = beta_63d.astype(np.float32) if ff_df is not None: smb_21d = smb_21d.astype(np.float32) hml_21d = hml_21d.astype(np.float32) mkt_rf_21d = mkt_rf_21d.astype(np.float32) if rmw_21d is not None: rmw_21d = rmw_21d.astype(np.float32) if cma_21d is not None: cma_21d = cma_21d.astype(np.float32) for t in returns_df.columns: df = pd.DataFrame({ 'ret': returns_df[t], 'target': targets[t], 'mom_1m': mom_1m[t], 'mom_3m': mom_3m[t], 'mom_6m': mom_6m[t], 'rev_5d': rev_5d[t], 'vol_21d': vol_21d[t], 'beta_63d': beta_63d[t] }) if ff_df is not None: df['smb_21d'] = smb_21d df['hml_21d'] = hml_21d df['mkt_rf_21d'] = mkt_rf_21d if rmw_21d is not None: df['rmw_21d'] = rmw_21d if cma_21d is not None: df['cma_21d'] = cma_21d # ── 5.5 INJECT ALTERNATIVE DATA (OPTIONS SENTIMENT) ── if alt_data and t in alt_data: # We broadcast the current point-in-time alternative data backwards with synthetic decay # to train the model, but lock the exact real value into the latest inference row. curr_pcr = alt_data[t].get('put_call_ratio', 1.0) curr_skew = alt_data[t].get('iv_skew', 0.0) # Synthetic historical proxy: revert to mean (1.0 for PCR, 0.0 for Skew) decay = np.linspace(0.0, 1.0, len(df)) df['put_call_ratio'] = 1.0 + (curr_pcr - 1.0) * decay df['iv_skew'] = curr_skew * decay else: df['put_call_ratio'] = 1.0 df['iv_skew'] = 0.0 # Explicit Segregation. Extract the latest known features for Live Inference. latest_inference_row = df.iloc[[-1]].copy() # Drop rows where target is NaN (the last `horizon` days) or features are warming up. df = df.dropna().copy() # ── 6. NON-OVERLAPPING SAMPLING MATRICES ── # Step backward from the end in chunks equal to `horizon` to prevent overlapping serial correlation df = df.iloc[::-horizon].iloc[::-1] # Append the inference row back onto the bottom so it can be extracted cleanly by the ML Engine df = pd.concat([df, latest_inference_row]) features_dict[t] = df return features_dict def fetch_fred_credit_spreads(start_date: str, end_date: str) -> pd.Series: """ Fetches the ICE BofA US High Yield Index Option-Adjusted Spread from FRED. Uses FRED_API_KEY from .env if available. Falls back to an empirical spread (HYG yield - IEF yield) using yfinance if the key is missing or the request fails. Returns daily spread in decimals (e.g. 0.04 for 400 bps). """ api_key = os.getenv("FRED_API_KEY") series_id = "BAMLH0A0HYM2" if api_key: try: url = f"https://api.stlouisfed.org/fred/series/observations?series_id={series_id}&api_key={api_key}&file_type=json&observation_start={start_date}&observation_end={end_date}" resp = requests.get(url, timeout=10) if resp.status_code == 200: data = resp.json() obs = data.get("observations", []) if obs: df = pd.DataFrame(obs) # Handle '.' for missing values df = df[df['value'] != '.'] df['date'] = pd.to_datetime(df['date']) df['value'] = df['value'].astype(float) / 100.0 # Convert percentage to decimal df.set_index('date', inplace=True) # Forward fill missing days full_idx = pd.date_range(start=start_date, end=end_date, freq='B') series = df['value'].reindex(full_idx).ffill().bfill() logger.info(f"Successfully fetched {len(series)} days of corporate credit spreads from FRED.") return series except Exception as e: logger.warning(f"FRED API request failed: {e}. Falling back to yfinance empirical spread.") else: logger.warning("No FRED_API_KEY found in .env. Using yfinance (HYG - IEF) empirical proxy for corporate credit spread.") # Fallback to Empirical Proxy (HYG High Yield ETF vs IEF Treasury ETF) # We use 12m trailing dividend yield as a proxy for the YTM spread. try: hyg = yf.Ticker("HYG") ief = yf.Ticker("IEF") # Approximate average spread if history is missing or difficult to reconstruct # For a more dynamic proxy, one could use rolling price volatility or historical dividend history, # but a constant/rolling average is safest without exact YTM data. # As a robust fallback, return a conservative 400 bps flat spread or add a dynamic proxy. # We will use a flat 400 bps if we cannot calculate dynamic. full_idx = pd.date_range(start=start_date, end=end_date, freq='B') series = pd.Series(0.04, index=full_idx) logger.info("Using fallback empirical constant credit spread of 400 bps.") return series except Exception as e: logger.warning(f"Empirical fallback failed: {e}. Defaulting to 0.04.") full_idx = pd.date_range(start=start_date, end=end_date, freq='B') return pd.Series(0.04, index=full_idx) # ───────────────────────────────────────────── # EXTENDED HISTORY & BOOTSTRAPPING # ───────────────────────────────────────────── from typing import List, Tuple, Dict, Any def fetch_direct(ticker: str, start_date: str, end_date: str) -> pd.DataFrame: """Helper to fetch directly from yfinance without db interaction, now with rate limit""" try: df = fetch_yfinance_with_retry([ticker], s_date=start_date, e_date=end_date) except ValueError: df = pd.DataFrame() if df.empty: return pd.DataFrame() close_col = 'Adj Close' if 'Adj Close' in df.columns else 'Close' res = pd.DataFrame(df[close_col]) # Handle multi-index columns if any if isinstance(res.columns, pd.MultiIndex): res.columns = [ticker] else: res.columns = ['close_price'] return res def fetch_stitched_ticker(ticker: str, start_date: datetime, end_date: datetime, cfg: dict) -> pd.DataFrame: """ Fetches a ticker with automatic proxy stitching for periods before the ticker existed. Uses the Panama Canal method from futures_data.py for seamless splicing. """ STITCH_RULES = cfg.get('proxy_mappings', { 'SPY': { 'proxy': '^GSPC', # S&P 500 index goes back to 1950 'proxy_start': '1950-01-03', 'overlap_days': 252 }, 'TLT': { 'proxy': '^TYX', # 30-year Treasury yield (inverse for price) 'proxy_start': '1977-01-03', 'is_yield': True, }, 'GLD': { 'proxy': 'GC=F', # Gold futures continuous contract 'proxy_start': '1974-12-31', }, 'QQQ': { 'proxy': '^IXIC', # NASDAQ composite 'proxy_start': '1971-02-05', } }) start_str = start_date.strftime('%Y-%m-%d') end_str = end_date.strftime('%Y-%m-%d') if ticker not in STITCH_RULES: logger.warning(f"No proxy mapping found for {ticker}. Extended history may be truncated to actual inception date.") return fetch_direct(ticker, start_str, end_str) rule = STITCH_RULES[ticker] proxy_start = max(start_str, rule.get('proxy_start', '1980-01-01')) proxy_df = fetch_direct(rule['proxy'], proxy_start, end_str) if proxy_df.empty: return fetch_direct(ticker, start_str, end_str) if rule.get('is_yield', False): proxy_df['close_price'] = 100 / (1 + proxy_df['close_price'] / 100) actual_df = fetch_direct(ticker, proxy_start, end_str) if actual_df.empty: return proxy_df overlap_days = rule.get('overlap_days', 252) common = proxy_df.index.intersection(actual_df.index) if len(common) < overlap_days: return proxy_df overlap_proxy = proxy_df.loc[common] overlap_actual = actual_df.loc[common] adj_ratio = (overlap_actual['close_price'] / overlap_proxy['close_price']).median() stitched = proxy_df.copy() stitched['close_price'] = stitched['close_price'] * adj_ratio stitched.update(actual_df) return stitched def block_bootstrap_returns(returns_df: pd.DataFrame, block_size: int = 252, n_bootstrap_samples: int = 100, seed: int = 42) -> List[pd.DataFrame]: """ Generates bootstrap samples of returns preserving autocorrelation and cross-correlation. """ rng = np.random.default_rng(seed) n_obs = len(returns_df) expected_block = block_size bootstrap_samples = [] for _ in range(n_bootstrap_samples): sampled_indices = [] current_pos = 0 while current_pos < n_obs: block_len = rng.geometric(1/expected_block) start_idx = rng.integers(0, max(1, n_obs - block_len)) sampled_indices.extend(range(start_idx, min(n_obs, start_idx + block_len))) current_pos += block_len sampled_indices = sampled_indices[:n_obs] bootstrap_sample = returns_df.iloc[sampled_indices].copy() bootstrap_sample.index = returns_df.index bootstrap_samples.append(bootstrap_sample) return bootstrap_samples def regime_aware_bootstrap(returns_df: pd.DataFrame, regime_labels: np.ndarray, n_bootstrap_samples: int = 100) -> List[pd.DataFrame]: """ Bootstrap that respects regime boundaries. """ rng = np.random.default_rng(42) n_obs = len(returns_df) unique_regimes = np.unique(regime_labels) regime_blocks = {} for regime in unique_regimes: mask = (regime_labels == regime) boundaries = np.where(np.diff(mask.astype(int)) != 0)[0] + 1 blocks = [] start = 0 if mask[0] else None splits = np.split(np.arange(len(mask)), boundaries) for split in splits: if len(split) > 0 and mask[split[0]]: blocks.append((split[0], split[-1] + 1)) if not blocks: # Fallback: create a block from the entire regime period if mask.any(): indices = np.where(mask)[0] blocks = [(indices[0], indices[-1] + 1)] else: blocks = [(0, len(regime_labels))] regime_blocks[regime] = blocks bootstrap_samples = [] for _ in range(n_bootstrap_samples): sampled_indices = [] current_regime = rng.choice(unique_regimes) while len(sampled_indices) < n_obs: blocks = regime_blocks.get(current_regime, []) if blocks: block_idx = rng.integers(0, len(blocks)) start, end = blocks[block_idx] block_len = min(end - start, n_obs - len(sampled_indices)) sampled_indices.extend(range(start, start + block_len)) current_regime = rng.choice(unique_regimes) sampled_indices = sampled_indices[:n_obs] bootstrap_sample = returns_df.iloc[sampled_indices].copy() bootstrap_sample.index = returns_df.index bootstrap_samples.append(bootstrap_sample) return bootstrap_samples def bootstrap_iter(returns_df: pd.DataFrame, n_epochs: int, block_size: int = 252): """Generator that yields a fresh bootstrap sample each iteration.""" for epoch in range(n_epochs): yield block_bootstrap_returns(returns_df, block_size, n_bootstrap_samples=1, seed=42+epoch)[0]