portfolio_opt / backtest.py
engineportf's picture
Add Quant Terminal Easter Egg
aceffd1
Raw
History Blame Contribute Delete
18.7 kB
import numpy as np
import pandas as pd
from scipy.linalg import cholesky
import copy
from config import Color, logger, DEFAULT_CONFIG
from core_types import PortfolioState, LotManager, CovarianceResult
from models import regime_stress_covariance
from solver import build_and_optimize
try:
from execution import estimate_market_impact
_HAS_EXECUTION = True
except ImportError:
_HAS_EXECUTION = False
def vectorized_backtest(weights_df, returns_df, tc=0.001, spreads=None, initial_capital=100000.0):
"""
High-performance fully vectorized backtest for static or pre-computed weight paths.
Explicitly subtracts transaction costs and slippage.
"""
tickers = weights_df.columns
if spreads is None:
spreads = np.full(len(tickers), 0.0008)
common_dates = weights_df.index.intersection(returns_df.index)
if len(common_dates) == 0:
return pd.Series(dtype=float)
w_arr = weights_df.loc[common_dates].values
r_arr = returns_df.loc[common_dates].values
# Calculate weight deltas for transaction costs
w_shifted = np.roll(w_arr, shift=1, axis=0)
w_shifted[0] = 0.0 # Initial allocation delta is from 0
deltas = np.abs(w_arr - w_shifted)
friction = np.sum(deltas * (spreads + tc), axis=1)
# Gross daily returns
gross_rets = np.sum(w_arr * r_arr, axis=1)
# Net daily returns
net_rets = gross_rets - friction
# Equity curve
equity_curve = initial_capital * np.cumprod(1 + net_rets)
return pd.Series(equity_curve, index=common_dates)
def expanding_window_backtest(returns_df, spy_rets, capital, rfr, cfg, model, allocation_engine, spread_map, initial_train_days=1260, rebalance_freq=63, ff_df=None, yield_df=None, raw_prices=None):
"""
Performs a rigorous out-of-sample expanding window backtest.
Inherently applies LotManager for precise HIFO tax lot tracking across time.
"""
trading_days = cfg.get("trading_days_per_year", 252)
adv_proxy = cfg.get("default_adv_proxy", 50_000_000.0)
local_cfg = copy.deepcopy(cfg)
local_cfg['_is_historical_backtest'] = True
total_days = len(returns_df)
if total_days <= initial_train_days:
print(f" {Color.YELLOW}⚠ Not enough data for expanding window backtest. Need > {initial_train_days} days.{Color.RESET}")
return None, None
equity_curve = pd.Series(index=returns_df.index[initial_train_days:], dtype=float)
initial_start_date = returns_df.index[initial_train_days - 1]
equity_curve.loc[initial_start_date] = capital
equity_curve = equity_curve.sort_index()
current_capital = capital
tickers = list(returns_df.columns)
current_state = PortfolioState.empty(tickers)
lot_manager = LotManager()
optimizer_failures = 0
total_rebalances = 0
synth_prices = (1 + returns_df).cumprod()
# Pre-compute (1 + returns) once to avoid O(N^2) reallocation in the loop
one_plus_returns_arr = 1.0 + returns_df.values
print(f" {Color.DIM}[INFO] Using trailing EWMA covariance computation for window blocks...{Color.RESET}")
from core_types import OptimizationParams
opt_params = OptimizationParams(use_fast_ewm_cov=True, is_backtest=True)
# Incremental covariance state
cov_halflife = 126
alpha = 1 - np.exp(-np.log(2) / cov_halflife)
ewm_mean = None
ewm_cov = None
spreads = np.array([spread_map.get(t, 0.0008) for t in tickers]) if spread_map else np.full(len(tickers), 0.0008)
trade_cost = local_cfg.get("transaction_cost", 0.001)
cumulative_idx = 0
for t in range(initial_train_days, total_days, rebalance_freq):
total_rebalances += 1
start_idx = max(0, t - initial_train_days)
train_df = returns_df.iloc[start_idx:t]
train_spy = spy_rets.reindex(train_df.index).dropna()
train_yields = yield_df.iloc[start_idx:t] if yield_df is not None and not yield_df.empty else None
train_ff = ff_df.reindex(train_df.index).dropna() if ff_df is not None else None
end_idx = min(t + rebalance_freq, total_days)
oos_df = returns_df.iloc[t:end_idx]
current_date = oos_df.index[0]
train_end_date = train_df.index[-1]
if ewm_mean is None:
# Initialize from scratch using incremental formula to avoid pandas EWM normalization mismatch
ewm_mean = train_df.iloc[0].values.copy()
ewm_cov = np.outer(ewm_mean, ewm_mean)
for row in train_df.iloc[1:].values:
diff = row - ewm_mean
ewm_mean += alpha * diff
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
else:
# Incrementally update EWM cov over the new window (start_idx to t)
# We process the last rebalance_freq rows from train_df.
new_rows = returns_df.iloc[t - rebalance_freq:t].values
for row in new_rows:
# EWM update formulas
diff = row - ewm_mean
ewm_mean += alpha * diff
# outer product
ewm_cov = (1 - alpha) * ewm_cov + alpha * np.outer(diff, diff)
opt_params.incremental_cov = pd.DataFrame(ewm_cov * trading_days, index=tickers, columns=tickers)
try:
# Ensure total_capital is properly set in the state before optimization
# This is critical for impact models and risk budget scaling
current_state.total_capital = current_capital
opt_res = build_and_optimize(
train_df, train_spy, risk_input=local_cfg.get('_risk_input', 5),
risk_factor=local_cfg.get('_risk_factor', 3.0), state=current_state, cfg=local_cfg,
model=model, allocation_engine=allocation_engine,
ff_df=train_ff, spread_map=spread_map, silent=True, yield_df=train_yields,
opt_params=opt_params
)
target_w = opt_res.weights
except Exception as e:
optimizer_failures += 1
logger.warning(f"Expanding window rebalance failed at step {t}: {e}")
target_w = pd.Series(1.0/len(tickers), index=tickers)
# Handle Delisted/Dead Assets
for col in target_w.index:
if col != 'CASH' and col in synth_prices.columns:
px = synth_prices[col].iloc[t]
if pd.isna(px) or px <= 1e-8:
target_w[col] = 0.0
w_arr = target_w.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
cash_w = float(target_w.get('CASH', 0.0))
if isinstance(rfr, pd.Series):
rfr_oos = rfr.reindex(oos_df.index).ffill().bfill().fillna(0.04)
daily_rfr = (rfr_oos / trading_days).values
cash_growth = (1 + daily_rfr).cumprod()
else:
daily_rfr = rfr / trading_days
cash_growth = (1 + daily_rfr) ** np.arange(1, len(oos_df) + 1)
# True Buy-and-Hold Return Computation (Instead of Daily Rebalancing Approximation)
# 1. Asset values compound organically across the period
oos_synth = synth_prices.loc[oos_df.index]
base_idx = synth_prices.index.get_indexer([train_end_date], method='ffill')[0]
base = synth_prices.iloc[base_idx]
asset_paths = oos_synth.divide(base + 1e-9, axis=1).values
allocated_capital_path = current_capital * (asset_paths @ w_arr)
# 3. Add continuous compounding cash yield
cash_path = current_capital * cash_w * cash_growth
# 4. Total Capital at each step
total_path = allocated_capital_path + cash_path
# 5. Extract the true daily portfolio returns for accounting
port_daily_rets = np.diff(total_path, prepend=current_capital) / np.concatenate(([current_capital], total_path[:-1]))
prev_w_arr = current_state.current_weights
if isinstance(prev_w_arr, pd.Series):
prev_w_arr = prev_w_arr.drop(labels=['CASH'], errors='ignore').reindex(oos_df.columns).fillna(0.0).values
delta = w_arr - prev_w_arr
friction = np.sum(np.abs(delta) * (spreads + trade_cost), axis=0)
impact = 0.0
if _HAS_EXECUTION:
vols = train_df.std().values
for i, t_val in enumerate(delta):
if abs(t_val) > 1e-4:
impact_pct = estimate_market_impact(abs(t_val * current_capital), adv_proxy, vols[i])
impact += impact_pct * abs(t_val)
# ── EXACT LOT-BY-LOT TAX LIQUIDATION ──
tax_hit = 0.0
if t == initial_train_days:
for i, ticker in enumerate(tickers):
if w_arr[i] > 1e-5 and current_capital > 0:
px = 1.0
if raw_prices is not None and ticker in raw_prices:
px = raw_prices[ticker].reindex([current_date], method='ffill').iloc[0]
else:
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
shares = (w_arr[i] * current_capital) / px
lot_manager.add_lot(ticker, current_date, px, shares)
else:
step_lt_gain = 0.0
step_st_gain = 0.0
if local_cfg.get('tax_enabled', False) and current_capital > 1e-4:
for i, ticker in enumerate(tickers):
w_shift = delta[i]
px = 1.0
if raw_prices is not None and ticker in raw_prices:
px = raw_prices[ticker].reindex([current_date], method='ffill').iloc[0]
else:
curr_idx = synth_prices.index.get_indexer([current_date], method='ffill')[0]
px = synth_prices.iloc[curr_idx][ticker]
if w_shift < -1e-5:
shares_to_sell = abs(w_shift) * current_capital / px
_, lt_gain, st_gain = lot_manager.sell_shares_with_tax(
ticker, shares_to_sell, px, current_date,
lt_days=local_cfg.get('lt_days', 366), method='hifo'
)
step_lt_gain += lt_gain
step_st_gain += st_gain
elif w_shift > 1e-5:
shares_to_buy = w_shift * current_capital / px
lot_manager.add_lot(ticker, current_date, px, shares_to_buy)
tax_hit_dollars = (max(0, step_lt_gain) * local_cfg.get('tax_rate_lt', 0.20)) + \
(max(0, step_st_gain) * local_cfg.get('tax_rate_st', 0.35))
tax_hit = tax_hit_dollars / current_capital
else:
tax_hit = 0.0
for i in range(len(port_daily_rets)):
if i == 0:
current_capital *= (1 + port_daily_rets[i] - friction - impact - tax_hit)
else:
current_capital *= (1 + port_daily_rets[i])
current_capital = max(0.0, current_capital)
cumulative_idx += 1
if cumulative_idx < len(equity_curve):
equity_curve.iloc[cumulative_idx] = current_capital
# Extract drifted end weights for the next period's delta calculation
if current_capital > 1e-4:
drifted_values = (current_capital * w_arr) * asset_paths[-1]
drifted_weights = drifted_values / np.sum(drifted_values) if np.sum(drifted_values) > 0 else w_arr
w_arr = drifted_weights
current_state.current_weights = np.append(w_arr, cash_w) if hasattr(current_state, 'cash_weight') else w_arr
# To be safe and preserve the exact type, we construct a Series
current_state.current_weights = pd.Series(w_arr, index=oos_df.columns)
current_state.current_weights['CASH'] = cash_w
equity_curve = equity_curve.dropna()
spy_oos = spy_rets.reindex(equity_curve.index).fillna(0.0)
spy_eq = capital * (1 + spy_oos).cumprod()
return equity_curve, spy_eq
def monte_carlo(weights, exp_rets, cov_mat, capital, cfg, macro=None, seed=None, return_paths=False):
"""
Generates thousands of future equity paths.
Properly routes HMM regime severity to stress both correlations and volatilities.
Uses dynamic trading days for drift computation.
Args:
return_paths (bool): If True, compute and return a (days, 50) array of
sample equity paths for visualisation. Defaults to False to save
memory when the caller only needs percentile bands.
"""
# Note: Use localized standard generator, no longer mutating global RNG state
rng = np.random.default_rng(seed)
trading_days = cfg.get("trading_days_per_year", 252)
sims = cfg.get("monte_carlo_sims", 5000)
years = cfg.get("monte_carlo_years", 10.0)
days = int(years * trading_days)
w_risky = weights.drop(labels=['CASH'], errors='ignore')
w_arr = w_risky.reindex(cov_mat.columns).fillna(0.0).values
cash_w = float(weights.get('CASH', 0.0))
rfr = cfg.get("risk_free_rate", 0.04)
mu_daily = exp_rets.reindex(cov_mat.columns).fillna(0.0).values / trading_days
regime_severity = 1.0
if macro and "hmm_regime" in macro:
if isinstance(macro["hmm_regime"], dict):
regime_severity = macro["hmm_regime"].get("severity_score", 1.0)
else:
regime_severity = 1.0
cov_arr = cov_mat.values
vols_mc = np.sqrt(np.maximum(np.diag(cov_arr), 1e-12))
outer_v = np.outer(vols_mc, vols_mc)
with np.errstate(divide='ignore', invalid='ignore'):
corr_arr = cov_arr / np.maximum(outer_v, 1e-8)
corr_arr[np.isnan(corr_arr)] = 0.0
cov_res_mc = CovarianceResult(
covariance=cov_mat,
correlation=pd.DataFrame(corr_arr, index=cov_mat.index, columns=cov_mat.columns),
volatility=pd.Series(vols_mc, index=cov_mat.columns),
shrinkage=0.0
)
stressed_cov = regime_stress_covariance(cov_res_mc, regime_severity)
cov_daily = stressed_cov.covariance.values / trading_days
try:
chol = cholesky(cov_daily, lower=True)
except Exception:
chol = cholesky(cov_daily + np.eye(len(cov_daily)) * 1e-6, lower=True)
var_daily = np.diag(cov_daily)
drift = mu_daily - 0.5 * var_daily
# Memory-efficient Vectorized Chunking (prevents OOM on large sims)
# A single chunk array size is days * chunk_size * num_assets.
# To keep memory footprint around 100-200MB per iteration, we cap floats at 25,000,000.
max_elements_per_chunk = 10_000_000
elements_per_sim = days * len(w_arr)
chunk_size = max(1, max_elements_per_chunk // elements_per_sim)
all_port_values = []
take = min(50, sims)
daily_cash_factor = 1 + (rfr / trading_days)
for start_idx in range(0, sims, chunk_size):
end_idx = min(start_idx + chunk_size, sims)
c_sims = end_idx - start_idx
# Z shape: (days, c_sims, assets)
Z = rng.standard_normal((days, c_sims, len(w_arr)))
daily_shocks = Z @ chol.T
asset_paths = np.exp(drift + daily_shocks)
# port_paths shape: (days, c_sims)
port_paths = np.sum(asset_paths * w_arr, axis=2) + (cash_w * daily_cash_factor)
# chunk_port_value shape: (days, c_sims)
chunk_port_value = capital * np.cumprod(port_paths, axis=0)
all_port_values.append(chunk_port_value)
# Combine chunks: (days, sims)
current_port_value = np.concatenate(all_port_values, axis=1)
# Calculate percentiles across the sims axis all at once
percentiles = np.percentile(current_port_value, [5, 25, 50, 75, 95], axis=1)
stats_5 = percentiles[0]
stats_25 = percentiles[1]
stats_50 = percentiles[2]
stats_75 = percentiles[3]
stats_95 = percentiles[4]
if return_paths:
visual_paths = current_port_value[:, :take]
else:
visual_paths = None
final_values = current_port_value[-1, :]
percentiles = np.percentile(final_values, [5, 25, 50, 75, 95])
stats = {
5: stats_5.tolist(),
25: stats_25.tolist(),
50: stats_50.tolist(),
75: stats_75.tolist(),
95: stats_95.tolist(),
"dates": [(pd.Timestamp.today() + pd.Timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days)]
}
return visual_paths, stats
# ─────────────────────────────────────────────
# STATISTICAL ARBITRAGE INTEGRATION
# ─────────────────────────────────────────────
def run_stat_arb_backtest(prices_df: pd.DataFrame, pair_ticker1: str, pair_ticker2: str, hedge_ratio: float, capital: float = 100000.0) -> dict:
"""
Runs the statistical arbitrage backtest on a specific pair.
Returns standard metric dictionary compatible with UI reporting.
"""
try:
from stat_arb import StatArbBacktester
bt = StatArbBacktester(entry_z=2.0, exit_z=0.0, window=60)
s1 = prices_df[pair_ticker1]
s2 = prices_df[pair_ticker2]
results = bt.run(s1, s2, hedge_ratio, capital)
if not results:
return {"error": "Not enough data for backtest window"}
return {
"pair": f"{pair_ticker1} / {pair_ticker2}",
"total_return": results["total_return"],
"annualized_volatility": results["annualized_volatility"],
"sharpe_ratio": results["sharpe_ratio"],
"equity_curve": results["equity_curve"],
"z_scores": results["z_scores"],
"spread": results["spread"],
"dates": s1.index.strftime('%Y-%m-%d').tolist()[1:] # Align length
}
except ImportError:
return {"error": "Stat Arb module not found."}
except Exception as e:
return {"error": str(e)}