avinashhm's picture
Add trading_intelligence/personalization.py
dc5d45a verified
"""
Personalization Layer
=====================
Clusters traders by behavior and adapts strategies per user.
Detects problematic patterns:
- Overtrading (excessive frequency)
- Revenge trading (increased size after losses)
- Risk escalation patterns
- Emotional trading signals
"""
import torch
import torch.nn as nn
import numpy as np
from typing import Dict, List, Tuple
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
# Trader archetypes
TRADER_TYPES = {
0: 'Conservative', # Low risk, long holding periods, small positions
1: 'Moderate', # Balanced approach, moderate position sizes
2: 'Aggressive', # High risk tolerance, larger positions, shorter holding
3: 'Scalper', # Very short holding periods, many trades
4: 'Swing Trader', # Multi-day holds, trend-following
}
class TraderProfiler:
"""
Build comprehensive trader profiles from historical behavior.
Uses both rule-based heuristics and learned embeddings.
"""
def __init__(self, n_clusters: int = 5):
self.n_clusters = n_clusters
self.scaler = StandardScaler()
self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
self.is_fitted = False
def extract_behavior_features(self, trades: List[Dict]) -> np.ndarray:
"""
Extract behavioral features from trade history.
Each trade dict should have:
- entry_price, exit_price, size, pnl, holding_time,
timestamp, direction (1=long, -1=short)
Returns: feature vector for this trader
"""
if not trades:
return np.zeros(15)
pnls = [t.get('pnl', 0) for t in trades]
sizes = [t.get('size', 0) for t in trades]
holding_times = [t.get('holding_time', 0) for t in trades]
winners = [p for p in pnls if p > 0]
losers = [p for p in pnls if p <= 0]
# Core metrics
win_rate = len(winners) / max(len(pnls), 1)
avg_win = np.mean(winners) if winners else 0
avg_loss = np.mean(losers) if losers else 0
profit_factor = abs(sum(winners)) / (abs(sum(losers)) + 1e-8)
# Risk metrics
avg_position_size = np.mean(sizes) if sizes else 0
max_position_size = np.max(sizes) if sizes else 0
position_size_std = np.std(sizes) if len(sizes) > 1 else 0
# Timing metrics
avg_holding_time = np.mean(holding_times) if holding_times else 0
trade_frequency = len(trades) # trades per period
# Behavioral patterns
consecutive_losses = self._max_consecutive_losses(pnls)
size_after_loss = self._avg_size_after_loss(trades)
size_after_win = self._avg_size_after_win(trades)
revenge_ratio = size_after_loss / (size_after_win + 1e-8)
# Max drawdown from trades
cumulative_pnl = np.cumsum(pnls)
running_max = np.maximum.accumulate(cumulative_pnl) if len(cumulative_pnl) > 0 else np.array([0])
drawdowns = running_max - cumulative_pnl if len(cumulative_pnl) > 0 else np.array([0])
max_drawdown = np.max(drawdowns) if len(drawdowns) > 0 else 0
return np.array([
win_rate, # 0
avg_win, # 1
avg_loss, # 2
profit_factor, # 3
avg_position_size, # 4
max_position_size, # 5
position_size_std, # 6
avg_holding_time, # 7
trade_frequency, # 8
consecutive_losses, # 9
revenge_ratio, # 10
max_drawdown, # 11
size_after_loss, # 12
size_after_win, # 13
len(trades), # 14 - total trades
])
def _max_consecutive_losses(self, pnls: List[float]) -> int:
"""Find maximum consecutive losing trades."""
max_streak = 0
current_streak = 0
for p in pnls:
if p <= 0:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
return max_streak
def _avg_size_after_loss(self, trades: List[Dict]) -> float:
"""Average position size after a losing trade."""
sizes_after_loss = []
for i in range(1, len(trades)):
if trades[i-1].get('pnl', 0) <= 0:
sizes_after_loss.append(trades[i].get('size', 0))
return np.mean(sizes_after_loss) if sizes_after_loss else 0
def _avg_size_after_win(self, trades: List[Dict]) -> float:
"""Average position size after a winning trade."""
sizes_after_win = []
for i in range(1, len(trades)):
if trades[i-1].get('pnl', 0) > 0:
sizes_after_win.append(trades[i].get('size', 0))
return np.mean(sizes_after_win) if sizes_after_win else 0
def fit(self, all_traders_features: np.ndarray):
"""Fit clustering model on features from multiple traders."""
self.scaler.fit(all_traders_features)
scaled = self.scaler.transform(all_traders_features)
self.kmeans.fit(scaled)
self.is_fitted = True
def predict_type(self, features: np.ndarray) -> Dict:
"""Predict trader type and provide analysis."""
if not self.is_fitted:
# Default classification based on rules
return self._rule_based_classification(features)
scaled = self.scaler.transform(features.reshape(1, -1))
cluster = self.kmeans.predict(scaled)[0]
return {
'cluster': int(cluster),
'type_name': TRADER_TYPES.get(cluster, 'Unknown'),
'features': {
'win_rate': float(features[0]),
'profit_factor': float(features[3]),
'avg_holding_time': float(features[7]),
'revenge_ratio': float(features[10]),
'max_drawdown': float(features[11]),
}
}
def _rule_based_classification(self, features: np.ndarray) -> Dict:
"""Rule-based trader classification when clustering isn't fitted."""
win_rate = features[0]
avg_holding = features[7]
position_size = features[4]
revenge_ratio = features[10]
# Determine type
if avg_holding < 5: # Minutes
trader_type = 3 # Scalper
elif avg_holding > 1440: # > 1 day
trader_type = 4 # Swing
elif position_size > 0.1: # > 10% of portfolio
trader_type = 2 # Aggressive
elif position_size < 0.02: # < 2% of portfolio
trader_type = 0 # Conservative
else:
trader_type = 1 # Moderate
return {
'cluster': trader_type,
'type_name': TRADER_TYPES[trader_type],
'features': {
'win_rate': float(features[0]),
'profit_factor': float(features[3]),
'avg_holding_time': float(features[7]),
'revenge_ratio': float(features[10]),
'max_drawdown': float(features[11]),
}
}
class BehaviorAlertSystem:
"""
Real-time detection of problematic trading patterns.
"""
def __init__(self):
self.thresholds = {
'overtrading_trades_per_hour': 10,
'revenge_size_multiplier': 1.5,
'max_consecutive_losses': 5,
'max_drawdown_pct': 0.15,
'tilt_detection_loss_streak': 3,
}
def analyze(self, recent_trades: List[Dict],
portfolio_value: float,
time_window_hours: float = 1.0) -> Dict:
"""
Analyze recent trading activity for behavioral issues.
Returns alerts and recommendations.
"""
alerts = []
risk_multiplier = 1.0 # Default: no adjustment
if not recent_trades:
return {'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}
# 1. Overtrading detection
trade_count = len(recent_trades)
if trade_count / max(time_window_hours, 0.1) > self.thresholds['overtrading_trades_per_hour']:
alerts.append({
'type': 'OVERTRADING',
'severity': 'HIGH',
'message': f'Trading {trade_count} times in {time_window_hours}h exceeds safe threshold',
'recommendation': 'Reduce trade frequency. Consider taking a break.',
})
risk_multiplier *= 0.5 # Halve recommended position sizes
# 2. Revenge trading detection
pnls = [t.get('pnl', 0) for t in recent_trades]
sizes = [t.get('size', 0) for t in recent_trades]
if len(recent_trades) >= 2:
last_pnl = pnls[-2]
last_size = sizes[-2]
current_size = sizes[-1]
if last_pnl < 0 and current_size > last_size * self.thresholds['revenge_size_multiplier']:
alerts.append({
'type': 'REVENGE_TRADING',
'severity': 'CRITICAL',
'message': 'Position size increased significantly after a loss',
'recommendation': 'Avoid increasing size after losses. Maintain discipline.',
})
risk_multiplier *= 0.3
# 3. Consecutive loss detection (tilt)
consecutive_losses = 0
for p in reversed(pnls):
if p <= 0:
consecutive_losses += 1
else:
break
if consecutive_losses >= self.thresholds['tilt_detection_loss_streak']:
alerts.append({
'type': 'LOSS_STREAK',
'severity': 'HIGH',
'message': f'{consecutive_losses} consecutive losing trades detected',
'recommendation': 'Consider pausing trading. Review strategy before next trade.',
})
risk_multiplier *= 0.5
# 4. Drawdown check
total_pnl = sum(pnls)
drawdown_pct = abs(min(total_pnl, 0)) / (portfolio_value + 1e-8)
if drawdown_pct > self.thresholds['max_drawdown_pct']:
alerts.append({
'type': 'EXCESSIVE_DRAWDOWN',
'severity': 'CRITICAL',
'message': f'Session drawdown at {drawdown_pct*100:.1f}% exceeds {self.thresholds["max_drawdown_pct"]*100}% threshold',
'recommendation': 'Stop trading for the day. Review risk parameters.',
})
risk_multiplier *= 0.1
status = 'normal'
if any(a['severity'] == 'CRITICAL' for a in alerts):
status = 'critical'
elif alerts:
status = 'warning'
return {
'alerts': alerts,
'risk_multiplier': risk_multiplier,
'status': status,
'consecutive_losses': consecutive_losses,
'session_drawdown_pct': drawdown_pct,
}
class PersonalizationEngine:
"""
Combines profiling, clustering, and alert systems
to provide personalized trading recommendations.
"""
def __init__(self):
self.profiler = TraderProfiler()
self.alert_system = BehaviorAlertSystem()
# Strategy adaptation rules per trader type
self.strategy_params = {
0: { # Conservative
'max_position_pct': 0.02,
'sl_atr_mult': 1.5,
'tp_atr_mult': 2.0,
'min_confidence': 0.7,
'max_trades_per_day': 3,
},
1: { # Moderate
'max_position_pct': 0.05,
'sl_atr_mult': 2.0,
'tp_atr_mult': 3.0,
'min_confidence': 0.6,
'max_trades_per_day': 5,
},
2: { # Aggressive
'max_position_pct': 0.10,
'sl_atr_mult': 2.5,
'tp_atr_mult': 4.0,
'min_confidence': 0.55,
'max_trades_per_day': 10,
},
3: { # Scalper
'max_position_pct': 0.03,
'sl_atr_mult': 0.5,
'tp_atr_mult': 1.0,
'min_confidence': 0.55,
'max_trades_per_day': 50,
},
4: { # Swing
'max_position_pct': 0.08,
'sl_atr_mult': 3.0,
'tp_atr_mult': 5.0,
'min_confidence': 0.65,
'max_trades_per_day': 2,
},
}
def get_personalized_params(self, trader_profile: Dict,
behavior_alerts: Dict) -> Dict:
"""
Get personalized trading parameters based on trader profile
and current behavior alerts.
"""
trader_type = trader_profile.get('cluster', 1)
params = self.strategy_params.get(trader_type, self.strategy_params[1]).copy()
# Apply risk multiplier from alerts
risk_mult = behavior_alerts.get('risk_multiplier', 1.0)
params['max_position_pct'] *= risk_mult
# If revenge trading detected, increase minimum confidence
if any(a['type'] == 'REVENGE_TRADING' for a in behavior_alerts.get('alerts', [])):
params['min_confidence'] = min(params['min_confidence'] + 0.15, 0.9)
# If overtrading, reduce max trades
if any(a['type'] == 'OVERTRADING' for a in behavior_alerts.get('alerts', [])):
params['max_trades_per_day'] = max(1, params['max_trades_per_day'] // 2)
return params