chart-pattern-locator / utils /patternLocating.py
yasirapunsith's picture
add files
c9b7b21
import joblib
from tqdm import tqdm
from utils.eval import intersection_over_union
from utils.formatAndPreprocessNewPatterns import get_patetrn_name_by_encoding, get_pattern_encoding_by_name, get_reverse_pattern_encoding
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
import math
from sklearn.cluster import DBSCAN
path = 'Datasets/OHLC data'
def process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed,seg_start, seg_end, window_size, padding_proportion,prob_threshold_of_no_pattern_to_mark_as_no_pattern=1):
start_index = i - math.ceil(window_size * padding_proportion)
end_index = start_index + window_size
start_index = max(start_index, 0)
end_index = min(end_index, len(ohlc_data_segment))
ohlc_segment = ohlc_data_segment[start_index:end_index]
if len(ohlc_segment) == 0:
return None # Skip empty segments
win_start_date = ohlc_segment['Date'].iloc[0]
win_end_date = ohlc_segment['Date'].iloc[-1]
# print("ohlc befor :" , ohlc_segment)
ohlc_array_for_rocket = ohlc_segment[['Open', 'High', 'Low', 'Close','Volume']].to_numpy().reshape(1, len(ohlc_segment), 5)
ohlc_array_for_rocket = np.transpose(ohlc_array_for_rocket, (0, 2, 1))
# print( "ohlc for rocket :" , ohlc_array_for_rocket)
try:
pattern_probabilities = rocket_model.predict_proba(ohlc_array_for_rocket)
except Exception as e:
print(f"Error in prediction: {e}")
return None
max_probability = np.max(pattern_probabilities)
# print(pattern_probabilities)
# print(f"Predicted Pattern: {pattern_encoding_reversed[np.argmax(pattern_probabilities)]} with probability: {max_probability} in num {i} window")
# if max_probability > probability_threshold:
no_pattern_proba = pattern_probabilities[0][get_pattern_encoding_by_name ('No Pattern')]
pattern_index = np.argmax(pattern_probabilities)
pred_proba = max_probability
pred_pattern = get_patetrn_name_by_encoding(pattern_index)
if no_pattern_proba > prob_threshold_of_no_pattern_to_mark_as_no_pattern:
pred_proba = no_pattern_proba
pred_pattern = 'No Pattern'
new_row = {
'Start': win_start_date, 'End': win_end_date, 'Chart Pattern': pred_pattern, 'Seg_Start': seg_start, 'Seg_End': seg_end ,
'Probability': pred_proba
}
# plot_patterns_for_segment(test_seg_id, pd.DataFrame([new_row]), ohlc_data_segment)
return new_row
# return None
def parallel_process_sliding_window(ohlc_data_segment, rocket_model, probability_threshold, stride, pattern_encoding_reversed, window_size, padding_proportion,prob_threshold_of_no_pattern_to_mark_as_no_pattern=1,parallel=True,num_cores=-1):
# get the start and end dates of the ohlc data
seg_start = ohlc_data_segment['Date'].iloc[0]
seg_end = ohlc_data_segment['Date'].iloc[-1]
if parallel:
# Use Parallel as a context manager to ensure cleanup
with Parallel(n_jobs=num_cores,verbose = 1) as parallel:
results = parallel(
delayed(process_window)(
i=i,
ohlc_data_segment=ohlc_data_segment,
rocket_model=rocket_model,
probability_threshold=probability_threshold,
pattern_encoding_reversed=pattern_encoding_reversed,
window_size=window_size,
seg_start=seg_start,
seg_end=seg_end,
padding_proportion=padding_proportion,
prob_threshold_of_no_pattern_to_mark_as_no_pattern=prob_threshold_of_no_pattern_to_mark_as_no_pattern
)
for i in range(0, len(ohlc_data_segment), stride)
)
# print(f"Finished processing segment {seg_id} for symbol {symbol}")
# print(results)
# Filter out None values and create DataFrame
return pd.DataFrame([res for res in results if res is not None])
else:
# do the sam e thing without parrellel processing
results = []
total_iterations = len(range(0, len(ohlc_data_segment), stride))
for i_idx, i in enumerate(range(0, len(ohlc_data_segment), stride)):
res = process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed, seg_start, seg_end, window_size, padding_proportion)
if res is not None:
results.append(res)
# Progress print statement
print(f"Processing window {i_idx + 1} of {total_iterations}...")
return pd.DataFrame(results)
def prepare_dataset_for_cluster(ohlc_data_segment, win_results_df):
predicted_patterns = win_results_df.copy()
origin_date = ohlc_data_segment['Date'].min()
for index, row in predicted_patterns.iterrows():
pattern_start = row['Start']
pattern_end = row['End']
# get the number of OHLC data points from the origin date to the pattern start date
start_point_index = len(ohlc_data_segment[ohlc_data_segment['Date'] < pattern_start])
pattern_len = len(ohlc_data_segment[(ohlc_data_segment['Date'] >= pattern_start) & (ohlc_data_segment['Date'] <= pattern_end)])
pattern_mid_index = start_point_index + (pattern_len / 2)
# add the center index to a new column Center in the predicted_patterns current row
predicted_patterns.at[index, 'Center'] = pattern_mid_index
predicted_patterns.at[index, 'Pattern_Start_pos'] = start_point_index
predicted_patterns.at[index, 'Pattern_End_pos'] = start_point_index + pattern_len
return predicted_patterns
def cluster_windows(predicted_patterns , probability_threshold, window_size,eps = 0.05 , min_samples = 2):
df = predicted_patterns.copy()
# check if the probability_threshold is a list or a float
if isinstance(probability_threshold, list):
# the list contain the probability thresholds for each chart pattern
# filter the dataframe for each probability threshold
for i in range(len(probability_threshold)):
pattern_name = get_patetrn_name_by_encoding(i)
df.drop(df[(df['Chart Pattern'] == pattern_name) & (df['Probability'] < probability_threshold[i])].index, inplace=True)
# print(f"Filtered {pattern_name} with probability < {probability_threshold[i]}")
else:
# only get the rows that has a probability greater than the probability threshold
df = df[df['Probability'] > probability_threshold]
# Initialize a list to store merged clusters from all groups
cluster_labled_windows = []
interseced_clusters = []
min_center = df['Center'].min()
max_center = df['Center'].max()
# Group by 'Chart Pattern' and apply clustering to each group
for pattern, group in df.groupby('Chart Pattern'):
# print (pattern)
# print(group)
# Clustering
centers = group['Center'].values.reshape(-1, 1)
# centers normalization
if min_center < max_center: # Avoid division by zero
norm_centers = (centers - min_center) / (max_center - min_center)
else:
# If all values are the same, set to constant (e.g., 0 or 1)
norm_centers = np.ones_like(centers)
# eps =window_size/2 + 4
db = DBSCAN(eps=eps, min_samples=min_samples).fit(norm_centers)
group['Cluster'] = db.labels_
cluster_labled_windows.append(group)
# Filter out noise (-1) and group by Cluster
for cluster_id, cluster_group in group[group['Cluster'] != -1].groupby('Cluster'):
expanded_dates = []
for _, row in cluster_group.iterrows():
# Print the start and end dates for debugging
dates = pd.date_range(row["Start"], row["End"])
expanded_dates.extend(dates)
# print("Total expanded dates:", len(expanded_dates))
# Step 2: Count occurrences of each date
date_counts = pd.Series(expanded_dates).value_counts().sort_index()
# Step 3: Identify cluster start and end (where at least 2 windows overlap)
cluster_start = date_counts[date_counts >= 2].index.min()
cluster_end = date_counts[date_counts >= 2].index.max()
interseced_clusters.append({
# 'Seg_ID' : cluster_group['Seg_ID'].iloc[0],
# 'Symbol' : cluster_group['Symbol'].iloc[0],
'Chart Pattern': pattern,
'Cluster': cluster_id,
'Start': cluster_start,
'End': cluster_end,
'Seg_Start': cluster_group['Seg_Start'].iloc[0],
'Seg_End': cluster_group['Seg_End'].iloc[0],
'Avg_Probability': cluster_group['Probability'].mean(),
})
if len(cluster_labled_windows) == 0 or len(interseced_clusters) == 0:
return None,None
# # Combine all merged clusters into a final DataFrame
cluster_labled_windows_df = pd.concat(cluster_labled_windows)
interseced_clusters_df = pd.DataFrame(interseced_clusters)
# sort by the index
cluster_labled_windows_df = cluster_labled_windows_df.sort_index()
# print(cluster_labled_windows_df)
# Display the result
# print(merged_df)
return cluster_labled_windows_df,interseced_clusters_df
# =========================Advance Locator ==========================
pattern_encoding_reversed = get_reverse_pattern_encoding()
# load the joblib model at Models\Width Aug OHLC_mini_rocket_xgb.joblib to use
model = joblib.load('Models/Width Aug OHLC_mini_rocket_xgb.joblib')
plot_count = 0
win_size_proportions = np.round(np.logspace(0, np.log10(20), num=10), 2).tolist()
padding_proportion = 0.6
stride = 1
probab_threshold_list = 0.5
prob_threshold_of_no_pattern_to_mark_as_no_pattern = 0.5
target_len = 30
eps=0.04 # in the dbscan clustering
min_samples=3 # in the dbscan clustering
win_width_proportion=10 # in the dbscan clustering from what amount to divide the width related feature
def locate_patterns(ohlc_data, patterns_to_return= None,model = model , pattern_encoding_reversed= pattern_encoding_reversed,plot_count = 10):
ohlc_data_segment = ohlc_data.copy()
# convert date to datetime
ohlc_data_segment['Date'] = pd.to_datetime(ohlc_data_segment['Date'])
seg_len = len(ohlc_data_segment)
if ohlc_data_segment is None or len(ohlc_data_segment) == 0:
print("OHLC Data segment is empty")
raise Exception("OHLC Data segment is empty")
win_results_for_each_size = []
located_patterns_and_other_info_for_each_size = []
cluster_labled_windows_list = []
used_win_sizes = []
win_iteration = 0
for win_size_proportion in win_size_proportions:
window_size = seg_len // win_size_proportion
# print(f"Win size : {window_size}")
if window_size < 10:
window_size = 10
# elif window_size > 30:
# window_size = 30
# convert to int
window_size = int(window_size)
if window_size in used_win_sizes:
continue
used_win_sizes.append(window_size)
# win_results_df = parallel_process_sliding_window(ohlc_data_segment, model, probability_threshold,stride, pattern_encoding_reversed,group,test_seg_id,window_size, padding_proportion, len_norm, target_len)
win_results_df = parallel_process_sliding_window(ohlc_data_segment, model, probab_threshold_list,stride, pattern_encoding_reversed,window_size, padding_proportion,prob_threshold_of_no_pattern_to_mark_as_no_pattern,parallel=True)
if win_results_df is None or len(win_results_df) == 0:
print("Window results dataframe is empty")
continue
win_results_df['Window_Size'] = window_size
win_results_for_each_size.append(win_results_df)
# plot_sliding_steps(win_results_df ,ohlc_data_segment,probability_threshold ,test_seg_id)
predicted_patterns = prepare_dataset_for_cluster(ohlc_data_segment, win_results_df)
if predicted_patterns is None or len(predicted_patterns) == 0:
print("Predicted patterns dataframe is empty")
# print("Predicted Patterns :",predicted_patterns)
# cluster_labled_windows_df , interseced_clusters_df = cluster_windows(predicted_patterns, probability_threshold, window_size)
cluster_labled_windows_df , interseced_clusters_df = cluster_windows(predicted_patterns, probab_threshold_list, window_size)
if cluster_labled_windows_df is None or interseced_clusters_df is None or len(cluster_labled_windows_df) == 0 or len(interseced_clusters_df) == 0:
print("Clustered windows dataframe is empty")
continue
mask = cluster_labled_windows_df['Cluster'] != -1
cluster_labled_windows_df.loc[mask, 'Cluster'] = cluster_labled_windows_df.loc[mask, 'Cluster'].astype(int) + win_iteration
# mask2 = interseced_clusters_df['Cluster'] != -1
interseced_clusters_df['Cluster'] = interseced_clusters_df['Cluster'].astype(int) + win_iteration
num_of_unique_clusters = interseced_clusters_df[interseced_clusters_df['Cluster']!=-1]['Cluster'].nunique()
win_iteration += num_of_unique_clusters
cluster_labled_windows_list.append(cluster_labled_windows_df)
# located_patterns_and_other_info = functional_pattern_filter_and_point_recognition(interseced_clusters_df)
interseced_clusters_df['Calc_Start'] = interseced_clusters_df['Start']
interseced_clusters_df['Calc_End'] = interseced_clusters_df['End']
located_patterns_and_other_info = interseced_clusters_df.copy()
if located_patterns_and_other_info is None or len(located_patterns_and_other_info) == 0:
print("]Located patterns and other info dataframe is empty")
continue
# Remove plotting call
# plot_pattern_groups_and_finalized_sections(located_patterns_and_other_info, cluster_labled_windows_df, test_seg_id)
located_patterns_and_other_info['Window_Size'] = window_size
located_patterns_and_other_info_for_each_size.append(located_patterns_and_other_info)
if located_patterns_and_other_info_for_each_size is None or len(located_patterns_and_other_info_for_each_size) == 0 or win_results_for_each_size is None or len(win_results_for_each_size) == 0:
print("Located patterns and other info for each size is empty")
return None
located_patterns_and_other_info_for_each_size_df = pd.concat(located_patterns_and_other_info_for_each_size)
win_results_for_each_size_df = pd.concat(win_results_for_each_size, ignore_index=True)
# window_results_list.append(win_results_for_each_size_df)
# get the set of unique window sizes from located_patterns_and_other_info_for_each_size_df
unique_window_sizes = located_patterns_and_other_info_for_each_size_df['Window_Size'].unique()
unique_patterns = located_patterns_and_other_info_for_each_size_df['Chart Pattern'].unique()
# sort the unique_window_sizes descending order
unique_window_sizes = np.sort(unique_window_sizes)[::-1]
filtered_loc_pat_and_info_rows_list = []
for chart_pattern in unique_patterns:
located_patterns_and_other_info_for_each_size_df_chart_pattern = located_patterns_and_other_info_for_each_size_df[located_patterns_and_other_info_for_each_size_df['Chart Pattern'] == chart_pattern]
for win_size in unique_window_sizes:
located_patterns_and_other_info_for_each_size_df_win_size_chart_pattern = located_patterns_and_other_info_for_each_size_df_chart_pattern[located_patterns_and_other_info_for_each_size_df_chart_pattern['Window_Size'] == win_size]
for idx , row in located_patterns_and_other_info_for_each_size_df_win_size_chart_pattern.iterrows():
start_date = row['Calc_Start']
end_date = row['Calc_End']
is_already_included = False
# check if there are any other rows that intersect with the start and end dates with the same chart pattern
intersecting_rows = located_patterns_and_other_info_for_each_size_df_chart_pattern[
(located_patterns_and_other_info_for_each_size_df_chart_pattern['Calc_Start'] <= end_date) &
(located_patterns_and_other_info_for_each_size_df_chart_pattern['Calc_End'] >= start_date)
]
is_already_included = False
for idx2, row2 in intersecting_rows.iterrows():
iou = intersection_over_union(start_date, end_date, row2['Calc_Start'], row2['Calc_End'])
if iou > 0.6:
# Case 1: Larger window already exists
if row2['Window_Size'] > row['Window_Size']:
# Case 1A: But smaller one has significantly higher probability, keep it instead
if (row['Avg_Probability'] - row2['Avg_Probability']) > 0.1:
is_already_included = False
else:
is_already_included = True
break # Keep large, skip current(small)
# Case 2: Equal or smaller window exists, possibly overlapping
elif row['Window_Size'] >= row2['Window_Size']:
# If current row has significantly better probability, replace existing
if (row2['Avg_Probability'] - row['Avg_Probability']) > 0.1:
is_already_included = True
break # remove current (large) , keep small
else:
is_already_included = False
# break
if not is_already_included:
filtered_loc_pat_and_info_rows_list.append(row)
# convert the filtered_loc_pat_and_info_rows_list to a dataframe
filtered_loc_pat_and_info_df = pd.DataFrame(filtered_loc_pat_and_info_rows_list)
# located_patterns_and_other_info_list.append(filtered_loc_pat_and_info_df)
if cluster_labled_windows_list is None or len(cluster_labled_windows_list) == 0:
print("Clustered windows list is empty")
cluster_labled_windows_df_conc = pd.concat(cluster_labled_windows_list)
# Remove plotting code
"""
if plot_count > 0:
plot_pattern_groups_and_finalized_sections(filtered_loc_pat_and_info_df, cluster_labled_windows_df_conc,ohcl_data_given=ohlc_data_segment)
plot_count -= 1
"""
if patterns_to_return is None or len(patterns_to_return) == 0:
return filtered_loc_pat_and_info_df
else:
# filter the filtered_loc_pat_and_info_df based on the patterns_to_return
filtered_loc_pat_and_info_df = filtered_loc_pat_and_info_df[filtered_loc_pat_and_info_df['Chart Pattern'].isin(patterns_to_return)]
return filtered_loc_pat_and_info_df