|
|
import joblib |
|
|
from utils.eval import intersection_over_union |
|
|
from utils.formatAndPreprocessNewPatterns import get_patetrn_name_by_encoding, get_pattern_encoding_by_name, get_reverse_pattern_encoding |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import math |
|
|
from sklearn.cluster import DBSCAN |
|
|
from joblib import Parallel, delayed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_PATH = 'Models/Width Aug OHLC_mini_rocket_xgb.joblib' |
|
|
try: |
|
|
rocket_model_global = joblib.load(MODEL_PATH) |
|
|
except FileNotFoundError: |
|
|
print(f"Error: Model file not found at {MODEL_PATH}. Please ensure the path is correct.") |
|
|
|
|
|
rocket_model_global = None |
|
|
|
|
|
pattern_encoding_reversed_global = get_reverse_pattern_encoding() |
|
|
|
|
|
|
|
|
WIN_SIZE_PROPORTIONS = np.round(np.logspace(0, np.log10(20), num=10), 2).tolist() |
|
|
PADDING_PROPORTION = 0.6 |
|
|
STRIDE = 1 |
|
|
|
|
|
PROBABILITY_THRESHOLD_LIST = [0.8884, 0.8676, 0.5620, 0.5596, 0.5132, 0.8367, 0.7635] |
|
|
PROB_THRESHOLD_NO_PATTERN = 0.5 |
|
|
|
|
|
|
|
|
DBSCAN_EPS = 0.04 |
|
|
DBSCAN_MIN_SAMPLES = 3 |
|
|
|
|
|
|
|
|
|
|
|
def _process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed, seg_start, seg_end, window_size, padding_proportion, prob_threshold_of_no_pattern_to_mark_as_no_pattern=1): |
|
|
"""Processes a single window of OHLC data to predict patterns.""" |
|
|
start_index = i - math.ceil(window_size * padding_proportion) |
|
|
end_index = start_index + window_size |
|
|
|
|
|
start_index = max(start_index, 0) |
|
|
end_index = min(end_index, len(ohlc_data_segment)) |
|
|
|
|
|
ohlc_segment = ohlc_data_segment[start_index:end_index] |
|
|
if len(ohlc_segment) == 0: |
|
|
return None |
|
|
|
|
|
win_start_date = ohlc_segment['Date'].iloc[0] |
|
|
win_end_date = ohlc_segment['Date'].iloc[-1] |
|
|
|
|
|
|
|
|
ohlc_array_for_rocket = ohlc_segment[['Open', 'High', 'Low', 'Close', 'Volume']].to_numpy().reshape(1, len(ohlc_segment), 5) |
|
|
ohlc_array_for_rocket = np.transpose(ohlc_array_for_rocket, (0, 2, 1)) |
|
|
|
|
|
try: |
|
|
pattern_probabilities = rocket_model.predict_proba(ohlc_array_for_rocket) |
|
|
except Exception as e: |
|
|
|
|
|
return None |
|
|
|
|
|
max_probability = np.max(pattern_probabilities) |
|
|
|
|
|
no_pattern_encoding = get_pattern_encoding_by_name('No Pattern') |
|
|
if no_pattern_encoding is None: |
|
|
|
|
|
no_pattern_proba = 0 |
|
|
else: |
|
|
no_pattern_proba = pattern_probabilities[0][no_pattern_encoding] |
|
|
|
|
|
pattern_index = np.argmax(pattern_probabilities) |
|
|
|
|
|
pred_proba = max_probability |
|
|
pred_pattern = get_patetrn_name_by_encoding(pattern_index) |
|
|
|
|
|
if no_pattern_proba >= prob_threshold_of_no_pattern_to_mark_as_no_pattern: |
|
|
pred_proba = no_pattern_proba |
|
|
pred_pattern = 'No Pattern' |
|
|
|
|
|
return { |
|
|
'Start': win_start_date, 'End': win_end_date, 'Chart Pattern': pred_pattern, |
|
|
'Seg_Start': seg_start, 'Seg_End': seg_end, 'Probability': pred_proba |
|
|
} |
|
|
|
|
|
def _parallel_process_sliding_window(ohlc_data_segment, rocket_model, probability_threshold, stride, pattern_encoding_reversed, window_size, padding_proportion, prob_threshold_of_no_pattern_to_mark_as_no_pattern=1, parallel=True, num_cores=16, verbose_level=1): |
|
|
"""Applies sliding window pattern detection in parallel or sequentially.""" |
|
|
seg_start = ohlc_data_segment['Date'].iloc[0] |
|
|
seg_end = ohlc_data_segment['Date'].iloc[-1] |
|
|
|
|
|
common_args = { |
|
|
'ohlc_data_segment': ohlc_data_segment, |
|
|
'rocket_model': rocket_model, |
|
|
'probability_threshold': probability_threshold, |
|
|
'pattern_encoding_reversed': pattern_encoding_reversed, |
|
|
'window_size': window_size, |
|
|
'seg_start': seg_start, |
|
|
'seg_end': seg_end, |
|
|
'padding_proportion': padding_proportion, |
|
|
'prob_threshold_of_no_pattern_to_mark_as_no_pattern': prob_threshold_of_no_pattern_to_mark_as_no_pattern |
|
|
} |
|
|
|
|
|
if parallel: |
|
|
with Parallel(n_jobs=num_cores, verbose=verbose_level) as parallel_executor: |
|
|
results = parallel_executor( |
|
|
delayed(_process_window)(i=i, **common_args) |
|
|
for i in range(0, len(ohlc_data_segment), stride) |
|
|
) |
|
|
else: |
|
|
results = [] |
|
|
total_iterations = len(range(0, len(ohlc_data_segment), stride)) |
|
|
for i_idx, i in enumerate(range(0, len(ohlc_data_segment), stride)): |
|
|
res = _process_window(i=i, **common_args) |
|
|
if res is not None: |
|
|
results.append(res) |
|
|
if verbose_level > 0: |
|
|
print(f"Processing window {i_idx + 1} of {total_iterations}...") |
|
|
|
|
|
return pd.DataFrame([res for res in results if res is not None]) |
|
|
|
|
|
def _prepare_dataset_for_cluster(ohlc_data_segment, win_results_df): |
|
|
"""Adds position-based features to window results for clustering.""" |
|
|
predicted_patterns = win_results_df.copy() |
|
|
|
|
|
for index, row in predicted_patterns.iterrows(): |
|
|
pattern_start_date = row['Start'] |
|
|
pattern_end_date = row['End'] |
|
|
|
|
|
start_point_index = len(ohlc_data_segment[ohlc_data_segment['Date'] < pattern_start_date]) |
|
|
pattern_len = len(ohlc_data_segment[(ohlc_data_segment['Date'] >= pattern_start_date) & (ohlc_data_segment['Date'] <= pattern_end_date)]) |
|
|
|
|
|
pattern_mid_index = start_point_index + (pattern_len / 2.0) |
|
|
|
|
|
predicted_patterns.at[index, 'Center'] = pattern_mid_index |
|
|
predicted_patterns.at[index, 'Pattern_Start_pos'] = start_point_index |
|
|
predicted_patterns.at[index, 'Pattern_End_pos'] = start_point_index + pattern_len |
|
|
return predicted_patterns |
|
|
|
|
|
def _cluster_windows(predicted_patterns, probability_threshold, eps=0.05, min_samples_dbscan=2): |
|
|
"""Clusters detected pattern windows using DBSCAN. |
|
|
min_samples_dbscan is the min_samples for DBSCAN algorithm itself. |
|
|
The overlap check for intersected_clusters will also use this value. |
|
|
""" |
|
|
df = predicted_patterns.copy() |
|
|
|
|
|
if isinstance(probability_threshold, list): |
|
|
temp_dfs = [] |
|
|
|
|
|
|
|
|
|
|
|
for i, p_thresh in enumerate(probability_threshold): |
|
|
pattern_name = get_patetrn_name_by_encoding(i) |
|
|
if pattern_name: |
|
|
temp_dfs.append(df[(df['Chart Pattern'] == pattern_name) & (df['Probability'] >= p_thresh)]) |
|
|
if temp_dfs: |
|
|
df = pd.concat(temp_dfs) if temp_dfs else pd.DataFrame(columns=df.columns) |
|
|
else: |
|
|
df = pd.DataFrame(columns=df.columns) |
|
|
else: |
|
|
df = df[df['Probability'] >= probability_threshold] |
|
|
|
|
|
if df.empty: |
|
|
return pd.DataFrame(), pd.DataFrame() |
|
|
|
|
|
cluster_labled_windows_list = [] |
|
|
interseced_clusters_list = [] |
|
|
|
|
|
|
|
|
min_center_val = df['Center'].min() |
|
|
max_center_val = df['Center'].max() |
|
|
|
|
|
for pattern, group in df.groupby('Chart Pattern'): |
|
|
if group.empty: |
|
|
continue |
|
|
|
|
|
centers = group['Center'].values.reshape(-1, 1) |
|
|
|
|
|
if min_center_val < max_center_val: |
|
|
norm_centers = (centers - min_center_val) / (max_center_val - min_center_val) |
|
|
elif len(centers) > 0 : |
|
|
norm_centers = np.zeros_like(centers) |
|
|
else: |
|
|
norm_centers = np.array([]) |
|
|
|
|
|
if len(norm_centers) == 0: |
|
|
group['Cluster'] = -1 |
|
|
cluster_labled_windows_list.append(group) |
|
|
continue |
|
|
|
|
|
current_min_samples_for_dbscan = min(min_samples_dbscan, len(norm_centers)) |
|
|
if current_min_samples_for_dbscan < 1 and len(norm_centers) > 0 : |
|
|
current_min_samples_for_dbscan = 1 |
|
|
elif len(norm_centers) == 0: |
|
|
group['Cluster'] = -1 |
|
|
cluster_labled_windows_list.append(group) |
|
|
continue |
|
|
|
|
|
db = DBSCAN(eps=eps, min_samples=current_min_samples_for_dbscan).fit(norm_centers) |
|
|
group['Cluster'] = db.labels_ |
|
|
cluster_labled_windows_list.append(group) |
|
|
|
|
|
for cluster_id, cluster_group in group[group['Cluster'] != -1].groupby('Cluster'): |
|
|
expanded_dates = [] |
|
|
for _, row_cg in cluster_group.iterrows(): |
|
|
|
|
|
try: |
|
|
dates = pd.date_range(start=pd.to_datetime(row_cg["Start"]), end=pd.to_datetime(row_cg["End"])) |
|
|
expanded_dates.extend(dates) |
|
|
except Exception as e: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
if not expanded_dates: |
|
|
continue |
|
|
|
|
|
date_counts = pd.Series(expanded_dates).value_counts().sort_index() |
|
|
|
|
|
|
|
|
overlapping_dates = date_counts[date_counts >= min_samples_dbscan] |
|
|
if overlapping_dates.empty: |
|
|
continue |
|
|
|
|
|
cluster_start = overlapping_dates.index.min() |
|
|
cluster_end = overlapping_dates.index.max() |
|
|
|
|
|
interseced_clusters_list.append({ |
|
|
'Chart Pattern': pattern, |
|
|
'Cluster': cluster_id, |
|
|
'Start': cluster_start, |
|
|
'End': cluster_end, |
|
|
'Seg_Start': cluster_group['Seg_Start'].iloc[0], |
|
|
'Seg_End': cluster_group['Seg_End'].iloc[0], |
|
|
'Avg_Probability': cluster_group['Probability'].mean(), |
|
|
}) |
|
|
|
|
|
final_cluster_labled_df = pd.concat(cluster_labled_windows_list) if cluster_labled_windows_list else pd.DataFrame(columns=df.columns if not df.empty else []) |
|
|
if 'Cluster' not in final_cluster_labled_df.columns and not final_cluster_labled_df.empty: |
|
|
final_cluster_labled_df['Cluster'] = -1 |
|
|
|
|
|
final_interseced_df = pd.DataFrame(interseced_clusters_list) |
|
|
|
|
|
return final_cluster_labled_df, final_interseced_df |
|
|
|
|
|
|
|
|
|
|
|
def locate_patterns(ohlc_data: pd.DataFrame, |
|
|
patterns_to_return: list = None, |
|
|
model=None, |
|
|
pattern_encoding_reversed=None, |
|
|
win_size_proportions: list = None, |
|
|
padding_proportion: float = PADDING_PROPORTION, |
|
|
stride: int = STRIDE, |
|
|
probability_threshold = None, |
|
|
prob_threshold_of_no_pattern_to_mark_as_no_pattern: float = PROB_THRESHOLD_NO_PATTERN, |
|
|
dbscan_eps: float = DBSCAN_EPS, |
|
|
dbscan_min_samples: int = DBSCAN_MIN_SAMPLES, |
|
|
enable_plotting: bool = False, |
|
|
parallel_processing: bool = True, |
|
|
num_cores_parallel: int = 16, |
|
|
parallel_verbose_level: int = 1 |
|
|
): |
|
|
""" |
|
|
Locates financial chart patterns in OHLC data using a sliding window approach and clustering. |
|
|
""" |
|
|
active_model = model if model is not None else rocket_model_global |
|
|
active_pattern_encoding_rev = pattern_encoding_reversed if pattern_encoding_reversed is not None else pattern_encoding_reversed_global |
|
|
active_win_size_proportions = win_size_proportions if win_size_proportions is not None else WIN_SIZE_PROPORTIONS |
|
|
active_probability_threshold = probability_threshold if probability_threshold is not None else PROBABILITY_THRESHOLD_LIST |
|
|
|
|
|
if active_model is None: |
|
|
print("Error: Pattern detection model is not loaded. Cannot proceed.") |
|
|
return pd.DataFrame() |
|
|
|
|
|
ohlc_data_segment = ohlc_data.copy() |
|
|
ohlc_data_segment['Date'] = pd.to_datetime(ohlc_data_segment['Date']) |
|
|
seg_len = len(ohlc_data_segment) |
|
|
|
|
|
if ohlc_data_segment.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
win_results_for_each_size = [] |
|
|
located_patterns_and_other_info_for_each_size = [] |
|
|
cluster_labled_windows_list = [] |
|
|
used_win_sizes = [] |
|
|
global_cluster_id_offset = 0 |
|
|
|
|
|
for win_prop in active_win_size_proportions: |
|
|
window_size = seg_len // win_prop if win_prop > 0 else seg_len |
|
|
window_size = int(max(10, window_size)) |
|
|
|
|
|
if window_size in used_win_sizes: |
|
|
continue |
|
|
used_win_sizes.append(window_size) |
|
|
|
|
|
win_results_df = _parallel_process_sliding_window( |
|
|
ohlc_data_segment, active_model, active_probability_threshold, stride, |
|
|
active_pattern_encoding_rev, window_size, padding_proportion, |
|
|
prob_threshold_of_no_pattern_to_mark_as_no_pattern, |
|
|
parallel=parallel_processing, num_cores=num_cores_parallel, |
|
|
verbose_level=parallel_verbose_level |
|
|
) |
|
|
|
|
|
if win_results_df.empty: |
|
|
continue |
|
|
win_results_df['Window_Size'] = window_size |
|
|
|
|
|
|
|
|
predicted_patterns_for_cluster = _prepare_dataset_for_cluster(ohlc_data_segment, win_results_df) |
|
|
if predicted_patterns_for_cluster.empty: |
|
|
continue |
|
|
|
|
|
|
|
|
temp_cluster_labled_windows_df, temp_interseced_clusters_df = _cluster_windows( |
|
|
predicted_patterns_for_cluster, active_probability_threshold, |
|
|
eps=dbscan_eps, min_samples_dbscan=dbscan_min_samples |
|
|
) |
|
|
|
|
|
if temp_cluster_labled_windows_df.empty or temp_interseced_clusters_df.empty: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
non_noise_clusters_mask_labeled = temp_cluster_labled_windows_df['Cluster'] != -1 |
|
|
if non_noise_clusters_mask_labeled.any(): |
|
|
temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'] = \ |
|
|
temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'].astype(int) + global_cluster_id_offset |
|
|
|
|
|
|
|
|
|
|
|
if not temp_interseced_clusters_df.empty: |
|
|
temp_interseced_clusters_df['Cluster'] = temp_interseced_clusters_df['Cluster'].astype(int) + global_cluster_id_offset |
|
|
|
|
|
current_max_cluster_id_in_batch = -1 |
|
|
if not temp_interseced_clusters_df.empty and 'Cluster' in temp_interseced_clusters_df.columns: |
|
|
valid_clusters = temp_interseced_clusters_df[temp_interseced_clusters_df['Cluster'] != -1]['Cluster'] |
|
|
if not valid_clusters.empty: |
|
|
current_max_cluster_id_in_batch = valid_clusters.max() |
|
|
|
|
|
cluster_labled_windows_list.append(temp_cluster_labled_windows_df) |
|
|
|
|
|
temp_interseced_clusters_df['Calc_Start'] = temp_interseced_clusters_df['Start'] |
|
|
temp_interseced_clusters_df['Calc_End'] = temp_interseced_clusters_df['End'] |
|
|
located_patterns_info = temp_interseced_clusters_df.copy() |
|
|
located_patterns_info['Window_Size'] = window_size |
|
|
located_patterns_and_other_info_for_each_size.append(located_patterns_info) |
|
|
|
|
|
if current_max_cluster_id_in_batch > -1 : |
|
|
global_cluster_id_offset = current_max_cluster_id_in_batch + 1 |
|
|
elif non_noise_clusters_mask_labeled.any(): |
|
|
max_labeled_cluster = temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'].max() |
|
|
global_cluster_id_offset = max_labeled_cluster + 1 |
|
|
|
|
|
|
|
|
if not located_patterns_and_other_info_for_each_size: |
|
|
return pd.DataFrame() |
|
|
|
|
|
all_located_patterns_df = pd.concat(located_patterns_and_other_info_for_each_size, ignore_index=True) |
|
|
if all_located_patterns_df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
unique_chart_patterns = all_located_patterns_df['Chart Pattern'].unique() |
|
|
|
|
|
sorted_unique_window_sizes = np.sort(all_located_patterns_df['Window_Size'].unique())[::-1] |
|
|
|
|
|
final_filtered_patterns_list = [] |
|
|
|
|
|
candidate_patterns_df = all_located_patterns_df.copy() |
|
|
|
|
|
if 'taken' not in candidate_patterns_df.columns: |
|
|
candidate_patterns_df['taken'] = False |
|
|
else: |
|
|
candidate_patterns_df['taken'] = False |
|
|
|
|
|
|
|
|
for cp_val in unique_chart_patterns: |
|
|
for ws_val in sorted_unique_window_sizes: |
|
|
|
|
|
current_batch_indices = candidate_patterns_df[ |
|
|
(candidate_patterns_df['Chart Pattern'] == cp_val) & |
|
|
(candidate_patterns_df['Window_Size'] == ws_val) & |
|
|
(~candidate_patterns_df['taken']) |
|
|
].index |
|
|
|
|
|
for current_idx in current_batch_indices: |
|
|
if candidate_patterns_df.loc[current_idx, 'taken']: |
|
|
continue |
|
|
|
|
|
current_row_data = candidate_patterns_df.loc[current_idx] |
|
|
final_filtered_patterns_list.append(current_row_data.drop('taken')) |
|
|
candidate_patterns_df.loc[current_idx, 'taken'] = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
overlapping_candidates_indices = candidate_patterns_df[ |
|
|
(candidate_patterns_df.index != current_idx) & |
|
|
(candidate_patterns_df['Chart Pattern'] == cp_val) & |
|
|
(~candidate_patterns_df['taken']) & |
|
|
(candidate_patterns_df['Calc_Start'] <= current_row_data['Calc_End']) & |
|
|
(candidate_patterns_df['Calc_End'] >= current_row_data['Calc_Start']) |
|
|
].index |
|
|
|
|
|
for ov_idx in overlapping_candidates_indices: |
|
|
ov_row_data = candidate_patterns_df.loc[ov_idx] |
|
|
iou = intersection_over_union(current_row_data['Calc_Start'], current_row_data['Calc_End'], |
|
|
ov_row_data['Calc_Start'], ov_row_data['Calc_End']) |
|
|
if iou > 0.6: |
|
|
|
|
|
|
|
|
|
|
|
is_ov_preferred = (ov_row_data['Window_Size'] < current_row_data['Window_Size']) and \ |
|
|
((ov_row_data['Avg_Probability'] - current_row_data['Avg_Probability']) > 0.1) |
|
|
|
|
|
if not is_ov_preferred: |
|
|
candidate_patterns_df.loc[ov_idx, 'taken'] = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
filtered_loc_pat_and_info_df = pd.DataFrame(final_filtered_patterns_list) |
|
|
if not filtered_loc_pat_and_info_df.empty: |
|
|
|
|
|
filtered_loc_pat_and_info_df = filtered_loc_pat_and_info_df.sort_values( |
|
|
by=['Chart Pattern', 'Calc_Start', 'Window_Size', 'Avg_Probability'], |
|
|
ascending=[True, True, False, False] |
|
|
).drop_duplicates( |
|
|
subset=['Chart Pattern', 'Calc_Start', 'Calc_End'], |
|
|
keep='first' |
|
|
).sort_values(by='Calc_Start').reset_index(drop=True) |
|
|
|
|
|
|
|
|
if enable_plotting and not filtered_loc_pat_and_info_df.empty and cluster_labled_windows_list: |
|
|
|
|
|
pass |
|
|
|
|
|
if patterns_to_return and not filtered_loc_pat_and_info_df.empty: |
|
|
return filtered_loc_pat_and_info_df[filtered_loc_pat_and_info_df['Chart Pattern'].isin(patterns_to_return)] |
|
|
|
|
|
return filtered_loc_pat_and_info_df |
|
|
|
|
|
|