avinashhm's picture
Add trading_intelligence/risk_model.py
e521ee3 verified
"""
Risk Model Module
==================
Portfolio-aware risk modeling engine.
Takes user portfolio as input, learns trading behavior patterns,
and outputs risk scores, position sizing, stop-loss/take-profit levels.
Inspired by:
- Deep RL for Portfolio Optimization (2412.18563): Sharpe-ratio reward
- Distributional Forecasting (2508.18921): VaR estimation with DNNs
- Modern Portfolio Theory + DL (2508.14999): Covariance estimation
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from typing import Dict, List, Optional, Tuple
class PortfolioEncoder(nn.Module):
"""
Encode portfolio state into a fixed-dimensional representation.
Portfolio state includes:
- Current positions (asset, size, entry price, unrealized PnL)
- Historical trades (win/loss ratio, avg holding period)
- Account metrics (equity, margin, drawdown)
"""
def __init__(self, position_dim: int = 8, max_positions: int = 20, d_model: int = 64):
super().__init__()
self.max_positions = max_positions
# Position embedding
self.position_encoder = nn.Sequential(
nn.Linear(position_dim, d_model),
nn.GELU(),
nn.Linear(d_model, d_model),
)
# Set-based aggregation (permutation invariant via attention)
self.position_attention = nn.MultiheadAttention(d_model, num_heads=4, batch_first=True)
self.norm = nn.LayerNorm(d_model)
# Account-level features
self.account_encoder = nn.Sequential(
nn.Linear(6, d_model), # equity, margin, drawdown, num_positions, total_exposure, cash_ratio
nn.GELU(),
)
# Combine
self.combine = nn.Sequential(
nn.Linear(d_model * 2, d_model),
nn.GELU(),
)
def forward(self, positions: torch.Tensor, account_features: torch.Tensor,
position_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
"""
Args:
positions: (B, max_positions, position_dim) - padded position features
account_features: (B, 6) - account-level metrics
position_mask: (B, max_positions) - True for valid positions
Returns:
portfolio_repr: (B, d_model)
"""
# Encode individual positions
pos_encoded = self.position_encoder(positions) # (B, P, d_model)
# Self-attention across positions (order-invariant aggregation)
key_padding_mask = ~position_mask if position_mask is not None else None
pos_attn, _ = self.position_attention(
pos_encoded, pos_encoded, pos_encoded,
key_padding_mask=key_padding_mask
)
pos_attn = self.norm(pos_attn + pos_encoded)
# Pool across positions
if position_mask is not None:
mask_expanded = position_mask.unsqueeze(-1).float()
pos_pooled = (pos_attn * mask_expanded).sum(dim=1) / (mask_expanded.sum(dim=1) + 1e-8)
else:
pos_pooled = pos_attn.mean(dim=1)
# Encode account features
account_encoded = self.account_encoder(account_features)
# Combine
combined = torch.cat([pos_pooled, account_encoded], dim=-1)
return self.combine(combined)
class TraderBehaviorAnalyzer(nn.Module):
"""
Learn trader behavior patterns from historical trade sequences.
Patterns detected:
- Risk appetite (average position size relative to portfolio)
- Drawdown tolerance (max drawdown before behavior change)
- Win/loss ratio patterns
- Position sizing habits
- Overtrading tendency
- Revenge trading patterns (increased size after losses)
"""
def __init__(self, trade_dim: int = 12, d_model: int = 64, n_layers: int = 2):
super().__init__()
# Trade sequence encoder (LSTM for sequential behavior patterns)
self.trade_encoder = nn.LSTM(
input_size=trade_dim,
hidden_size=d_model,
num_layers=n_layers,
batch_first=True,
dropout=0.1
)
# Behavior pattern heads
self.risk_appetite_head = nn.Sequential(
nn.Linear(d_model, 32),
nn.GELU(),
nn.Linear(32, 1),
nn.Sigmoid() # 0-1 scale
)
self.drawdown_tolerance_head = nn.Sequential(
nn.Linear(d_model, 32),
nn.GELU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
self.overtrading_head = nn.Sequential(
nn.Linear(d_model, 32),
nn.GELU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
self.revenge_trading_head = nn.Sequential(
nn.Linear(d_model, 32),
nn.GELU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
# Trader type classifier (5 types)
self.trader_type_head = nn.Sequential(
nn.Linear(d_model, 32),
nn.GELU(),
nn.Linear(32, 5), # conservative, moderate, aggressive, scalper, swing
)
def forward(self, trade_history: torch.Tensor) -> Dict[str, torch.Tensor]:
"""
Args:
trade_history: (B, num_trades, trade_dim)
trade_dim features: [entry_price, exit_price, size, pnl, holding_time,
is_winner, direction, max_drawdown, entry_hour,
day_of_week, time_since_last_trade, consecutive_losses]
Returns:
behavior_profile: Dict of behavioral metrics
"""
_, (hidden, _) = self.trade_encoder(trade_history)
h = hidden[-1] # Last layer hidden state: (B, d_model)
return {
'risk_appetite': self.risk_appetite_head(h).squeeze(-1),
'drawdown_tolerance': self.drawdown_tolerance_head(h).squeeze(-1),
'overtrading_prob': self.overtrading_head(h).squeeze(-1),
'revenge_trading_prob': self.revenge_trading_head(h).squeeze(-1),
'trader_type_logits': self.trader_type_head(h),
'behavior_embedding': h, # For downstream use
}
class RiskModel(nn.Module):
"""
Complete risk modeling engine.
Combines:
1. Market state (from prediction model)
2. Portfolio state (positions, account)
3. Trader behavior profile
Outputs:
- Risk score (0-1)
- Recommended position size (fraction of portfolio)
- Stop-loss / take-profit levels
- Probability of portfolio drawdown exceeding threshold
"""
def __init__(
self,
market_dim: int = 128, # Dimension of market state from prediction model
portfolio_dim: int = 64, # Portfolio encoder output dim
behavior_dim: int = 64, # Behavior analyzer output dim
d_model: int = 128,
num_horizons: int = 3,
):
super().__init__()
self.portfolio_encoder = PortfolioEncoder(d_model=portfolio_dim)
self.behavior_analyzer = TraderBehaviorAnalyzer(d_model=behavior_dim)
# Fusion network
total_dim = market_dim + portfolio_dim + behavior_dim
self.fusion = nn.Sequential(
nn.Linear(total_dim, d_model),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(d_model, d_model),
nn.GELU(),
)
# Risk score head
self.risk_score_head = nn.Sequential(
nn.Linear(d_model, 64),
nn.GELU(),
nn.Linear(64, 1),
nn.Sigmoid() # 0-1
)
# Position size head (Kelly-criterion inspired)
self.position_size_head = nn.Sequential(
nn.Linear(d_model, 64),
nn.GELU(),
nn.Linear(64, 1),
nn.Sigmoid() # 0-1 (fraction of portfolio)
)
# Stop-loss / Take-profit head (outputs as ATR multiples)
self.sl_tp_head = nn.Sequential(
nn.Linear(d_model, 64),
nn.GELU(),
nn.Linear(64, 2), # [stop_loss_atr_mult, take_profit_atr_mult]
nn.Softplus() # Positive values
)
# Drawdown probability head (predicts P(drawdown > threshold) for multiple thresholds)
self.drawdown_head = nn.Sequential(
nn.Linear(d_model, 64),
nn.GELU(),
nn.Linear(64, 4), # P(dd > 5%), P(dd > 10%), P(dd > 15%), P(dd > 20%)
nn.Sigmoid()
)
# Value at Risk head
self.var_head = nn.Sequential(
nn.Linear(d_model, 64),
nn.GELU(),
nn.Linear(64, 3), # VaR at 95%, 99%, 99.5%
)
def forward(
self,
market_state: torch.Tensor,
positions: torch.Tensor,
account_features: torch.Tensor,
trade_history: torch.Tensor,
position_mask: Optional[torch.Tensor] = None,
) -> Dict[str, torch.Tensor]:
"""
Full risk assessment.
Args:
market_state: (B, market_dim) from prediction model
positions: (B, max_positions, position_dim)
account_features: (B, 6)
trade_history: (B, num_trades, trade_dim)
position_mask: (B, max_positions)
Returns:
Dict with all risk outputs
"""
# Encode portfolio
portfolio_repr = self.portfolio_encoder(positions, account_features, position_mask)
# Analyze behavior
behavior = self.behavior_analyzer(trade_history)
behavior_repr = behavior['behavior_embedding']
# Fuse all signals
fused = self.fusion(torch.cat([market_state, portfolio_repr, behavior_repr], dim=-1))
# Compute outputs
risk_score = self.risk_score_head(fused).squeeze(-1)
position_size = self.position_size_head(fused).squeeze(-1)
sl_tp = self.sl_tp_head(fused)
drawdown_probs = self.drawdown_head(fused)
var_estimates = self.var_head(fused)
# Adjust position size based on risk score (lower risk tolerance → smaller positions)
adjusted_position_size = position_size * (1 - 0.5 * risk_score)
return {
'risk_score': risk_score,
'raw_position_size': position_size,
'adjusted_position_size': adjusted_position_size,
'stop_loss_atr_mult': sl_tp[:, 0],
'take_profit_atr_mult': sl_tp[:, 1],
'drawdown_probs': drawdown_probs,
'var_estimates': var_estimates,
'behavior_profile': behavior,
}
class RiskLoss(nn.Module):
"""Loss function for risk model training."""
def __init__(self):
super().__init__()
def forward(self, predictions: Dict, targets: Dict) -> Dict[str, torch.Tensor]:
"""
Targets should include:
- actual_risk: realized risk score from hindsight
- actual_drawdown: realized drawdown
- optimal_position_size: computed from Kelly criterion or similar
"""
losses = {}
if 'actual_risk' in targets:
losses['risk_loss'] = F.mse_loss(predictions['risk_score'], targets['actual_risk'])
if 'optimal_position_size' in targets:
losses['position_loss'] = F.mse_loss(
predictions['adjusted_position_size'], targets['optimal_position_size']
)
if 'drawdown_occurred' in targets:
losses['drawdown_loss'] = F.binary_cross_entropy(
predictions['drawdown_probs'], targets['drawdown_occurred']
)
total = sum(losses.values())
losses['total_loss'] = total
return losses