# handler.py import joblib import pandas as pd import numpy as np import math from joblib import Parallel, delayed from sklearn.cluster import DBSCAN import os # For accessing model path # Import your utility functions # Make sure your utils directory is alongside handler.py # and contains __init__.py, eval.py, formatAndPreprocessNewPatterns.py from utils.eval import intersection_over_union from utils.formatAndPreprocessNewPatterns import get_patetrn_name_by_encoding, get_pattern_encoding_by_name, get_reverse_pattern_encoding # --- Global Model Loading (Crucial for performance) --- # This model will be loaded ONLY ONCE when the server starts. # Ensure the path is correct relative to where handler.py runs in the container. # The `MODEL_DIR` env var is automatically set by Inference Endpoints. # If you place 'Models/' directly in your repo root, it will be at /repository/Models/ # If you place it outside (not recommended), you'd need to adjust paths. # For simplicity, assume `Models/` is in the root of your HF repo. MODEL_PATH = os.path.join(os.environ.get("MODEL_DIR", "/repository"), "Models", "Width Aug OHLC_mini_rocket_xgb.joblib") # Load the model globally try: print(f"Loading model from: {MODEL_PATH}") rocket_model = joblib.load(MODEL_PATH) print("Model loaded successfully!") except Exception as e: print(f"Error loading model: {e}") # In a real scenario, you might want to raise an exception to prevent the server from starting rocket_model = None # --- Helper functions (from your provided code) --- # Paste your `process_window`, `parallel_process_sliding_window`, # `prepare_dataset_for_cluster`, `cluster_windows` here. # Make sure they are defined before `locate_patterns` # because locate_patterns depends on them. # Make sure these globals are outside functions if they are truly global constants pattern_encoding_reversed = get_reverse_pattern_encoding() # model is now `rocket_model` loaded globally # plot_count is handled by the API input now win_size_proportions = np.round(np.logspace(0, np.log10(20), num=10), 2).tolist() padding_proportion = 0.6 stride = 1 probab_threshold_list = 0.5 prob_threshold_of_no_pattern_to_mark_as_no_pattern = 0.5 target_len = 30 # Not used in your current code eps=0.04 min_samples=3 win_width_proportion=10 # Not used in your current code def process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed,seg_start, seg_end, window_size, padding_proportion,prob_threshold_of_no_pattern_to_mark_as_no_pattern=1): start_index = i - math.ceil(window_size * padding_proportion) end_index = start_index + window_size start_index = max(start_index, 0) end_index = min(end_index, len(ohlc_data_segment)) ohlc_segment = ohlc_data_segment[start_index:end_index] if len(ohlc_segment) == 0: return None # Skip empty segments win_start_date = ohlc_segment['Date'].iloc[0] win_end_date = ohlc_segment['Date'].iloc[-1] ohlc_array_for_rocket = ohlc_segment[['Open', 'High', 'Low', 'Close','Volume']].to_numpy().reshape(1, len(ohlc_segment), 5) ohlc_array_for_rocket = np.transpose(ohlc_array_for_rocket, (0, 2, 1)) try: pattern_probabilities = rocket_model.predict_proba(ohlc_array_for_rocket) except Exception as e: print(f"Error in prediction: {e}") return None max_probability = np.max(pattern_probabilities) no_pattern_proba = pattern_probabilities[0][get_pattern_encoding_by_name ('No Pattern')] pattern_index = np.argmax(pattern_probabilities) pred_proba = max_probability pred_pattern = get_patetrn_name_by_encoding(pattern_index) if no_pattern_proba > prob_threshold_of_no_pattern_to_mark_as_no_pattern: pred_proba = no_pattern_proba pred_pattern = 'No Pattern' new_row = { 'Start': win_start_date, 'End': win_end_date, 'Chart Pattern': pred_pattern, 'Seg_Start': seg_start, 'Seg_End': seg_end , 'Probability': pred_proba } return new_row def parallel_process_sliding_window(ohlc_data_segment, rocket_model, probability_threshold, stride, pattern_encoding_reversed, window_size, padding_proportion,prob_threshold_of_no_pattern_to_mark_as_no_pattern=1,parallel=True,num_cores=-1): seg_start = ohlc_data_segment['Date'].iloc[0] seg_end = ohlc_data_segment['Date'].iloc[-1] # Render.com's worker environment for the HF endpoint will have limited cores for single instances. # Parallel processing (`joblib.Parallel`) within the *single* HF endpoint worker # might not yield significant benefits or might even cause issues if not configured carefully. # It's generally better to rely on HF's scaling for multiple requests. # Consider setting `parallel=False` or `num_cores=1` for initial deployment if you hit issues. # For now, let's keep it as is, but be mindful of resource constraints. if parallel: with Parallel(n_jobs=num_cores, verbose=0) as parallel: # verbose=0 to reduce log spam results = parallel( delayed(process_window)( i=i, ohlc_data_segment=ohlc_data_segment, rocket_model=rocket_model, probability_threshold=probability_threshold, pattern_encoding_reversed=pattern_encoding_reversed, window_size=window_size, seg_start=seg_start, seg_end=seg_end, padding_proportion=padding_proportion, prob_threshold_of_no_pattern_to_mark_as_no_pattern=prob_threshold_of_no_pattern_to_mark_as_no_pattern ) for i in range(0, len(ohlc_data_segment), stride) ) return pd.DataFrame([res for res in results if res is not None]) else: results = [] for i_idx, i in enumerate(range(0, len(ohlc_data_segment), stride)): res = process_window(i, ohlc_data_segment, rocket_model, probability_threshold, pattern_encoding_reversed, seg_start, seg_end, window_size, padding_proportion) if res is not None: results.append(res) return pd.DataFrame(results) def prepare_dataset_for_cluster(ohlc_data_segment, win_results_df): predicted_patterns = win_results_df.copy() # origin_date = ohlc_data_segment['Date'].min() # Not used for index, row in predicted_patterns.iterrows(): pattern_start = row['Start'] pattern_end = row['End'] start_point_index = len(ohlc_data_segment[ohlc_data_segment['Date'] < pattern_start]) pattern_len = len(ohlc_data_segment[(ohlc_data_segment['Date'] >= pattern_start) & (ohlc_data_segment['Date'] <= pattern_end)]) pattern_mid_index = start_point_index + (pattern_len / 2) predicted_patterns.at[index, 'Center'] = pattern_mid_index predicted_patterns.at[index, 'Pattern_Start_pos'] = start_point_index predicted_patterns.at[index, 'Pattern_End_pos'] = start_point_index + pattern_len return predicted_patterns def cluster_windows(predicted_patterns , probability_threshold, window_size,eps = 0.05 , min_samples = 2): df = predicted_patterns.copy() if isinstance(probability_threshold, list): for i in range(len(probability_threshold)): pattern_name = get_patetrn_name_by_encoding(i) df.drop(df[(df['Chart Pattern'] == pattern_name) & (df['Probability'] < probability_threshold[i])].index, inplace=True) else: df = df[df['Probability'] > probability_threshold] cluster_labled_windows = [] interseced_clusters = [] if df.empty: # Handle case where df might be empty after filtering return None, None min_center = df['Center'].min() max_center = df['Center'].max() print("Min center: ",min_center, "\nMax center: ",max_center) for pattern, group in df.groupby('Chart Pattern'): centers = group['Center'].values.reshape(-1, 1) # print("centers: ", centers) if min_center < max_center: norm_centers = (centers - min_center) / (max_center - min_center) # print("Norm Center: ",norm_centers) else: norm_centers = np.ones_like(centers) db = DBSCAN(eps=eps, min_samples=min_samples).fit(norm_centers) print("DBSCAN \n", db) group['Cluster'] = db.labels_ cluster_labled_windows.append(group) # print(cluster_labled_windows) for cluster_id, cluster_group in group[group['Cluster'] != -1].groupby('Cluster'): expanded_dates = [] for _, row in cluster_group.iterrows(): dates = pd.date_range(row["Start"], row["End"]) expanded_dates.extend(dates) date_counts = pd.Series(expanded_dates).value_counts().sort_index() cluster_start = date_counts[date_counts >= 2].index.min() cluster_end = date_counts[date_counts >= 2].index.max() interseced_clusters.append({ 'Chart Pattern': pattern, 'Cluster': cluster_id, 'Start': cluster_start, 'End': cluster_end, 'Seg_Start': cluster_group['Seg_Start'].iloc[0], 'Seg_End': cluster_group['Seg_End'].iloc[0], 'Avg_Probability': cluster_group['Probability'].mean(), }) print("inside cluster windows") print(interseced_clusters) if len(cluster_labled_windows) == 0 or len(interseced_clusters) == 0: return None, None cluster_labled_windows_df = pd.concat(cluster_labled_windows) print("inside cluster windows before dataframe make") print(interseced_clusters) interseced_clusters_df = pd.DataFrame(interseced_clusters) cluster_labled_windows_df = cluster_labled_windows_df.sort_index() return cluster_labled_windows_df, interseced_clusters_df # ========================= locate_patterns function ========================== # This will be your primary inference function called by the HF endpoint. class EndpointHandler: def __init__(self, path): # Model is loaded globally, so it's accessible here self.model = rocket_model if self.model is None: raise ValueError("ML model failed to load during initialization.") # Initialize other global parameters here as well self.pattern_encoding_reversed = pattern_encoding_reversed self.win_size_proportions = win_size_proportions self.padding_proportion = padding_proportion self.stride = stride self.probab_threshold_list = probab_threshold_list self.prob_threshold_of_no_pattern_to_mark_as_no_pattern = prob_threshold_of_no_pattern_to_mark_as_no_pattern self.eps = eps self.min_samples = min_samples def __call__(self, inputs): """ Main inference method for the Hugging Face Inference Endpoint. Args: inputs: A dictionary or list of dictionaries representing the input data. For your case, this will be the OHLC data sent from Django. Expected format: [{"Date": "YYYY-MM-DD", "Open": ..., "High": ..., ...}, ...] Returns: A list of dictionaries representing the detected patterns. """ if not self.model: raise ValueError("ML model is not loaded. Cannot perform inference.") if isinstance(inputs, dict) and "inputs" in inputs: raw_ohlc_list = inputs["inputs"] if not isinstance(raw_ohlc_list, list): raise ValueError("Payload 'inputs' key must contain a list of OHLC data.") elif isinstance(inputs, list): # Fallback for direct list if payload structure changes raw_ohlc_list = inputs else: raise ValueError(f"Invalid top-level input format. Expected a dict with 'inputs' key or a list. Got: {type(inputs)}") # # Ensure inputs is a list of dictionaries if not already # if isinstance(inputs, dict): # inputs = [inputs] # Handle single input dict if needed # Convert input (list of dicts) to pandas DataFrame try: ohlc_data = pd.DataFrame(raw_ohlc_list) # # Ensure 'Date' is datetime, it might come as string from JSON ohlc_data['Date'] = pd.to_datetime(ohlc_data['Date'], format='%Y-%m-%d', errors='raise') # Ensure proper columns exist required_cols = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume'] if not all(col in ohlc_data.columns for col in required_cols): raise ValueError(f"Missing required columns in input data. Expected: {required_cols}, Got: {ohlc_data.columns.tolist()}") # print(f"HANDLER: Columns received in DataFrame: {ohlc_data.columns.tolist()}") # # --- Step 1: Ensure 'Date' column is present and correctly typed --- if 'Date' not in ohlc_data.columns: # Try common casing if 'Date' not found found_date_col = None for col in ohlc_data.columns: if str(col).lower() == 'date': found_date_col = col break if found_date_col and found_date_col != 'Date': ohlc_data.rename(columns={found_date_col: 'Date'}, inplace=True) print(f"HANDLER: Renamed '{found_date_col}' to 'Date'. New columns: {ohlc_data.columns.tolist()}") elif not found_date_col: raise ValueError("Input data must contain a 'Date' column (e.g., 'Date', 'date').") # Convert 'Date' to datetime. 'errors='raise'' will be explicit. # The backend explicitly formats as YYYY-MM-DD, so this should match perfectly. ohlc_data['Date'] = pd.to_datetime(ohlc_data['Date'], format='%Y-%m-%d', errors='raise') # --- Step 2: Ensure all required OHLCV columns are present and numeric --- required_numeric_cols = ['Open', 'High', 'Low', 'Close', 'Volume'] final_ohlcv_cols = [] for col in required_numeric_cols: if col in ohlc_data.columns: final_ohlcv_cols.append(col) # Convert to numeric, coercing errors. We need all rows, but NaNs can break models. # Consider a strategy for NaNs (e.g., forward fill, mean fill, or raise a more specific error). ohlc_data[col] = pd.to_numeric(ohlc_data[col], errors='coerce') else: # If a required column is missing, try common casings found_alt_col = None for df_col in ohlc_data.columns: if str(df_col).lower() == col.lower(): found_alt_col = df_col break if found_alt_col: ohlc_data.rename(columns={found_alt_col: col}, inplace=True) final_ohlcv_cols.append(col) ohlc_data[col] = pd.to_numeric(ohlc_data[col], errors='coerce') print(f"HANDLER: Renamed '{found_alt_col}' to '{col}'. New columns: {ohlc_data.columns.tolist()}") else: raise ValueError(f"Missing required numeric column: '{col}'. Available: {ohlc_data.columns.tolist()}") # # After ensuring column names and types, check for NaNs in critical columns. # # If your model cannot handle NaNs, these rows are effectively "invalid" input. # # You stated no rows can be dropped, so if NaNs appear here, it implies a data quality issue # # from yfinance for the requested period. if ohlc_data[final_ohlcv_cols].isnull().any().any(): # Log which columns/rows have NaNs, but don't drop if not allowed. # You might need to fill NaNs, but be aware it alters data. print("HANDLER: Warning! NaN values detected in critical OHLCV columns after conversion. Your model might require clean data.") print(ohlc_data[ohlc_data[final_ohlcv_cols].isnull().any(axis=1)].to_string()) # If your model can't handle NaNs, this is a failure point. # Consider raising a more specific error here, or decide on a NaN filling strategy. # For now, if the model *needs* clean data, this implicitly is a "bad input" if NaNs appear. # If your model handles NaNs gracefully, then this is just a warning. # # Print head after all processing to see the final DataFrame state # print("\n--- HANDLER: OHLC Data after all input processing ---") # print(ohlc_data.head().to_string()) # print("--- END HANDLER DEBUG ---") except Exception as e: print(f"Error processing input data: {e}") raise ValueError(f"Invalid input data format: {e}") # print("--- AFTER CONVERSION ---") # print(ohlc_data.to_string()) ohlc_data_segment = ohlc_data.copy() seg_len = len(ohlc_data_segment) print(seg_len) if ohlc_data_segment.empty: raise ValueError("OHLC Data segment is empty or invalid after processing.") win_results_for_each_size = [] located_patterns_and_other_info_for_each_size = [] cluster_labled_windows_list = [] used_win_sizes = [] win_iteration = 0 for win_size_proportion in self.win_size_proportions: window_size = seg_len // win_size_proportion if window_size < 10: window_size = 10 window_size = int(window_size) if window_size in used_win_sizes: continue used_win_sizes.append(window_size) # Pass the globally loaded model `self.model` win_results_df = parallel_process_sliding_window( ohlc_data_segment, self.model, self.probab_threshold_list, self.stride, self.pattern_encoding_reversed, window_size, self.padding_proportion, self.prob_threshold_of_no_pattern_to_mark_as_no_pattern, parallel=True, # You might want to test with False/num_cores=1 on HF to avoid internal parallelism issues num_cores=-1 # -1 means all available cores; on HF, this will be limited by the instance type ) if win_results_df is None or win_results_df.empty: print(f"Window results dataframe is empty for window size {window_size}") continue win_results_df['Window_Size'] = window_size win_results_for_each_size.append(win_results_df) predicted_patterns = prepare_dataset_for_cluster(ohlc_data_segment, win_results_df) if predicted_patterns is None or predicted_patterns.empty: print("Predicted patterns dataframe is empty") continue # print("Predicted Patterns intermediate") # print(predicted_patterns) # Pass eps and min_samples from handler's state cluster_labled_windows_df , interseced_clusters_df = cluster_windows( predicted_patterns, self.probab_threshold_list, window_size, eps=self.eps, min_samples=self.min_samples ) if cluster_labled_windows_df is None or interseced_clusters_df is None or cluster_labled_windows_df.empty or interseced_clusters_df.empty: print("Clustered windows dataframe is empty") continue mask = cluster_labled_windows_df['Cluster'] != -1 cluster_labled_windows_df.loc[mask, 'Cluster'] = cluster_labled_windows_df.loc[mask, 'Cluster'].astype(int) + win_iteration interseced_clusters_df['Cluster'] = interseced_clusters_df['Cluster'].astype(int) + win_iteration num_of_unique_clusters = interseced_clusters_df[interseced_clusters_df['Cluster']!=-1]['Cluster'].nunique() win_iteration += num_of_unique_clusters cluster_labled_windows_list.append(cluster_labled_windows_df) interseced_clusters_df['Calc_Start'] = interseced_clusters_df['Start'] interseced_clusters_df['Calc_End'] = interseced_clusters_df['End'] located_patterns_and_other_info = interseced_clusters_df.copy() if located_patterns_and_other_info is None or located_patterns_and_other_info.empty: print("Located patterns and other info dataframe is empty") continue located_patterns_and_other_info['Window_Size'] = window_size located_patterns_and_other_info_for_each_size.append(located_patterns_and_other_info) if located_patterns_and_other_info_for_each_size is None or not located_patterns_and_other_info_for_each_size: print("Located patterns and other info for each size is empty") return [] # Return empty list if no patterns found located_patterns_and_other_info_for_each_size_df = pd.concat(located_patterns_and_other_info_for_each_size) unique_window_sizes = located_patterns_and_other_info_for_each_size_df['Window_Size'].unique() unique_patterns = located_patterns_and_other_info_for_each_size_df['Chart Pattern'].unique() unique_window_sizes = np.sort(unique_window_sizes)[::-1] filtered_loc_pat_and_info_rows_list = [] for chart_pattern in unique_patterns: located_patterns_and_other_info_for_each_size_df_chart_pattern = located_patterns_and_other_info_for_each_size_df[located_patterns_and_other_info_for_each_size_df['Chart Pattern'] == chart_pattern] for win_size in unique_window_sizes: located_patterns_and_other_info_for_each_size_df_win_size_chart_pattern = located_patterns_and_other_info_for_each_size_df_chart_pattern[located_patterns_and_other_info_for_each_size_df_chart_pattern['Window_Size'] == win_size] for idx , row in located_patterns_and_other_info_for_each_size_df_win_size_chart_pattern.iterrows(): start_date = row['Calc_Start'] end_date = row['Calc_End'] is_already_included = False intersecting_rows = located_patterns_and_other_info_for_each_size_df_chart_pattern[ (located_patterns_and_other_info_for_each_size_df_chart_pattern['Calc_Start'] <= end_date) & (located_patterns_and_other_info_for_each_size_df_chart_pattern['Calc_End'] >= start_date) ] is_already_included = False for idx2, row2 in intersecting_rows.iterrows(): iou = intersection_over_union(start_date, end_date, row2['Calc_Start'], row2['Calc_End']) if iou > 0.6: if row2['Window_Size'] > row['Window_Size']: if (row['Avg_Probability'] - row2['Avg_Probability']) > 0.1: is_already_included = False else: is_already_included = True break elif row['Window_Size'] >= row2['Window_Size']: if (row2['Avg_Probability'] - row['Avg_Probability']) > 0.1: is_already_included = True break else: is_already_included = False if not is_already_included: filtered_loc_pat_and_info_rows_list.append(row) filtered_loc_pat_and_info_df = pd.DataFrame(filtered_loc_pat_and_info_rows_list) # Convert datetime columns to string format for serialization before returning datetime_columns = ['Start', 'End', 'Seg_Start', 'Seg_End', 'Calc_Start', 'Calc_End'] for col in datetime_columns: if col in filtered_loc_pat_and_info_df.columns: if pd.api.types.is_datetime64_any_dtype(filtered_loc_pat_and_info_df[col]): filtered_loc_pat_and_info_df[col] = pd.to_datetime(filtered_loc_pat_and_info_df[col]).dt.strftime('%Y-%m-%d') elif not filtered_loc_pat_and_info_df[col].empty and isinstance(filtered_loc_pat_and_info_df[col].iloc[0], str): pass else: filtered_loc_pat_and_info_df[col] = filtered_loc_pat_and_info_df[col].astype(str) # Return as a list of dictionaries (JSON serializable) return filtered_loc_pat_and_info_df.to_dict('records')