alphaforge-quant-system / risk_engine.py
Premchan369's picture
Upload risk_engine.py
e7e2207 verified
raw
history blame
8.34 kB
"""Risk Engine - VaR, CVaR, tail risk, and drawdown control."""
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class RiskEngine:
"""Comprehensive risk analytics engine."""
def __init__(self, confidence_levels: List[float] = None):
self.confidence_levels = confidence_levels or [0.95, 0.99]
self.var_history = []
self.cvar_history = []
self.tail_risk_history = []
def compute_var(self, returns: np.ndarray,
method: str = 'historical',
confidence: float = 0.95) -> float:
"""
Compute Value at Risk.
Methods:
- historical: empirical quantile
- parametric: assume normal distribution
- cornish_fisher: adjust for skewness and kurtosis
"""
if method == 'historical':
return -np.percentile(returns, (1 - confidence) * 100)
elif method == 'parametric':
z = stats.norm.ppf(1 - confidence)
return -(returns.mean() + z * returns.std())
elif method == 'cornish_fisher':
z = stats.norm.ppf(1 - confidence)
s = stats.skew(returns)
k = stats.kurtosis(returns)
# Cornish-Fisher expansion
z_cf = (z +
(z**2 - 1) * s / 6 +
(z**3 - 3*z) * (k - 3) / 24 -
(2*z**3 - 5*z) * s**2 / 36)
return -(returns.mean() + z_cf * returns.std())
else:
raise ValueError(f"Unknown VaR method: {method}")
def compute_cvar(self, returns: np.ndarray,
confidence: float = 0.95) -> float:
"""
Compute Conditional Value at Risk (Expected Shortfall).
Average loss beyond VaR threshold.
"""
var = self.compute_var(returns, 'historical', confidence)
tail_losses = returns[returns <= -var]
if len(tail_losses) == 0:
return var
return -tail_losses.mean()
def compute_tail_risk(self, returns: np.ndarray) -> Dict:
"""
Compute comprehensive tail risk metrics.
"""
# Skewness and kurtosis
skewness = stats.skew(returns)
kurt = stats.kurtosis(returns)
# Tail ratio: ratio of 95th percentile to 5th percentile
tail_ratio = np.percentile(returns, 95) / abs(np.percentile(returns, 5))
# Maximum consecutive losses
losses = returns < 0
consecutive = []
current = 0
for is_loss in losses:
if is_loss:
current += 1
else:
if current > 0:
consecutive.append(current)
current = 0
if current > 0:
consecutive.append(current)
max_consecutive_losses = max(consecutive) if consecutive else 0
# Pain ratio: annualized return / max drawdown
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
max_dd = np.min(drawdown)
annual_return = np.mean(returns) * 252
pain_ratio = annual_return / abs(max_dd) if max_dd != 0 else 0
# Ulcer index
ulcer = np.sqrt(np.mean(drawdown**2))
return {
'skewness': skewness,
'kurtosis': kurt,
'tail_ratio': tail_ratio,
'max_consecutive_losses': max_consecutive_losses,
'max_drawdown': max_dd,
'pain_ratio': pain_ratio,
'ulcer_index': ulcer,
'downside_deviation': np.std(returns[returns < 0]) * np.sqrt(252)
}
def compute_all_var(self, returns: np.ndarray) -> Dict:
"""Compute VaR and CVaR at all confidence levels."""
results = {}
for conf in self.confidence_levels:
var_hist = self.compute_var(returns, 'historical', conf)
var_param = self.compute_var(returns, 'parametric', conf)
var_cf = self.compute_var(returns, 'cornish_fisher', conf)
cvar = self.compute_cvar(returns, conf)
results[f'var_{int(conf*100)}_historical'] = var_hist
results[f'var_{int(conf*100)}_parametric'] = var_param
results[f'var_{int(conf*100)}_cornish_fisher'] = var_cf
results[f'cvar_{int(conf*100)}'] = cvar
return results
def rolling_risk(self, returns: pd.Series,
window: int = 63) -> pd.DataFrame:
"""Compute rolling risk metrics."""
rolling_var = returns.rolling(window).quantile(0.05)
rolling_cvar = returns.rolling(window).apply(
lambda x: -x[x <= x.quantile(0.05)].mean() if len(x[x <= x.quantile(0.05)]) > 0 else -x.quantile(0.05)
)
rolling_vol = returns.rolling(window).std() * np.sqrt(252)
return pd.DataFrame({
'rolling_var_95': -rolling_var,
'rolling_cvar_95': rolling_cvar,
'rolling_volatility': rolling_vol
})
def portfolio_var(self, weights: np.ndarray,
returns_df: pd.DataFrame,
method: str = 'parametric',
confidence: float = 0.95) -> float:
"""
Compute portfolio-level VaR using covariance matrix.
"""
cov_matrix = returns_df.cov() * 252
port_vol = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
if method == 'parametric':
z = stats.norm.ppf(1 - confidence)
port_mean = np.dot(weights, returns_df.mean() * 252)
return -(port_mean + z * port_vol)
elif method == 'historical':
port_returns = returns_df.dot(weights)
return -np.percentile(port_returns, (1 - confidence) * 100)
else:
raise ValueError(f"Unknown method: {method}")
class DrawdownControl:
"""Dynamic position sizing based on drawdown state."""
def __init__(self,
max_drawdown_threshold: float = -0.10,
risk_scaling: bool = True,
scaling_factor: float = 2.0):
self.max_drawdown_threshold = max_drawdown_threshold
self.risk_scaling = risk_scaling
self.scaling_factor = scaling_factor
self.drawdown_history = []
self.scale_history = []
def compute_scale_factor(self,
current_equity: float,
peak_equity: float) -> float:
"""
Compute position scaling factor based on drawdown.
As drawdown increases, reduce exposure exponentially.
"""
drawdown = (current_equity - peak_equity) / peak_equity
self.drawdown_history.append(drawdown)
if drawdown >= 0:
# No drawdown, full exposure
scale = 1.0
elif drawdown > self.max_drawdown_threshold:
# Mild drawdown, linear scaling
scale = 1.0 + (drawdown / abs(self.max_drawdown_threshold)) * 0.5
else:
# Severe drawdown, exponential scaling
excess_dd = abs(drawdown) - abs(self.max_drawdown_threshold)
scale = 0.5 * np.exp(-self.scaling_factor * excess_dd)
scale = max(scale, 0.1) # Minimum 10% exposure
self.scale_history.append(scale)
return scale
def apply_to_weights(self, weights: np.ndarray, scale: float) -> np.ndarray:
"""Apply scaling factor to portfolio weights."""
return weights * scale
def get_drawdown_stats(self) -> Dict:
"""Get drawdown statistics."""
if not self.drawdown_history:
return {}
dd = np.array(self.drawdown_history)
return {
'max_drawdown': np.min(dd),
'avg_drawdown': np.mean(dd[dd < 0]),
'current_drawdown': dd[-1],
'avg_scale': np.mean(self.scale_history),
'current_scale': self.scale_history[-1] if self.scale_history else 1.0
}