Spaces:
Sleeping
Sleeping
File size: 14,862 Bytes
558db1e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 | import numpy as np
import pandas as pd
from scipy.linalg import cholesky
import copy
from config import Color, logger, DEFAULT_CONFIG
from core_types import PortfolioState, LotManager, CovarianceResult
from models import regime_stress_covariance
from solver import build_and_optimize
try:
from execution import estimate_market_impact
_HAS_EXECUTION = True
except ImportError:
_HAS_EXECUTION = False
def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None):
"""
Performs a rigorous out-of-sample expanding window backtest.
Inherently applies LotManager for precise HIFO tax lot tracking across time.
"""
trading_days = cfg.get("trading_days_per_year", 252)
adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0)
local_cfg = copy.deepcopy(cfg)
local_cfg['_is_historical_backtest'] = True
total_days = len(returns_df)
if total_days <= initial_train_days:
print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}")
return None, None
equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float)
initial_start_date = returns_df.index[initial_train_days - 1]
equity_curve.loc[initial_start_date] = capital
equity_curve = equity_curve.sort_index()
current_capital = capital
tickers = list(returns_df.columns)
current_state = PortfolioState.empty(tickers)
lot_manager = LotManager()
optimizer_failures = 0
total_rebalances = 0
synth_prices = (1 + returns_df).cumprod()
# Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop
one_plus_returns_arr = 1.0 + returns_df.values
print(f" {Color.DIM}ℹ Using trailing EWMA covariance computation for window blocks...{Color.RESET}")
from core_types import OptimizationParams
opt_params = OptimizationParams(use_fast_ewm_cov=True)
# Incremental covariance state
cov_halflife = 126
alpha = 1 - np.exp(-np.log(2) / cov_halflife)
ewm_mean = None
ewm_cov = None
spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008)
trade_cost = local_cfg.get("transaction_cost", 0.001)
cumulative_idx = 0
for t in range(initial_train_days, total_days, rebalance_freq):
total_rebalances += 1
start_idx = max(0, t - initial_train_days)
train_df = returns_df.iloc[start_idx:t]
train_spy = spy_rets.reindex(train_df.index).dropna()
train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None
train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None
end_idx = min(t + rebalance_freq, total_days)
oos_df = returns_df.iloc[t:end_idx]
current_date = oos_df.index[0]
train_end_date = train_df.index[-1]
if ewm_mean is None:
# Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch
ewm_mean = train_df.iloc[0].values.copy()
ewm_cov = np.outer(ewm_mean, ewm_mean)
for row in train_df.iloc[1:].values:
diff = row - ewm_mean
ewm_mean += alpha * diff
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
else:
# Incrementally update EWM cov over the new window (start_idx to t)
# We process the last rebalance_freq rows from train_df.
new_rows = returns_df.iloc[t - rebalance_freq:t].values
for row in new_rows:
# EWM update formulas
diff = row - ewm_mean
ewm_mean += alpha * diff
# outer product
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
try:
# Ensure total_capital is properly set in the state before optimization
# This is critical for impact models and risk budget scaling
current_state.total_capital = current_capital
opt_res = build_and_optimize(
train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5),
risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg,
model=model, allocation_engine=allocation_engine,
ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields,
opt_params=opt_params
)
target_w = opt_res.weights
except Exception as e:
optimizer_failures += 1
logger.warning(f"Expanding window rebalance failed at step {t}: {e}")
target_w = pd.Series(1.0/len(tickers), index=tickers)
# Handle Delisted/Dead Assets
for col in target_w.index:
if col != 'CASH' and col in synth_prices.columns:
px = synth_prices[col].iloc[t]
if pd.isna(px) or px <= 1e-8:
target_w[col] = 0.0
w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
cash_w = float(target_w.get('CASH', 0.0))
if isinstance(rfr, pd.Series):
rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04)
daily_rfr = (rfr_oos / trading_days).values
cash_growth = (1 + daily_rfr).cumprod()
else:
daily_rfr = rfr / trading_days
cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1)
# True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation)
# 1. Asset values compound organically across the period
oos_synth = synth_prices.loc[oos_df.index]
base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0]
base = synth_prices.iloc[base_idx]
asset_paths = oos_synth.divide(base + 1e-9, axis=1).values
allocated_capital_path = current_capital * (asset_paths @ w_arr)
# 3. Add continuous compounding cash yield
cash_path = current_capital * cash_w * cash_growth
# 4. Total Capital at each step
total_path = allocated_capital_path + cash_path
# 5. Extract the true daily portfolio returns for accounting
port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1]))
prev_w_arr = current_state.current_weights
if isinstance(prev_w_arr, pd.Series):
prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
delta = w_arr - prev_w_arr
friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0)
impact = 0.0
if _HAS_EXECUTION:
vols = train_df.std().values
for i, t_val in enumerate(delta):
if abs(t_val) > 1e-4:
impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i])
impact += impact_pct * abs(t_val)
# ── EXACT LOT-BY-LOT TAX LIQUIDATION ──
tax_hit = 0.0
if t == initial_train_days:
for i, ticker in enumerate(tickers):
if w_arr[i] > 1e-5 and current_capital > 0:
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
shares = (w_arr[i] * current_capital) / px
lot_manager.add_lot(ticker, current_date, px, shares)
else:
step_lt_gain = 0.0
step_st_gain = 0.0
if local_cfg.get('tax_enabled', False) and current_capital > 1e-4:
for i, ticker in enumerate(tickers):
w_shift = delta[i]
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
if w_shift < -1e-5:
shares_to_sell = abs(w_shift) * current_capital / px
_, lt_gain, st_gain = lot_manager.sell_shares_with_tax(
ticker, shares_to_sell, px, current_date,
lt_days=local_cfg.get('lt_days', 366), method='hifo'
)
step_lt_gain += lt_gain
step_st_gain += st_gain
elif w_shift > 1e-5:
shares_to_buy = w_shift * current_capital / px
lot_manager.add_lot(ticker, current_date, px, shares_to_buy)
tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \
(max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35))
tax_hit = tax_hit_dollars / current_capital
else:
tax_hit = 0.0
for i in range(len(port_daily_rets)):
if i == 0:
current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit)
else:
current_capital *= (1 + port_daily_rets[i])
current_capital = max(0.0, current_capital)
cumulative_idx += 1
if cumulative_idx < len(equity_curve):
equity_curve.iloc[cumulative_idx] = current_capital
# Extract drifted end weights for the next period's delta calculation
if current_capital > 1e-4:
drifted_values = (current_capital * w_arr) * asset_paths[-1]
drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr
w_arr = drifted_weights
current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr
# To be safe and preserve the exact type, we construct a Series
current_state.current_weights = pd.Series(w_arr, index=oos_df.columns)
current_state.current_weights['CASH'] = cash_w
equity_curve = equity_curve.dropna()
spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0)
spy_eq = capital * (1 + spy_oos).cumprod()
return equity_curve, spy_eq
def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False):
"""
Generates thousands of future equity paths.
Properly routes HMM regime severity to stress both correlations and volatilities.
Uses dynamic trading days for drift computation.
Args:
return_paths (bool): If True, compute and return a (days, 50) array of
sample equity paths for visualisation. Defaults to False to save
memory when the caller only needs percentile bands.
"""
# Note: Use localized standard generator, no longer mutating global RNG state
rng = np.random.default_rng(seed)
trading_days = cfg.get("trading_days_per_year", 252)
sims = cfg.get("monte_carlo_sims", 5000)
years = cfg.get("monte_carlo_years", 1.0)
days = int(years * trading_days)
w_risky = weights.drop(labels=['CASH'], errors='ignore')
w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values
cash_w = float(weights.get('CASH', 0.0))
rfr = cfg.get("risk_free_rate", 0.04)
mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days
regime_severity = 1.0
if macro and "hmm_regime" in macro:
if isinstance(macro["hmm_regime"], dict):
regime_severity = macro["hmm_regime"].get("severity_score", 1.0)
else:
regime_severity = 1.0
cov_arr = cov_mat.values
vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12))
outer_v = np.outer(vols_mc, vols_mc)
with np.errstate(divide='ignore', invalid='ignore'):
corr_arr = cov_arr / np.maximum(outer_v, 1e-8)
corr_arr[np.isnan(corr_arr)] = 0.0
cov_res_mc = CovarianceResult(
covariance=cov_mat,
correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns),
volatility=pd.Series(vols_mc, index=cov_mat.columns),
shrinkage=0.0
)
stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity)
cov_daily = stressed_cov.covariance.values / trading_days
try:
chol = cholesky(cov_daily, lower=True)
except Exception:
chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True)
var_daily = np.diag(cov_daily)
drift = mu_daily - 0.5 * var_daily
current_port_value = np.full(sims, capital, dtype=float)
stats_5 = np.zeros(days)
stats_25 = np.zeros(days)
stats_50 = np.zeros(days)
stats_75 = np.zeros(days)
stats_95 = np.zeros(days)
# Only allocate visual_paths when the caller actually needs them
take = min(50, sims)
visual_paths = np.zeros((days, take)) if return_paths else None
daily_cash_factor = 1 + (rfr / trading_days)
for d in range(days):
Z = rng.standard_normal((sims, len(w_arr)))
daily_shocks = Z @ chol.T
asset_paths = np.exp(drift + daily_shocks)
# Cash compounds daily: each day the cash portion grows by daily_cash_factor
port_paths = np.sum(asset_paths * w_arr, axis=1) + cash_w * daily_cash_factor
current_port_value *= port_paths
p5, p25, p50, p75, p95 = np.percentile(current_port_value, [5, 25, 50, 75, 95])
stats_5[d] = p5
stats_25[d] = p25
stats_50[d] = p50
stats_75[d] = p75
stats_95[d] = p95
if visual_paths is not None:
visual_paths[d, :] = current_port_value[:take]
final_values = current_port_value
percentiles = np.percentile(final_values, [5, 25, 50, 75, 95])
stats = {
5: stats_5.tolist(),
25: stats_25.tolist(),
50: stats_50.tolist(),
75: stats_75.tolist(),
95: stats_95.tolist(),
"dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)]
}
return visual_paths, stats
|