File size: 3,425 Bytes
6ebfdfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# models/pattern_recognition.py
import pandas as pd
import numpy as np
from scipy.signal import find_peaks

class PatternRecognition:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.patterns = pd.DataFrame(index=data.index)
    
    def detect_patterns(self) -> pd.DataFrame:
        """Detect various chart patterns."""
        self._detect_double_top_bottom()
        self._detect_head_and_shoulders()
        self._detect_triangles()
        self._detect_channels()
        return self.patterns
    
    def _detect_double_top_bottom(self):
        """Detect double top and double bottom patterns."""
        prices = self.data['Close'].values
        peaks, _ = find_peaks(prices, distance=20)
        troughs, _ = find_peaks(-prices, distance=20)
        
        self.patterns['Double_Top'] = 0
        self.patterns['Double_Bottom'] = 0
        
        # Detect double tops
        for i in range(len(peaks)-1):
            if abs(prices[peaks[i]] - prices[peaks[i+1]]) < prices[peaks[i]] * 0.02:
                self.patterns.iloc[peaks[i+1], self.patterns.columns.get_loc('Double_Top')] = 1
        
        # Detect double bottoms
        for i in range(len(troughs)-1):
            if abs(prices[troughs[i]] - prices[troughs[i+1]]) < prices[troughs[i]] * 0.02:
                self.patterns.iloc[troughs[i+1], self.patterns.columns.get_loc('Double_Bottom')] = 1
    
    def _detect_head_and_shoulders(self):
        """Detect head and shoulders pattern."""
        self.patterns['Head_And_Shoulders'] = 0
        prices = self.data['Close'].values
        peaks, _ = find_peaks(prices, distance=20)
        
        for i in range(len(peaks)-2):
            if (prices[peaks[i+1]] > prices[peaks[i]] and 
                prices[peaks[i+1]] > prices[peaks[i+2]] and
                abs(prices[peaks[i]] - prices[peaks[i+2]]) < prices[peaks[i]] * 0.02):
                self.patterns.iloc[peaks[i+2], self.patterns.columns.get_loc('Head_And_Shoulders')] = 1
    
    def _detect_triangles(self):
        """Detect ascending and descending triangles."""
        self.patterns['Triangle'] = 0
        window = 20
        
        for i in range(window, len(self.data)):
            subset = self.data.iloc[i-window:i]
            highs = subset['High']
            lows = subset['Low']
            
            high_slope = np.polyfit(range(window), highs, 1)[0]
            low_slope = np.polyfit(range(window), lows, 1)[0]
            
            if abs(high_slope) < 0.1 and low_slope > 0.1:  # Ascending triangle
                self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1
            elif high_slope < -0.1 and abs(low_slope) < 0.1:  # Descending triangle
                self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1
    
    def _detect_channels(self):
        """Detect price channels."""
        self.patterns['Channel'] = 0
        window = 20
        
        for i in range(window, len(self.data)):
            subset = self.data.iloc[i-window:i]
            highs = subset['High']
            lows = subset['Low']
            
            high_slope = np.polyfit(range(window), highs, 1)[0]
            low_slope = np.polyfit(range(window), lows, 1)[0]
            
            if abs(high_slope - low_slope) < 0.05:  # Parallel channel
                self.patterns.iloc[i, self.patterns.columns.get_loc('Channel')] = 1