File size: 12,687 Bytes
24ff2ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""Backtest Engine for AlphaForge with comprehensive metrics."""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Callable
import warnings
warnings.filterwarnings('ignore')


class BacktestEngine:
    """Portfolio backtest engine with transaction costs and slippage"""
    
    def __init__(self, 
                 initial_capital: float = 1_000_000,
                 transaction_cost: float = 0.0003,
                 slippage: float = 0.0001,
                 benchmark: str = 'SPY'):
        self.initial_capital = initial_capital
        self.transaction_cost = transaction_cost
        self.slippage = slippage
        self.benchmark = benchmark
        
        self.portfolio_values = []
        self.weights_history = []
        self.returns_history = []
        self.dates = []
        self.trades = []
        
    def run_backtest(self,
                     returns_df: pd.DataFrame,
                     weights_df: pd.DataFrame,
                     rebalance_dates: Optional[List[pd.Timestamp]] = None) -> Dict:
        """
        Run portfolio backtest
        
        Args:
            returns_df: DataFrame of asset returns (dates x assets)
            weights_df: DataFrame of portfolio weights (dates x assets)
            rebalance_dates: List of dates to rebalance (if None, rebalance daily)
        
        Returns:
            Dict with performance metrics
        """
        # Align dates
        common_dates = returns_df.index.intersection(weights_df.index)
        returns_df = returns_df.loc[common_dates]
        weights_df = weights_df.loc[common_dates]
        
        capital = self.initial_capital
        current_weights = np.zeros(len(returns_df.columns))
        portfolio_values = [capital]
        
        for i, date in enumerate(common_dates[1:], 1):
            # Get target weights
            target_weights = weights_df.iloc[i].values
            
            # Check if rebalance needed
            if rebalance_dates is None or date in rebalance_dates:
                # Calculate turnover
                turnover = np.sum(np.abs(target_weights - current_weights))
                
                # Transaction costs
                tc = turnover * self.transaction_cost * capital
                capital -= tc
                
                # Record trade
                if turnover > 0.001:
                    self.trades.append({
                        'date': date,
                        'turnover': turnover,
                        'cost': tc,
                        'old_weights': current_weights.copy(),
                        'new_weights': target_weights.copy()
                    })
                
                current_weights = target_weights.copy()
            
            # Apply slippage to returns
            daily_returns = returns_df.iloc[i].values
            slippage_cost = np.sum(np.abs(current_weights)) * self.slippage
            
            # Portfolio return
            port_return = np.dot(current_weights, daily_returns) - slippage_cost
            capital *= (1 + port_return)
            
            portfolio_values.append(capital)
            self.returns_history.append(port_return)
            self.weights_history.append(current_weights.copy())
            self.dates.append(date)
        
        self.portfolio_values = np.array(portfolio_values)
        self.returns_history = np.array(self.returns_history)
        
        return self.compute_metrics()
    
    def compute_metrics(self, benchmark_returns: Optional[np.ndarray] = None) -> Dict:
        """Compute comprehensive performance metrics"""
        returns = self.returns_history
        
        if len(returns) == 0:
            return {}
        
        # Basic metrics
        total_return = (self.portfolio_values[-1] / self.initial_capital) - 1
        annualized_return = (1 + total_return) ** (252 / len(returns)) - 1
        
        # Volatility
        volatility = np.std(returns) * np.sqrt(252)
        
        # Sharpe ratio
        excess_returns = returns - 0.04 / 252  # Assuming 4% risk-free rate
        sharpe = np.mean(excess_returns) / np.std(returns) * np.sqrt(252) if np.std(returns) > 0 else 0
        
        # Sortino ratio
        downside_returns = returns[returns < 0]
        downside_std = np.std(downside_returns) * np.sqrt(252) if len(downside_returns) > 0 else 1e-8
        sortino = (annualized_return - 0.04) / downside_std
        
        # Max drawdown
        cumulative = np.cumprod(1 + returns)
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = np.min(drawdown)
        
        # Calmar ratio
        calmar = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0
        
        # Win rate
        win_rate = np.sum(returns > 0) / len(returns)
        
        # Profit factor
        gross_profit = np.sum(returns[returns > 0])
        gross_loss = abs(np.sum(returns[returns < 0]))
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
        
        # Alpha and Beta (vs benchmark)
        alpha, beta = 0, 0
        if benchmark_returns is not None and len(benchmark_returns) == len(returns):
            cov = np.cov(returns, benchmark_returns)[0, 1]
            bench_var = np.var(benchmark_returns)
            beta = cov / bench_var if bench_var > 0 else 0
            alpha = (np.mean(returns) - beta * np.mean(benchmark_returns)) * 252
        
        # Information ratio
        if benchmark_returns is not None:
            tracking_error = np.std(returns - benchmark_returns) * np.sqrt(252)
            info_ratio = (annualized_return - np.mean(benchmark_returns) * 252) / tracking_error if tracking_error > 0 else 0
        else:
            info_ratio = 0
        
        # Turnover statistics
        avg_turnover = np.mean([t['turnover'] for t in self.trades]) if self.trades else 0
        total_cost = sum([t['cost'] for t in self.trades]) if self.trades else 0
        
        metrics = {
            'total_return': total_return,
            'annualized_return': annualized_return,
            'volatility': volatility,
            'sharpe_ratio': sharpe,
            'sortino_ratio': sortino,
            'max_drawdown': max_drawdown,
            'calmar_ratio': calmar,
            'win_rate': win_rate,
            'profit_factor': profit_factor,
            'alpha': alpha,
            'beta': beta,
            'information_ratio': info_ratio,
            'avg_turnover': avg_turnover,
            'total_transaction_costs': total_cost,
            'final_capital': self.portfolio_values[-1],
            'n_trades': len(self.trades),
            'n_days': len(returns)
        }
        
        return metrics
    
    def get_equity_curve(self) -> pd.DataFrame:
        """Get equity curve"""
        return pd.DataFrame({
            'date': [self.dates[0]] + list(self.dates),
            'portfolio_value': self.portfolio_values,
            'cumulative_return': (self.portfolio_values / self.initial_capital) - 1
        })
    
    def get_drawdown_series(self) -> pd.Series:
        """Get drawdown series"""
        cumulative = np.cumprod(1 + self.returns_history)
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        return pd.Series(drawdown, index=self.dates)
    
    def get_monthly_returns(self) -> pd.DataFrame:
        """Get monthly returns"""
        returns_series = pd.Series(self.returns_history, index=self.dates)
        monthly = returns_series.resample('M').apply(lambda x: np.prod(1 + x) - 1)
        return monthly
    
    def get_rolling_metrics(self, window: int = 63) -> pd.DataFrame:
        """Get rolling performance metrics"""
        returns_series = pd.Series(self.returns_history, index=self.dates)
        
        rolling_sharpe = (
            returns_series.rolling(window).mean() / 
            returns_series.rolling(window).std() * np.sqrt(252)
        )
        
        rolling_vol = returns_series.rolling(window).std() * np.sqrt(252)
        
        return pd.DataFrame({
            'rolling_sharpe': rolling_sharpe,
            'rolling_volatility': rolling_vol
        })


def compute_information_coefficient(predictions: pd.Series, 
                                     actuals: pd.Series,
                                     by_date: bool = True) -> Dict:
    """
    Compute Information Coefficient (rank correlation)
    
    Args:
        predictions: Series of predicted returns
        actuals: Series of actual returns
        by_date: If True, compute IC per date and return mean/std
    
    Returns:
        Dict with IC metrics
    """
    from scipy.stats import spearmanr
    
    if by_date and hasattr(predictions, 'index') and hasattr(actuals, 'index'):
        # Group by date
        ic_by_date = []
        
        pred_df = pd.DataFrame({'pred': predictions, 'actual': actuals})
        pred_df = pred_df.dropna()
        
        if hasattr(pred_df.index, 'date'):
            dates = pred_df.index.date
        else:
            dates = pred_df.index
        
        for date in np.unique(dates):
            mask = dates == date
            if mask.sum() > 3:
                p = pred_df.loc[mask, 'pred']
                a = pred_df.loc[mask, 'actual']
                ic, _ = spearmanr(p, a)
                if not np.isnan(ic):
                    ic_by_date.append(ic)
        
        if len(ic_by_date) > 0:
            return {
                'mean_ic': np.mean(ic_by_date),
                'ic_std': np.std(ic_by_date),
                'ic_ir': np.mean(ic_by_date) / np.std(ic_by_date) if np.std(ic_by_date) > 0 else 0,
                'ic_pct_positive': np.sum(np.array(ic_by_date) > 0) / len(ic_by_date),
                'n_periods': len(ic_by_date)
            }
    
    # Overall IC
    mask = ~(np.isnan(predictions) | np.isnan(actuals))
    ic, pvalue = spearmanr(predictions[mask], actuals[mask])
    
    return {
        'mean_ic': ic if not np.isnan(ic) else 0,
        'ic_std': 0,
        'ic_ir': 0,
        'ic_pct_positive': 1 if ic > 0 else 0,
        'n_periods': 1,
        'p_value': pvalue
    }


class RegimeDetector:
    """Detect market regimes using Hidden Markov Model or simple heuristics"""
    
    def __init__(self, method: str = 'simple'):
        self.method = method
        self.regimes = []
        
    def detect_regimes(self, returns: pd.Series, 
                       volatility_window: int = 21) -> pd.Series:
        """
        Detect market regimes:
        - Bull: positive trend, low vol
        - Bear: negative trend, high vol
        - High Vol: high volatility regardless of trend
        """
        # Trend
        trend = returns.rolling(63).mean()
        
        # Volatility
        vol = returns.rolling(volatility_window).std() * np.sqrt(252)
        vol_median = vol.median()
        
        regimes = pd.Series(index=returns.index, dtype='object')
        
        for i, date in enumerate(returns.index):
            if pd.isna(trend.loc[date]) or pd.isna(vol.loc[date]):
                regimes.loc[date] = 'unknown'
                continue
            
            t = trend.loc[date]
            v = vol.loc[date]
            
            if v > vol_median * 1.5:
                regimes.loc[date] = 'high_vol'
            elif t > 0.001:
                regimes.loc[date] = 'bull'
            elif t < -0.001:
                regimes.loc[date] = 'bear'
            else:
                regimes.loc[date] = 'neutral'
        
        self.regimes = regimes
        return regimes
    
    def get_regime_stats(self, returns: pd.Series) -> pd.DataFrame:
        """Get performance statistics by regime"""
        if len(self.regimes) == 0:
            self.detect_regimes(returns)
        
        stats = []
        for regime in self.regimes.unique():
            mask = self.regimes == regime
            regime_returns = returns[mask]
            
            if len(regime_returns) > 0:
                stats.append({
                    'regime': regime,
                    'n_days': len(regime_returns),
                    'mean_return': regime_returns.mean() * 252,
                    'volatility': regime_returns.std() * np.sqrt(252),
                    'sharpe': (regime_returns.mean() / regime_returns.std()) * np.sqrt(252) if regime_returns.std() > 0 else 0,
                    'max_drawdown': (regime_returns.cumsum() - regime_returns.cumsum().cummax()).min()
                })
        
        return pd.DataFrame(stats)