Spaces:
Sleeping
Sleeping
| # models/trading_models.py | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, Optional, Tuple | |
| import ta | |
| from sklearn.preprocessing import StandardScaler | |
| from scipy.signal import find_peaks | |
| class EnhancedTradingModels: | |
| def __init__(self, data: pd.DataFrame): | |
| """Initialize with DataFrame containing OHLCV data.""" | |
| self.data = self._validate_and_prepare_data(data) | |
| self.signals = pd.DataFrame(index=self.data.index) | |
| self.technical_indicators = None | |
| self.pattern_signals = None | |
| def _validate_and_prepare_data(self, data: pd.DataFrame) -> pd.DataFrame: | |
| """Validate and prepare input data.""" | |
| required_columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume'] | |
| # Ensure all required columns exist | |
| if not all(col in data.columns for col in required_columns): | |
| raise ValueError(f"Missing required columns. Need: {required_columns}") | |
| # Create working copy | |
| df = data.copy() | |
| # Convert Date to datetime if needed | |
| df['Date'] = pd.to_datetime(df['Date']) | |
| # Convert price and volume columns to numeric | |
| numeric_columns = ['Open', 'High', 'Low', 'Close', 'Volume'] | |
| for col in numeric_columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Handle missing values | |
| df = df.fillna(method='ffill').fillna(method='bfill') | |
| # Sort by date | |
| df = df.sort_values('Date') | |
| return df | |
| def generate_signals(self) -> Tuple[pd.DataFrame, Dict]: | |
| """Generate trading signals using multiple models.""" | |
| try: | |
| # Calculate technical indicators | |
| self._calculate_technical_indicators() | |
| # Generate pattern recognition signals | |
| self._generate_pattern_signals() | |
| # Combine signals | |
| self._combine_signals() | |
| # Calculate performance metrics | |
| metrics = self._calculate_performance_metrics() | |
| return self.signals, metrics | |
| except Exception as e: | |
| raise RuntimeError(f"Error generating signals: {str(e)}") | |
| def _calculate_technical_indicators(self): | |
| """Calculate technical indicators using ta-lib.""" | |
| try: | |
| self.technical_indicators = pd.DataFrame(index=self.data.index) | |
| # Trend Indicators | |
| self.technical_indicators['SMA_20'] = ta.trend.sma_indicator(close=self.data['Close'], window=20) | |
| self.technical_indicators['SMA_50'] = ta.trend.sma_indicator(close=self.data['Close'], window=50) | |
| self.technical_indicators['EMA_20'] = ta.trend.ema_indicator(close=self.data['Close'], window=20) | |
| # Momentum Indicators | |
| self.technical_indicators['RSI'] = ta.momentum.RSIIndicator(close=self.data['Close']).rsi() | |
| stoch = ta.momentum.StochasticOscillator( | |
| high=self.data['High'], | |
| low=self.data['Low'], | |
| close=self.data['Close'] | |
| ) | |
| self.technical_indicators['STOCH_K'] = stoch.stoch() | |
| self.technical_indicators['STOCH_D'] = stoch.stoch_signal() | |
| # Volatility Indicators | |
| bb = ta.volatility.BollingerBands(close=self.data['Close']) | |
| self.technical_indicators['BB_UPPER'] = bb.bollinger_hband() | |
| self.technical_indicators['BB_LOWER'] = bb.bollinger_lband() | |
| self.technical_indicators['BB_MID'] = bb.bollinger_mavg() | |
| # Volume Indicators | |
| self.technical_indicators['OBV'] = ta.volume.on_balance_volume(close=self.data['Close'], volume=self.data['Volume']) | |
| except Exception as e: | |
| raise RuntimeError(f"Error calculating technical indicators: {str(e)}") | |
| def _generate_pattern_signals(self): | |
| """Generate pattern recognition signals.""" | |
| try: | |
| self.pattern_signals = pd.DataFrame(index=self.data.index) | |
| # Find peaks and troughs | |
| prices = self.data['Close'].values | |
| peaks, _ = find_peaks(prices, distance=20) | |
| troughs, _ = find_peaks(-prices, distance=20) | |
| # Double top/bottom patterns | |
| self.pattern_signals['DOUBLE_TOP'] = self._detect_double_formation(peaks, prices, pattern_type='top') | |
| self.pattern_signals['DOUBLE_BOTTOM'] = self._detect_double_formation(troughs, prices, pattern_type='bottom') | |
| # Head and shoulders pattern | |
| self.pattern_signals['HEAD_SHOULDERS'] = self._detect_head_shoulders(peaks, prices) | |
| # Trend patterns | |
| self.pattern_signals['TREND'] = self._detect_trend_patterns() | |
| except Exception as e: | |
| raise RuntimeError(f"Error generating pattern signals: {str(e)}") | |
| def _detect_double_formation(self, points, prices, pattern_type: str, threshold: float = 0.02) -> pd.Series: | |
| """Detect double top/bottom formations.""" | |
| signals = pd.Series(0, index=self.data.index) | |
| for i in range(len(points)-1): | |
| price_diff = abs(prices[points[i]] - prices[points[i+1]]) / prices[points[i]] | |
| if price_diff < threshold: | |
| signals.iloc[points[i+1]] = 1 if pattern_type == 'top' else -1 | |
| return signals | |
| def _detect_head_shoulders(self, peaks, prices, threshold: float = 0.02) -> pd.Series: | |
| """Detect head and shoulders pattern.""" | |
| signals = pd.Series(0, index=self.data.index) | |
| for i in range(len(peaks)-2): | |
| if (prices[peaks[i+1]] > prices[peaks[i]] and | |
| prices[peaks[i+1]] > prices[peaks[i+2]] and | |
| abs(prices[peaks[i]] - prices[peaks[i+2]]) < threshold * prices[peaks[i]]): | |
| signals.iloc[peaks[i+2]] = -1 | |
| return signals | |
| def _detect_trend_patterns(self) -> pd.Series: | |
| """Detect trend patterns using moving averages.""" | |
| signals = pd.Series(0, index=self.data.index) | |
| # Uptrend: Short MA > Long MA | |
| uptrend = (self.technical_indicators['SMA_20'] > self.technical_indicators['SMA_50']) | |
| signals[uptrend] = 1 | |
| signals[~uptrend] = -1 | |
| return signals | |
| def _combine_signals(self): | |
| """Combine technical and pattern signals.""" | |
| # Initialize combined signals | |
| self.signals['POSITION'] = 0 | |
| # Technical signal conditions | |
| tech_buy = ( | |
| (self.technical_indicators['RSI'] < 30) & # Oversold | |
| (self.data['Close'] > self.technical_indicators['SMA_20']) & # Above short MA | |
| (self.technical_indicators['STOCH_K'] < 20) # Stochastic oversold | |
| ) | |
| tech_sell = ( | |
| (self.technical_indicators['RSI'] > 70) & # Overbought | |
| (self.data['Close'] < self.technical_indicators['SMA_20']) & # Below short MA | |
| (self.technical_indicators['STOCH_K'] > 80) # Stochastic overbought | |
| ) | |
| # Pattern signal conditions | |
| pattern_buy = ( | |
| (self.pattern_signals['DOUBLE_BOTTOM'] == -1) | | |
| (self.pattern_signals['TREND'] == 1) | |
| ) | |
| pattern_sell = ( | |
| (self.pattern_signals['DOUBLE_TOP'] == 1) | | |
| (self.pattern_signals['HEAD_SHOULDERS'] == -1) | | |
| (self.pattern_signals['TREND'] == -1) | |
| ) | |
| # Combine signals | |
| self.signals.loc[tech_buy & pattern_buy, 'POSITION'] = 1 | |
| self.signals.loc[tech_sell & pattern_sell, 'POSITION'] = -1 | |
| def _calculate_performance_metrics(self) -> Dict: | |
| """Calculate strategy performance metrics.""" | |
| returns = pd.Series(index=self.data.index) | |
| position = 0 | |
| entry_price = 0 | |
| for i in range(len(self.signals)): | |
| if self.signals['POSITION'].iloc[i] == 1 and position == 0: # Buy | |
| position = 1 | |
| entry_price = self.data['Close'].iloc[i] | |
| elif self.signals['POSITION'].iloc[i] == -1 and position == 1: # Sell | |
| position = 0 | |
| returns.iloc[i] = (self.data['Close'].iloc[i] - entry_price) / entry_price | |
| metrics = { | |
| 'total_trades': len(returns[returns != 0]), | |
| 'win_rate': len(returns[returns > 0]) / len(returns[returns != 0]) if len(returns[returns != 0]) > 0 else 0, | |
| 'avg_return': returns[returns != 0].mean() if len(returns[returns != 0]) > 0 else 0, | |
| 'sharpe_ratio': returns.mean() / returns.std() if returns.std() != 0 else 0, | |
| 'max_drawdown': (returns.cumsum() - returns.cumsum().cummax()).min() | |
| } | |
| return metrics |