chart-pattern-locator / utils /patternLocatingGemni.py
yasirapunsith's picture
add files
c9b7b21
import joblib
from utils.eval import intersection_over_union
from utils.formatAndPreprocessNewPatterns import get_patetrn_name_by_encoding, get_pattern_encoding_by_name, get_reverse_pattern_encoding
import pandas as pd
import numpy as np
import math
from sklearn.cluster import DBSCAN
from joblib import Parallel, delayed
# Remove matplotlib imports and plotting function import
# import matplotlib.pyplot as plt
# from utils.functionalPatternLocateAndPlot import plot_pattern_groups_and_finalized_sections
# --- Global Configuration & Model Loading ---
# Load the pre-trained model and pattern encodings
# It's assumed 'Models/Width Aug OHLC_mini_rocket_xgb.joblib' is in the correct path
MODEL_PATH = 'Models/Width Aug OHLC_mini_rocket_xgb.joblib'
try:
rocket_model_global = joblib.load(MODEL_PATH)
except FileNotFoundError:
print(f"Error: Model file not found at {MODEL_PATH}. Please ensure the path is correct.")
# You might want to exit or raise an exception here if the model is critical
rocket_model_global = None
pattern_encoding_reversed_global = get_reverse_pattern_encoding()
# Default parameters for the pattern location logic
WIN_SIZE_PROPORTIONS = np.round(np.logspace(0, np.log10(20), num=10), 2).tolist()
PADDING_PROPORTION = 0.6
STRIDE = 1
# Default probability thresholds for pattern identification.
PROBABILITY_THRESHOLD_LIST = [0.8884, 0.8676, 0.5620, 0.5596, 0.5132, 0.8367, 0.7635]
PROB_THRESHOLD_NO_PATTERN = 0.5 # Threshold to mark as 'No Pattern'
# DBSCAN Clustering parameters
DBSCAN_EPS = 0.04
DBSCAN_MIN_SAMPLES = 3
# --- Private Helper Functions ---
def _process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed, seg_start, seg_end, window_size, padding_proportion, prob_threshold_of_no_pattern_to_mark_as_no_pattern=1):
"""Processes a single window of OHLC data to predict patterns."""
start_index = i - math.ceil(window_size * padding_proportion)
end_index = start_index + window_size
start_index = max(start_index, 0)
end_index = min(end_index, len(ohlc_data_segment))
ohlc_segment = ohlc_data_segment[start_index:end_index]
if len(ohlc_segment) == 0:
return None
win_start_date = ohlc_segment['Date'].iloc[0]
win_end_date = ohlc_segment['Date'].iloc[-1]
# Prepare data for Rocket model (reshape and transpose)
ohlc_array_for_rocket = ohlc_segment[['Open', 'High', 'Low', 'Close', 'Volume']].to_numpy().reshape(1, len(ohlc_segment), 5)
ohlc_array_for_rocket = np.transpose(ohlc_array_for_rocket, (0, 2, 1))
try:
pattern_probabilities = rocket_model.predict_proba(ohlc_array_for_rocket)
except Exception as e:
# print(f"Error in prediction for window {i}: {e}") # Optional: for debugging
return None
max_probability = np.max(pattern_probabilities)
# Assuming get_pattern_encoding_by_name returns a valid index or handles errors
no_pattern_encoding = get_pattern_encoding_by_name('No Pattern')
if no_pattern_encoding is None: # Handle case where 'No Pattern' is not in encoding
# print("Warning: 'No Pattern' encoding not found.") # Optional warning
no_pattern_proba = 0
else:
no_pattern_proba = pattern_probabilities[0][no_pattern_encoding]
pattern_index = np.argmax(pattern_probabilities)
pred_proba = max_probability
pred_pattern = get_patetrn_name_by_encoding(pattern_index)
if no_pattern_proba >= prob_threshold_of_no_pattern_to_mark_as_no_pattern: # Use >= for consistency
pred_proba = no_pattern_proba
pred_pattern = 'No Pattern'
return {
'Start': win_start_date, 'End': win_end_date, 'Chart Pattern': pred_pattern,
'Seg_Start': seg_start, 'Seg_End': seg_end, 'Probability': pred_proba
}
def _parallel_process_sliding_window(ohlc_data_segment, rocket_model, probability_threshold, stride, pattern_encoding_reversed, window_size, padding_proportion, prob_threshold_of_no_pattern_to_mark_as_no_pattern=1, parallel=True, num_cores=16, verbose_level=1):
"""Applies sliding window pattern detection in parallel or sequentially."""
seg_start = ohlc_data_segment['Date'].iloc[0]
seg_end = ohlc_data_segment['Date'].iloc[-1]
common_args = {
'ohlc_data_segment': ohlc_data_segment,
'rocket_model': rocket_model,
'probability_threshold': probability_threshold,
'pattern_encoding_reversed': pattern_encoding_reversed,
'window_size': window_size,
'seg_start': seg_start,
'seg_end': seg_end,
'padding_proportion': padding_proportion,
'prob_threshold_of_no_pattern_to_mark_as_no_pattern': prob_threshold_of_no_pattern_to_mark_as_no_pattern
}
if parallel:
with Parallel(n_jobs=num_cores, verbose=verbose_level) as parallel_executor: # User requested verbose
results = parallel_executor(
delayed(_process_window)(i=i, **common_args)
for i in range(0, len(ohlc_data_segment), stride)
)
else:
results = []
total_iterations = len(range(0, len(ohlc_data_segment), stride)) # Optional: for progress
for i_idx, i in enumerate(range(0, len(ohlc_data_segment), stride)):
res = _process_window(i=i, **common_args)
if res is not None:
results.append(res)
if verbose_level > 0: # Basic progress for sequential
print(f"Processing window {i_idx + 1} of {total_iterations}...")
return pd.DataFrame([res for res in results if res is not None])
def _prepare_dataset_for_cluster(ohlc_data_segment, win_results_df):
"""Adds position-based features to window results for clustering."""
predicted_patterns = win_results_df.copy()
for index, row in predicted_patterns.iterrows():
pattern_start_date = row['Start']
pattern_end_date = row['End']
start_point_index = len(ohlc_data_segment[ohlc_data_segment['Date'] < pattern_start_date])
pattern_len = len(ohlc_data_segment[(ohlc_data_segment['Date'] >= pattern_start_date) & (ohlc_data_segment['Date'] <= pattern_end_date)])
pattern_mid_index = start_point_index + (pattern_len / 2.0) # Use float division
predicted_patterns.at[index, 'Center'] = pattern_mid_index
predicted_patterns.at[index, 'Pattern_Start_pos'] = start_point_index
predicted_patterns.at[index, 'Pattern_End_pos'] = start_point_index + pattern_len
return predicted_patterns
def _cluster_windows(predicted_patterns, probability_threshold, eps=0.05, min_samples_dbscan=2):
"""Clusters detected pattern windows using DBSCAN.
min_samples_dbscan is the min_samples for DBSCAN algorithm itself.
The overlap check for intersected_clusters will also use this value.
"""
df = predicted_patterns.copy()
if isinstance(probability_threshold, list):
temp_dfs = []
# Ensure probability_threshold list length matches number of encodable patterns if used directly with get_patetrn_name_by_encoding(i)
# Or, better, iterate through unique patterns present in df if threshold list is a dict or structured differently.
# Assuming probability_threshold list is indexed corresponding to pattern encodings from 0 to N-1
for i, p_thresh in enumerate(probability_threshold):
pattern_name = get_patetrn_name_by_encoding(i)
if pattern_name:
temp_dfs.append(df[(df['Chart Pattern'] == pattern_name) & (df['Probability'] >= p_thresh)])
if temp_dfs:
df = pd.concat(temp_dfs) if temp_dfs else pd.DataFrame(columns=df.columns)
else:
df = pd.DataFrame(columns=df.columns)
else: # single float threshold
df = df[df['Probability'] >= probability_threshold] # Changed > to >=
if df.empty:
return pd.DataFrame(), pd.DataFrame()
cluster_labled_windows_list = []
interseced_clusters_list = []
# Normalize 'Center' for DBSCAN if there's variance
min_center_val = df['Center'].min()
max_center_val = df['Center'].max()
for pattern, group in df.groupby('Chart Pattern'):
if group.empty:
continue
centers = group['Center'].values.reshape(-1, 1)
if min_center_val < max_center_val: # Avoid division by zero if all centers are same
norm_centers = (centers - min_center_val) / (max_center_val - min_center_val)
elif len(centers) > 0 : # All centers are the same, no real distance variance
norm_centers = np.zeros_like(centers) # Treat as single point for clustering
else: # Empty group after filtering, should not happen if group.empty() check passed
norm_centers = np.array([])
if len(norm_centers) == 0:
group['Cluster'] = -1
cluster_labled_windows_list.append(group)
continue
current_min_samples_for_dbscan = min(min_samples_dbscan, len(norm_centers))
if current_min_samples_for_dbscan < 1 and len(norm_centers) > 0 :
current_min_samples_for_dbscan = 1
elif len(norm_centers) == 0:
group['Cluster'] = -1
cluster_labled_windows_list.append(group)
continue
db = DBSCAN(eps=eps, min_samples=current_min_samples_for_dbscan).fit(norm_centers)
group['Cluster'] = db.labels_
cluster_labled_windows_list.append(group)
for cluster_id, cluster_group in group[group['Cluster'] != -1].groupby('Cluster'):
expanded_dates = []
for _, row_cg in cluster_group.iterrows(): # Renamed 'row' to 'row_cg' to avoid conflict
# Ensure Start and End are valid datetime objects
try:
dates = pd.date_range(start=pd.to_datetime(row_cg["Start"]), end=pd.to_datetime(row_cg["End"]))
expanded_dates.extend(dates)
except Exception as e:
# print(f"Warning: Could not create date range for row: {row_cg}. Error: {e}") # Optional
continue
if not expanded_dates:
continue
date_counts = pd.Series(expanded_dates).value_counts().sort_index()
# Use min_samples_dbscan for defining a significant overlap
overlapping_dates = date_counts[date_counts >= min_samples_dbscan]
if overlapping_dates.empty:
continue
cluster_start = overlapping_dates.index.min()
cluster_end = overlapping_dates.index.max()
interseced_clusters_list.append({
'Chart Pattern': pattern,
'Cluster': cluster_id, # This ID is local to the (pattern, window_size) batch
'Start': cluster_start,
'End': cluster_end,
'Seg_Start': cluster_group['Seg_Start'].iloc[0],
'Seg_End': cluster_group['Seg_End'].iloc[0],
'Avg_Probability': cluster_group['Probability'].mean(),
})
final_cluster_labled_df = pd.concat(cluster_labled_windows_list) if cluster_labled_windows_list else pd.DataFrame(columns=df.columns if not df.empty else [])
if 'Cluster' not in final_cluster_labled_df.columns and not final_cluster_labled_df.empty:
final_cluster_labled_df['Cluster'] = -1 # Default if no clusters formed but df had data
final_interseced_df = pd.DataFrame(interseced_clusters_list)
return final_cluster_labled_df, final_interseced_df
# --- Public API Function ---
def locate_patterns(ohlc_data: pd.DataFrame,
patterns_to_return: list = None,
model=None,
pattern_encoding_reversed=None,
win_size_proportions: list = None,
padding_proportion: float = PADDING_PROPORTION,
stride: int = STRIDE,
probability_threshold = None,
prob_threshold_of_no_pattern_to_mark_as_no_pattern: float = PROB_THRESHOLD_NO_PATTERN,
dbscan_eps: float = DBSCAN_EPS,
dbscan_min_samples: int = DBSCAN_MIN_SAMPLES,
enable_plotting: bool = False, # Keep parameter but ignore it
parallel_processing: bool = True,
num_cores_parallel: int = 16,
parallel_verbose_level: int = 1
):
"""
Locates financial chart patterns in OHLC data using a sliding window approach and clustering.
"""
active_model = model if model is not None else rocket_model_global
active_pattern_encoding_rev = pattern_encoding_reversed if pattern_encoding_reversed is not None else pattern_encoding_reversed_global
active_win_size_proportions = win_size_proportions if win_size_proportions is not None else WIN_SIZE_PROPORTIONS
active_probability_threshold = probability_threshold if probability_threshold is not None else PROBABILITY_THRESHOLD_LIST
if active_model is None:
print("Error: Pattern detection model is not loaded. Cannot proceed.")
return pd.DataFrame()
ohlc_data_segment = ohlc_data.copy()
ohlc_data_segment['Date'] = pd.to_datetime(ohlc_data_segment['Date'])
seg_len = len(ohlc_data_segment)
if ohlc_data_segment.empty:
return pd.DataFrame()
win_results_for_each_size = []
located_patterns_and_other_info_for_each_size = []
cluster_labled_windows_list = [] # Stores all clustered windows from all iterations
used_win_sizes = []
global_cluster_id_offset = 0 # To ensure cluster IDs are unique across all window sizes and patterns
for win_prop in active_win_size_proportions:
window_size = seg_len // win_prop if win_prop > 0 else seg_len # Avoid division by zero
window_size = int(max(10, window_size))
if window_size in used_win_sizes:
continue
used_win_sizes.append(window_size)
win_results_df = _parallel_process_sliding_window(
ohlc_data_segment, active_model, active_probability_threshold, stride,
active_pattern_encoding_rev, window_size, padding_proportion,
prob_threshold_of_no_pattern_to_mark_as_no_pattern,
parallel=parallel_processing, num_cores=num_cores_parallel,
verbose_level=parallel_verbose_level # Pass verbosity
)
if win_results_df.empty:
continue
win_results_df['Window_Size'] = window_size
# win_results_for_each_size.append(win_results_df) # Not directly used later, can be omitted if not needed for debugging
predicted_patterns_for_cluster = _prepare_dataset_for_cluster(ohlc_data_segment, win_results_df)
if predicted_patterns_for_cluster.empty:
continue
# Pass dbscan_min_samples to _cluster_windows
temp_cluster_labled_windows_df, temp_interseced_clusters_df = _cluster_windows(
predicted_patterns_for_cluster, active_probability_threshold,
eps=dbscan_eps, min_samples_dbscan=dbscan_min_samples # Pass the parameter
)
if temp_cluster_labled_windows_df.empty or temp_interseced_clusters_df.empty:
continue
# Adjust cluster IDs to be globally unique before appending
# For temp_cluster_labled_windows_df
non_noise_clusters_mask_labeled = temp_cluster_labled_windows_df['Cluster'] != -1
if non_noise_clusters_mask_labeled.any():
temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'] = \
temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'].astype(int) + global_cluster_id_offset
# For temp_interseced_clusters_df
# Note: 'Cluster' in temp_interseced_clusters_df is already filtered for non-noise by its creation logic
if not temp_interseced_clusters_df.empty:
temp_interseced_clusters_df['Cluster'] = temp_interseced_clusters_df['Cluster'].astype(int) + global_cluster_id_offset
current_max_cluster_id_in_batch = -1
if not temp_interseced_clusters_df.empty and 'Cluster' in temp_interseced_clusters_df.columns:
valid_clusters = temp_interseced_clusters_df[temp_interseced_clusters_df['Cluster'] != -1]['Cluster']
if not valid_clusters.empty:
current_max_cluster_id_in_batch = valid_clusters.max()
cluster_labled_windows_list.append(temp_cluster_labled_windows_df)
temp_interseced_clusters_df['Calc_Start'] = temp_interseced_clusters_df['Start']
temp_interseced_clusters_df['Calc_End'] = temp_interseced_clusters_df['End']
located_patterns_info = temp_interseced_clusters_df.copy()
located_patterns_info['Window_Size'] = window_size
located_patterns_and_other_info_for_each_size.append(located_patterns_info)
if current_max_cluster_id_in_batch > -1 :
global_cluster_id_offset = current_max_cluster_id_in_batch + 1
elif non_noise_clusters_mask_labeled.any(): # If intersected was empty but labeled had clusters
max_labeled_cluster = temp_cluster_labled_windows_df.loc[non_noise_clusters_mask_labeled, 'Cluster'].max()
global_cluster_id_offset = max_labeled_cluster + 1
if not located_patterns_and_other_info_for_each_size:
return pd.DataFrame()
all_located_patterns_df = pd.concat(located_patterns_and_other_info_for_each_size, ignore_index=True)
if all_located_patterns_df.empty:
return pd.DataFrame()
# Filter overlapping patterns (logic remains similar to previous version)
unique_chart_patterns = all_located_patterns_df['Chart Pattern'].unique()
# Sort window sizes descending to prioritize larger windows
sorted_unique_window_sizes = np.sort(all_located_patterns_df['Window_Size'].unique())[::-1]
final_filtered_patterns_list = []
# Use a copy and mark 'taken' to handle overlaps systematically
candidate_patterns_df = all_located_patterns_df.copy()
# Ensure 'taken' column exists, default to False
if 'taken' not in candidate_patterns_df.columns:
candidate_patterns_df['taken'] = False
else: # if it somehow exists from a previous run (unlikely with .copy()), reset it
candidate_patterns_df['taken'] = False
for cp_val in unique_chart_patterns:
for ws_val in sorted_unique_window_sizes:
# Select current batch of patterns to consider
current_batch_indices = candidate_patterns_df[
(candidate_patterns_df['Chart Pattern'] == cp_val) &
(candidate_patterns_df['Window_Size'] == ws_val) &
(~candidate_patterns_df['taken'])
].index
for current_idx in current_batch_indices:
if candidate_patterns_df.loc[current_idx, 'taken']: # Already claimed by a higher priority pattern
continue
current_row_data = candidate_patterns_df.loc[current_idx]
final_filtered_patterns_list.append(current_row_data.drop('taken')) # Add to final list
candidate_patterns_df.loc[current_idx, 'taken'] = True # Mark as taken
# Now, check for overlaps with other non-taken patterns and invalidate lower-priority ones
# Lower priority: smaller window, or same window but processed later (which this loop structure handles),
# or significantly lower probability.
overlapping_candidates_indices = candidate_patterns_df[
(candidate_patterns_df.index != current_idx) & # Don't compare with itself
(candidate_patterns_df['Chart Pattern'] == cp_val) &
(~candidate_patterns_df['taken']) &
(candidate_patterns_df['Calc_Start'] <= current_row_data['Calc_End']) &
(candidate_patterns_df['Calc_End'] >= current_row_data['Calc_Start'])
].index
for ov_idx in overlapping_candidates_indices:
ov_row_data = candidate_patterns_df.loc[ov_idx]
iou = intersection_over_union(current_row_data['Calc_Start'], current_row_data['Calc_End'],
ov_row_data['Calc_Start'], ov_row_data['Calc_End'])
if iou > 0.6: # Significant overlap
# current_row_data (from larger/earlier window) is preferred by default.
# ov_row_data (overlapping candidate) is discarded UNLESS:
# it's from a smaller window AND has significantly higher probability.
is_ov_preferred = (ov_row_data['Window_Size'] < current_row_data['Window_Size']) and \
((ov_row_data['Avg_Probability'] - current_row_data['Avg_Probability']) > 0.1)
if not is_ov_preferred:
candidate_patterns_df.loc[ov_idx, 'taken'] = True
# If ov_preferred, current_row_data was already added. The ov_row will be considered
# when its (smaller) window size turn comes, if not already taken.
# This implies a potential issue: if current_row is added, and a smaller, much better ov_row exists,
# current_row should ideally be removed. The current logic adds current_row first.
# For a more robust selection, decisions might need to be deferred or involve pairwise ranking.
# However, given the descending window size iteration, this greedy choice is often sufficient.
# Re-evaluating this complex interaction:
# If current_row (larger window) is chosen, and an ov_row (smaller window, much higher prob) exists,
# the current logic keeps current_row and marks ov_row as NOT taken, so ov_row can be picked later.
# This might lead to both being in the list if their IoU with *other* patterns doesn't disqualify them.
# The final drop_duplicates will handle exact overlaps.
filtered_loc_pat_and_info_df = pd.DataFrame(final_filtered_patterns_list)
if not filtered_loc_pat_and_info_df.empty:
# Drop duplicates based on the defining characteristics of a pattern instance
filtered_loc_pat_and_info_df = filtered_loc_pat_and_info_df.sort_values(
by=['Chart Pattern', 'Calc_Start', 'Window_Size', 'Avg_Probability'],
ascending=[True, True, False, False] # Prioritize larger window, then higher prob for duplicates
).drop_duplicates(
subset=['Chart Pattern', 'Calc_Start', 'Calc_End'],
keep='first' # Keep the one that came first after sorting (best according to sort)
).sort_values(by='Calc_Start').reset_index(drop=True)
if enable_plotting and not filtered_loc_pat_and_info_df.empty and cluster_labled_windows_list:
# Remove plotting code
pass
if patterns_to_return and not filtered_loc_pat_and_info_df.empty:
return filtered_loc_pat_and_info_df[filtered_loc_pat_and_info_df['Chart Pattern'].isin(patterns_to_return)]
return filtered_loc_pat_and_info_df