|
|
|
|
|
|
|
|
from scipy.signal import find_peaks |
|
|
|
|
|
from utils.formatAndPreprocessNewPatterns import get_pattern_encoding |
|
|
|
|
|
path = 'Datasets/OHLC data' |
|
|
pattern_encoding = get_pattern_encoding() |
|
|
|
|
|
def calc_head_and_sholder_top(row,ohlc_data_pattern_segment): |
|
|
high_prices = ohlc_data_pattern_segment['High'].values |
|
|
low_prices = ohlc_data_pattern_segment['Low'].values |
|
|
|
|
|
|
|
|
prominence_value = 0.1 |
|
|
|
|
|
|
|
|
peak_indices, _ = find_peaks(high_prices, prominence=prominence_value) |
|
|
|
|
|
valley_indices, _ = find_peaks(-low_prices, prominence=prominence_value) |
|
|
|
|
|
|
|
|
peak_dates = ohlc_data_pattern_segment['Date'].iloc[peak_indices] |
|
|
valley_dates = ohlc_data_pattern_segment['Date'].iloc[valley_indices] |
|
|
|
|
|
if len(peak_indices) < 3 or len(valley_indices) < 2: |
|
|
print("Not enough peaks and valleys to form a Head & Shoulders pattern.") |
|
|
return |
|
|
|
|
|
try: |
|
|
H_index = np.argmax(high_prices[peak_indices]) |
|
|
H = peak_indices[H_index] |
|
|
LS_index = np.argmax(high_prices[peak_indices[0:H_index]]) |
|
|
LS = peak_indices[LS_index] |
|
|
RS_index = np.argmax(high_prices[peak_indices[H_index+1:]]) + H_index + 1 |
|
|
RS = peak_indices[RS_index] |
|
|
|
|
|
vally_left = valley_indices[(valley_indices > LS) & (valley_indices < H)] |
|
|
vally_right = valley_indices[(valley_indices > H) & (valley_indices < RS)] |
|
|
NL1 = vally_left[np.argmin(low_prices[vally_left])] |
|
|
NL2 = vally_right[np.argmin(low_prices[vally_right])] |
|
|
|
|
|
|
|
|
if high_prices[H] <= max(high_prices[LS], high_prices[RS]): |
|
|
print("Not a valid Head & Shoulders pattern.") |
|
|
return |
|
|
|
|
|
LS_date = ohlc_data_pattern_segment['Date'].iloc[LS] |
|
|
H_date = ohlc_data_pattern_segment['Date'].iloc[H] |
|
|
RS_date = ohlc_data_pattern_segment['Date'].iloc[RS] |
|
|
NL1_date = ohlc_data_pattern_segment['Date'].iloc[NL1] |
|
|
NL2_date = ohlc_data_pattern_segment['Date'].iloc[NL2] |
|
|
|
|
|
|
|
|
row['HS_Left_Shoulder'] = LS_date |
|
|
row['HS_Head'] = H_date |
|
|
row['HS_Right_Shoulder'] = RS_date |
|
|
row['HS_Neckline_1'] = NL1_date |
|
|
row['HS_Neckline_2'] = NL2_date |
|
|
row['Peak_Dates'] = peak_dates |
|
|
row['Valley_Dates'] = valley_dates |
|
|
row['Calc_Start'] = LS_date |
|
|
row['Calc_End'] = RS_date |
|
|
|
|
|
return row |
|
|
except: |
|
|
print("Error in finding the peaks or valleys in the Head and Shoulders pattern") |
|
|
return |
|
|
|
|
|
def calc_head_and_shoulder_bottom(row, ohlc_data_pattern_segment): |
|
|
high_prices = ohlc_data_pattern_segment['High'].values |
|
|
low_prices = ohlc_data_pattern_segment['Low'].values |
|
|
|
|
|
|
|
|
prominence_value = 0.1 |
|
|
|
|
|
|
|
|
valley_indices, _ = find_peaks(-low_prices, prominence=prominence_value) |
|
|
|
|
|
peak_indices, _ = find_peaks(high_prices, prominence=prominence_value) |
|
|
|
|
|
|
|
|
valley_dates = ohlc_data_pattern_segment['Date'].iloc[valley_indices] |
|
|
peak_dates = ohlc_data_pattern_segment['Date'].iloc[peak_indices] |
|
|
|
|
|
if len(valley_indices) < 3 or len(peak_indices) < 2: |
|
|
print("Not enough valleys and peaks to form a Head & Shoulders Bottom pattern.") |
|
|
return |
|
|
|
|
|
try: |
|
|
H_index = np.argmin(low_prices[valley_indices]) |
|
|
H = valley_indices[H_index] |
|
|
LS_index = np.argmin(low_prices[valley_indices[0:H_index]]) |
|
|
LS = valley_indices[LS_index] |
|
|
RS_index = np.argmin(low_prices[valley_indices[H_index+1:]]) + H_index + 1 |
|
|
RS = valley_indices[RS_index] |
|
|
|
|
|
peak_left = peak_indices[(peak_indices > LS) & (peak_indices < H)] |
|
|
peak_right = peak_indices[(peak_indices > H) & (peak_indices < RS)] |
|
|
NL1 = peak_left[np.argmax(high_prices[peak_left])] |
|
|
NL2 = peak_right[np.argmax(high_prices[peak_right])] |
|
|
|
|
|
|
|
|
if low_prices[H] >= min(low_prices[LS], low_prices[RS]): |
|
|
print("Not a valid Head & Shoulders Bottom pattern.") |
|
|
return |
|
|
|
|
|
LS_date = ohlc_data_pattern_segment['Date'].iloc[LS] |
|
|
H_date = ohlc_data_pattern_segment['Date'].iloc[H] |
|
|
RS_date = ohlc_data_pattern_segment['Date'].iloc[RS] |
|
|
NL1_date = ohlc_data_pattern_segment['Date'].iloc[NL1] |
|
|
NL2_date = ohlc_data_pattern_segment['Date'].iloc[NL2] |
|
|
|
|
|
|
|
|
row['HS_Left_Shoulder'] = LS_date |
|
|
row['HS_Head'] = H_date |
|
|
row['HS_Right_Shoulder'] = RS_date |
|
|
row['HS_Neckline_1'] = NL1_date |
|
|
row['HS_Neckline_2'] = NL2_date |
|
|
row['Valley_Dates'] = valley_dates |
|
|
row['Peak_Dates'] = peak_dates |
|
|
row['Calc_Start'] = LS_date |
|
|
row['Calc_End'] = RS_date |
|
|
|
|
|
return row |
|
|
except: |
|
|
print("Error in finding the valleys or peaks in the Head and Shoulders Bottom pattern") |
|
|
return |
|
|
|
|
|
def calc_double_top_aa(row,ohlc_data_pattern_segment): |
|
|
high_prices = ohlc_data_pattern_segment['High'].values |
|
|
low_prices = ohlc_data_pattern_segment['Low'].values |
|
|
|
|
|
|
|
|
prominence_value = 0.1 |
|
|
|
|
|
|
|
|
peak_indices, _ = find_peaks(high_prices, prominence=prominence_value) |
|
|
|
|
|
valley_indices, _ = find_peaks(-low_prices, prominence=prominence_value) |
|
|
|
|
|
|
|
|
peak_dates = ohlc_data_pattern_segment['Date'].iloc[peak_indices] |
|
|
valley_dates = ohlc_data_pattern_segment['Date'].iloc[valley_indices] |
|
|
|
|
|
|
|
|
if len(peak_indices) < 2 or len(valley_indices) < 1: |
|
|
print("Not enough peaks and valleys to form a Double Top pattern.") |
|
|
return |
|
|
|
|
|
try: |
|
|
H1_index = np.argmax(high_prices[peak_indices]) |
|
|
H1 = peak_indices[H1_index] |
|
|
H2_index = np.argmax(high_prices[peak_indices[H1_index+1:]]) + H1_index + 1 |
|
|
H2 = peak_indices[H2_index] |
|
|
|
|
|
valley_indices_between_H1_H2 = valley_indices[(valley_indices > H1) & (valley_indices < H2)] |
|
|
V = valley_indices_between_H1_H2[np.argmax(low_prices[ valley_indices_between_H1_H2])] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
H1_date = ohlc_data_pattern_segment['Date'].iloc[H1] |
|
|
H2_date = ohlc_data_pattern_segment['Date'].iloc[H2] |
|
|
V_date = ohlc_data_pattern_segment['Date'].iloc[V] |
|
|
|
|
|
|
|
|
row['DT_Peak_1'] = H1_date |
|
|
row['DT_Peak_2'] = H2_date |
|
|
row['DT_Valley'] = V_date |
|
|
row['Peak_Dates'] = peak_dates |
|
|
row['Valley_Dates'] = valley_dates |
|
|
row['Calc_Start'] = H1_date |
|
|
row['Calc_End'] = H2_date |
|
|
|
|
|
return row |
|
|
except: |
|
|
print("Error in finding the peaks or valleys in the Double Top pattern") |
|
|
return |
|
|
|
|
|
def calc_double_bottom_aa(row,ohlc_data_pattern_segment): |
|
|
high_prices = ohlc_data_pattern_segment['High'].values |
|
|
low_prices = ohlc_data_pattern_segment['Low'].values |
|
|
|
|
|
|
|
|
prominence_value = 0.05 |
|
|
|
|
|
|
|
|
valley_indices, _ = find_peaks(-low_prices, prominence=prominence_value) |
|
|
|
|
|
peak_indices, _ = find_peaks(high_prices, prominence=prominence_value) |
|
|
|
|
|
|
|
|
valley_dates = ohlc_data_pattern_segment['Date'].iloc[valley_indices] |
|
|
peak_dates = ohlc_data_pattern_segment['Date'].iloc[peak_indices] |
|
|
|
|
|
if len(valley_indices) < 2 or len(peak_indices) < 1: |
|
|
print("Not enough valleys and peaks to form a Double Bottom pattern.") |
|
|
return |
|
|
|
|
|
try: |
|
|
H1_index = np.argmin(low_prices[valley_indices]) |
|
|
H1 = valley_indices[H1_index] |
|
|
H2_index = np.argmin(low_prices[valley_indices[H1_index+1:]]) + H1_index + 1 |
|
|
H2 = valley_indices[H2_index] |
|
|
|
|
|
peak_indices_between_H1_H2 = peak_indices[(peak_indices > H1) & (peak_indices < H2)] |
|
|
P = peak_indices_between_H1_H2[np.argmax(high_prices[ peak_indices_between_H1_H2])] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
H1_date = ohlc_data_pattern_segment['Date'].iloc[H1] |
|
|
H2_date = ohlc_data_pattern_segment['Date'].iloc[H2] |
|
|
P_date = ohlc_data_pattern_segment['Date'].iloc[P] |
|
|
|
|
|
|
|
|
row['DB_Valley_1'] = H1_date |
|
|
row['DB_Valley_2'] = H2_date |
|
|
row['DB_Peak'] = P_date |
|
|
row['Valley_Dates'] = valley_dates |
|
|
row['Peak_Dates'] = peak_dates |
|
|
row['Calc_Start'] = H1_date |
|
|
row['Calc_End'] = H2_date |
|
|
|
|
|
return row |
|
|
except: |
|
|
print("Error in finding the valleys or peaks in the Double Bottom pattern") |
|
|
return |
|
|
|
|
|
def calc_double_bottom_ea(row,ohlc_data_pattern_segment): |
|
|
high_prices = ohlc_data_pattern_segment['High'].values |
|
|
low_prices = ohlc_data_pattern_segment['Low'].values |
|
|
|
|
|
|
|
|
prominence_value = 0.1 |
|
|
|
|
|
|
|
|
valley_indices, _ = find_peaks(-low_prices, prominence=prominence_value) |
|
|
|
|
|
peak_indices, _ = find_peaks(high_prices, prominence=prominence_value) |
|
|
|
|
|
round_vallies,_ = find_peaks(-low_prices, prominence=0.01,width=3,threshold=0.01) |
|
|
|
|
|
|
|
|
valley_dates = ohlc_data_pattern_segment['Date'].iloc[valley_indices] |
|
|
peak_dates = ohlc_data_pattern_segment['Date'].iloc[peak_indices] |
|
|
|
|
|
if len(valley_indices) < 2 or len(peak_indices) < 1: |
|
|
print("Not enough valleys and peaks to form a Double Bottom pattern.") |
|
|
return |
|
|
|
|
|
try: |
|
|
H1_index = np.argmin(low_prices[round_vallies]) |
|
|
H1 = valley_indices[H1_index] |
|
|
H2_index = np.argmin(low_prices[valley_indices[H1_index+1:]]) + H1_index + 1 |
|
|
H2 = valley_indices[H2_index] |
|
|
|
|
|
peak_indices_between_H1_H2 = peak_indices[(peak_indices > H1) & (peak_indices < H2)] |
|
|
P = peak_indices_between_H1_H2[np.argmax(high_prices[ peak_indices_between_H1_H2])] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
H1_date = ohlc_data_pattern_segment['Date'].iloc[H1] |
|
|
H2_date = ohlc_data_pattern_segment['Date'].iloc[H2] |
|
|
P_date = ohlc_data_pattern_segment['Date'].iloc[P] |
|
|
|
|
|
|
|
|
row['DB_Valley_1'] = H1_date |
|
|
row['DB_Valley_2'] = H2_date |
|
|
row['DB_Peak'] = P_date |
|
|
row['Valley_Dates'] = valley_dates |
|
|
row['Peak_Dates'] = peak_dates |
|
|
row['Calc_Start'] = H1_date |
|
|
row['Calc_End'] = H2_date |
|
|
|
|
|
return row |
|
|
except: |
|
|
print("Error in finding the valleys or peaks in the Double Bottom pattern") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
import matplotlib.pyplot as plt |
|
|
import mplfinance as mpf |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import matplotlib.pyplot as plt |
|
|
import mplfinance as mpf |
|
|
from scipy.signal import argrelextrema |
|
|
from scipy.signal import find_peaks |
|
|
|
|
|
def draw_head_and_shoulders_top(ax, ohlc_data, pat_start_idx,row): |
|
|
|
|
|
Draws a Head and Shoulders pattern on an existing mplfinance plot and visualizes detected peaks and valleys. |
|
|
|
|
|
Parameters: |
|
|
ax (matplotlib.axes.Axes): The candlestick chart's axis. |
|
|
ohlc_data (pd.DataFrame): Data containing 'High' and 'Low' columns. |
|
|
|
|
|
# reset the index of the ohlc_data |
|
|
ohlc_data.reset_index(drop=True, inplace=True) |
|
|
high_prices = ohlc_data['High'].values |
|
|
low_prices = ohlc_data['Low'].values |
|
|
|
|
|
# check if 'Peak_Dates' and 'Valley_Dates' columns are present in the row |
|
|
if 'Peak_Dates' in row and 'Valley_Dates' in row: |
|
|
|
|
|
peak_days = row['Peak_Dates'] |
|
|
valley_days = row['Valley_Dates'] |
|
|
|
|
|
|
|
|
peak_indices = ohlc_data[ohlc_data['Date'].isin(peak_days)].index |
|
|
# add the pat_start_idx to the peak_indices |
|
|
peak_indices = peak_indices |
|
|
|
|
|
valley_indices = ohlc_data[ohlc_data['Date'].isin(valley_days)].index |
|
|
# add the pat_start_idx to the valley_indices |
|
|
valley_indices = valley_indices |
|
|
|
|
|
# Debugging visualization: Plot detected peaks and valleys |
|
|
ax.scatter(peak_indices , high_prices[peak_indices], color='green', marker='^', label='Peaks', zorder=3) |
|
|
ax.scatter(valley_indices, low_prices[valley_indices], color='red', marker='v', label='Valleys', zorder=3) |
|
|
|
|
|
calc_start_date = row['Calc_Start'] |
|
|
calc_end_date = row['Calc_End'] |
|
|
|
|
|
calc_start_idx = ohlc_data[ohlc_data['Date']== calc_start_date].index |
|
|
calc_end_idx = ohlc_data[ohlc_data['Date']== calc_end_date].index |
|
|
|
|
|
# drow a pink dotted vertical line at calc_start_idx and calc_end_idx |
|
|
ax.axvline(x=calc_start_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
ax.axvline(x=calc_end_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
|
|
|
LS_idx = ohlc_data[ohlc_data['Date']== row['HS_Left_Shoulder']].index |
|
|
H_idx = ohlc_data[ohlc_data['Date']== row['HS_Head']].index |
|
|
RS_idx = ohlc_data[ohlc_data['Date']== row['HS_Right_Shoulder']].index |
|
|
NL1_idx = ohlc_data[ohlc_data['Date']== row['HS_Neckline_1']].index |
|
|
NL2_idx = ohlc_data[ohlc_data['Date']== row['HS_Neckline_2']].index |
|
|
|
|
|
# Draw the head and shoulders |
|
|
ax.plot([LS_idx, H_idx, RS_idx], [high_prices[LS_idx], high_prices[H_idx], high_prices[RS_idx]], |
|
|
linestyle="solid", marker="o", color="blue", linewidth=1, label="H&S Pattern") |
|
|
|
|
|
# Use NL1_idx and NL2_idx as the x-range to keep the line within bounds |
|
|
x_min, x_max = min(NL1_idx, NL2_idx), max(NL1_idx, NL2_idx) |
|
|
|
|
|
# Compute the y-values using the line equation (y = mx + c) |
|
|
slope = (low_prices[NL2_idx] - low_prices[NL1_idx]) / (NL2_idx - NL1_idx) |
|
|
y_min = low_prices[NL1_idx] + slope * (x_min - NL1_idx) |
|
|
y_max = low_prices[NL1_idx] + slope * (x_max - NL1_idx) |
|
|
|
|
|
# Plot the line within the original graph size |
|
|
ax.plot([x_min, x_max], [y_min, y_max], |
|
|
linestyle="dashed", color="red", linewidth=1, label="Neckline") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def draw_head_and_shoulders_bottom(ax, ohlc_data, pat_start_idx,row): |
|
|
|
|
|
Draws a Head and Shoulders pattern on an existing mplfinance plot and visualizes detected peaks and valleys. |
|
|
|
|
|
Parameters: |
|
|
ax (matplotlib.axes.Axes): The candlestick chart's axis. |
|
|
ohlc_data (pd.DataFrame): Data containing 'High' and 'Low' columns. |
|
|
|
|
|
# reset the index of the ohlc_data |
|
|
ohlc_data.reset_index(drop=True, inplace=True) |
|
|
high_prices = ohlc_data['High'].values |
|
|
low_prices = ohlc_data['Low'].values |
|
|
|
|
|
# check if 'Peak_Dates' and 'Valley_Dates' columns are present in the row |
|
|
if 'Peak_Dates' in row and 'Valley_Dates' in row: |
|
|
peak_days = row['Peak_Dates'] |
|
|
valley_days = row['Valley_Dates'] |
|
|
|
|
|
|
|
|
peak_indices = ohlc_data[ohlc_data['Date'].isin(peak_days)].index |
|
|
# add the pat_start_idx to the peak_indices |
|
|
peak_indices = peak_indices |
|
|
|
|
|
valley_indices = ohlc_data[ohlc_data['Date'].isin(valley_days)].index |
|
|
# add the pat_start_idx to the valley_indices |
|
|
valley_indices = valley_indices |
|
|
|
|
|
# Debugging visualization: Plot detected peaks and valleys |
|
|
ax.scatter(peak_indices , high_prices[peak_indices], color='green', marker='^', label='Peaks', zorder=3) |
|
|
ax.scatter(valley_indices, low_prices[valley_indices], color='red', marker='v', label='Valleys', zorder=3) |
|
|
|
|
|
calc_start_date = row['Calc_Start'] |
|
|
calc_end_date = row['Calc_End'] |
|
|
|
|
|
calc_start_idx = ohlc_data[ohlc_data['Date']== calc_start_date].index |
|
|
calc_end_idx = ohlc_data[ohlc_data['Date']== calc_end_date].index |
|
|
|
|
|
# drow a pink dotted vertical line at calc_start_idx and calc_end_idx |
|
|
ax.axvline(x=calc_start_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
ax.axvline(x=calc_end_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
|
|
|
|
|
|
LS_idx = ohlc_data[ohlc_data['Date']== row['HS_Left_Shoulder']].index |
|
|
H_idx = ohlc_data[ohlc_data['Date']== row['HS_Head']].index |
|
|
RS_idx = ohlc_data[ohlc_data['Date']== row['HS_Right_Shoulder']].index |
|
|
NL1_idx = ohlc_data[ohlc_data['Date']== row['HS_Neckline_1']].index |
|
|
NL2_idx = ohlc_data[ohlc_data['Date']== row['HS_Neckline_2']].index |
|
|
|
|
|
# Draw the head and shoulders |
|
|
ax.plot([LS_idx, H_idx, RS_idx], [low_prices[LS_idx], low_prices[H_idx], low_prices[RS_idx]], |
|
|
linestyle="solid", marker="o", color="blue", linewidth=1, label="H&S Pattern") |
|
|
|
|
|
# Use NL1_idx and NL2_idx as the x-range to keep the line within bounds |
|
|
x_min, x_max = min(NL1_idx, NL2_idx), max(NL1_idx, NL2_idx) |
|
|
|
|
|
# Compute the y-values using the line equation (y = mx + c) |
|
|
slope = (high_prices[NL2_idx] - high_prices[NL1_idx]) / (NL2_idx - NL1_idx) |
|
|
y_min = high_prices[NL1_idx] + slope * (x_min - NL1_idx) |
|
|
y_max = high_prices[NL1_idx] + slope * (x_max - NL1_idx) |
|
|
|
|
|
# Plot the line within the original graph size |
|
|
ax.plot([x_min, x_max], [y_min, y_max], |
|
|
linestyle="dashed", color="red", linewidth=1, label="Neckline") |
|
|
|
|
|
|
|
|
|
|
|
def draw_double_top_aa(ax, ohlc_data, pat_start_idx,row): |
|
|
|
|
|
Draws a Double Top pattern on an existing mplfinance plot and visualizes detected peaks and valleys. |
|
|
|
|
|
Parameters: |
|
|
ax (matplotlib.axes.Axes): The candlestick chart's axis. |
|
|
ohlc_data (pd.DataFrame): Data containing 'High' and 'Low' columns. |
|
|
|
|
|
# reset the index of the ohlc_data |
|
|
ohlc_data.reset_index(drop=True, inplace=True) |
|
|
high_prices = ohlc_data['High'].values |
|
|
low_prices = ohlc_data['Low'].values |
|
|
|
|
|
# check if 'Peak_Dates' and 'Valley_Dates' columns are present in the row |
|
|
if 'Peak_Dates' in row and 'Valley_Dates' in row: |
|
|
|
|
|
|
|
|
peak_days = row['Peak_Dates'] |
|
|
valley_days = row['Valley_Dates'] |
|
|
|
|
|
|
|
|
peak_indices = ohlc_data[ohlc_data['Date'].isin(peak_days)].index |
|
|
# add the pat_start_idx to the peak_indices |
|
|
peak_indices = peak_indices |
|
|
|
|
|
valley_indices = ohlc_data[ohlc_data['Date'].isin(valley_days)].index |
|
|
# add the pat_start_idx to the valley_indices |
|
|
valley_indices = valley_indices |
|
|
|
|
|
# Debugging visualization: Plot detected peaks and valleys |
|
|
ax.scatter(peak_indices , high_prices[peak_indices], color='green', marker='^', label='Peaks', zorder=3) |
|
|
ax.scatter(valley_indices, low_prices[valley_indices], color='red', marker='v', label='Valleys', zorder=3) |
|
|
|
|
|
|
|
|
|
|
|
DT_Peak_1_idx = ohlc_data[ohlc_data['Date']== row['DT_Peak_1']].index |
|
|
DT_Peak_2_idx = ohlc_data[ohlc_data['Date']== row['DT_Peak_2']].index |
|
|
DT_Valley_idx = ohlc_data[ohlc_data['Date']== row['DT_Valley']].index |
|
|
|
|
|
# draw the double peaks |
|
|
ax.plot([DT_Peak_1_idx,DT_Valley_idx, DT_Peak_2_idx], [high_prices[DT_Peak_1_idx],high_prices[DT_Valley_idx], high_prices[DT_Peak_2_idx]], |
|
|
linestyle="solid", marker="o", color="blue", linewidth=1, label="Double Top Pattern") |
|
|
# Draw the neckline |
|
|
ax.hlines(y=low_prices[DT_Valley_idx], xmin=ax.get_xlim()[0], xmax=ax.get_xlim()[1], color='red', linestyle='dotted', linewidth=1) |
|
|
|
|
|
def draw_double_bottom_aa(ax, ohlc_data, pat_start_idx,row): |
|
|
|
|
|
Draws a Double Bottom pattern on an existing mplfinance plot and visualizes detected peaks and valleys. |
|
|
|
|
|
Parameters: |
|
|
ax (matplotlib.axes.Axes): The candlestick chart's axis. |
|
|
ohlc_data (pd.DataFrame): Data containing 'High' and 'Low' columns. |
|
|
|
|
|
# reset the index of the ohlc_data |
|
|
ohlc_data.reset_index(drop=True, inplace=True) |
|
|
high_prices = ohlc_data['High'].values |
|
|
low_prices = ohlc_data['Low'].values |
|
|
|
|
|
# check if 'Peak_Dates' and 'Valley_Dates' columns are present in the row |
|
|
if 'Peak_Dates' in row and 'Valley_Dates' in row: |
|
|
|
|
|
|
|
|
peak_days = row['Peak_Dates'] |
|
|
valley_days = row['Valley_Dates'] |
|
|
|
|
|
|
|
|
peak_indices = ohlc_data[ohlc_data['Date'].isin(peak_days)].index |
|
|
# add the pat_start_idx to the peak_indices |
|
|
peak_indices = peak_indices |
|
|
|
|
|
valley_indices = ohlc_data[ohlc_data['Date'].isin(valley_days)].index |
|
|
# add the pat_start_idx to the valley_indices |
|
|
valley_indices = valley_indices |
|
|
|
|
|
# Debugging visualization: Plot detected peaks and valleys |
|
|
ax.scatter(peak_indices , high_prices[peak_indices], color='green', marker='^', label='Peaks', zorder=3) |
|
|
ax.scatter(valley_indices, low_prices[valley_indices], color='red', marker='v', label='Valleys', zorder=3) |
|
|
|
|
|
DB_Valley_1_idx = ohlc_data[ohlc_data['Date']== row['DB_Valley_1']].index |
|
|
DB_Valley_2_idx = ohlc_data[ohlc_data['Date']== row['DB_Valley_2']].index |
|
|
DB_Peak_idx = ohlc_data[ohlc_data['Date']== row['DB_Peak']].index |
|
|
|
|
|
# draw the double peaks |
|
|
ax.plot([DB_Valley_1_idx,DB_Peak_idx, DB_Valley_2_idx], [low_prices[DB_Valley_1_idx],low_prices[DB_Peak_idx], low_prices[DB_Valley_2_idx]], |
|
|
linestyle="solid", marker="o", color="blue", linewidth=1, label="Double Bottom Pattern") |
|
|
# Draw the neckline |
|
|
ax.hlines(y=high_prices[DB_Peak_idx], xmin=ax.get_xlim()[0], xmax=ax.get_xlim()[1], color='red', linestyle='dotted', linewidth=1) |
|
|
|
|
|
def plot_pattern_clusters( test_pattern_segment_wise, ohcl_data_given=None, padding_days=0,draw_lines = False): |
|
|
colors = ["blue", "green", "red", "cyan", "magenta", "yellow", "purple", "orange", "brown", "pink", "lime", "teal"] |
|
|
|
|
|
group = test_pattern_segment_wise |
|
|
|
|
|
if ohcl_data_given is None: |
|
|
symbol = group['Symbol'].iloc[0] |
|
|
ohcl_data = pd.read_csv(path + '/' + symbol + '.csv') |
|
|
else: |
|
|
ohcl_data = ohcl_data_given |
|
|
|
|
|
ohcl_data['Date'] = pd.to_datetime(ohcl_data['Date']) |
|
|
ohcl_data['Date'] = ohcl_data['Date'].dt.tz_localize(None) |
|
|
|
|
|
seg_start = group['Seg_Start'].iloc[0] - pd.to_timedelta(padding_days, unit='D') |
|
|
seg_end = group['Seg_End'].iloc[0] + pd.to_timedelta(padding_days, unit='D') |
|
|
|
|
|
ohcl_data = ohcl_data[(ohcl_data['Date'] >= seg_start) & (ohcl_data['Date'] <= seg_end)] |
|
|
if ohcl_data.empty: |
|
|
print("OHLC Data set is empty") |
|
|
return |
|
|
|
|
|
ohlc_for_mpf = ohcl_data[['Open', 'High', 'Low', 'Close']].copy() |
|
|
ohlc_for_mpf.index = pd.to_datetime(ohcl_data['Date']) |
|
|
|
|
|
fig, axes = mpf.plot(ohlc_for_mpf, type='candle', style='charles', datetime_format='%Y-%m-%d', returnfig=True) |
|
|
ax = axes[0] |
|
|
|
|
|
for _, row in group.iterrows(): |
|
|
pattern_name = row['Chart Pattern'] |
|
|
cluster = row['Cluster'] |
|
|
color = "gray" if cluster == -1 else colors[cluster % len(colors)] |
|
|
|
|
|
pattern_start_date = pd.to_datetime(row['Start']).tz_localize(None) |
|
|
pattern_end_date = pd.to_datetime(row['End']).tz_localize(None) |
|
|
|
|
|
num_start = len(ohcl_data[ohcl_data['Date'] < pattern_start_date]) |
|
|
num_end = num_start + len(ohcl_data[(ohcl_data['Date'] >= pattern_start_date) & (ohcl_data['Date'] <= pattern_end_date)]) |
|
|
|
|
|
ax.axvspan(num_start, num_end, color=color, alpha=0.1, label=pattern_name) |
|
|
|
|
|
|
|
|
if draw_lines: |
|
|
# error = row['Error'] check only if the column is present |
|
|
error = False |
|
|
if 'Error' in row and row['Error'] != np.nan: |
|
|
error = row['Error'] |
|
|
if error != True: |
|
|
calc_start_date = row['Calc_Start'] |
|
|
calc_end_date = row['Calc_End'] |
|
|
|
|
|
# reset the index of the ohlc_data |
|
|
ohcl_data.reset_index(drop=True, inplace=True) |
|
|
|
|
|
calc_start_idx = ohcl_data[ohcl_data['Date']== calc_start_date].index |
|
|
calc_end_idx = ohcl_data[ohcl_data['Date']== calc_end_date].index |
|
|
|
|
|
# drow a pink dotted vertical line at calc_start_idx and calc_end_idx |
|
|
ax.axvline(x=calc_start_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
ax.axvline(x=calc_end_idx, color='blue', linestyle='dotted', linewidth=1) |
|
|
|
|
|
# # If detected pattern is Head and Shoulders, plot indicator lines |
|
|
# if pattern_name == "Head-and-shoulders top": |
|
|
# # get the ohlc segment of where the date is between the pattern start and end from ohlc_for_mpf data set where the index is the date |
|
|
# ohlc_segment_head_and_sholder = ohlc_for_mpf.loc[pattern_start_date:pattern_end_date] |
|
|
# draw_head_and_shoulders_top(ax, ohcl_data, num_start,row) |
|
|
# elif pattern_name == "Head-and-shoulders bottom": |
|
|
# # get the ohlc segment of where the date is between the pattern start and end from ohlc_for_mpf data set where the index is the date |
|
|
# ohlc_segment_head_and_sholder = ohlc_for_mpf.loc[pattern_start_date:pattern_end_date] |
|
|
# draw_head_and_shoulders_bottom(ax, ohcl_data, num_start,row) |
|
|
# elif pattern_name == "Double Top, Adam and Adam": |
|
|
# # get the ohlc segment of where the date is between the pattern start and end from ohlc_for_mpf data set where the index is the date |
|
|
# ohlc_segment_double_top = ohlc_for_mpf.loc[pattern_start_date:pattern_end_date] |
|
|
# draw_double_top_aa(ax, ohcl_data, num_start,row) |
|
|
# elif pattern_name == "Double Bottom, Adam and Adam": |
|
|
# ohlc_segment_double_top = ohlc_for_mpf.loc[pattern_start_date:pattern_end_date] |
|
|
# draw_double_bottom_aa(ax, ohcl_data, num_start,row) |
|
|
# elif pattern_name == "Double Bottom, Eve and Adam": |
|
|
# ohlc_segment_double_top = ohlc_for_mpf.loc[pattern_start_date:pattern_end_date] |
|
|
# draw_double_bottom_aa(ax, ohcl_data, num_start,row) |
|
|
|
|
|
|
|
|
if draw_lines: |
|
|
# Get unique legend handles and labels |
|
|
handles, labels = ax.get_legend_handles_labels() |
|
|
unique_labels = {} |
|
|
unique_handles = [] |
|
|
|
|
|
|
|
|
|
|
|
# Initialize storage for unique handles/labels |
|
|
unique_labels = {} |
|
|
unique_handles = [] |
|
|
i= 1 |
|
|
|
|
|
for handle, label in zip(handles, labels): |
|
|
# print(label) |
|
|
|
|
|
# Allow duplication if the label is in pattern_encoding |
|
|
if label in pattern_encoding or label not in unique_labels: |
|
|
if label not in unique_labels: |
|
|
unique_labels[label] = handle |
|
|
unique_handles.append(handle) |
|
|
else: |
|
|
unique_labels[label + f"_{i}"] = handle |
|
|
unique_handles.append(handle) |
|
|
i += 1 |
|
|
|
|
|
|
|
|
ax.legend(unique_handles, unique_labels.keys()) |
|
|
|
|
|
|
|
|
|
|
|
ax.grid(True) |
|
|
plt.show() |
|
|
|
|
|
def plot_pattern_groups_and_finalized_sections(located_patterns_and_other_info, cluster_labled_windows_df ,ohcl_data_given=None): |
|
|
# for each unique Chart Pattern in located_patterns_and_other_info plot the patterns |
|
|
for pattern, group in located_patterns_and_other_info.groupby('Chart Pattern'): |
|
|
# pattern = 'Head-and-shoulders top' |
|
|
print (pattern ," :") |
|
|
print(" Clustered Windows :") |
|
|
plot_pattern_clusters( cluster_labled_windows_df[cluster_labled_windows_df['Chart Pattern'] == pattern],ohcl_data_given=ohcl_data_given) |
|
|
print(" Finalized Section :") |
|
|
plot_pattern_clusters( located_patterns_and_other_info[located_patterns_and_other_info['Chart Pattern'] == pattern],draw_lines=True,ohcl_data_given=ohcl_data_given) |
|
|
""" |
|
|
|
|
|
|