Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from scipy.linalg import cholesky | |
| import copy | |
| from config import Color, logger, DEFAULT_CONFIG | |
| from core_types import PortfolioState, LotManager, CovarianceResult | |
| from models import regime_stress_covariance | |
| from solver import build_and_optimize | |
| try: | |
| from execution import estimate_market_impact | |
| _HAS_EXECUTION = True | |
| except ImportError: | |
| _HAS_EXECUTION = False | |
| def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None): | |
| """ | |
| Performs a rigorous out-of-sample expanding window backtest. | |
| Inherently applies LotManager for precise HIFO tax lot tracking across time. | |
| """ | |
| trading_days = cfg.get("trading_days_per_year", 252) | |
| adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0) | |
| local_cfg = copy.deepcopy(cfg) | |
| local_cfg['_is_historical_backtest'] = True | |
| total_days = len(returns_df) | |
| if total_days <= initial_train_days: | |
| print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}") | |
| return None, None | |
| equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float) | |
| initial_start_date = returns_df.index[initial_train_days - 1] | |
| equity_curve.loc[initial_start_date] = capital | |
| equity_curve = equity_curve.sort_index() | |
| current_capital = capital | |
| tickers = list(returns_df.columns) | |
| current_state = PortfolioState.empty(tickers) | |
| lot_manager = LotManager() | |
| optimizer_failures = 0 | |
| total_rebalances = 0 | |
| synth_prices = (1 + returns_df).cumprod() | |
| # Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop | |
| one_plus_returns_arr = 1.0 + returns_df.values | |
| print(f" {Color.DIM}ℹ Using trailing EWMA covariance computation for window blocks...{Color.RESET}") | |
| from core_types import OptimizationParams | |
| opt_params = OptimizationParams(use_fast_ewm_cov=True) | |
| # Incremental covariance state | |
| cov_halflife = 126 | |
| alpha = 1 - np.exp(-np.log(2) / cov_halflife) | |
| ewm_mean = None | |
| ewm_cov = None | |
| spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008) | |
| trade_cost = local_cfg.get("transaction_cost", 0.001) | |
| cumulative_idx = 0 | |
| for t in range(initial_train_days, total_days, rebalance_freq): | |
| total_rebalances += 1 | |
| start_idx = max(0, t - initial_train_days) | |
| train_df = returns_df.iloc[start_idx:t] | |
| train_spy = spy_rets.reindex(train_df.index).dropna() | |
| train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None | |
| train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None | |
| end_idx = min(t + rebalance_freq, total_days) | |
| oos_df = returns_df.iloc[t:end_idx] | |
| current_date = oos_df.index[0] | |
| train_end_date = train_df.index[-1] | |
| if ewm_mean is None: | |
| # Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch | |
| ewm_mean = train_df.iloc[0].values.copy() | |
| ewm_cov = np.outer(ewm_mean, ewm_mean) | |
| for row in train_df.iloc[1:].values: | |
| diff = row - ewm_mean | |
| ewm_mean += alpha * diff | |
| ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff) | |
| opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers) | |
| else: | |
| # Incrementally update EWM cov over the new window (start_idx to t) | |
| # We process the last rebalance_freq rows from train_df. | |
| new_rows = returns_df.iloc[t - rebalance_freq:t].values | |
| for row in new_rows: | |
| # EWM update formulas | |
| diff = row - ewm_mean | |
| ewm_mean += alpha * diff | |
| # outer product | |
| ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff) | |
| opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers) | |
| try: | |
| # Ensure total_capital is properly set in the state before optimization | |
| # This is critical for impact models and risk budget scaling | |
| current_state.total_capital = current_capital | |
| opt_res = build_and_optimize( | |
| train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5), | |
| risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg, | |
| model=model, allocation_engine=allocation_engine, | |
| ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields, | |
| opt_params=opt_params | |
| ) | |
| target_w = opt_res.weights | |
| except Exception as e: | |
| optimizer_failures += 1 | |
| logger.warning(f"Expanding window rebalance failed at step {t}: {e}") | |
| target_w = pd.Series(1.0/len(tickers), index=tickers) | |
| # Handle Delisted/Dead Assets | |
| for col in target_w.index: | |
| if col != 'CASH' and col in synth_prices.columns: | |
| px = synth_prices[col].iloc[t] | |
| if pd.isna(px) or px <= 1e-8: | |
| target_w[col] = 0.0 | |
| w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values | |
| cash_w = float(target_w.get('CASH', 0.0)) | |
| if isinstance(rfr, pd.Series): | |
| rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04) | |
| daily_rfr = (rfr_oos / trading_days).values | |
| cash_growth = (1 + daily_rfr).cumprod() | |
| else: | |
| daily_rfr = rfr / trading_days | |
| cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1) | |
| # True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation) | |
| # 1. Asset values compound organically across the period | |
| oos_synth = synth_prices.loc[oos_df.index] | |
| base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0] | |
| base = synth_prices.iloc[base_idx] | |
| asset_paths = oos_synth.divide(base + 1e-9, axis=1).values | |
| allocated_capital_path = current_capital * (asset_paths @ w_arr) | |
| # 3. Add continuous compounding cash yield | |
| cash_path = current_capital * cash_w * cash_growth | |
| # 4. Total Capital at each step | |
| total_path = allocated_capital_path + cash_path | |
| # 5. Extract the true daily portfolio returns for accounting | |
| port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1])) | |
| prev_w_arr = current_state.current_weights | |
| if isinstance(prev_w_arr, pd.Series): | |
| prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values | |
| delta = w_arr - prev_w_arr | |
| friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0) | |
| impact = 0.0 | |
| if _HAS_EXECUTION: | |
| vols = train_df.std().values | |
| for i, t_val in enumerate(delta): | |
| if abs(t_val) > 1e-4: | |
| impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i]) | |
| impact += impact_pct * abs(t_val) | |
| # ── EXACT LOT-BY-LOT TAX LIQUIDATION ── | |
| tax_hit = 0.0 | |
| if t == initial_train_days: | |
| for i, ticker in enumerate(tickers): | |
| if w_arr[i] > 1e-5 and current_capital > 0: | |
| curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0] | |
| px = synth_prices.iloc[curr_idx][ticker] | |
| shares = (w_arr[i] * current_capital) / px | |
| lot_manager.add_lot(ticker, current_date, px, shares) | |
| else: | |
| step_lt_gain = 0.0 | |
| step_st_gain = 0.0 | |
| if local_cfg.get('tax_enabled', False) and current_capital > 1e-4: | |
| for i, ticker in enumerate(tickers): | |
| w_shift = delta[i] | |
| curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0] | |
| px = synth_prices.iloc[curr_idx][ticker] | |
| if w_shift < -1e-5: | |
| shares_to_sell = abs(w_shift) * current_capital / px | |
| _, lt_gain, st_gain = lot_manager.sell_shares_with_tax( | |
| ticker, shares_to_sell, px, current_date, | |
| lt_days=local_cfg.get('lt_days', 366), method='hifo' | |
| ) | |
| step_lt_gain += lt_gain | |
| step_st_gain += st_gain | |
| elif w_shift > 1e-5: | |
| shares_to_buy = w_shift * current_capital / px | |
| lot_manager.add_lot(ticker, current_date, px, shares_to_buy) | |
| tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \ | |
| (max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35)) | |
| tax_hit = tax_hit_dollars / current_capital | |
| else: | |
| tax_hit = 0.0 | |
| for i in range(len(port_daily_rets)): | |
| if i == 0: | |
| current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit) | |
| else: | |
| current_capital *= (1 + port_daily_rets[i]) | |
| current_capital = max(0.0, current_capital) | |
| cumulative_idx += 1 | |
| if cumulative_idx < len(equity_curve): | |
| equity_curve.iloc[cumulative_idx] = current_capital | |
| # Extract drifted end weights for the next period's delta calculation | |
| if current_capital > 1e-4: | |
| drifted_values = (current_capital * w_arr) * asset_paths[-1] | |
| drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr | |
| w_arr = drifted_weights | |
| current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr | |
| # To be safe and preserve the exact type, we construct a Series | |
| current_state.current_weights = pd.Series(w_arr, index=oos_df.columns) | |
| current_state.current_weights['CASH'] = cash_w | |
| equity_curve = equity_curve.dropna() | |
| spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0) | |
| spy_eq = capital * (1 + spy_oos).cumprod() | |
| return equity_curve, spy_eq | |
| def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False): | |
| """ | |
| Generates thousands of future equity paths. | |
| Properly routes HMM regime severity to stress both correlations and volatilities. | |
| Uses dynamic trading days for drift computation. | |
| Args: | |
| return_paths (bool): If True, compute and return a (days, 50) array of | |
| sample equity paths for visualisation. Defaults to False to save | |
| memory when the caller only needs percentile bands. | |
| """ | |
| # Note: Use localized standard generator, no longer mutating global RNG state | |
| rng = np.random.default_rng(seed) | |
| trading_days = cfg.get("trading_days_per_year", 252) | |
| sims = cfg.get("monte_carlo_sims", 5000) | |
| years = cfg.get("monte_carlo_years", 1.0) | |
| days = int(years * trading_days) | |
| w_risky = weights.drop(labels=['CASH'], errors='ignore') | |
| w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values | |
| cash_w = float(weights.get('CASH', 0.0)) | |
| rfr = cfg.get("risk_free_rate", 0.04) | |
| mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days | |
| regime_severity = 1.0 | |
| if macro and "hmm_regime" in macro: | |
| if isinstance(macro["hmm_regime"], dict): | |
| regime_severity = macro["hmm_regime"].get("severity_score", 1.0) | |
| else: | |
| regime_severity = 1.0 | |
| cov_arr = cov_mat.values | |
| vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12)) | |
| outer_v = np.outer(vols_mc, vols_mc) | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| corr_arr = cov_arr / np.maximum(outer_v, 1e-8) | |
| corr_arr[np.isnan(corr_arr)] = 0.0 | |
| cov_res_mc = CovarianceResult( | |
| covariance=cov_mat, | |
| correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns), | |
| volatility=pd.Series(vols_mc, index=cov_mat.columns), | |
| shrinkage=0.0 | |
| ) | |
| stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity) | |
| cov_daily = stressed_cov.covariance.values / trading_days | |
| try: | |
| chol = cholesky(cov_daily, lower=True) | |
| except Exception: | |
| chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True) | |
| var_daily = np.diag(cov_daily) | |
| drift = mu_daily - 0.5 * var_daily | |
| current_port_value = np.full(sims, capital, dtype=float) | |
| stats_5 = np.zeros(days) | |
| stats_25 = np.zeros(days) | |
| stats_50 = np.zeros(days) | |
| stats_75 = np.zeros(days) | |
| stats_95 = np.zeros(days) | |
| # Only allocate visual_paths when the caller actually needs them | |
| take = min(50, sims) | |
| visual_paths = np.zeros((days, take)) if return_paths else None | |
| daily_cash_factor = 1 + (rfr / trading_days) | |
| for d in range(days): | |
| Z = rng.standard_normal((sims, len(w_arr))) | |
| daily_shocks = Z @ chol.T | |
| asset_paths = np.exp(drift + daily_shocks) | |
| # Cash compounds daily: each day the cash portion grows by daily_cash_factor | |
| port_paths = np.sum(asset_paths * w_arr, axis=1) + cash_w * daily_cash_factor | |
| current_port_value *= port_paths | |
| p5, p25, p50, p75, p95 = np.percentile(current_port_value, [5, 25, 50, 75, 95]) | |
| stats_5[d] = p5 | |
| stats_25[d] = p25 | |
| stats_50[d] = p50 | |
| stats_75[d] = p75 | |
| stats_95[d] = p95 | |
| if visual_paths is not None: | |
| visual_paths[d, :] = current_port_value[:take] | |
| final_values = current_port_value | |
| percentiles = np.percentile(final_values, [5, 25, 50, 75, 95]) | |
| stats = { | |
| 5: stats_5.tolist(), | |
| 25: stats_25.tolist(), | |
| 50: stats_50.tolist(), | |
| 75: stats_75.tolist(), | |
| 95: stats_95.tolist(), | |
| "dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)] | |
| } | |
| return visual_paths, stats | |