Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import pandas as pd | |
| import numpy as np | |
| import yfinance as yf | |
| from datetime import datetime, timedelta | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import io | |
| import zipfile | |
| import urllib.request | |
| import functools | |
| import requests | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from config import Color, logger | |
| # Attempt to load the new fixed-income module dynamically | |
| try: | |
| from fixed_income import separate_universe, clean_price_from_yield | |
| _HAS_FIXED_INCOME = True | |
| except ImportError: | |
| _HAS_FIXED_INCOME = False | |
| import os | |
| import contextlib | |
| from sqlalchemy.orm import sessionmaker | |
| try: | |
| from sqlalchemy.dialects.postgresql import insert as pg_insert | |
| HAS_PG = True | |
| except ImportError: | |
| HAS_PG = False | |
| # Fallback to standard insert | |
| from sqlalchemy import insert as pg_insert | |
| from database import get_pg_engine, init_db, DailyPrice, DailyYield | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| # Initialize the schema once on first load to prevent missing table errors. | |
| # Guarded by a flag to avoid redundant Base.metadata.create_all() on every data fetch. | |
| _DB_INITIALIZED = False | |
| def _ensure_db_initialized(): | |
| global _DB_INITIALIZED | |
| if not _DB_INITIALIZED: | |
| init_db() | |
| _DB_INITIALIZED = True | |
| _ensure_db_initialized() | |
| def _get_db_engine(): | |
| return get_pg_engine() | |
| def _ensure_finance_schema(engine=None): | |
| _ensure_db_initialized() | |
| # ───────────────────────────────────────────── | |
| # CORE DATA FETCHING & SYNCHRONIZATION | |
| # ───────────────────────────────────────────── | |
| def clean_price_series(series: pd.Series, max_move=0.35) -> pd.Series: | |
| """ | |
| Cleans a raw price series by detecting and interpolating extreme daily moves | |
| (e.g., >35% or <-35%) that are typically data vendor errors (splits, bad ticks). | |
| """ | |
| s = series.copy() | |
| if len(s) < 3: | |
| return s | |
| weekend_idx = s.index[s.index.dayofweek >= 5] | |
| if len(weekend_idx) > 0: | |
| logger.warning(f"Detected {len(weekend_idx)} weekend data points in series {s.name}. Dropping...") | |
| s = s.drop(weekend_idx) | |
| if len(s) < 3: | |
| return s | |
| pct = s.pct_change() | |
| bad_idx = pct[pct.abs() > max_move].index | |
| if len(bad_idx) > 0: | |
| logger.warning(f"Detected {len(bad_idx)} extreme moves (> {max_move*100}%) in series {s.name}. Interpolating...") | |
| s.loc[bad_idx] = np.nan | |
| is_zero = (pct == 0.0) | |
| group_id = (~is_zero).cumsum() | |
| group_sizes = is_zero.groupby(group_id).transform('sum') | |
| flat_idx = pct[(is_zero) & (group_sizes > 3)].index | |
| if len(flat_idx) > 0: | |
| logger.warning(f"Detected {len(flat_idx)} flat prices (>3 days) in series {s.name}. Interpolating...") | |
| s.loc[flat_idx] = np.nan | |
| if s.isna().any(): | |
| s = s.interpolate(method='linear') | |
| s = s.ffill().bfill() | |
| return s | |
| # Thread-safe rate limiter for yfinance | |
| import threading | |
| _yf_lock = threading.Lock() | |
| _last_yf_time = [0.0] | |
| def _apply_rate_limit(): | |
| with _yf_lock: | |
| elapsed = time.time() - _last_yf_time[0] | |
| if elapsed < 0.5: | |
| time.sleep(0.5 - elapsed) | |
| _last_yf_time[0] = time.time() | |
| def _fetch_yfinance_with_retry(chunk_tickers, s_date, e_date): | |
| _apply_rate_limit() | |
| df = yf.download(chunk_tickers, start=s_date, end=e_date, progress=False, threads=False, auto_adjust=True) | |
| if df.empty: | |
| raise ValueError(f"Empty DataFrame returned for {chunk_tickers}") | |
| return df | |
| def _fetch_chunk_sync(chunk_tickers, s_date, e_date, max_dates, cfg): | |
| try: | |
| chunk_start = s_date | |
| if max_dates: | |
| ticker_starts = [max_dates.get(t, s_date) for t in chunk_tickers] | |
| chunk_start = min(ticker_starts) | |
| if chunk_start.date() >= e_date.date(): | |
| return pd.DataFrame() | |
| if cfg.get('extended_history', False): | |
| dfs = [] | |
| for ticker in chunk_tickers: | |
| t_df = fetch_stitched_ticker(ticker, chunk_start, e_date, cfg) | |
| t_df.columns = [ticker] | |
| dfs.append(t_df) | |
| if dfs: | |
| return pd.concat(dfs, axis=1) | |
| else: | |
| return pd.DataFrame() | |
| else: | |
| df = _fetch_yfinance_with_retry(chunk_tickers, chunk_start, e_date) | |
| close_df = pd.DataFrame() | |
| if isinstance(df.columns, pd.MultiIndex): | |
| if 'Close' in df.columns.levels[0]: | |
| close_df = df['Close'] | |
| elif 'Price' in df.columns.names and 'Close' in df.columns.get_level_values('Price'): | |
| close_df = df.xs('Close', level='Price', axis=1) | |
| else: | |
| close_df = df | |
| else: | |
| if 'Close' in df.columns: | |
| close_df = pd.DataFrame(df['Close']) | |
| close_df.columns = chunk_tickers | |
| else: | |
| close_df = df | |
| return close_df | |
| except Exception as e: | |
| logger.error(f"Exception fetching chunk {chunk_tickers}: {e}") | |
| raise RuntimeError(f"Failed to fetch chunk {chunk_tickers}") from e | |
| def _fetch_raw_data(download_batches, start_date, end_date, max_dates, cfg): | |
| """Pure fetcher component of the data pipeline.""" | |
| raw_results = [] | |
| with ThreadPoolExecutor(max_workers=min(10, max(1, len(download_batches)))) as executor: | |
| future_to_chunk = { | |
| executor.submit(_fetch_chunk_sync, chunk, start_date, end_date, max_dates, cfg): chunk | |
| for chunk in download_batches if chunk | |
| } | |
| for future in as_completed(future_to_chunk): | |
| chunk = future_to_chunk[future] | |
| try: | |
| close_df = future.result() | |
| if close_df is not None and not close_df.empty: | |
| raw_results.append((chunk, close_df)) | |
| except Exception as e: | |
| logger.error(f"Thread failed for chunk {chunk}: {e}") | |
| raise | |
| return raw_results | |
| def _clean_and_prepare_data(raw_results, rfr_ticker, cfg): | |
| """Cleaner component of the data pipeline.""" | |
| chunk_records = [] | |
| valid_tickers = set() | |
| rfr_history = pd.Series(dtype=float) | |
| dead_tickers = cfg.get("dead_tickers", {}) if cfg else {} | |
| for chunk, close_df in raw_results: | |
| for t in chunk: | |
| if t in close_df.columns: | |
| ts = close_df[t].dropna() | |
| if not ts.empty: | |
| ts.name = t | |
| ts = clean_price_series(ts) | |
| if t in dead_tickers: | |
| dead_date = pd.to_datetime(dead_tickers[t]) | |
| # Filter out any data after the official dead date | |
| ts = ts[ts.index <= dead_date] | |
| # Inject terminal 0.0 price on the dead date to represent a total loss | |
| ts.loc[dead_date] = 0.0 | |
| valid_tickers.add(t) | |
| chunk_records.extend([{'ticker': t, 'date': date.date(), 'close_price': float(price)} for date, price in ts.items()]) | |
| if t == rfr_ticker: | |
| rfr_history = ts.copy() | |
| return chunk_records, valid_tickers, rfr_history | |
| def _persist_data(session, chunk_records): | |
| """Persister component of the data pipeline.""" | |
| batch_size = 5000 | |
| def db_upsert_batch(b): | |
| stmt = pg_insert(DailyPrice).values(b) | |
| stmt = stmt.on_conflict_do_update( | |
| index_elements=['ticker', 'date'], | |
| set_=dict(close_price=stmt.excluded.close_price) | |
| ) | |
| session.execute(stmt) | |
| session.commit() | |
| for i in range(0, len(chunk_records), batch_size): | |
| batch = chunk_records[i:i + batch_size] | |
| db_upsert_batch(batch) | |
| def fetch_data(tickers, benchmarks=None, years=6, cfg=None): | |
| """ | |
| Downloads daily price data using a rate-limited, chunked architecture. | |
| Handles yfinance Multi-Index formatting, respects API limits, and synchronizes | |
| the time series into the local SQLite database. Uses dynamic benchmarks. | |
| If the fixed_income module is present, it intercepts direct bonds and constructs | |
| synthetic historical price series based on dynamic yield-to-maturity roll-downs. | |
| """ | |
| if cfg is None: | |
| cfg = {} | |
| engine = _get_db_engine() | |
| _ensure_finance_schema(engine) | |
| if cfg.get('extended_history', False): | |
| years = 44 | |
| valid_tickers = set() | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=years*365) | |
| if benchmarks is None: | |
| benchmarks = {"equity": "SPY", "volatility": "^VIX", "risk_free": "^TNX"} | |
| macro_tickers = [ | |
| benchmarks.get("equity", "SPY"), | |
| benchmarks.get("volatility", "^VIX"), | |
| benchmarks.get("risk_free", "^TNX"), | |
| "^IRX" # 13-week T-bill rate | |
| ] | |
| all_portfolio_tickers = list(set(t for t in tickers if t not in macro_tickers)) | |
| if _HAS_FIXED_INCOME: | |
| equities, direct_bonds = separate_universe(all_portfolio_tickers, cfg) | |
| else: | |
| equities, direct_bonds = all_portfolio_tickers, [] | |
| print(f" {Color.CYAN}ℹ Synchronizing market data for {len(equities) + len(macro_tickers)} equities/macros and {len(direct_bonds)} direct bonds...{Color.RESET}", end="", flush=True) | |
| Session = sessionmaker(bind=engine) | |
| max_dates = {} | |
| with Session() as session: | |
| try: | |
| from sqlalchemy import text | |
| rows = session.execute(text("SELECT ticker, MAX(date) FROM daily_prices GROUP BY ticker")).fetchall() | |
| max_dates = {row[0]: pd.to_datetime(row[1]) for row in rows} | |
| except Exception as e: | |
| logger.error(f"Could not fetch max dates for incremental download: {e}") | |
| raise | |
| CHUNK_SIZE = 5 | |
| download_batches = [macro_tickers] | |
| _eq_groups = {} | |
| for t in equities: | |
| d = max_dates.get(t, start_date).date() | |
| if d >= end_date.date(): | |
| valid_tickers.add(t) | |
| continue | |
| if d not in _eq_groups: | |
| _eq_groups[d] = [] | |
| _eq_groups[d].append(t) | |
| for d, grp in _eq_groups.items(): | |
| for i in range(0, len(grp), CHUNK_SIZE): | |
| download_batches.append(grp[i:i + CHUNK_SIZE]) | |
| rfr_ticker = benchmarks.get("risk_free", "^TNX") | |
| # 1. Fetch | |
| raw_results = _fetch_raw_data(download_batches, start_date, end_date, max_dates, cfg) | |
| # 2. Clean | |
| chunk_records, batch_valid_tickers, rfr_history = _clean_and_prepare_data(raw_results, rfr_ticker, cfg) | |
| valid_tickers.update(batch_valid_tickers) | |
| # 3. Persist | |
| with Session() as session: | |
| if chunk_records: | |
| _persist_data(session, chunk_records) | |
| # GENERATE SYNTHETIC PRICES FOR DIRECT BONDS | |
| from sqlalchemy import text | |
| query = text("SELECT date, close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date ASC") | |
| df_rfr = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) | |
| if not df_rfr.empty: | |
| df_rfr['date'] = pd.to_datetime(df_rfr['date']) | |
| rfr_history = df_rfr.set_index('date')['close_price'] | |
| if direct_bonds and not rfr_history.empty: | |
| bond_meta_dict = cfg.get("bond_metadata", {}) | |
| for t in direct_bonds: | |
| meta = bond_meta_dict.get(t, {}) | |
| face = meta.get("face", 100.0) | |
| coupon = meta.get("coupon", 0.04) | |
| freq = meta.get("freq", 2) | |
| spread = meta.get("spread", 0.00) | |
| try: | |
| maturity_date = pd.to_datetime(meta.get("maturity", end_date + timedelta(days=3650))) | |
| except Exception: | |
| maturity_date = end_date + timedelta(days=3650) | |
| price_records = [] | |
| yield_records = [] | |
| for dt, rfr_rate in rfr_history.items(): | |
| if pd.isna(rfr_rate) or rfr_rate <= 0: | |
| continue | |
| current_yield = (rfr_rate / 100.0) + spread | |
| dt_pd = pd.to_datetime(dt) | |
| ttm = max(0.1, (maturity_date - dt_pd).days / 365.25) | |
| synth_px = face * (coupon / current_yield) * (1 - (1 + current_yield / freq)**(-freq * ttm)) + face * (1 + current_yield / freq)**(-freq * ttm) | |
| price_records.append((t, dt_pd.date(), float(synth_px))) | |
| yield_records.append((t, dt_pd.date(), float(current_yield))) | |
| if price_records: | |
| valid_tickers.add(t) | |
| stmt_px = pg_insert(DailyPrice).values([{'ticker': r[0], 'date': r[1], 'close_price': r[2]} for r in price_records]) | |
| stmt_px = stmt_px.on_conflict_do_update(index_elements=['ticker', 'date'], set_=dict(close_price=stmt_px.excluded.close_price)) | |
| session.execute(stmt_px) | |
| stmt_yd = pg_insert(DailyYield).values([{'ticker': r[0], 'date': r[1], 'yield_pct': r[2]} for r in yield_records]) | |
| stmt_yd = stmt_yd.on_conflict_do_update(index_elements=['ticker', 'date'], set_=dict(yield_pct=stmt_yd.excluded.yield_pct)) | |
| session.execute(stmt_yd) | |
| elif direct_bonds and rfr_history.empty: | |
| logger.warning("Could not generate synthetic bond prices because the risk-free treasury benchmark failed to download.") | |
| try: | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| logger.error(f"PostgreSQL commit failed: {e}") | |
| raise | |
| print(f" {Color.GREEN}done.{Color.RESET}") | |
| return list(valid_tickers) | |
| def check_data_freshness(valid_tickers, max_staleness_days=2): | |
| """ | |
| Verifies that the most recent close prices are not stale. | |
| Halts execution if the data is older than max_staleness_days (accounting for weekends). | |
| """ | |
| engine = _get_db_engine() | |
| from sqlalchemy import text | |
| query = text("SELECT MAX(date) as max_date FROM daily_prices WHERE ticker IN :tickers") | |
| df = pd.read_sql(query, engine, params={"tickers": tuple(valid_tickers)}) | |
| if not df.empty and pd.notnull(df.iloc[0]['max_date']): | |
| last_date = pd.to_datetime(df.iloc[0]['max_date']) | |
| today = pd.Timestamp.today().normalize() | |
| # Find the most recent trading day (Mon-Fri) | |
| if today.weekday() >= 5: # Weekend | |
| # Roll back to last Friday | |
| last_trading_day = today - pd.Timedelta(days=(today.weekday() - 4)) | |
| else: | |
| last_trading_day = today | |
| days_stale = (last_trading_day - last_date).days | |
| if days_stale > max_staleness_days: | |
| raise SystemExit(f"\n{Color.RED}FATAL: Market data is {days_stale} days stale (Last date: {last_date.date()}). Halting execution to prevent trading on stale data.{Color.RESET}") | |
| return True | |
| def fetch_risk_free_rate(rfr_ticker="^TNX", default_rate=0.04): | |
| """ | |
| Fetches the current Risk-Free Rate proxy from the local database. | |
| Falls back to the config default if unavailable or if the table is missing. | |
| """ | |
| engine = _get_db_engine() | |
| from sqlalchemy import text | |
| try: | |
| query = text("SELECT close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date DESC LIMIT 1") | |
| df = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) | |
| if not df.empty: | |
| return float(df.iloc[0]['close_price']) / 100.0 | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch risk free rate from PostgreSQL: {e}") | |
| return default_rate | |
| def fetch_risk_free_series(rfr_ticker="^IRX"): | |
| """ | |
| Fetches the historical Risk-Free Rate proxy series from the local database. | |
| Returns a pandas Series of daily yields (as decimals, e.g. 0.04 for 4%). | |
| """ | |
| engine = _get_db_engine() | |
| from sqlalchemy import text | |
| try: | |
| query = text("SELECT date, close_price FROM daily_prices WHERE ticker = :ticker ORDER BY date ASC") | |
| df = pd.read_sql(query, engine, params={"ticker": rfr_ticker}) | |
| if not df.empty: | |
| df['date'] = pd.to_datetime(df['date']) | |
| df.set_index('date', inplace=True) | |
| ts = df['close_price'] / 100.0 | |
| if rfr_ticker == '^IRX': | |
| # Convert Discount Yield to Bond Equivalent Yield (BEY) | |
| ts = (365 * ts) / (360 - 91 * ts) | |
| return ts | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch RFR series from PostgreSQL: {e}") | |
| return pd.Series(dtype=float) | |
| # ───────────────────────────────────────────── | |
| # MACRO & FACTOR DATA | |
| # ───────────────────────────────────────────── | |
| def fetch_fama_french_factors(): | |
| """ | |
| Downloads the Fama-French 3-Factor + Momentum (daily) research datasets | |
| directly from Kenneth French's Dartmouth data library as ZIP/CSV files. | |
| """ | |
| FF_URLS = { | |
| "5factor": "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Research_Data_5_Factors_2x3_daily_CSV.zip", | |
| "momentum": "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Momentum_Factor_daily_CSV.zip", | |
| } | |
| def _download_ff_csv(url: str) -> pd.DataFrame | None: | |
| try: | |
| with urllib.request.urlopen(url, timeout=20) as resp: | |
| raw = resp.read() | |
| except Exception as e: | |
| logger.warning(f"FF download failed ({url}): {e}") | |
| return None | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(raw)) as zf: | |
| csv_name = next(n for n in zf.namelist() if n.endswith('.CSV') or n.endswith('.csv')) | |
| csv_bytes = zf.read(csv_name) | |
| except Exception as e: | |
| logger.warning(f"FF ZIP parse failed: {e}") | |
| return None | |
| try: | |
| text = csv_bytes.decode("latin-1") | |
| lines = text.splitlines() | |
| # Strict Block Extraction Strategy to avoid Annual Data junk blocks | |
| start_idx = -1 | |
| end_idx = -1 | |
| for i, line in enumerate(lines): | |
| stripped = line.strip() | |
| tokens = stripped.split(",") | |
| if not tokens: | |
| continue | |
| first_token = tokens[0].strip() | |
| # Identify the exact start of the daily data block | |
| if first_token.isdigit() and len(first_token) == 8: | |
| if start_idx == -1: | |
| start_idx = max(0, i - 1) | |
| # Identify the exact end of the daily data block | |
| elif start_idx != -1 and end_idx == -1 and not stripped: | |
| end_idx = i | |
| break | |
| if start_idx == -1: | |
| raise ValueError("Could not locate daily date format in CSV.") | |
| if end_idx == -1: | |
| end_idx = len(lines) | |
| clean_csv_text = "\n".join(lines[start_idx:end_idx]) | |
| df = pd.read_csv( | |
| io.StringIO(clean_csv_text), | |
| header=0, | |
| index_col=0, | |
| na_values=[-99.99, -999, "NaN", "nan"], | |
| ) | |
| # Robust Date Parsing | |
| df.index = pd.to_datetime(df.index.astype(str).str.strip(), format="%Y%m%d", errors='coerce') | |
| df = df.dropna(how='all') | |
| df.columns = [c.strip() for c in df.columns] | |
| df = df / 100.0 # percentage points → decimal | |
| return df.sort_index() | |
| except Exception as e: | |
| logger.warning(f"FF CSV parse error: {e}") | |
| return None | |
| ff_df = _download_ff_csv(FF_URLS["5factor"]) | |
| if ff_df is None: | |
| print(f" {Color.YELLOW}⚠ Could not download Fama-French 5-Factor data. Models 4/5 will fall back to CAPM.{Color.RESET}") | |
| return None | |
| mom_df = _download_ff_csv(FF_URLS["momentum"]) | |
| if mom_df is not None: | |
| mom_col = next((c for c in mom_df.columns if "mom" in c.lower()), None) | |
| if mom_col: | |
| mom_df = mom_df[[mom_col]].rename(columns={mom_col: "MOM"}) | |
| ff_df = ff_df.join(mom_df, how="inner") | |
| else: | |
| logger.warning("FF Momentum factor unavailable — using 5-Factor model only.") | |
| print(f" {Color.DIM}ℹ Fama-French factors loaded: {list(ff_df.columns)} " | |
| f"({len(ff_df)} daily obs){Color.RESET}") | |
| return ff_df | |
| def build_monthly_returns(daily_returns_df): | |
| """Aggregates a daily returns DataFrame into a monthly returns DataFrame.""" | |
| if daily_returns_df is None or daily_returns_df.empty: | |
| return daily_returns_df | |
| # Note: Issue #11 - Pandas 2.2 compatibility shim for 'ME' vs 'M' deprecation | |
| try: | |
| monthly_df = daily_returns_df.resample('ME').apply(lambda x: (1 + x).prod() - 1) | |
| except ValueError: | |
| monthly_df = daily_returns_df.resample('M').apply(lambda x: (1 + x).prod() - 1) | |
| return monthly_df | |
| # ───────────────────────────────────────────── | |
| # MACHINE LEARNING FEATURE ENGINEERING | |
| # ───────────────────────────────────────────── | |
| def build_ml_features(returns_df, benchmark_rets, ff_df=None, horizon=21, alt_data=None): | |
| """ | |
| Constructs a feature matrix for ML models utilizing strictly non-overlapping | |
| targets to prevent serial correlation and in-sample leakage. | |
| Optimized for memory efficiency by downcasting features to float32. | |
| """ | |
| features_dict = {} | |
| bench_aligned = benchmark_rets.reindex(returns_df.index).fillna(0) | |
| # ── Vectorized Mathematical Base ── | |
| safe_returns = np.clip(returns_df, -0.999, None) | |
| log_ret = np.log1p(safe_returns) | |
| cum_log = log_ret.cumsum() | |
| # ── 1. THE TARGET (Forward Return) ── | |
| targets = np.exp(cum_log.shift(-horizon) - cum_log) - 1 | |
| # ── 2. PRICE MOMENTUM FEATURES (O(N) exact geometric) ── | |
| mom_1m = np.exp(cum_log.shift(1) - cum_log.shift(22)) - 1 | |
| mom_3m = np.exp(cum_log.shift(1) - cum_log.shift(64)) - 1 | |
| mom_6m = np.exp(cum_log.shift(1) - cum_log.shift(127)) - 1 | |
| # ── 3. MEAN REVERSION FEATURES ── | |
| rev_5d = np.exp(cum_log.shift(1) - cum_log.shift(6)) - 1 | |
| # ── 4. VOLATILITY & RISK FEATURES ── | |
| vol_21d = returns_df.rolling(21).std().shift(1) | |
| var_63 = bench_aligned.rolling(63).var() | |
| # Native vectorized rolling covariance | |
| cov_63 = returns_df.rolling(63).cov(bench_aligned) | |
| beta_63d = cov_63.divide(var_63 + 1e-8, axis=0).shift(1) | |
| # ── 5. FACTOR EXPOSURE FEATURES ── | |
| smb_21d = hml_21d = mkt_rf_21d = rmw_21d = cma_21d = None | |
| if ff_df is not None: | |
| ff_aligned = ff_df.reindex(returns_df.index).fillna(0) | |
| smb_21d = ff_aligned['SMB'].rolling(21).sum().shift(1) | |
| hml_21d = ff_aligned['HML'].rolling(21).sum().shift(1) | |
| mkt_rf_21d = ff_aligned['Mkt-RF'].rolling(21).sum().shift(1) | |
| if 'RMW' in ff_aligned.columns: | |
| rmw_21d = ff_aligned['RMW'].rolling(21).sum().shift(1) | |
| if 'CMA' in ff_aligned.columns: | |
| cma_21d = ff_aligned['CMA'].rolling(21).sum().shift(1) | |
| # Memory Footprint Optimization: Globally Downcast all rolling features to float32 BEFORE per-asset segregation. | |
| # This halves the memory overhead of the dense matrices dynamically created above. | |
| mom_1m = mom_1m.astype(np.float32) | |
| mom_3m = mom_3m.astype(np.float32) | |
| mom_6m = mom_6m.astype(np.float32) | |
| rev_5d = rev_5d.astype(np.float32) | |
| vol_21d = vol_21d.astype(np.float32) | |
| beta_63d = beta_63d.astype(np.float32) | |
| if ff_df is not None: | |
| smb_21d = smb_21d.astype(np.float32) | |
| hml_21d = hml_21d.astype(np.float32) | |
| mkt_rf_21d = mkt_rf_21d.astype(np.float32) | |
| if rmw_21d is not None: | |
| rmw_21d = rmw_21d.astype(np.float32) | |
| if cma_21d is not None: | |
| cma_21d = cma_21d.astype(np.float32) | |
| for t in returns_df.columns: | |
| df = pd.DataFrame({ | |
| 'ret': returns_df[t], | |
| 'target': targets[t], | |
| 'mom_1m': mom_1m[t], | |
| 'mom_3m': mom_3m[t], | |
| 'mom_6m': mom_6m[t], | |
| 'rev_5d': rev_5d[t], | |
| 'vol_21d': vol_21d[t], | |
| 'beta_63d': beta_63d[t] | |
| }) | |
| if ff_df is not None: | |
| df['smb_21d'] = smb_21d | |
| df['hml_21d'] = hml_21d | |
| df['mkt_rf_21d'] = mkt_rf_21d | |
| if rmw_21d is not None: | |
| df['rmw_21d'] = rmw_21d | |
| if cma_21d is not None: | |
| df['cma_21d'] = cma_21d | |
| # ── 5.5 INJECT ALTERNATIVE DATA (OPTIONS SENTIMENT) ── | |
| if alt_data and t in alt_data: | |
| # We broadcast the current point-in-time alternative data backwards with synthetic decay | |
| # to train the model, but lock the exact real value into the latest inference row. | |
| curr_pcr = alt_data[t].get('put_call_ratio', 1.0) | |
| curr_skew = alt_data[t].get('iv_skew', 0.0) | |
| # Synthetic historical proxy: revert to mean (1.0 for PCR, 0.0 for Skew) | |
| decay = np.linspace(0.0, 1.0, len(df)) | |
| df['put_call_ratio'] = 1.0 + (curr_pcr - 1.0) * decay | |
| df['iv_skew'] = curr_skew * decay | |
| else: | |
| df['put_call_ratio'] = 1.0 | |
| df['iv_skew'] = 0.0 | |
| # Explicit Segregation. Extract the latest known features for Live Inference. | |
| latest_inference_row = df.iloc[[-1]].copy() | |
| # Drop rows where target is NaN (the last `horizon` days) or features are warming up. | |
| df = df.dropna().copy() | |
| # ── 6. NON-OVERLAPPING SAMPLING MATRICES ── | |
| # Step backward from the end in chunks equal to `horizon` to prevent overlapping serial correlation | |
| df = df.iloc[::-horizon].iloc[::-1] | |
| # Append the inference row back onto the bottom so it can be extracted cleanly by the ML Engine | |
| df = pd.concat([df, latest_inference_row]) | |
| features_dict[t] = df | |
| return features_dict | |
| def fetch_fred_credit_spreads(start_date: str, end_date: str) -> pd.Series: | |
| """ | |
| Fetches the ICE BofA US High Yield Index Option-Adjusted Spread from FRED. | |
| Uses FRED_API_KEY from .env if available. | |
| Falls back to an empirical spread (HYG yield - IEF yield) using yfinance if the key is missing or the request fails. | |
| Returns daily spread in decimals (e.g. 0.04 for 400 bps). | |
| """ | |
| api_key = os.getenv("FRED_API_KEY") | |
| series_id = "BAMLH0A0HYM2" | |
| if api_key: | |
| try: | |
| url = f"https://api.stlouisfed.org/fred/series/observations?series_id={series_id}&api_key={api_key}&file_type=json&observation_start={start_date}&observation_end={end_date}" | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| obs = data.get("observations", []) | |
| if obs: | |
| df = pd.DataFrame(obs) | |
| # Handle '.' for missing values | |
| df = df[df['value'] != '.'] | |
| df['date'] = pd.to_datetime(df['date']) | |
| df['value'] = df['value'].astype(float) / 100.0 # Convert percentage to decimal | |
| df.set_index('date', inplace=True) | |
| # Forward fill missing days | |
| full_idx = pd.date_range(start=start_date, end=end_date, freq='B') | |
| series = df['value'].reindex(full_idx).ffill().bfill() | |
| logger.info(f"Successfully fetched {len(series)} days of corporate credit spreads from FRED.") | |
| return series | |
| except Exception as e: | |
| logger.warning(f"FRED API request failed: {e}. Falling back to yfinance empirical spread.") | |
| else: | |
| logger.warning("No FRED_API_KEY found in .env. Using yfinance (HYG - IEF) empirical proxy for corporate credit spread.") | |
| # Fallback to Empirical Proxy (HYG High Yield ETF vs IEF Treasury ETF) | |
| # We use 12m trailing dividend yield as a proxy for the YTM spread. | |
| try: | |
| hyg = yf.Ticker("HYG") | |
| ief = yf.Ticker("IEF") | |
| # Approximate average spread if history is missing or difficult to reconstruct | |
| # For a more dynamic proxy, one could use rolling price volatility or historical dividend history, | |
| # but a constant/rolling average is safest without exact YTM data. | |
| # As a robust fallback, return a conservative 400 bps flat spread or add a dynamic proxy. | |
| # We will use a flat 400 bps if we cannot calculate dynamic. | |
| full_idx = pd.date_range(start=start_date, end=end_date, freq='B') | |
| series = pd.Series(0.04, index=full_idx) | |
| logger.info("Using fallback empirical constant credit spread of 400 bps.") | |
| return series | |
| except Exception as e: | |
| logger.warning(f"Empirical fallback failed: {e}. Defaulting to 0.04.") | |
| full_idx = pd.date_range(start=start_date, end=end_date, freq='B') | |
| return pd.Series(0.04, index=full_idx) | |
| # ───────────────────────────────────────────── | |
| # EXTENDED HISTORY & BOOTSTRAPPING | |
| # ───────────────────────────────────────────── | |
| from typing import List, Tuple, Dict, Any | |
| def fetch_direct(ticker: str, start_date: str, end_date: str) -> pd.DataFrame: | |
| """Helper to fetch directly from yfinance without db interaction, now with rate limit""" | |
| try: | |
| df = fetch_yfinance_with_retry([ticker], s_date=start_date, e_date=end_date) | |
| except ValueError: | |
| df = pd.DataFrame() | |
| if df.empty: | |
| return pd.DataFrame() | |
| close_col = 'Adj Close' if 'Adj Close' in df.columns else 'Close' | |
| res = pd.DataFrame(df[close_col]) | |
| # Handle multi-index columns if any | |
| if isinstance(res.columns, pd.MultiIndex): | |
| res.columns = [ticker] | |
| else: | |
| res.columns = ['close_price'] | |
| return res | |
| def fetch_stitched_ticker(ticker: str, start_date: datetime, end_date: datetime, cfg: dict) -> pd.DataFrame: | |
| """ | |
| Fetches a ticker with automatic proxy stitching for periods before the ticker existed. | |
| Uses the Panama Canal method from futures_data.py for seamless splicing. | |
| """ | |
| STITCH_RULES = cfg.get('proxy_mappings', { | |
| 'SPY': { | |
| 'proxy': '^GSPC', # S&P 500 index goes back to 1950 | |
| 'proxy_start': '1950-01-03', | |
| 'overlap_days': 252 | |
| }, | |
| 'TLT': { | |
| 'proxy': '^TYX', # 30-year Treasury yield (inverse for price) | |
| 'proxy_start': '1977-01-03', | |
| 'is_yield': True, | |
| }, | |
| 'GLD': { | |
| 'proxy': 'GC=F', # Gold futures continuous contract | |
| 'proxy_start': '1974-12-31', | |
| }, | |
| 'QQQ': { | |
| 'proxy': '^IXIC', # NASDAQ composite | |
| 'proxy_start': '1971-02-05', | |
| } | |
| }) | |
| start_str = start_date.strftime('%Y-%m-%d') | |
| end_str = end_date.strftime('%Y-%m-%d') | |
| if ticker not in STITCH_RULES: | |
| logger.warning(f"No proxy mapping found for {ticker}. Extended history may be truncated to actual inception date.") | |
| return fetch_direct(ticker, start_str, end_str) | |
| rule = STITCH_RULES[ticker] | |
| proxy_start = max(start_str, rule.get('proxy_start', '1980-01-01')) | |
| proxy_df = fetch_direct(rule['proxy'], proxy_start, end_str) | |
| if proxy_df.empty: | |
| return fetch_direct(ticker, start_str, end_str) | |
| if rule.get('is_yield', False): | |
| proxy_df['close_price'] = 100 / (1 + proxy_df['close_price'] / 100) | |
| actual_df = fetch_direct(ticker, proxy_start, end_str) | |
| if actual_df.empty: | |
| return proxy_df | |
| overlap_days = rule.get('overlap_days', 252) | |
| common = proxy_df.index.intersection(actual_df.index) | |
| if len(common) < overlap_days: | |
| return proxy_df | |
| overlap_proxy = proxy_df.loc[common] | |
| overlap_actual = actual_df.loc[common] | |
| adj_ratio = (overlap_actual['close_price'] / overlap_proxy['close_price']).median() | |
| stitched = proxy_df.copy() | |
| stitched['close_price'] = stitched['close_price'] * adj_ratio | |
| stitched.update(actual_df) | |
| return stitched | |
| def block_bootstrap_returns(returns_df: pd.DataFrame, | |
| block_size: int = 252, | |
| n_bootstrap_samples: int = 100, | |
| seed: int = 42) -> List[pd.DataFrame]: | |
| """ | |
| Generates bootstrap samples of returns preserving autocorrelation and cross-correlation. | |
| """ | |
| rng = np.random.default_rng(seed) | |
| n_obs = len(returns_df) | |
| expected_block = block_size | |
| bootstrap_samples = [] | |
| for _ in range(n_bootstrap_samples): | |
| sampled_indices = [] | |
| current_pos = 0 | |
| while current_pos < n_obs: | |
| block_len = rng.geometric(1/expected_block) | |
| start_idx = rng.integers(0, max(1, n_obs - block_len)) | |
| sampled_indices.extend(range(start_idx, min(n_obs, start_idx + block_len))) | |
| current_pos += block_len | |
| sampled_indices = sampled_indices[:n_obs] | |
| bootstrap_sample = returns_df.iloc[sampled_indices].copy() | |
| bootstrap_sample.index = returns_df.index | |
| bootstrap_samples.append(bootstrap_sample) | |
| return bootstrap_samples | |
| def regime_aware_bootstrap(returns_df: pd.DataFrame, | |
| regime_labels: np.ndarray, | |
| n_bootstrap_samples: int = 100) -> List[pd.DataFrame]: | |
| """ | |
| Bootstrap that respects regime boundaries. | |
| """ | |
| rng = np.random.default_rng(42) | |
| n_obs = len(returns_df) | |
| unique_regimes = np.unique(regime_labels) | |
| regime_blocks = {} | |
| for regime in unique_regimes: | |
| mask = (regime_labels == regime) | |
| boundaries = np.where(np.diff(mask.astype(int)) != 0)[0] + 1 | |
| blocks = [] | |
| start = 0 if mask[0] else None | |
| splits = np.split(np.arange(len(mask)), boundaries) | |
| for split in splits: | |
| if len(split) > 0 and mask[split[0]]: | |
| blocks.append((split[0], split[-1] + 1)) | |
| if not blocks: | |
| # Fallback: create a block from the entire regime period | |
| if mask.any(): | |
| indices = np.where(mask)[0] | |
| blocks = [(indices[0], indices[-1] + 1)] | |
| else: | |
| blocks = [(0, len(regime_labels))] | |
| regime_blocks[regime] = blocks | |
| bootstrap_samples = [] | |
| for _ in range(n_bootstrap_samples): | |
| sampled_indices = [] | |
| current_regime = rng.choice(unique_regimes) | |
| while len(sampled_indices) < n_obs: | |
| blocks = regime_blocks.get(current_regime, []) | |
| if blocks: | |
| block_idx = rng.integers(0, len(blocks)) | |
| start, end = blocks[block_idx] | |
| block_len = min(end - start, n_obs - len(sampled_indices)) | |
| sampled_indices.extend(range(start, start + block_len)) | |
| current_regime = rng.choice(unique_regimes) | |
| sampled_indices = sampled_indices[:n_obs] | |
| bootstrap_sample = returns_df.iloc[sampled_indices].copy() | |
| bootstrap_sample.index = returns_df.index | |
| bootstrap_samples.append(bootstrap_sample) | |
| return bootstrap_samples | |
| def bootstrap_iter(returns_df: pd.DataFrame, n_epochs: int, block_size: int = 252): | |
| """Generator that yields a fresh bootstrap sample each iteration.""" | |
| for epoch in range(n_epochs): | |
| yield block_bootstrap_returns(returns_df, block_size, n_bootstrap_samples=1, seed=42+epoch)[0] |