""" Personalization Layer ===================== Clusters traders by behavior and adapts strategies per user. Detects problematic patterns: - Overtrading (excessive frequency) - Revenge trading (increased size after losses) - Risk escalation patterns - Emotional trading signals """ import torch import torch.nn as nn import numpy as np from typing import Dict, List, Tuple from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler # Trader archetypes TRADER_TYPES = { 0: 'Conservative', # Low risk, long holding periods, small positions 1: 'Moderate', # Balanced approach, moderate position sizes 2: 'Aggressive', # High risk tolerance, larger positions, shorter holding 3: 'Scalper', # Very short holding periods, many trades 4: 'Swing Trader', # Multi-day holds, trend-following } class TraderProfiler: """ Build comprehensive trader profiles from historical behavior. Uses both rule-based heuristics and learned embeddings. """ def __init__(self, n_clusters: int = 5): self.n_clusters = n_clusters self.scaler = StandardScaler() self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) self.is_fitted = False def extract_behavior_features(self, trades: List[Dict]) -> np.ndarray: """ Extract behavioral features from trade history. Each trade dict should have: - entry_price, exit_price, size, pnl, holding_time, timestamp, direction (1=long, -1=short) Returns: feature vector for this trader """ if not trades: return np.zeros(15) pnls = [t.get('pnl', 0) for t in trades] sizes = [t.get('size', 0) for t in trades] holding_times = [t.get('holding_time', 0) for t in trades] winners = [p for p in pnls if p > 0] losers = [p for p in pnls if p <= 0] # Core metrics win_rate = len(winners) / max(len(pnls), 1) avg_win = np.mean(winners) if winners else 0 avg_loss = np.mean(losers) if losers else 0 profit_factor = abs(sum(winners)) / (abs(sum(losers)) + 1e-8) # Risk metrics avg_position_size = np.mean(sizes) if sizes else 0 max_position_size = np.max(sizes) if sizes else 0 position_size_std = np.std(sizes) if len(sizes) > 1 else 0 # Timing metrics avg_holding_time = np.mean(holding_times) if holding_times else 0 trade_frequency = len(trades) # trades per period # Behavioral patterns consecutive_losses = self._max_consecutive_losses(pnls) size_after_loss = self._avg_size_after_loss(trades) size_after_win = self._avg_size_after_win(trades) revenge_ratio = size_after_loss / (size_after_win + 1e-8) # Max drawdown from trades cumulative_pnl = np.cumsum(pnls) running_max = np.maximum.accumulate(cumulative_pnl) if len(cumulative_pnl) > 0 else np.array([0]) drawdowns = running_max - cumulative_pnl if len(cumulative_pnl) > 0 else np.array([0]) max_drawdown = np.max(drawdowns) if len(drawdowns) > 0 else 0 return np.array([ win_rate, # 0 avg_win, # 1 avg_loss, # 2 profit_factor, # 3 avg_position_size, # 4 max_position_size, # 5 position_size_std, # 6 avg_holding_time, # 7 trade_frequency, # 8 consecutive_losses, # 9 revenge_ratio, # 10 max_drawdown, # 11 size_after_loss, # 12 size_after_win, # 13 len(trades), # 14 - total trades ]) def _max_consecutive_losses(self, pnls: List[float]) -> int: """Find maximum consecutive losing trades.""" max_streak = 0 current_streak = 0 for p in pnls: if p <= 0: current_streak += 1 max_streak = max(max_streak, current_streak) else: current_streak = 0 return max_streak def _avg_size_after_loss(self, trades: List[Dict]) -> float: """Average position size after a losing trade.""" sizes_after_loss = [] for i in range(1, len(trades)): if trades[i-1].get('pnl', 0) <= 0: sizes_after_loss.append(trades[i].get('size', 0)) return np.mean(sizes_after_loss) if sizes_after_loss else 0 def _avg_size_after_win(self, trades: List[Dict]) -> float: """Average position size after a winning trade.""" sizes_after_win = [] for i in range(1, len(trades)): if trades[i-1].get('pnl', 0) > 0: sizes_after_win.append(trades[i].get('size', 0)) return np.mean(sizes_after_win) if sizes_after_win else 0 def fit(self, all_traders_features: np.ndarray): """Fit clustering model on features from multiple traders.""" self.scaler.fit(all_traders_features) scaled = self.scaler.transform(all_traders_features) self.kmeans.fit(scaled) self.is_fitted = True def predict_type(self, features: np.ndarray) -> Dict: """Predict trader type and provide analysis.""" if not self.is_fitted: # Default classification based on rules return self._rule_based_classification(features) scaled = self.scaler.transform(features.reshape(1, -1)) cluster = self.kmeans.predict(scaled)[0] return { 'cluster': int(cluster), 'type_name': TRADER_TYPES.get(cluster, 'Unknown'), 'features': { 'win_rate': float(features[0]), 'profit_factor': float(features[3]), 'avg_holding_time': float(features[7]), 'revenge_ratio': float(features[10]), 'max_drawdown': float(features[11]), } } def _rule_based_classification(self, features: np.ndarray) -> Dict: """Rule-based trader classification when clustering isn't fitted.""" win_rate = features[0] avg_holding = features[7] position_size = features[4] revenge_ratio = features[10] # Determine type if avg_holding < 5: # Minutes trader_type = 3 # Scalper elif avg_holding > 1440: # > 1 day trader_type = 4 # Swing elif position_size > 0.1: # > 10% of portfolio trader_type = 2 # Aggressive elif position_size < 0.02: # < 2% of portfolio trader_type = 0 # Conservative else: trader_type = 1 # Moderate return { 'cluster': trader_type, 'type_name': TRADER_TYPES[trader_type], 'features': { 'win_rate': float(features[0]), 'profit_factor': float(features[3]), 'avg_holding_time': float(features[7]), 'revenge_ratio': float(features[10]), 'max_drawdown': float(features[11]), } } class BehaviorAlertSystem: """ Real-time detection of problematic trading patterns. """ def __init__(self): self.thresholds = { 'overtrading_trades_per_hour': 10, 'revenge_size_multiplier': 1.5, 'max_consecutive_losses': 5, 'max_drawdown_pct': 0.15, 'tilt_detection_loss_streak': 3, } def analyze(self, recent_trades: List[Dict], portfolio_value: float, time_window_hours: float = 1.0) -> Dict: """ Analyze recent trading activity for behavioral issues. Returns alerts and recommendations. """ alerts = [] risk_multiplier = 1.0 # Default: no adjustment if not recent_trades: return {'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'} # 1. Overtrading detection trade_count = len(recent_trades) if trade_count / max(time_window_hours, 0.1) > self.thresholds['overtrading_trades_per_hour']: alerts.append({ 'type': 'OVERTRADING', 'severity': 'HIGH', 'message': f'Trading {trade_count} times in {time_window_hours}h exceeds safe threshold', 'recommendation': 'Reduce trade frequency. Consider taking a break.', }) risk_multiplier *= 0.5 # Halve recommended position sizes # 2. Revenge trading detection pnls = [t.get('pnl', 0) for t in recent_trades] sizes = [t.get('size', 0) for t in recent_trades] if len(recent_trades) >= 2: last_pnl = pnls[-2] last_size = sizes[-2] current_size = sizes[-1] if last_pnl < 0 and current_size > last_size * self.thresholds['revenge_size_multiplier']: alerts.append({ 'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'Position size increased significantly after a loss', 'recommendation': 'Avoid increasing size after losses. Maintain discipline.', }) risk_multiplier *= 0.3 # 3. Consecutive loss detection (tilt) consecutive_losses = 0 for p in reversed(pnls): if p <= 0: consecutive_losses += 1 else: break if consecutive_losses >= self.thresholds['tilt_detection_loss_streak']: alerts.append({ 'type': 'LOSS_STREAK', 'severity': 'HIGH', 'message': f'{consecutive_losses} consecutive losing trades detected', 'recommendation': 'Consider pausing trading. Review strategy before next trade.', }) risk_multiplier *= 0.5 # 4. Drawdown check total_pnl = sum(pnls) drawdown_pct = abs(min(total_pnl, 0)) / (portfolio_value + 1e-8) if drawdown_pct > self.thresholds['max_drawdown_pct']: alerts.append({ 'type': 'EXCESSIVE_DRAWDOWN', 'severity': 'CRITICAL', 'message': f'Session drawdown at {drawdown_pct*100:.1f}% exceeds {self.thresholds["max_drawdown_pct"]*100}% threshold', 'recommendation': 'Stop trading for the day. Review risk parameters.', }) risk_multiplier *= 0.1 status = 'normal' if any(a['severity'] == 'CRITICAL' for a in alerts): status = 'critical' elif alerts: status = 'warning' return { 'alerts': alerts, 'risk_multiplier': risk_multiplier, 'status': status, 'consecutive_losses': consecutive_losses, 'session_drawdown_pct': drawdown_pct, } class PersonalizationEngine: """ Combines profiling, clustering, and alert systems to provide personalized trading recommendations. """ def __init__(self): self.profiler = TraderProfiler() self.alert_system = BehaviorAlertSystem() # Strategy adaptation rules per trader type self.strategy_params = { 0: { # Conservative 'max_position_pct': 0.02, 'sl_atr_mult': 1.5, 'tp_atr_mult': 2.0, 'min_confidence': 0.7, 'max_trades_per_day': 3, }, 1: { # Moderate 'max_position_pct': 0.05, 'sl_atr_mult': 2.0, 'tp_atr_mult': 3.0, 'min_confidence': 0.6, 'max_trades_per_day': 5, }, 2: { # Aggressive 'max_position_pct': 0.10, 'sl_atr_mult': 2.5, 'tp_atr_mult': 4.0, 'min_confidence': 0.55, 'max_trades_per_day': 10, }, 3: { # Scalper 'max_position_pct': 0.03, 'sl_atr_mult': 0.5, 'tp_atr_mult': 1.0, 'min_confidence': 0.55, 'max_trades_per_day': 50, }, 4: { # Swing 'max_position_pct': 0.08, 'sl_atr_mult': 3.0, 'tp_atr_mult': 5.0, 'min_confidence': 0.65, 'max_trades_per_day': 2, }, } def get_personalized_params(self, trader_profile: Dict, behavior_alerts: Dict) -> Dict: """ Get personalized trading parameters based on trader profile and current behavior alerts. """ trader_type = trader_profile.get('cluster', 1) params = self.strategy_params.get(trader_type, self.strategy_params[1]).copy() # Apply risk multiplier from alerts risk_mult = behavior_alerts.get('risk_multiplier', 1.0) params['max_position_pct'] *= risk_mult # If revenge trading detected, increase minimum confidence if any(a['type'] == 'REVENGE_TRADING' for a in behavior_alerts.get('alerts', [])): params['min_confidence'] = min(params['min_confidence'] + 0.15, 0.9) # If overtrading, reduce max trades if any(a['type'] == 'OVERTRADING' for a in behavior_alerts.get('alerts', [])): params['max_trades_per_day'] = max(1, params['max_trades_per_day'] // 2) return params