| """Risk Engine - VaR, CVaR, tail risk, and drawdown control.""" |
| import numpy as np |
| import pandas as pd |
| from scipy import stats |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class RiskEngine: |
| """Comprehensive risk analytics engine.""" |
| |
| def __init__(self, confidence_levels: List[float] = None): |
| self.confidence_levels = confidence_levels or [0.95, 0.99] |
| self.var_history = [] |
| self.cvar_history = [] |
| self.tail_risk_history = [] |
| |
| def compute_var(self, returns: np.ndarray, |
| method: str = 'historical', |
| confidence: float = 0.95) -> float: |
| """ |
| Compute Value at Risk. |
| |
| Methods: |
| - historical: empirical quantile |
| - parametric: assume normal distribution |
| - cornish_fisher: adjust for skewness and kurtosis |
| """ |
| if method == 'historical': |
| return -np.percentile(returns, (1 - confidence) * 100) |
| |
| elif method == 'parametric': |
| z = stats.norm.ppf(1 - confidence) |
| return -(returns.mean() + z * returns.std()) |
| |
| elif method == 'cornish_fisher': |
| z = stats.norm.ppf(1 - confidence) |
| s = stats.skew(returns) |
| k = stats.kurtosis(returns) |
| |
| |
| z_cf = (z + |
| (z**2 - 1) * s / 6 + |
| (z**3 - 3*z) * (k - 3) / 24 - |
| (2*z**3 - 5*z) * s**2 / 36) |
| |
| return -(returns.mean() + z_cf * returns.std()) |
| |
| else: |
| raise ValueError(f"Unknown VaR method: {method}") |
| |
| def compute_cvar(self, returns: np.ndarray, |
| confidence: float = 0.95) -> float: |
| """ |
| Compute Conditional Value at Risk (Expected Shortfall). |
| Average loss beyond VaR threshold. |
| """ |
| var = self.compute_var(returns, 'historical', confidence) |
| tail_losses = returns[returns <= -var] |
| |
| if len(tail_losses) == 0: |
| return var |
| |
| return -tail_losses.mean() |
| |
| def compute_tail_risk(self, returns: np.ndarray) -> Dict: |
| """ |
| Compute comprehensive tail risk metrics. |
| """ |
| |
| skewness = stats.skew(returns) |
| kurt = stats.kurtosis(returns) |
| |
| |
| tail_ratio = np.percentile(returns, 95) / abs(np.percentile(returns, 5)) |
| |
| |
| losses = returns < 0 |
| consecutive = [] |
| current = 0 |
| for is_loss in losses: |
| if is_loss: |
| current += 1 |
| else: |
| if current > 0: |
| consecutive.append(current) |
| current = 0 |
| if current > 0: |
| consecutive.append(current) |
| |
| max_consecutive_losses = max(consecutive) if consecutive else 0 |
| |
| |
| cumulative = np.cumprod(1 + returns) |
| running_max = np.maximum.accumulate(cumulative) |
| drawdown = (cumulative - running_max) / running_max |
| max_dd = np.min(drawdown) |
| |
| annual_return = np.mean(returns) * 252 |
| pain_ratio = annual_return / abs(max_dd) if max_dd != 0 else 0 |
| |
| |
| ulcer = np.sqrt(np.mean(drawdown**2)) |
| |
| return { |
| 'skewness': skewness, |
| 'kurtosis': kurt, |
| 'tail_ratio': tail_ratio, |
| 'max_consecutive_losses': max_consecutive_losses, |
| 'max_drawdown': max_dd, |
| 'pain_ratio': pain_ratio, |
| 'ulcer_index': ulcer, |
| 'downside_deviation': np.std(returns[returns < 0]) * np.sqrt(252) |
| } |
| |
| def compute_all_var(self, returns: np.ndarray) -> Dict: |
| """Compute VaR and CVaR at all confidence levels.""" |
| results = {} |
| for conf in self.confidence_levels: |
| var_hist = self.compute_var(returns, 'historical', conf) |
| var_param = self.compute_var(returns, 'parametric', conf) |
| var_cf = self.compute_var(returns, 'cornish_fisher', conf) |
| cvar = self.compute_cvar(returns, conf) |
| |
| results[f'var_{int(conf*100)}_historical'] = var_hist |
| results[f'var_{int(conf*100)}_parametric'] = var_param |
| results[f'var_{int(conf*100)}_cornish_fisher'] = var_cf |
| results[f'cvar_{int(conf*100)}'] = cvar |
| |
| return results |
| |
| def rolling_risk(self, returns: pd.Series, |
| window: int = 63) -> pd.DataFrame: |
| """Compute rolling risk metrics.""" |
| rolling_var = returns.rolling(window).quantile(0.05) |
| rolling_cvar = returns.rolling(window).apply( |
| lambda x: -x[x <= x.quantile(0.05)].mean() if len(x[x <= x.quantile(0.05)]) > 0 else -x.quantile(0.05) |
| ) |
| rolling_vol = returns.rolling(window).std() * np.sqrt(252) |
| |
| return pd.DataFrame({ |
| 'rolling_var_95': -rolling_var, |
| 'rolling_cvar_95': rolling_cvar, |
| 'rolling_volatility': rolling_vol |
| }) |
| |
| def portfolio_var(self, weights: np.ndarray, |
| returns_df: pd.DataFrame, |
| method: str = 'parametric', |
| confidence: float = 0.95) -> float: |
| """ |
| Compute portfolio-level VaR using covariance matrix. |
| """ |
| cov_matrix = returns_df.cov() * 252 |
| port_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) |
| |
| if method == 'parametric': |
| z = stats.norm.ppf(1 - confidence) |
| port_mean = np.dot(weights, returns_df.mean() * 252) |
| return -(port_mean + z * port_vol) |
| |
| elif method == 'historical': |
| port_returns = returns_df.dot(weights) |
| return -np.percentile(port_returns, (1 - confidence) * 100) |
| |
| else: |
| raise ValueError(f"Unknown method: {method}") |
|
|
|
|
| class DrawdownControl: |
| """Dynamic position sizing based on drawdown state.""" |
| |
| def __init__(self, |
| max_drawdown_threshold: float = -0.10, |
| risk_scaling: bool = True, |
| scaling_factor: float = 2.0): |
| self.max_drawdown_threshold = max_drawdown_threshold |
| self.risk_scaling = risk_scaling |
| self.scaling_factor = scaling_factor |
| self.drawdown_history = [] |
| self.scale_history = [] |
| |
| def compute_scale_factor(self, |
| current_equity: float, |
| peak_equity: float) -> float: |
| """ |
| Compute position scaling factor based on drawdown. |
| |
| As drawdown increases, reduce exposure exponentially. |
| """ |
| drawdown = (current_equity - peak_equity) / peak_equity |
| self.drawdown_history.append(drawdown) |
| |
| if drawdown >= 0: |
| |
| scale = 1.0 |
| elif drawdown > self.max_drawdown_threshold: |
| |
| scale = 1.0 + (drawdown / abs(self.max_drawdown_threshold)) * 0.5 |
| else: |
| |
| excess_dd = abs(drawdown) - abs(self.max_drawdown_threshold) |
| scale = 0.5 * np.exp(-self.scaling_factor * excess_dd) |
| |
| scale = max(scale, 0.1) |
| self.scale_history.append(scale) |
| |
| return scale |
| |
| def apply_to_weights(self, weights: np.ndarray, scale: float) -> np.ndarray: |
| """Apply scaling factor to portfolio weights.""" |
| return weights * scale |
| |
| def get_drawdown_stats(self) -> Dict: |
| """Get drawdown statistics.""" |
| if not self.drawdown_history: |
| return {} |
| |
| dd = np.array(self.drawdown_history) |
| return { |
| 'max_drawdown': np.min(dd), |
| 'avg_drawdown': np.mean(dd[dd < 0]), |
| 'current_drawdown': dd[-1], |
| 'avg_scale': np.mean(self.scale_history), |
| 'current_scale': self.scale_history[-1] if self.scale_history else 1.0 |
| } |
|
|