import numpy as np import pandas as pd from scipy.linalg import cholesky import copy from config import Color, logger, DEFAULT_CONFIG from core_types import PortfolioState, LotManager, CovarianceResult from models import regime_stress_covariance from solver import build_and_optimize try: from execution import estimate_market_impact _HAS_EXECUTION = True except ImportError: _HAS_EXECUTION = False def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None): """ Performs a rigorous out-of-sample expanding window backtest. Inherently applies LotManager for precise HIFO tax lot tracking across time. """ trading_days = cfg.get("trading_days_per_year", 252) adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0) local_cfg = copy.deepcopy(cfg) local_cfg['_is_historical_backtest'] = True total_days = len(returns_df) if total_days <= initial_train_days: print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}") return None, None equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float) initial_start_date = returns_df.index[initial_train_days - 1] equity_curve.loc[initial_start_date] = capital equity_curve = equity_curve.sort_index() current_capital = capital tickers = list(returns_df.columns) current_state = PortfolioState.empty(tickers) lot_manager = LotManager() optimizer_failures = 0 total_rebalances = 0 synth_prices = (1 + returns_df).cumprod() # Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop one_plus_returns_arr = 1.0 + returns_df.values print(f" {Color.DIM}ℹ Using trailing EWMA covariance computation for window blocks...{Color.RESET}") from core_types import OptimizationParams opt_params = OptimizationParams(use_fast_ewm_cov=True) # Incremental covariance state cov_halflife = 126 alpha = 1 - np.exp(-np.log(2) / cov_halflife) ewm_mean = None ewm_cov = None spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008) trade_cost = local_cfg.get("transaction_cost", 0.001) cumulative_idx = 0 for t in range(initial_train_days, total_days, rebalance_freq): total_rebalances += 1 start_idx = max(0, t - initial_train_days) train_df = returns_df.iloc[start_idx:t] train_spy = spy_rets.reindex(train_df.index).dropna() train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None end_idx = min(t + rebalance_freq, total_days) oos_df = returns_df.iloc[t:end_idx] current_date = oos_df.index[0] train_end_date = train_df.index[-1] if ewm_mean is None: # Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch ewm_mean = train_df.iloc[0].values.copy() ewm_cov = np.outer(ewm_mean, ewm_mean) for row in train_df.iloc[1:].values: diff = row - ewm_mean ewm_mean += alpha * diff ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff) opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers) else: # Incrementally update EWM cov over the new window (start_idx to t) # We process the last rebalance_freq rows from train_df. new_rows = returns_df.iloc[t - rebalance_freq:t].values for row in new_rows: # EWM update formulas diff = row - ewm_mean ewm_mean += alpha * diff # outer product ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff) opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers) try: # Ensure total_capital is properly set in the state before optimization # This is critical for impact models and risk budget scaling current_state.total_capital = current_capital opt_res = build_and_optimize( train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5), risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg, model=model, allocation_engine=allocation_engine, ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields, opt_params=opt_params ) target_w = opt_res.weights except Exception as e: optimizer_failures += 1 logger.warning(f"Expanding window rebalance failed at step {t}: {e}") target_w = pd.Series(1.0/len(tickers), index=tickers) # Handle Delisted/Dead Assets for col in target_w.index: if col != 'CASH' and col in synth_prices.columns: px = synth_prices[col].iloc[t] if pd.isna(px) or px <= 1e-8: target_w[col] = 0.0 w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values cash_w = float(target_w.get('CASH', 0.0)) if isinstance(rfr, pd.Series): rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04) daily_rfr = (rfr_oos / trading_days).values cash_growth = (1 + daily_rfr).cumprod() else: daily_rfr = rfr / trading_days cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1) # True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation) # 1. Asset values compound organically across the period oos_synth = synth_prices.loc[oos_df.index] base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0] base = synth_prices.iloc[base_idx] asset_paths = oos_synth.divide(base + 1e-9, axis=1).values allocated_capital_path = current_capital * (asset_paths @ w_arr) # 3. Add continuous compounding cash yield cash_path = current_capital * cash_w * cash_growth # 4. Total Capital at each step total_path = allocated_capital_path + cash_path # 5. Extract the true daily portfolio returns for accounting port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1])) prev_w_arr = current_state.current_weights if isinstance(prev_w_arr, pd.Series): prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values delta = w_arr - prev_w_arr friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0) impact = 0.0 if _HAS_EXECUTION: vols = train_df.std().values for i, t_val in enumerate(delta): if abs(t_val) > 1e-4: impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i]) impact += impact_pct * abs(t_val) # ── EXACT LOT-BY-LOT TAX LIQUIDATION ── tax_hit = 0.0 if t == initial_train_days: for i, ticker in enumerate(tickers): if w_arr[i] > 1e-5 and current_capital > 0: curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0] px = synth_prices.iloc[curr_idx][ticker] shares = (w_arr[i] * current_capital) / px lot_manager.add_lot(ticker, current_date, px, shares) else: step_lt_gain = 0.0 step_st_gain = 0.0 if local_cfg.get('tax_enabled', False) and current_capital > 1e-4: for i, ticker in enumerate(tickers): w_shift = delta[i] curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0] px = synth_prices.iloc[curr_idx][ticker] if w_shift < -1e-5: shares_to_sell = abs(w_shift) * current_capital / px _, lt_gain, st_gain = lot_manager.sell_shares_with_tax( ticker, shares_to_sell, px, current_date, lt_days=local_cfg.get('lt_days', 366), method='hifo' ) step_lt_gain += lt_gain step_st_gain += st_gain elif w_shift > 1e-5: shares_to_buy = w_shift * current_capital / px lot_manager.add_lot(ticker, current_date, px, shares_to_buy) tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \ (max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35)) tax_hit = tax_hit_dollars / current_capital else: tax_hit = 0.0 for i in range(len(port_daily_rets)): if i == 0: current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit) else: current_capital *= (1 + port_daily_rets[i]) current_capital = max(0.0, current_capital) cumulative_idx += 1 if cumulative_idx < len(equity_curve): equity_curve.iloc[cumulative_idx] = current_capital # Extract drifted end weights for the next period's delta calculation if current_capital > 1e-4: drifted_values = (current_capital * w_arr) * asset_paths[-1] drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr w_arr = drifted_weights current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr # To be safe and preserve the exact type, we construct a Series current_state.current_weights = pd.Series(w_arr, index=oos_df.columns) current_state.current_weights['CASH'] = cash_w equity_curve = equity_curve.dropna() spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0) spy_eq = capital * (1 + spy_oos).cumprod() return equity_curve, spy_eq def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False): """ Generates thousands of future equity paths. Properly routes HMM regime severity to stress both correlations and volatilities. Uses dynamic trading days for drift computation. Args: return_paths (bool): If True, compute and return a (days, 50) array of sample equity paths for visualisation. Defaults to False to save memory when the caller only needs percentile bands. """ # Note: Use localized standard generator, no longer mutating global RNG state rng = np.random.default_rng(seed) trading_days = cfg.get("trading_days_per_year", 252) sims = cfg.get("monte_carlo_sims", 5000) years = cfg.get("monte_carlo_years", 1.0) days = int(years * trading_days) w_risky = weights.drop(labels=['CASH'], errors='ignore') w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values cash_w = float(weights.get('CASH', 0.0)) rfr = cfg.get("risk_free_rate", 0.04) mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days regime_severity = 1.0 if macro and "hmm_regime" in macro: if isinstance(macro["hmm_regime"], dict): regime_severity = macro["hmm_regime"].get("severity_score", 1.0) else: regime_severity = 1.0 cov_arr = cov_mat.values vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12)) outer_v = np.outer(vols_mc, vols_mc) with np.errstate(divide='ignore', invalid='ignore'): corr_arr = cov_arr / np.maximum(outer_v, 1e-8) corr_arr[np.isnan(corr_arr)] = 0.0 cov_res_mc = CovarianceResult( covariance=cov_mat, correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns), volatility=pd.Series(vols_mc, index=cov_mat.columns), shrinkage=0.0 ) stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity) cov_daily = stressed_cov.covariance.values / trading_days try: chol = cholesky(cov_daily, lower=True) except Exception: chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True) var_daily = np.diag(cov_daily) drift = mu_daily - 0.5 * var_daily current_port_value = np.full(sims, capital, dtype=float) stats_5 = np.zeros(days) stats_25 = np.zeros(days) stats_50 = np.zeros(days) stats_75 = np.zeros(days) stats_95 = np.zeros(days) # Only allocate visual_paths when the caller actually needs them take = min(50, sims) visual_paths = np.zeros((days, take)) if return_paths else None daily_cash_factor = 1 + (rfr / trading_days) for d in range(days): Z = rng.standard_normal((sims, len(w_arr))) daily_shocks = Z @ chol.T asset_paths = np.exp(drift + daily_shocks) # Cash compounds daily: each day the cash portion grows by daily_cash_factor port_paths = np.sum(asset_paths * w_arr, axis=1) + cash_w * daily_cash_factor current_port_value *= port_paths p5, p25, p50, p75, p95 = np.percentile(current_port_value, [5, 25, 50, 75, 95]) stats_5[d] = p5 stats_25[d] = p25 stats_50[d] = p50 stats_75[d] = p75 stats_95[d] = p95 if visual_paths is not None: visual_paths[d, :] = current_port_value[:take] final_values = current_port_value percentiles = np.percentile(final_values, [5, 25, 50, 75, 95]) stats = { 5: stats_5.tolist(), 25: stats_25.tolist(), 50: stats_50.tolist(), 75: stats_75.tolist(), 95: stats_95.tolist(), "dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)] } return visual_paths, stats