portfolio-engine / backtest.py
engineportf's picture
Initial Deployment from Local Engine
208fbf8 verified
Raw
History Blame Contribute Delete
15.5 kB
import numpy as np
import pandas as pd
from scipy.linalg import cholesky
import copy
from config import Color, logger, DEFAULT_CONFIG
from core_types import PortfolioState, LotManager, CovarianceResult
from models import regime_stress_covariance
from solver import build_and_optimize
try:
from execution import estimate_market_impact
_HAS_EXECUTION = True
except ImportError:
_HAS_EXECUTION = False
def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None):
"""
Performs a rigorous out-of-sample expanding window backtest.
Inherently applies LotManager for precise HIFO tax lot tracking across time.
"""
trading_days = cfg.get("trading_days_per_year", 252)
adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0)
local_cfg = copy.deepcopy(cfg)
local_cfg['_is_historical_backtest'] = True
total_days = len(returns_df)
if total_days <= initial_train_days:
print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}")
return None, None
equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float)
initial_start_date = returns_df.index[initial_train_days - 1]
equity_curve.loc[initial_start_date] = capital
equity_curve = equity_curve.sort_index()
current_capital = capital
tickers = list(returns_df.columns)
current_state = PortfolioState.empty(tickers)
lot_manager = LotManager()
optimizer_failures = 0
total_rebalances = 0
synth_prices = (1 + returns_df).cumprod()
# Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop
one_plus_returns_arr = 1.0 + returns_df.values
print(f" {Color.DIM}[INFO] Using trailing EWMA covariance computation for window blocks...{Color.RESET}")
from core_types import OptimizationParams
opt_params = OptimizationParams(use_fast_ewm_cov=True)
# Incremental covariance state
cov_halflife = 126
alpha = 1 - np.exp(-np.log(2) / cov_halflife)
ewm_mean = None
ewm_cov = None
spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008)
trade_cost = local_cfg.get("transaction_cost", 0.001)
cumulative_idx = 0
for t in range(initial_train_days, total_days, rebalance_freq):
total_rebalances += 1
start_idx = max(0, t - initial_train_days)
train_df = returns_df.iloc[start_idx:t]
train_spy = spy_rets.reindex(train_df.index).dropna()
train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None
train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None
end_idx = min(t + rebalance_freq, total_days)
oos_df = returns_df.iloc[t:end_idx]
current_date = oos_df.index[0]
train_end_date = train_df.index[-1]
if ewm_mean is None:
# Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch
ewm_mean = train_df.iloc[0].values.copy()
ewm_cov = np.outer(ewm_mean, ewm_mean)
for row in train_df.iloc[1:].values:
diff = row - ewm_mean
ewm_mean += alpha * diff
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
else:
# Incrementally update EWM cov over the new window (start_idx to t)
# We process the last rebalance_freq rows from train_df.
new_rows = returns_df.iloc[t - rebalance_freq:t].values
for row in new_rows:
# EWM update formulas
diff = row - ewm_mean
ewm_mean += alpha * diff
# outer product
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
try:
# Ensure total_capital is properly set in the state before optimization
# This is critical for impact models and risk budget scaling
current_state.total_capital = current_capital
opt_res = build_and_optimize(
train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5),
risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg,
model=model, allocation_engine=allocation_engine,
ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields,
opt_params=opt_params
)
target_w = opt_res.weights
except Exception as e:
optimizer_failures += 1
logger.warning(f"Expanding window rebalance failed at step {t}: {e}")
target_w = pd.Series(1.0/len(tickers), index=tickers)
# Handle Delisted/Dead Assets
for col in target_w.index:
if col != 'CASH' and col in synth_prices.columns:
px = synth_prices[col].iloc[t]
if pd.isna(px) or px <= 1e-8:
target_w[col] = 0.0
w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
cash_w = float(target_w.get('CASH', 0.0))
if isinstance(rfr, pd.Series):
rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04)
daily_rfr = (rfr_oos / trading_days).values
cash_growth = (1 + daily_rfr).cumprod()
else:
daily_rfr = rfr / trading_days
cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1)
# True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation)
# 1. Asset values compound organically across the period
oos_synth = synth_prices.loc[oos_df.index]
base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0]
base = synth_prices.iloc[base_idx]
asset_paths = oos_synth.divide(base + 1e-9, axis=1).values
allocated_capital_path = current_capital * (asset_paths @ w_arr)
# 3. Add continuous compounding cash yield
cash_path = current_capital * cash_w * cash_growth
# 4. Total Capital at each step
total_path = allocated_capital_path + cash_path
# 5. Extract the true daily portfolio returns for accounting
port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1]))
prev_w_arr = current_state.current_weights
if isinstance(prev_w_arr, pd.Series):
prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
delta = w_arr - prev_w_arr
friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0)
impact = 0.0
if _HAS_EXECUTION:
vols = train_df.std().values
for i, t_val in enumerate(delta):
if abs(t_val) > 1e-4:
impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i])
impact += impact_pct * abs(t_val)
# ── EXACT LOT-BY-LOT TAX LIQUIDATION ──
tax_hit = 0.0
if t == initial_train_days:
for i, ticker in enumerate(tickers):
if w_arr[i] > 1e-5 and current_capital > 0:
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
shares = (w_arr[i] * current_capital) / px
lot_manager.add_lot(ticker, current_date, px, shares)
else:
step_lt_gain = 0.0
step_st_gain = 0.0
if local_cfg.get('tax_enabled', False) and current_capital > 1e-4:
for i, ticker in enumerate(tickers):
w_shift = delta[i]
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
if w_shift < -1e-5:
shares_to_sell = abs(w_shift) * current_capital / px
_, lt_gain, st_gain = lot_manager.sell_shares_with_tax(
ticker, shares_to_sell, px, current_date,
lt_days=local_cfg.get('lt_days', 366), method='hifo'
)
step_lt_gain += lt_gain
step_st_gain += st_gain
elif w_shift > 1e-5:
shares_to_buy = w_shift * current_capital / px
lot_manager.add_lot(ticker, current_date, px, shares_to_buy)
tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \
(max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35))
tax_hit = tax_hit_dollars / current_capital
else:
tax_hit = 0.0
for i in range(len(port_daily_rets)):
if i == 0:
current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit)
else:
current_capital *= (1 + port_daily_rets[i])
current_capital = max(0.0, current_capital)
cumulative_idx += 1
if cumulative_idx < len(equity_curve):
equity_curve.iloc[cumulative_idx] = current_capital
# Extract drifted end weights for the next period's delta calculation
if current_capital > 1e-4:
drifted_values = (current_capital * w_arr) * asset_paths[-1]
drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr
w_arr = drifted_weights
current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr
# To be safe and preserve the exact type, we construct a Series
current_state.current_weights = pd.Series(w_arr, index=oos_df.columns)
current_state.current_weights['CASH'] = cash_w
equity_curve = equity_curve.dropna()
spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0)
spy_eq = capital * (1 + spy_oos).cumprod()
return equity_curve, spy_eq
def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False):
"""
Generates thousands of future equity paths.
Properly routes HMM regime severity to stress both correlations and volatilities.
Uses dynamic trading days for drift computation.
Args:
return_paths (bool): If True, compute and return a (days, 50) array of
sample equity paths for visualisation. Defaults to False to save
memory when the caller only needs percentile bands.
"""
# Note: Use localized standard generator, no longer mutating global RNG state
rng = np.random.default_rng(seed)
trading_days = cfg.get("trading_days_per_year", 252)
sims = cfg.get("monte_carlo_sims", 5000)
years = cfg.get("monte_carlo_years", 1.0)
days = int(years * trading_days)
w_risky = weights.drop(labels=['CASH'], errors='ignore')
w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values
cash_w = float(weights.get('CASH', 0.0))
rfr = cfg.get("risk_free_rate", 0.04)
mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days
regime_severity = 1.0
if macro and "hmm_regime" in macro:
if isinstance(macro["hmm_regime"], dict):
regime_severity = macro["hmm_regime"].get("severity_score", 1.0)
else:
regime_severity = 1.0
cov_arr = cov_mat.values
vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12))
outer_v = np.outer(vols_mc, vols_mc)
with np.errstate(divide='ignore', invalid='ignore'):
corr_arr = cov_arr / np.maximum(outer_v, 1e-8)
corr_arr[np.isnan(corr_arr)] = 0.0
cov_res_mc = CovarianceResult(
covariance=cov_mat,
correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns),
volatility=pd.Series(vols_mc, index=cov_mat.columns),
shrinkage=0.0
)
stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity)
cov_daily = stressed_cov.covariance.values / trading_days
try:
chol = cholesky(cov_daily, lower=True)
except Exception:
chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True)
var_daily = np.diag(cov_daily)
drift = mu_daily - 0.5 * var_daily
# Memory-efficient Vectorized Chunking (100x faster than daily loops, prevents OOM on large sims)
chunk_size = 1000
all_port_values = []
take = min(50, sims)
daily_cash_factor = 1 + (rfr / trading_days)
for start_idx in range(0, sims, chunk_size):
end_idx = min(start_idx + chunk_size, sims)
c_sims = end_idx - start_idx
# Z shape: (days, c_sims, assets)
Z = rng.standard_normal((days, c_sims, len(w_arr)))
daily_shocks = Z @ chol.T
asset_paths = np.exp(drift + daily_shocks)
# port_paths shape: (days, c_sims)
port_paths = np.sum(asset_paths * w_arr, axis=2) + cash_w * daily_cash_factor
# chunk_port_value shape: (days, c_sims)
chunk_port_value = capital * np.cumprod(port_paths, axis=0)
all_port_values.append(chunk_port_value)
# Combine chunks: (days, sims)
current_port_value = np.concatenate(all_port_values, axis=1)
# Calculate percentiles across the sims axis all at once
percentiles = np.percentile(current_port_value, [5, 25, 50, 75, 95], axis=1)
stats_5 = percentiles[0]
stats_25 = percentiles[1]
stats_50 = percentiles[2]
stats_75 = percentiles[3]
stats_95 = percentiles[4]
if return_paths:
visual_paths = current_port_value[:, :take]
else:
visual_paths = None
final_values = current_port_value[-1, :]
percentiles = np.percentile(final_values, [5, 25, 50, 75, 95])
stats = {
5: stats_5.tolist(),
25: stats_25.tolist(),
50: stats_50.tolist(),
75: stats_75.tolist(),
95: stats_95.tolist(),
"dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)]
}
return visual_paths, stats