alphaforge-quant-system / backtest_engine.py
Premchan369's picture
Add backtest engine with Sharpe, Sortino, max drawdown, IC tracking
24ff2ee verified
"""Backtest Engine for AlphaForge with comprehensive metrics."""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Callable
import warnings
warnings.filterwarnings('ignore')
class BacktestEngine:
"""Portfolio backtest engine with transaction costs and slippage"""
def __init__(self,
initial_capital: float = 1_000_000,
transaction_cost: float = 0.0003,
slippage: float = 0.0001,
benchmark: str = 'SPY'):
self.initial_capital = initial_capital
self.transaction_cost = transaction_cost
self.slippage = slippage
self.benchmark = benchmark
self.portfolio_values = []
self.weights_history = []
self.returns_history = []
self.dates = []
self.trades = []
def run_backtest(self,
returns_df: pd.DataFrame,
weights_df: pd.DataFrame,
rebalance_dates: Optional[List[pd.Timestamp]] = None) -> Dict:
"""
Run portfolio backtest
Args:
returns_df: DataFrame of asset returns (dates x assets)
weights_df: DataFrame of portfolio weights (dates x assets)
rebalance_dates: List of dates to rebalance (if None, rebalance daily)
Returns:
Dict with performance metrics
"""
# Align dates
common_dates = returns_df.index.intersection(weights_df.index)
returns_df = returns_df.loc[common_dates]
weights_df = weights_df.loc[common_dates]
capital = self.initial_capital
current_weights = np.zeros(len(returns_df.columns))
portfolio_values = [capital]
for i, date in enumerate(common_dates[1:], 1):
# Get target weights
target_weights = weights_df.iloc[i].values
# Check if rebalance needed
if rebalance_dates is None or date in rebalance_dates:
# Calculate turnover
turnover = np.sum(np.abs(target_weights - current_weights))
# Transaction costs
tc = turnover * self.transaction_cost * capital
capital -= tc
# Record trade
if turnover > 0.001:
self.trades.append({
'date': date,
'turnover': turnover,
'cost': tc,
'old_weights': current_weights.copy(),
'new_weights': target_weights.copy()
})
current_weights = target_weights.copy()
# Apply slippage to returns
daily_returns = returns_df.iloc[i].values
slippage_cost = np.sum(np.abs(current_weights)) * self.slippage
# Portfolio return
port_return = np.dot(current_weights, daily_returns) - slippage_cost
capital *= (1 + port_return)
portfolio_values.append(capital)
self.returns_history.append(port_return)
self.weights_history.append(current_weights.copy())
self.dates.append(date)
self.portfolio_values = np.array(portfolio_values)
self.returns_history = np.array(self.returns_history)
return self.compute_metrics()
def compute_metrics(self, benchmark_returns: Optional[np.ndarray] = None) -> Dict:
"""Compute comprehensive performance metrics"""
returns = self.returns_history
if len(returns) == 0:
return {}
# Basic metrics
total_return = (self.portfolio_values[-1] / self.initial_capital) - 1
annualized_return = (1 + total_return) ** (252 / len(returns)) - 1
# Volatility
volatility = np.std(returns) * np.sqrt(252)
# Sharpe ratio
excess_returns = returns - 0.04 / 252 # Assuming 4% risk-free rate
sharpe = np.mean(excess_returns) / np.std(returns) * np.sqrt(252) if np.std(returns) > 0 else 0
# Sortino ratio
downside_returns = returns[returns < 0]
downside_std = np.std(downside_returns) * np.sqrt(252) if len(downside_returns) > 0 else 1e-8
sortino = (annualized_return - 0.04) / downside_std
# Max drawdown
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
max_drawdown = np.min(drawdown)
# Calmar ratio
calmar = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0
# Win rate
win_rate = np.sum(returns > 0) / len(returns)
# Profit factor
gross_profit = np.sum(returns[returns > 0])
gross_loss = abs(np.sum(returns[returns < 0]))
profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
# Alpha and Beta (vs benchmark)
alpha, beta = 0, 0
if benchmark_returns is not None and len(benchmark_returns) == len(returns):
cov = np.cov(returns, benchmark_returns)[0, 1]
bench_var = np.var(benchmark_returns)
beta = cov / bench_var if bench_var > 0 else 0
alpha = (np.mean(returns) - beta * np.mean(benchmark_returns)) * 252
# Information ratio
if benchmark_returns is not None:
tracking_error = np.std(returns - benchmark_returns) * np.sqrt(252)
info_ratio = (annualized_return - np.mean(benchmark_returns) * 252) / tracking_error if tracking_error > 0 else 0
else:
info_ratio = 0
# Turnover statistics
avg_turnover = np.mean([t['turnover'] for t in self.trades]) if self.trades else 0
total_cost = sum([t['cost'] for t in self.trades]) if self.trades else 0
metrics = {
'total_return': total_return,
'annualized_return': annualized_return,
'volatility': volatility,
'sharpe_ratio': sharpe,
'sortino_ratio': sortino,
'max_drawdown': max_drawdown,
'calmar_ratio': calmar,
'win_rate': win_rate,
'profit_factor': profit_factor,
'alpha': alpha,
'beta': beta,
'information_ratio': info_ratio,
'avg_turnover': avg_turnover,
'total_transaction_costs': total_cost,
'final_capital': self.portfolio_values[-1],
'n_trades': len(self.trades),
'n_days': len(returns)
}
return metrics
def get_equity_curve(self) -> pd.DataFrame:
"""Get equity curve"""
return pd.DataFrame({
'date': [self.dates[0]] + list(self.dates),
'portfolio_value': self.portfolio_values,
'cumulative_return': (self.portfolio_values / self.initial_capital) - 1
})
def get_drawdown_series(self) -> pd.Series:
"""Get drawdown series"""
cumulative = np.cumprod(1 + self.returns_history)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
return pd.Series(drawdown, index=self.dates)
def get_monthly_returns(self) -> pd.DataFrame:
"""Get monthly returns"""
returns_series = pd.Series(self.returns_history, index=self.dates)
monthly = returns_series.resample('M').apply(lambda x: np.prod(1 + x) - 1)
return monthly
def get_rolling_metrics(self, window: int = 63) -> pd.DataFrame:
"""Get rolling performance metrics"""
returns_series = pd.Series(self.returns_history, index=self.dates)
rolling_sharpe = (
returns_series.rolling(window).mean() /
returns_series.rolling(window).std() * np.sqrt(252)
)
rolling_vol = returns_series.rolling(window).std() * np.sqrt(252)
return pd.DataFrame({
'rolling_sharpe': rolling_sharpe,
'rolling_volatility': rolling_vol
})
def compute_information_coefficient(predictions: pd.Series,
actuals: pd.Series,
by_date: bool = True) -> Dict:
"""
Compute Information Coefficient (rank correlation)
Args:
predictions: Series of predicted returns
actuals: Series of actual returns
by_date: If True, compute IC per date and return mean/std
Returns:
Dict with IC metrics
"""
from scipy.stats import spearmanr
if by_date and hasattr(predictions, 'index') and hasattr(actuals, 'index'):
# Group by date
ic_by_date = []
pred_df = pd.DataFrame({'pred': predictions, 'actual': actuals})
pred_df = pred_df.dropna()
if hasattr(pred_df.index, 'date'):
dates = pred_df.index.date
else:
dates = pred_df.index
for date in np.unique(dates):
mask = dates == date
if mask.sum() > 3:
p = pred_df.loc[mask, 'pred']
a = pred_df.loc[mask, 'actual']
ic, _ = spearmanr(p, a)
if not np.isnan(ic):
ic_by_date.append(ic)
if len(ic_by_date) > 0:
return {
'mean_ic': np.mean(ic_by_date),
'ic_std': np.std(ic_by_date),
'ic_ir': np.mean(ic_by_date) / np.std(ic_by_date) if np.std(ic_by_date) > 0 else 0,
'ic_pct_positive': np.sum(np.array(ic_by_date) > 0) / len(ic_by_date),
'n_periods': len(ic_by_date)
}
# Overall IC
mask = ~(np.isnan(predictions) | np.isnan(actuals))
ic, pvalue = spearmanr(predictions[mask], actuals[mask])
return {
'mean_ic': ic if not np.isnan(ic) else 0,
'ic_std': 0,
'ic_ir': 0,
'ic_pct_positive': 1 if ic > 0 else 0,
'n_periods': 1,
'p_value': pvalue
}
class RegimeDetector:
"""Detect market regimes using Hidden Markov Model or simple heuristics"""
def __init__(self, method: str = 'simple'):
self.method = method
self.regimes = []
def detect_regimes(self, returns: pd.Series,
volatility_window: int = 21) -> pd.Series:
"""
Detect market regimes:
- Bull: positive trend, low vol
- Bear: negative trend, high vol
- High Vol: high volatility regardless of trend
"""
# Trend
trend = returns.rolling(63).mean()
# Volatility
vol = returns.rolling(volatility_window).std() * np.sqrt(252)
vol_median = vol.median()
regimes = pd.Series(index=returns.index, dtype='object')
for i, date in enumerate(returns.index):
if pd.isna(trend.loc[date]) or pd.isna(vol.loc[date]):
regimes.loc[date] = 'unknown'
continue
t = trend.loc[date]
v = vol.loc[date]
if v > vol_median * 1.5:
regimes.loc[date] = 'high_vol'
elif t > 0.001:
regimes.loc[date] = 'bull'
elif t < -0.001:
regimes.loc[date] = 'bear'
else:
regimes.loc[date] = 'neutral'
self.regimes = regimes
return regimes
def get_regime_stats(self, returns: pd.Series) -> pd.DataFrame:
"""Get performance statistics by regime"""
if len(self.regimes) == 0:
self.detect_regimes(returns)
stats = []
for regime in self.regimes.unique():
mask = self.regimes == regime
regime_returns = returns[mask]
if len(regime_returns) > 0:
stats.append({
'regime': regime,
'n_days': len(regime_returns),
'mean_return': regime_returns.mean() * 252,
'volatility': regime_returns.std() * np.sqrt(252),
'sharpe': (regime_returns.mean() / regime_returns.std()) * np.sqrt(252) if regime_returns.std() > 0 else 0,
'max_drawdown': (regime_returns.cumsum() - regime_returns.cumsum().cummax()).min()
})
return pd.DataFrame(stats)