"""Risk Engine - VaR, CVaR, tail risk, and drawdown control.""" import numpy as np import pandas as pd from scipy import stats from typing import Dict, List, Optional, Tuple import warnings warnings.filterwarnings('ignore') class RiskEngine: """Comprehensive risk analytics engine.""" def __init__(self, confidence_levels: List[float] = None): self.confidence_levels = confidence_levels or [0.95, 0.99] self.var_history = [] self.cvar_history = [] self.tail_risk_history = [] def compute_var(self, returns: np.ndarray, method: str = 'historical', confidence: float = 0.95) -> float: """ Compute Value at Risk. Methods: - historical: empirical quantile - parametric: assume normal distribution - cornish_fisher: adjust for skewness and kurtosis """ if method == 'historical': return -np.percentile(returns, (1 - confidence) * 100) elif method == 'parametric': z = stats.norm.ppf(1 - confidence) return -(returns.mean() + z * returns.std()) elif method == 'cornish_fisher': z = stats.norm.ppf(1 - confidence) s = stats.skew(returns) k = stats.kurtosis(returns) # Cornish-Fisher expansion z_cf = (z + (z**2 - 1) * s / 6 + (z**3 - 3*z) * (k - 3) / 24 - (2*z**3 - 5*z) * s**2 / 36) return -(returns.mean() + z_cf * returns.std()) else: raise ValueError(f"Unknown VaR method: {method}") def compute_cvar(self, returns: np.ndarray, confidence: float = 0.95) -> float: """ Compute Conditional Value at Risk (Expected Shortfall). Average loss beyond VaR threshold. """ var = self.compute_var(returns, 'historical', confidence) tail_losses = returns[returns <= -var] if len(tail_losses) == 0: return var return -tail_losses.mean() def compute_tail_risk(self, returns: np.ndarray) -> Dict: """ Compute comprehensive tail risk metrics. """ # Skewness and kurtosis skewness = stats.skew(returns) kurt = stats.kurtosis(returns) # Tail ratio: ratio of 95th percentile to 5th percentile tail_ratio = np.percentile(returns, 95) / abs(np.percentile(returns, 5)) # Maximum consecutive losses losses = returns < 0 consecutive = [] current = 0 for is_loss in losses: if is_loss: current += 1 else: if current > 0: consecutive.append(current) current = 0 if current > 0: consecutive.append(current) max_consecutive_losses = max(consecutive) if consecutive else 0 # Pain ratio: annualized return / max drawdown cumulative = np.cumprod(1 + returns) running_max = np.maximum.accumulate(cumulative) drawdown = (cumulative - running_max) / running_max max_dd = np.min(drawdown) annual_return = np.mean(returns) * 252 pain_ratio = annual_return / abs(max_dd) if max_dd != 0 else 0 # Ulcer index ulcer = np.sqrt(np.mean(drawdown**2)) return { 'skewness': skewness, 'kurtosis': kurt, 'tail_ratio': tail_ratio, 'max_consecutive_losses': max_consecutive_losses, 'max_drawdown': max_dd, 'pain_ratio': pain_ratio, 'ulcer_index': ulcer, 'downside_deviation': np.std(returns[returns < 0]) * np.sqrt(252) } def compute_all_var(self, returns: np.ndarray) -> Dict: """Compute VaR and CVaR at all confidence levels.""" results = {} for conf in self.confidence_levels: var_hist = self.compute_var(returns, 'historical', conf) var_param = self.compute_var(returns, 'parametric', conf) var_cf = self.compute_var(returns, 'cornish_fisher', conf) cvar = self.compute_cvar(returns, conf) results[f'var_{int(conf*100)}_historical'] = var_hist results[f'var_{int(conf*100)}_parametric'] = var_param results[f'var_{int(conf*100)}_cornish_fisher'] = var_cf results[f'cvar_{int(conf*100)}'] = cvar return results def rolling_risk(self, returns: pd.Series, window: int = 63) -> pd.DataFrame: """Compute rolling risk metrics.""" rolling_var = returns.rolling(window).quantile(0.05) rolling_cvar = returns.rolling(window).apply( lambda x: -x[x <= x.quantile(0.05)].mean() if len(x[x <= x.quantile(0.05)]) > 0 else -x.quantile(0.05) ) rolling_vol = returns.rolling(window).std() * np.sqrt(252) return pd.DataFrame({ 'rolling_var_95': -rolling_var, 'rolling_cvar_95': rolling_cvar, 'rolling_volatility': rolling_vol }) def portfolio_var(self, weights: np.ndarray, returns_df: pd.DataFrame, method: str = 'parametric', confidence: float = 0.95) -> float: """ Compute portfolio-level VaR using covariance matrix. """ cov_matrix = returns_df.cov() * 252 port_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights))) if method == 'parametric': z = stats.norm.ppf(1 - confidence) port_mean = np.dot(weights, returns_df.mean() * 252) return -(port_mean + z * port_vol) elif method == 'historical': port_returns = returns_df.dot(weights) return -np.percentile(port_returns, (1 - confidence) * 100) else: raise ValueError(f"Unknown method: {method}") class DrawdownControl: """Dynamic position sizing based on drawdown state.""" def __init__(self, max_drawdown_threshold: float = -0.10, risk_scaling: bool = True, scaling_factor: float = 2.0): self.max_drawdown_threshold = max_drawdown_threshold self.risk_scaling = risk_scaling self.scaling_factor = scaling_factor self.drawdown_history = [] self.scale_history = [] def compute_scale_factor(self, current_equity: float, peak_equity: float) -> float: """ Compute position scaling factor based on drawdown. As drawdown increases, reduce exposure exponentially. """ drawdown = (current_equity - peak_equity) / peak_equity self.drawdown_history.append(drawdown) if drawdown >= 0: # No drawdown, full exposure scale = 1.0 elif drawdown > self.max_drawdown_threshold: # Mild drawdown, linear scaling scale = 1.0 + (drawdown / abs(self.max_drawdown_threshold)) * 0.5 else: # Severe drawdown, exponential scaling excess_dd = abs(drawdown) - abs(self.max_drawdown_threshold) scale = 0.5 * np.exp(-self.scaling_factor * excess_dd) scale = max(scale, 0.1) # Minimum 10% exposure self.scale_history.append(scale) return scale def apply_to_weights(self, weights: np.ndarray, scale: float) -> np.ndarray: """Apply scaling factor to portfolio weights.""" return weights * scale def get_drawdown_stats(self) -> Dict: """Get drawdown statistics.""" if not self.drawdown_history: return {} dd = np.array(self.drawdown_history) return { 'max_drawdown': np.min(dd), 'avg_drawdown': np.mean(dd[dd < 0]), 'current_drawdown': dd[-1], 'avg_scale': np.mean(self.scale_history), 'current_scale': self.scale_history[-1] if self.scale_history else 1.0 }