| """Backtest Engine for AlphaForge with comprehensive metrics.""" |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Optional, Callable |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class BacktestEngine: |
| """Portfolio backtest engine with transaction costs and slippage""" |
| |
| def __init__(self, |
| initial_capital: float = 1_000_000, |
| transaction_cost: float = 0.0003, |
| slippage: float = 0.0001, |
| benchmark: str = 'SPY'): |
| self.initial_capital = initial_capital |
| self.transaction_cost = transaction_cost |
| self.slippage = slippage |
| self.benchmark = benchmark |
| |
| self.portfolio_values = [] |
| self.weights_history = [] |
| self.returns_history = [] |
| self.dates = [] |
| self.trades = [] |
| |
| def run_backtest(self, |
| returns_df: pd.DataFrame, |
| weights_df: pd.DataFrame, |
| rebalance_dates: Optional[List[pd.Timestamp]] = None) -> Dict: |
| """ |
| Run portfolio backtest |
| |
| Args: |
| returns_df: DataFrame of asset returns (dates x assets) |
| weights_df: DataFrame of portfolio weights (dates x assets) |
| rebalance_dates: List of dates to rebalance (if None, rebalance daily) |
| |
| Returns: |
| Dict with performance metrics |
| """ |
| |
| common_dates = returns_df.index.intersection(weights_df.index) |
| returns_df = returns_df.loc[common_dates] |
| weights_df = weights_df.loc[common_dates] |
| |
| capital = self.initial_capital |
| current_weights = np.zeros(len(returns_df.columns)) |
| portfolio_values = [capital] |
| |
| for i, date in enumerate(common_dates[1:], 1): |
| |
| target_weights = weights_df.iloc[i].values |
| |
| |
| if rebalance_dates is None or date in rebalance_dates: |
| |
| turnover = np.sum(np.abs(target_weights - current_weights)) |
| |
| |
| tc = turnover * self.transaction_cost * capital |
| capital -= tc |
| |
| |
| if turnover > 0.001: |
| self.trades.append({ |
| 'date': date, |
| 'turnover': turnover, |
| 'cost': tc, |
| 'old_weights': current_weights.copy(), |
| 'new_weights': target_weights.copy() |
| }) |
| |
| current_weights = target_weights.copy() |
| |
| |
| daily_returns = returns_df.iloc[i].values |
| slippage_cost = np.sum(np.abs(current_weights)) * self.slippage |
| |
| |
| port_return = np.dot(current_weights, daily_returns) - slippage_cost |
| capital *= (1 + port_return) |
| |
| portfolio_values.append(capital) |
| self.returns_history.append(port_return) |
| self.weights_history.append(current_weights.copy()) |
| self.dates.append(date) |
| |
| self.portfolio_values = np.array(portfolio_values) |
| self.returns_history = np.array(self.returns_history) |
| |
| return self.compute_metrics() |
| |
| def compute_metrics(self, benchmark_returns: Optional[np.ndarray] = None) -> Dict: |
| """Compute comprehensive performance metrics""" |
| returns = self.returns_history |
| |
| if len(returns) == 0: |
| return {} |
| |
| |
| total_return = (self.portfolio_values[-1] / self.initial_capital) - 1 |
| annualized_return = (1 + total_return) ** (252 / len(returns)) - 1 |
| |
| |
| volatility = np.std(returns) * np.sqrt(252) |
| |
| |
| excess_returns = returns - 0.04 / 252 |
| sharpe = np.mean(excess_returns) / np.std(returns) * np.sqrt(252) if np.std(returns) > 0 else 0 |
| |
| |
| downside_returns = returns[returns < 0] |
| downside_std = np.std(downside_returns) * np.sqrt(252) if len(downside_returns) > 0 else 1e-8 |
| sortino = (annualized_return - 0.04) / downside_std |
| |
| |
| cumulative = np.cumprod(1 + returns) |
| running_max = np.maximum.accumulate(cumulative) |
| drawdown = (cumulative - running_max) / running_max |
| max_drawdown = np.min(drawdown) |
| |
| |
| calmar = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0 |
| |
| |
| win_rate = np.sum(returns > 0) / len(returns) |
| |
| |
| gross_profit = np.sum(returns[returns > 0]) |
| gross_loss = abs(np.sum(returns[returns < 0])) |
| profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf |
| |
| |
| alpha, beta = 0, 0 |
| if benchmark_returns is not None and len(benchmark_returns) == len(returns): |
| cov = np.cov(returns, benchmark_returns)[0, 1] |
| bench_var = np.var(benchmark_returns) |
| beta = cov / bench_var if bench_var > 0 else 0 |
| alpha = (np.mean(returns) - beta * np.mean(benchmark_returns)) * 252 |
| |
| |
| if benchmark_returns is not None: |
| tracking_error = np.std(returns - benchmark_returns) * np.sqrt(252) |
| info_ratio = (annualized_return - np.mean(benchmark_returns) * 252) / tracking_error if tracking_error > 0 else 0 |
| else: |
| info_ratio = 0 |
| |
| |
| avg_turnover = np.mean([t['turnover'] for t in self.trades]) if self.trades else 0 |
| total_cost = sum([t['cost'] for t in self.trades]) if self.trades else 0 |
| |
| metrics = { |
| 'total_return': total_return, |
| 'annualized_return': annualized_return, |
| 'volatility': volatility, |
| 'sharpe_ratio': sharpe, |
| 'sortino_ratio': sortino, |
| 'max_drawdown': max_drawdown, |
| 'calmar_ratio': calmar, |
| 'win_rate': win_rate, |
| 'profit_factor': profit_factor, |
| 'alpha': alpha, |
| 'beta': beta, |
| 'information_ratio': info_ratio, |
| 'avg_turnover': avg_turnover, |
| 'total_transaction_costs': total_cost, |
| 'final_capital': self.portfolio_values[-1], |
| 'n_trades': len(self.trades), |
| 'n_days': len(returns) |
| } |
| |
| return metrics |
| |
| def get_equity_curve(self) -> pd.DataFrame: |
| """Get equity curve""" |
| return pd.DataFrame({ |
| 'date': [self.dates[0]] + list(self.dates), |
| 'portfolio_value': self.portfolio_values, |
| 'cumulative_return': (self.portfolio_values / self.initial_capital) - 1 |
| }) |
| |
| def get_drawdown_series(self) -> pd.Series: |
| """Get drawdown series""" |
| cumulative = np.cumprod(1 + self.returns_history) |
| running_max = np.maximum.accumulate(cumulative) |
| drawdown = (cumulative - running_max) / running_max |
| return pd.Series(drawdown, index=self.dates) |
| |
| def get_monthly_returns(self) -> pd.DataFrame: |
| """Get monthly returns""" |
| returns_series = pd.Series(self.returns_history, index=self.dates) |
| monthly = returns_series.resample('M').apply(lambda x: np.prod(1 + x) - 1) |
| return monthly |
| |
| def get_rolling_metrics(self, window: int = 63) -> pd.DataFrame: |
| """Get rolling performance metrics""" |
| returns_series = pd.Series(self.returns_history, index=self.dates) |
| |
| rolling_sharpe = ( |
| returns_series.rolling(window).mean() / |
| returns_series.rolling(window).std() * np.sqrt(252) |
| ) |
| |
| rolling_vol = returns_series.rolling(window).std() * np.sqrt(252) |
| |
| return pd.DataFrame({ |
| 'rolling_sharpe': rolling_sharpe, |
| 'rolling_volatility': rolling_vol |
| }) |
|
|
|
|
| def compute_information_coefficient(predictions: pd.Series, |
| actuals: pd.Series, |
| by_date: bool = True) -> Dict: |
| """ |
| Compute Information Coefficient (rank correlation) |
| |
| Args: |
| predictions: Series of predicted returns |
| actuals: Series of actual returns |
| by_date: If True, compute IC per date and return mean/std |
| |
| Returns: |
| Dict with IC metrics |
| """ |
| from scipy.stats import spearmanr |
| |
| if by_date and hasattr(predictions, 'index') and hasattr(actuals, 'index'): |
| |
| ic_by_date = [] |
| |
| pred_df = pd.DataFrame({'pred': predictions, 'actual': actuals}) |
| pred_df = pred_df.dropna() |
| |
| if hasattr(pred_df.index, 'date'): |
| dates = pred_df.index.date |
| else: |
| dates = pred_df.index |
| |
| for date in np.unique(dates): |
| mask = dates == date |
| if mask.sum() > 3: |
| p = pred_df.loc[mask, 'pred'] |
| a = pred_df.loc[mask, 'actual'] |
| ic, _ = spearmanr(p, a) |
| if not np.isnan(ic): |
| ic_by_date.append(ic) |
| |
| if len(ic_by_date) > 0: |
| return { |
| 'mean_ic': np.mean(ic_by_date), |
| 'ic_std': np.std(ic_by_date), |
| 'ic_ir': np.mean(ic_by_date) / np.std(ic_by_date) if np.std(ic_by_date) > 0 else 0, |
| 'ic_pct_positive': np.sum(np.array(ic_by_date) > 0) / len(ic_by_date), |
| 'n_periods': len(ic_by_date) |
| } |
| |
| |
| mask = ~(np.isnan(predictions) | np.isnan(actuals)) |
| ic, pvalue = spearmanr(predictions[mask], actuals[mask]) |
| |
| return { |
| 'mean_ic': ic if not np.isnan(ic) else 0, |
| 'ic_std': 0, |
| 'ic_ir': 0, |
| 'ic_pct_positive': 1 if ic > 0 else 0, |
| 'n_periods': 1, |
| 'p_value': pvalue |
| } |
|
|
|
|
| class RegimeDetector: |
| """Detect market regimes using Hidden Markov Model or simple heuristics""" |
| |
| def __init__(self, method: str = 'simple'): |
| self.method = method |
| self.regimes = [] |
| |
| def detect_regimes(self, returns: pd.Series, |
| volatility_window: int = 21) -> pd.Series: |
| """ |
| Detect market regimes: |
| - Bull: positive trend, low vol |
| - Bear: negative trend, high vol |
| - High Vol: high volatility regardless of trend |
| """ |
| |
| trend = returns.rolling(63).mean() |
| |
| |
| vol = returns.rolling(volatility_window).std() * np.sqrt(252) |
| vol_median = vol.median() |
| |
| regimes = pd.Series(index=returns.index, dtype='object') |
| |
| for i, date in enumerate(returns.index): |
| if pd.isna(trend.loc[date]) or pd.isna(vol.loc[date]): |
| regimes.loc[date] = 'unknown' |
| continue |
| |
| t = trend.loc[date] |
| v = vol.loc[date] |
| |
| if v > vol_median * 1.5: |
| regimes.loc[date] = 'high_vol' |
| elif t > 0.001: |
| regimes.loc[date] = 'bull' |
| elif t < -0.001: |
| regimes.loc[date] = 'bear' |
| else: |
| regimes.loc[date] = 'neutral' |
| |
| self.regimes = regimes |
| return regimes |
| |
| def get_regime_stats(self, returns: pd.Series) -> pd.DataFrame: |
| """Get performance statistics by regime""" |
| if len(self.regimes) == 0: |
| self.detect_regimes(returns) |
| |
| stats = [] |
| for regime in self.regimes.unique(): |
| mask = self.regimes == regime |
| regime_returns = returns[mask] |
| |
| if len(regime_returns) > 0: |
| stats.append({ |
| 'regime': regime, |
| 'n_days': len(regime_returns), |
| 'mean_return': regime_returns.mean() * 252, |
| 'volatility': regime_returns.std() * np.sqrt(252), |
| 'sharpe': (regime_returns.mean() / regime_returns.std()) * np.sqrt(252) if regime_returns.std() > 0 else 0, |
| 'max_drawdown': (regime_returns.cumsum() - regime_returns.cumsum().cummax()).min() |
| }) |
| |
| return pd.DataFrame(stats) |
|
|