math-backend / backtest.py
engineportf's picture
Upload folder using huggingface_hub
558db1e verified
Raw
History Blame Contribute Delete
14.9 kB
import numpy as np
import pandas as pd
from scipy.linalg import cholesky
import copy
from config import Color, logger, DEFAULT_CONFIG
from core_types import PortfolioState, LotManager, CovarianceResult
from models import regime_stress_covariance
from solver import build_and_optimize
try:
from execution import estimate_market_impact
_HAS_EXECUTION = True
except ImportError:
_HAS_EXECUTION = False
def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None):
"""
Performs a rigorous out-of-sample expanding window backtest.
Inherently applies LotManager for precise HIFO tax lot tracking across time.
"""
trading_days = cfg.get("trading_days_per_year", 252)
adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0)
local_cfg = copy.deepcopy(cfg)
local_cfg['_is_historical_backtest'] = True
total_days = len(returns_df)
if total_days <= initial_train_days:
print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}")
return None, None
equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float)
initial_start_date = returns_df.index[initial_train_days - 1]
equity_curve.loc[initial_start_date] = capital
equity_curve = equity_curve.sort_index()
current_capital = capital
tickers = list(returns_df.columns)
current_state = PortfolioState.empty(tickers)
lot_manager = LotManager()
optimizer_failures = 0
total_rebalances = 0
synth_prices = (1 + returns_df).cumprod()
# Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop
one_plus_returns_arr = 1.0 + returns_df.values
print(f" {Color.DIM}ℹ Using trailing EWMA covariance computation for window blocks...{Color.RESET}")
from core_types import OptimizationParams
opt_params = OptimizationParams(use_fast_ewm_cov=True)
# Incremental covariance state
cov_halflife = 126
alpha = 1 - np.exp(-np.log(2) / cov_halflife)
ewm_mean = None
ewm_cov = None
spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008)
trade_cost = local_cfg.get("transaction_cost", 0.001)
cumulative_idx = 0
for t in range(initial_train_days, total_days, rebalance_freq):
total_rebalances += 1
start_idx = max(0, t - initial_train_days)
train_df = returns_df.iloc[start_idx:t]
train_spy = spy_rets.reindex(train_df.index).dropna()
train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None
train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None
end_idx = min(t + rebalance_freq, total_days)
oos_df = returns_df.iloc[t:end_idx]
current_date = oos_df.index[0]
train_end_date = train_df.index[-1]
if ewm_mean is None:
# Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch
ewm_mean = train_df.iloc[0].values.copy()
ewm_cov = np.outer(ewm_mean, ewm_mean)
for row in train_df.iloc[1:].values:
diff = row - ewm_mean
ewm_mean += alpha * diff
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
else:
# Incrementally update EWM cov over the new window (start_idx to t)
# We process the last rebalance_freq rows from train_df.
new_rows = returns_df.iloc[t - rebalance_freq:t].values
for row in new_rows:
# EWM update formulas
diff = row - ewm_mean
ewm_mean += alpha * diff
# outer product
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
try:
# Ensure total_capital is properly set in the state before optimization
# This is critical for impact models and risk budget scaling
current_state.total_capital = current_capital
opt_res = build_and_optimize(
train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5),
risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg,
model=model, allocation_engine=allocation_engine,
ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields,
opt_params=opt_params
)
target_w = opt_res.weights
except Exception as e:
optimizer_failures += 1
logger.warning(f"Expanding window rebalance failed at step {t}: {e}")
target_w = pd.Series(1.0/len(tickers), index=tickers)
# Handle Delisted/Dead Assets
for col in target_w.index:
if col != 'CASH' and col in synth_prices.columns:
px = synth_prices[col].iloc[t]
if pd.isna(px) or px <= 1e-8:
target_w[col] = 0.0
w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
cash_w = float(target_w.get('CASH', 0.0))
if isinstance(rfr, pd.Series):
rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04)
daily_rfr = (rfr_oos / trading_days).values
cash_growth = (1 + daily_rfr).cumprod()
else:
daily_rfr = rfr / trading_days
cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1)
# True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation)
# 1. Asset values compound organically across the period
oos_synth = synth_prices.loc[oos_df.index]
base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0]
base = synth_prices.iloc[base_idx]
asset_paths = oos_synth.divide(base + 1e-9, axis=1).values
allocated_capital_path = current_capital * (asset_paths @ w_arr)
# 3. Add continuous compounding cash yield
cash_path = current_capital * cash_w * cash_growth
# 4. Total Capital at each step
total_path = allocated_capital_path + cash_path
# 5. Extract the true daily portfolio returns for accounting
port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1]))
prev_w_arr = current_state.current_weights
if isinstance(prev_w_arr, pd.Series):
prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
delta = w_arr - prev_w_arr
friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0)
impact = 0.0
if _HAS_EXECUTION:
vols = train_df.std().values
for i, t_val in enumerate(delta):
if abs(t_val) > 1e-4:
impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i])
impact += impact_pct * abs(t_val)
# ── EXACT LOT-BY-LOT TAX LIQUIDATION ──
tax_hit = 0.0
if t == initial_train_days:
for i, ticker in enumerate(tickers):
if w_arr[i] > 1e-5 and current_capital > 0:
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
shares = (w_arr[i] * current_capital) / px
lot_manager.add_lot(ticker, current_date, px, shares)
else:
step_lt_gain = 0.0
step_st_gain = 0.0
if local_cfg.get('tax_enabled', False) and current_capital > 1e-4:
for i, ticker in enumerate(tickers):
w_shift = delta[i]
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
if w_shift < -1e-5:
shares_to_sell = abs(w_shift) * current_capital / px
_, lt_gain, st_gain = lot_manager.sell_shares_with_tax(
ticker, shares_to_sell, px, current_date,
lt_days=local_cfg.get('lt_days', 366), method='hifo'
)
step_lt_gain += lt_gain
step_st_gain += st_gain
elif w_shift > 1e-5:
shares_to_buy = w_shift * current_capital / px
lot_manager.add_lot(ticker, current_date, px, shares_to_buy)
tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \
(max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35))
tax_hit = tax_hit_dollars / current_capital
else:
tax_hit = 0.0
for i in range(len(port_daily_rets)):
if i == 0:
current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit)
else:
current_capital *= (1 + port_daily_rets[i])
current_capital = max(0.0, current_capital)
cumulative_idx += 1
if cumulative_idx < len(equity_curve):
equity_curve.iloc[cumulative_idx] = current_capital
# Extract drifted end weights for the next period's delta calculation
if current_capital > 1e-4:
drifted_values = (current_capital * w_arr) * asset_paths[-1]
drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr
w_arr = drifted_weights
current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr
# To be safe and preserve the exact type, we construct a Series
current_state.current_weights = pd.Series(w_arr, index=oos_df.columns)
current_state.current_weights['CASH'] = cash_w
equity_curve = equity_curve.dropna()
spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0)
spy_eq = capital * (1 + spy_oos).cumprod()
return equity_curve, spy_eq
def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False):
"""
Generates thousands of future equity paths.
Properly routes HMM regime severity to stress both correlations and volatilities.
Uses dynamic trading days for drift computation.
Args:
return_paths (bool): If True, compute and return a (days, 50) array of
sample equity paths for visualisation. Defaults to False to save
memory when the caller only needs percentile bands.
"""
# Note: Use localized standard generator, no longer mutating global RNG state
rng = np.random.default_rng(seed)
trading_days = cfg.get("trading_days_per_year", 252)
sims = cfg.get("monte_carlo_sims", 5000)
years = cfg.get("monte_carlo_years", 1.0)
days = int(years * trading_days)
w_risky = weights.drop(labels=['CASH'], errors='ignore')
w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values
cash_w = float(weights.get('CASH', 0.0))
rfr = cfg.get("risk_free_rate", 0.04)
mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days
regime_severity = 1.0
if macro and "hmm_regime" in macro:
if isinstance(macro["hmm_regime"], dict):
regime_severity = macro["hmm_regime"].get("severity_score", 1.0)
else:
regime_severity = 1.0
cov_arr = cov_mat.values
vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12))
outer_v = np.outer(vols_mc, vols_mc)
with np.errstate(divide='ignore', invalid='ignore'):
corr_arr = cov_arr / np.maximum(outer_v, 1e-8)
corr_arr[np.isnan(corr_arr)] = 0.0
cov_res_mc = CovarianceResult(
covariance=cov_mat,
correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns),
volatility=pd.Series(vols_mc, index=cov_mat.columns),
shrinkage=0.0
)
stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity)
cov_daily = stressed_cov.covariance.values / trading_days
try:
chol = cholesky(cov_daily, lower=True)
except Exception:
chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True)
var_daily = np.diag(cov_daily)
drift = mu_daily - 0.5 * var_daily
current_port_value = np.full(sims, capital, dtype=float)
stats_5 = np.zeros(days)
stats_25 = np.zeros(days)
stats_50 = np.zeros(days)
stats_75 = np.zeros(days)
stats_95 = np.zeros(days)
# Only allocate visual_paths when the caller actually needs them
take = min(50, sims)
visual_paths = np.zeros((days, take)) if return_paths else None
daily_cash_factor = 1 + (rfr / trading_days)
for d in range(days):
Z = rng.standard_normal((sims, len(w_arr)))
daily_shocks = Z @ chol.T
asset_paths = np.exp(drift + daily_shocks)
# Cash compounds daily: each day the cash portion grows by daily_cash_factor
port_paths = np.sum(asset_paths * w_arr, axis=1) + cash_w * daily_cash_factor
current_port_value *= port_paths
p5, p25, p50, p75, p95 = np.percentile(current_port_value, [5, 25, 50, 75, 95])
stats_5[d] = p5
stats_25[d] = p25
stats_50[d] = p50
stats_75[d] = p75
stats_95[d] = p95
if visual_paths is not None:
visual_paths[d, :] = current_port_value[:take]
final_values = current_port_value
percentiles = np.percentile(final_values, [5, 25, 50, 75, 95])
stats = {
5: stats_5.tolist(),
25: stats_25.tolist(),
50: stats_50.tolist(),
75: stats_75.tolist(),
95: stats_95.tolist(),
"dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)]
}
return visual_paths, stats