# models/pattern_recognition.py import pandas as pd import numpy as np from scipy.signal import find_peaks class PatternRecognition: def __init__(self, data: pd.DataFrame): self.data = data self.patterns = pd.DataFrame(index=data.index) def detect_patterns(self) -> pd.DataFrame: """Detect various chart patterns.""" self._detect_double_top_bottom() self._detect_head_and_shoulders() self._detect_triangles() self._detect_channels() return self.patterns def _detect_double_top_bottom(self): """Detect double top and double bottom patterns.""" prices = self.data['Close'].values peaks, _ = find_peaks(prices, distance=20) troughs, _ = find_peaks(-prices, distance=20) self.patterns['Double_Top'] = 0 self.patterns['Double_Bottom'] = 0 # Detect double tops for i in range(len(peaks)-1): if abs(prices[peaks[i]] - prices[peaks[i+1]]) < prices[peaks[i]] * 0.02: self.patterns.iloc[peaks[i+1], self.patterns.columns.get_loc('Double_Top')] = 1 # Detect double bottoms for i in range(len(troughs)-1): if abs(prices[troughs[i]] - prices[troughs[i+1]]) < prices[troughs[i]] * 0.02: self.patterns.iloc[troughs[i+1], self.patterns.columns.get_loc('Double_Bottom')] = 1 def _detect_head_and_shoulders(self): """Detect head and shoulders pattern.""" self.patterns['Head_And_Shoulders'] = 0 prices = self.data['Close'].values peaks, _ = find_peaks(prices, distance=20) for i in range(len(peaks)-2): if (prices[peaks[i+1]] > prices[peaks[i]] and prices[peaks[i+1]] > prices[peaks[i+2]] and abs(prices[peaks[i]] - prices[peaks[i+2]]) < prices[peaks[i]] * 0.02): self.patterns.iloc[peaks[i+2], self.patterns.columns.get_loc('Head_And_Shoulders')] = 1 def _detect_triangles(self): """Detect ascending and descending triangles.""" self.patterns['Triangle'] = 0 window = 20 for i in range(window, len(self.data)): subset = self.data.iloc[i-window:i] highs = subset['High'] lows = subset['Low'] high_slope = np.polyfit(range(window), highs, 1)[0] low_slope = np.polyfit(range(window), lows, 1)[0] if abs(high_slope) < 0.1 and low_slope > 0.1: # Ascending triangle self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1 elif high_slope < -0.1 and abs(low_slope) < 0.1: # Descending triangle self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1 def _detect_channels(self): """Detect price channels.""" self.patterns['Channel'] = 0 window = 20 for i in range(window, len(self.data)): subset = self.data.iloc[i-window:i] highs = subset['High'] lows = subset['Low'] high_slope = np.polyfit(range(window), highs, 1)[0] low_slope = np.polyfit(range(window), lows, 1)[0] if abs(high_slope - low_slope) < 0.05: # Parallel channel self.patterns.iloc[i, self.patterns.columns.get_loc('Channel')] = 1