| """ |
| Risk Model Module |
| ================== |
| Portfolio-aware risk modeling engine. |
| |
| Takes user portfolio as input, learns trading behavior patterns, |
| and outputs risk scores, position sizing, stop-loss/take-profit levels. |
| |
| Inspired by: |
| - Deep RL for Portfolio Optimization (2412.18563): Sharpe-ratio reward |
| - Distributional Forecasting (2508.18921): VaR estimation with DNNs |
| - Modern Portfolio Theory + DL (2508.14999): Covariance estimation |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
|
|
|
|
| class PortfolioEncoder(nn.Module): |
| """ |
| Encode portfolio state into a fixed-dimensional representation. |
| |
| Portfolio state includes: |
| - Current positions (asset, size, entry price, unrealized PnL) |
| - Historical trades (win/loss ratio, avg holding period) |
| - Account metrics (equity, margin, drawdown) |
| """ |
| |
| def __init__(self, position_dim: int = 8, max_positions: int = 20, d_model: int = 64): |
| super().__init__() |
| self.max_positions = max_positions |
| |
| |
| self.position_encoder = nn.Sequential( |
| nn.Linear(position_dim, d_model), |
| nn.GELU(), |
| nn.Linear(d_model, d_model), |
| ) |
| |
| |
| self.position_attention = nn.MultiheadAttention(d_model, num_heads=4, batch_first=True) |
| self.norm = nn.LayerNorm(d_model) |
| |
| |
| self.account_encoder = nn.Sequential( |
| nn.Linear(6, d_model), |
| nn.GELU(), |
| ) |
| |
| |
| self.combine = nn.Sequential( |
| nn.Linear(d_model * 2, d_model), |
| nn.GELU(), |
| ) |
| |
| def forward(self, positions: torch.Tensor, account_features: torch.Tensor, |
| position_mask: Optional[torch.Tensor] = None) -> torch.Tensor: |
| """ |
| Args: |
| positions: (B, max_positions, position_dim) - padded position features |
| account_features: (B, 6) - account-level metrics |
| position_mask: (B, max_positions) - True for valid positions |
| |
| Returns: |
| portfolio_repr: (B, d_model) |
| """ |
| |
| pos_encoded = self.position_encoder(positions) |
| |
| |
| key_padding_mask = ~position_mask if position_mask is not None else None |
| pos_attn, _ = self.position_attention( |
| pos_encoded, pos_encoded, pos_encoded, |
| key_padding_mask=key_padding_mask |
| ) |
| pos_attn = self.norm(pos_attn + pos_encoded) |
| |
| |
| if position_mask is not None: |
| mask_expanded = position_mask.unsqueeze(-1).float() |
| pos_pooled = (pos_attn * mask_expanded).sum(dim=1) / (mask_expanded.sum(dim=1) + 1e-8) |
| else: |
| pos_pooled = pos_attn.mean(dim=1) |
| |
| |
| account_encoded = self.account_encoder(account_features) |
| |
| |
| combined = torch.cat([pos_pooled, account_encoded], dim=-1) |
| return self.combine(combined) |
|
|
|
|
| class TraderBehaviorAnalyzer(nn.Module): |
| """ |
| Learn trader behavior patterns from historical trade sequences. |
| |
| Patterns detected: |
| - Risk appetite (average position size relative to portfolio) |
| - Drawdown tolerance (max drawdown before behavior change) |
| - Win/loss ratio patterns |
| - Position sizing habits |
| - Overtrading tendency |
| - Revenge trading patterns (increased size after losses) |
| """ |
| |
| def __init__(self, trade_dim: int = 12, d_model: int = 64, n_layers: int = 2): |
| super().__init__() |
| |
| |
| self.trade_encoder = nn.LSTM( |
| input_size=trade_dim, |
| hidden_size=d_model, |
| num_layers=n_layers, |
| batch_first=True, |
| dropout=0.1 |
| ) |
| |
| |
| self.risk_appetite_head = nn.Sequential( |
| nn.Linear(d_model, 32), |
| nn.GELU(), |
| nn.Linear(32, 1), |
| nn.Sigmoid() |
| ) |
| |
| self.drawdown_tolerance_head = nn.Sequential( |
| nn.Linear(d_model, 32), |
| nn.GELU(), |
| nn.Linear(32, 1), |
| nn.Sigmoid() |
| ) |
| |
| self.overtrading_head = nn.Sequential( |
| nn.Linear(d_model, 32), |
| nn.GELU(), |
| nn.Linear(32, 1), |
| nn.Sigmoid() |
| ) |
| |
| self.revenge_trading_head = nn.Sequential( |
| nn.Linear(d_model, 32), |
| nn.GELU(), |
| nn.Linear(32, 1), |
| nn.Sigmoid() |
| ) |
| |
| |
| self.trader_type_head = nn.Sequential( |
| nn.Linear(d_model, 32), |
| nn.GELU(), |
| nn.Linear(32, 5), |
| ) |
| |
| def forward(self, trade_history: torch.Tensor) -> Dict[str, torch.Tensor]: |
| """ |
| Args: |
| trade_history: (B, num_trades, trade_dim) |
| trade_dim features: [entry_price, exit_price, size, pnl, holding_time, |
| is_winner, direction, max_drawdown, entry_hour, |
| day_of_week, time_since_last_trade, consecutive_losses] |
| |
| Returns: |
| behavior_profile: Dict of behavioral metrics |
| """ |
| _, (hidden, _) = self.trade_encoder(trade_history) |
| h = hidden[-1] |
| |
| return { |
| 'risk_appetite': self.risk_appetite_head(h).squeeze(-1), |
| 'drawdown_tolerance': self.drawdown_tolerance_head(h).squeeze(-1), |
| 'overtrading_prob': self.overtrading_head(h).squeeze(-1), |
| 'revenge_trading_prob': self.revenge_trading_head(h).squeeze(-1), |
| 'trader_type_logits': self.trader_type_head(h), |
| 'behavior_embedding': h, |
| } |
|
|
|
|
| class RiskModel(nn.Module): |
| """ |
| Complete risk modeling engine. |
| |
| Combines: |
| 1. Market state (from prediction model) |
| 2. Portfolio state (positions, account) |
| 3. Trader behavior profile |
| |
| Outputs: |
| - Risk score (0-1) |
| - Recommended position size (fraction of portfolio) |
| - Stop-loss / take-profit levels |
| - Probability of portfolio drawdown exceeding threshold |
| """ |
| |
| def __init__( |
| self, |
| market_dim: int = 128, |
| portfolio_dim: int = 64, |
| behavior_dim: int = 64, |
| d_model: int = 128, |
| num_horizons: int = 3, |
| ): |
| super().__init__() |
| |
| self.portfolio_encoder = PortfolioEncoder(d_model=portfolio_dim) |
| self.behavior_analyzer = TraderBehaviorAnalyzer(d_model=behavior_dim) |
| |
| |
| total_dim = market_dim + portfolio_dim + behavior_dim |
| self.fusion = nn.Sequential( |
| nn.Linear(total_dim, d_model), |
| nn.GELU(), |
| nn.Dropout(0.1), |
| nn.Linear(d_model, d_model), |
| nn.GELU(), |
| ) |
| |
| |
| self.risk_score_head = nn.Sequential( |
| nn.Linear(d_model, 64), |
| nn.GELU(), |
| nn.Linear(64, 1), |
| nn.Sigmoid() |
| ) |
| |
| |
| self.position_size_head = nn.Sequential( |
| nn.Linear(d_model, 64), |
| nn.GELU(), |
| nn.Linear(64, 1), |
| nn.Sigmoid() |
| ) |
| |
| |
| self.sl_tp_head = nn.Sequential( |
| nn.Linear(d_model, 64), |
| nn.GELU(), |
| nn.Linear(64, 2), |
| nn.Softplus() |
| ) |
| |
| |
| self.drawdown_head = nn.Sequential( |
| nn.Linear(d_model, 64), |
| nn.GELU(), |
| nn.Linear(64, 4), |
| nn.Sigmoid() |
| ) |
| |
| |
| self.var_head = nn.Sequential( |
| nn.Linear(d_model, 64), |
| nn.GELU(), |
| nn.Linear(64, 3), |
| ) |
| |
| def forward( |
| self, |
| market_state: torch.Tensor, |
| positions: torch.Tensor, |
| account_features: torch.Tensor, |
| trade_history: torch.Tensor, |
| position_mask: Optional[torch.Tensor] = None, |
| ) -> Dict[str, torch.Tensor]: |
| """ |
| Full risk assessment. |
| |
| Args: |
| market_state: (B, market_dim) from prediction model |
| positions: (B, max_positions, position_dim) |
| account_features: (B, 6) |
| trade_history: (B, num_trades, trade_dim) |
| position_mask: (B, max_positions) |
| |
| Returns: |
| Dict with all risk outputs |
| """ |
| |
| portfolio_repr = self.portfolio_encoder(positions, account_features, position_mask) |
| |
| |
| behavior = self.behavior_analyzer(trade_history) |
| behavior_repr = behavior['behavior_embedding'] |
| |
| |
| fused = self.fusion(torch.cat([market_state, portfolio_repr, behavior_repr], dim=-1)) |
| |
| |
| risk_score = self.risk_score_head(fused).squeeze(-1) |
| position_size = self.position_size_head(fused).squeeze(-1) |
| sl_tp = self.sl_tp_head(fused) |
| drawdown_probs = self.drawdown_head(fused) |
| var_estimates = self.var_head(fused) |
| |
| |
| adjusted_position_size = position_size * (1 - 0.5 * risk_score) |
| |
| return { |
| 'risk_score': risk_score, |
| 'raw_position_size': position_size, |
| 'adjusted_position_size': adjusted_position_size, |
| 'stop_loss_atr_mult': sl_tp[:, 0], |
| 'take_profit_atr_mult': sl_tp[:, 1], |
| 'drawdown_probs': drawdown_probs, |
| 'var_estimates': var_estimates, |
| 'behavior_profile': behavior, |
| } |
|
|
|
|
| class RiskLoss(nn.Module): |
| """Loss function for risk model training.""" |
| |
| def __init__(self): |
| super().__init__() |
| |
| def forward(self, predictions: Dict, targets: Dict) -> Dict[str, torch.Tensor]: |
| """ |
| Targets should include: |
| - actual_risk: realized risk score from hindsight |
| - actual_drawdown: realized drawdown |
| - optimal_position_size: computed from Kelly criterion or similar |
| """ |
| losses = {} |
| |
| if 'actual_risk' in targets: |
| losses['risk_loss'] = F.mse_loss(predictions['risk_score'], targets['actual_risk']) |
| |
| if 'optimal_position_size' in targets: |
| losses['position_loss'] = F.mse_loss( |
| predictions['adjusted_position_size'], targets['optimal_position_size'] |
| ) |
| |
| if 'drawdown_occurred' in targets: |
| losses['drawdown_loss'] = F.binary_cross_entropy( |
| predictions['drawdown_probs'], targets['drawdown_occurred'] |
| ) |
| |
| total = sum(losses.values()) |
| losses['total_loss'] = total |
| |
| return losses |
|
|