|
|
|
|
|
from multiprocessing import Manager, Value |
|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from joblib import Parallel, delayed |
|
|
import math |
|
|
from scipy import interpolate |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
original_pattern_name_list = [ |
|
|
'Double Top, Adam and Adam', |
|
|
'Double Top, Adam and Eve', |
|
|
'Double Top, Eve and Eve', |
|
|
'Double Top, Eve and Adam', |
|
|
'Double Bottom, Adam and Adam', |
|
|
'Double Bottom, Eve and Adam', |
|
|
'Double Bottom, Eve and Eve', |
|
|
'Double Bottom, Adam and Eve', |
|
|
'Triangle, symmetrical', |
|
|
'Head-and-shoulders top', |
|
|
'Head-and-shoulders bottom', |
|
|
'Flag, high and tight' |
|
|
] |
|
|
|
|
|
|
|
|
pattern_encoding = { |
|
|
'Double Top': 0, |
|
|
'Double Bottom': 1, |
|
|
'Triangle, symmetrical': 2, |
|
|
'Head-and-shoulders top': 3, |
|
|
'Head-and-shoulders bottom': 4, |
|
|
'Flag, high and tight': 5, |
|
|
'No Pattern': 6 |
|
|
} |
|
|
|
|
|
def get_pattern_encoding(): |
|
|
return pattern_encoding |
|
|
|
|
|
def get_reverse_pattern_encoding(): |
|
|
return {v: k for k, v in pattern_encoding.items()} |
|
|
|
|
|
def get_patetrn_name_by_encoding(encoding): |
|
|
""" |
|
|
Get the pattern name by encoding. |
|
|
|
|
|
# Input: |
|
|
- encoding (int): The encoding of the pattern. |
|
|
|
|
|
# Returns: |
|
|
- str: The name of the pattern. |
|
|
""" |
|
|
return get_reverse_pattern_encoding().get(encoding, 'Unknown Pattern') |
|
|
|
|
|
def get_pattern_encoding_by_name(name): |
|
|
""" |
|
|
Get the pattern encoding by name. |
|
|
|
|
|
# Input: |
|
|
- name (str): The name of the pattern. |
|
|
|
|
|
# Returns: |
|
|
- int: The encoding of the pattern. |
|
|
""" |
|
|
return get_pattern_encoding().get(name, -1) |
|
|
|
|
|
def get_pattern_list(): |
|
|
return list(pattern_encoding.keys()) |
|
|
|
|
|
def filter_to_get_selected_patterns(df): |
|
|
|
|
|
df = df[df['Chart Pattern'].isin(original_pattern_name_list)].copy() |
|
|
|
|
|
|
|
|
double_top_variations = { |
|
|
'Double Top, Adam and Adam': 'Double Top', |
|
|
'Double Top, Adam and Eve': 'Double Top', |
|
|
'Double Top, Eve and Eve': 'Double Top', |
|
|
'Double Top, Eve and Adam': 'Double Top' |
|
|
} |
|
|
|
|
|
double_bottom_variations = { |
|
|
'Double Bottom, Adam and Adam': 'Double Bottom', |
|
|
'Double Bottom, Eve and Adam': 'Double Bottom', |
|
|
'Double Bottom, Eve and Eve': 'Double Bottom', |
|
|
'Double Bottom, Adam and Eve': 'Double Bottom' |
|
|
} |
|
|
|
|
|
|
|
|
pattern_mapping = {**double_top_variations, **double_bottom_variations} |
|
|
|
|
|
|
|
|
df.loc[:, 'Chart Pattern'] = df['Chart Pattern'].replace(pattern_mapping) |
|
|
|
|
|
return df |
|
|
|
|
|
def normalize_dataset(dataset): |
|
|
|
|
|
min_low = dataset.groupby(level='Instance')['Low'].transform('min') |
|
|
max_high = dataset.groupby(level='Instance')['High'].transform('max') |
|
|
|
|
|
|
|
|
ohlc_columns = ['Open', 'High', 'Low', 'Close'] |
|
|
|
|
|
dataset_normalized = dataset.copy() |
|
|
|
|
|
|
|
|
dataset_normalized[ohlc_columns] = (dataset_normalized[ohlc_columns] - min_low.values[:, None]) / (max_high.values[:, None] - min_low.values[:, None]) |
|
|
|
|
|
|
|
|
if 'Volume' in dataset.columns: |
|
|
|
|
|
min_volume = dataset.groupby(level='Instance')['Volume'].transform('min') |
|
|
max_volume = dataset.groupby(level='Instance')['Volume'].transform('max') |
|
|
|
|
|
|
|
|
dataset_normalized['Volume'] = (dataset_normalized['Volume'] - min_volume.values) / (max_volume.values - min_volume) |
|
|
|
|
|
|
|
|
return dataset_normalized |
|
|
|
|
|
def normalize_ohlc_segment(dataset): |
|
|
|
|
|
min_low = dataset['Low'].min() |
|
|
max_high = dataset['High'].max() |
|
|
|
|
|
|
|
|
ohlc_columns = ['Open', 'High', 'Low', 'Close'] |
|
|
|
|
|
dataset_normalized = dataset.copy() |
|
|
|
|
|
if (max_high - min_low) != 0: |
|
|
|
|
|
dataset_normalized[ohlc_columns] = (dataset_normalized[ohlc_columns] - min_low) / (max_high - min_low) |
|
|
else : |
|
|
print("Error: Max high and min low are equal") |
|
|
|
|
|
|
|
|
if 'Volume' in dataset.columns: |
|
|
|
|
|
min_volume = dataset['Volume'].min() |
|
|
max_volume = dataset['Volume'].max() |
|
|
|
|
|
if (max_volume - min_volume) != 0: |
|
|
|
|
|
dataset_normalized['Volume'] = (dataset_normalized['Volume'] - min_volume) / (max_volume - min_volume) |
|
|
else: |
|
|
print("Error: Max volume and min volume are equal") |
|
|
|
|
|
|
|
|
return dataset_normalized |
|
|
|
|
|
def process_row_improved(idx, row, ohlc_df, instance_counter, lock, successful_instances, instance_index_mapping): |
|
|
try: |
|
|
|
|
|
start_date = pd.to_datetime(row['Start']) |
|
|
end_date = pd.to_datetime(row['End']) |
|
|
|
|
|
symbol_df_filtered = ohlc_df[(ohlc_df['Date'] >= start_date) & |
|
|
(ohlc_df['Date'] <= end_date)] |
|
|
|
|
|
if symbol_df_filtered.empty: |
|
|
print(f"Empty result for {row['Symbol']} from {start_date} to {end_date}") |
|
|
return None |
|
|
|
|
|
|
|
|
with lock: |
|
|
unique_instance = instance_counter.value |
|
|
instance_counter.value += 1 |
|
|
|
|
|
|
|
|
instance_index_mapping[unique_instance] = idx |
|
|
|
|
|
|
|
|
successful_instances.append(unique_instance) |
|
|
|
|
|
|
|
|
symbol_df_filtered = symbol_df_filtered.reset_index(drop=True) |
|
|
multi_index = pd.MultiIndex.from_arrays( |
|
|
[[unique_instance] * len(symbol_df_filtered), range(len(symbol_df_filtered))], |
|
|
names=["Instance", "Time"] |
|
|
) |
|
|
symbol_df_filtered.index = multi_index |
|
|
|
|
|
|
|
|
symbol_df_filtered.index = symbol_df_filtered.index.set_levels( |
|
|
symbol_df_filtered.index.levels[0].astype('int'), level=0 |
|
|
) |
|
|
symbol_df_filtered.index = symbol_df_filtered.index.set_levels( |
|
|
symbol_df_filtered.index.levels[1].astype('int64'), level=1 |
|
|
) |
|
|
|
|
|
|
|
|
symbol_df_filtered['Pattern'] = pattern_encoding[row['Chart Pattern']] |
|
|
symbol_df_filtered.drop('Date', axis=1, inplace=True) |
|
|
if 'Adj Close' in symbol_df_filtered.columns: |
|
|
symbol_df_filtered.drop('Adj Close', axis=1, inplace=True) |
|
|
|
|
|
|
|
|
symbol_df_filtered = normalize_ohlc_segment(symbol_df_filtered) |
|
|
|
|
|
return symbol_df_filtered |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing {row['Symbol']}: {str(e)}") |
|
|
return None |
|
|
|
|
|
def dataset_format(filteredPatternDf, give_instance_index_mapping=False): |
|
|
""" |
|
|
Formats and preprocesses the dataset with better tracking of successful instances. |
|
|
""" |
|
|
|
|
|
folder_path = 'Datasets/OHLC data/' |
|
|
file_list = os.listdir(folder_path) |
|
|
symbol_list = [file[:-4] for file in file_list if file.endswith('.csv')] |
|
|
|
|
|
|
|
|
symbols_in_df = filteredPatternDf['Symbol'].unique() |
|
|
missing_symbols = set(symbols_in_df) - set(symbol_list) |
|
|
if missing_symbols: |
|
|
print("Missing symbols: ", missing_symbols) |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for symbol in symbols_in_df: |
|
|
if symbol in symbol_list: |
|
|
filteredPatternDf_for_symbol = filteredPatternDf[filteredPatternDf['Symbol'] == symbol] |
|
|
file_path = os.path.join(folder_path, f"{symbol}.csv") |
|
|
|
|
|
|
|
|
try: |
|
|
symbol_df = pd.read_csv(file_path) |
|
|
symbol_df['Date'] = pd.to_datetime(symbol_df['Date']) |
|
|
symbol_df['Date'] = symbol_df['Date'].dt.tz_localize(None) |
|
|
|
|
|
for idx, row in filteredPatternDf_for_symbol.iterrows(): |
|
|
tasks.append((idx, row, symbol_df)) |
|
|
except Exception as e: |
|
|
print(f"Error loading {symbol}: {str(e)}") |
|
|
|
|
|
print(f"Processing {len(tasks)} tasks in parallel...") |
|
|
|
|
|
|
|
|
with Manager() as manager: |
|
|
instance_counter = manager.Value('i', 0) |
|
|
lock = manager.Lock() |
|
|
successful_instances = manager.list() |
|
|
instance_index_mapping = manager.dict() |
|
|
|
|
|
results = Parallel(n_jobs=-1, verbose=1)( |
|
|
delayed(process_row_improved)(task_idx, row, df, instance_counter, lock, successful_instances, instance_index_mapping) |
|
|
for task_idx, row, df in tasks |
|
|
) |
|
|
|
|
|
|
|
|
results = [result for result in results if result is not None] |
|
|
|
|
|
print(f"Total tasks: {len(tasks)}, Successful: {len(results)}") |
|
|
print(f"Instance counter final value: {instance_counter.value}") |
|
|
print(f"Number of successful instances: {len(successful_instances)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(successful_instances) < instance_counter.value: |
|
|
print("Warning: Some instances were assigned but their tasks failed") |
|
|
|
|
|
|
|
|
if results: |
|
|
dataset = pd.concat(results) |
|
|
dataset = dataset.sort_index(level=0) |
|
|
|
|
|
|
|
|
dataset.replace([np.inf, -np.inf], np.nan, inplace=True) |
|
|
dataset.fillna(method='ffill', inplace=True) |
|
|
|
|
|
if give_instance_index_mapping: |
|
|
|
|
|
instance_index_mapping_dict = dict(instance_index_mapping) |
|
|
|
|
|
print("Converted Mapping:", instance_index_mapping_dict) |
|
|
return dataset, instance_index_mapping_dict |
|
|
else: |
|
|
return dataset |
|
|
else: |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def width_augmentation (filteredPatternDf, min_aug_len , aug_len_fraction, make_duplicates = False , keep_original = False): |
|
|
""" |
|
|
Perform width augmentation on the filtered pattern DataFrame. |
|
|
|
|
|
# Input: |
|
|
- filteredPatternDf (pd.DataFrame): The filtered pattern DataFrame. |
|
|
- min_aug_len (int): The minimum length of the augmented data. |
|
|
- aug_len_fraction (float): The fraction of the original data size to determine the maximum length of the augmented data. |
|
|
- make_duplicates (bool): Flag to indicate whether to make duplicates of patterns to reduce dataset imbalance.(make this false on test data) |
|
|
- keep_original (bool): Flag to indicate whether to keep the original patterns in the augmented DataFrame. |
|
|
|
|
|
# Returns: |
|
|
- filteredPattern_width_aug_df (pd.DataFrame): The DataFrame with width-augmented patterns. |
|
|
|
|
|
""" |
|
|
|
|
|
filteredPattern_width_aug_df = pd.DataFrame(columns=filteredPatternDf.columns) |
|
|
|
|
|
print('Performing width augmentation...') |
|
|
|
|
|
|
|
|
|
|
|
for index, row in tqdm(filteredPatternDf.iterrows(), total=len(filteredPatternDf), desc="Processing"): |
|
|
|
|
|
symbol = row['Symbol'] |
|
|
start_date = row['Start'] |
|
|
end_date = row['End'] |
|
|
pattern = row['Chart Pattern'] |
|
|
|
|
|
ohlc_df = pd.read_csv(f'Datasets/OHLC data/{symbol}.csv') |
|
|
|
|
|
ohlc_df['Date'] = pd.to_datetime(ohlc_df['Date']).dt.tz_localize(None) |
|
|
|
|
|
|
|
|
start_date = pd.to_datetime(start_date).tz_localize(None) |
|
|
end_date = pd.to_datetime(end_date).tz_localize(None) |
|
|
|
|
|
ohlc_of_interest = ohlc_df[(ohlc_df['Date'] >= start_date) & (ohlc_df['Date'] <= end_date)] |
|
|
data_size = len(ohlc_of_interest) |
|
|
|
|
|
if data_size <= 0: |
|
|
print (f'No data for {symbol} between {start_date} and {end_date}') |
|
|
continue |
|
|
|
|
|
|
|
|
start_index = ohlc_of_interest.index[0] |
|
|
end_index = ohlc_of_interest.index[-1] |
|
|
|
|
|
min_possible_index = 0 |
|
|
max_possible_index = len(ohlc_df) - 1 |
|
|
|
|
|
number_of_rows_for_pattern= filteredPatternDf['Chart Pattern'].value_counts()[pattern] |
|
|
max_num_of_rows_for_pattern = filteredPatternDf['Chart Pattern'].value_counts().max() |
|
|
|
|
|
|
|
|
if make_duplicates: |
|
|
num_row_diff = (max_num_of_rows_for_pattern - number_of_rows_for_pattern)*2 |
|
|
|
|
|
multiplier = math.ceil(num_row_diff / number_of_rows_for_pattern) +2 |
|
|
|
|
|
|
|
|
m = np.random.randint(1, multiplier) |
|
|
else: |
|
|
m = 1 |
|
|
|
|
|
for i in range(m): |
|
|
max_aug_len = math.ceil(data_size * aug_len_fraction) |
|
|
if max_aug_len < min_aug_len: |
|
|
max_aug_len = min_aug_len |
|
|
aug_len_l = np.random.randint(1, max_aug_len) |
|
|
aug_len_r = np.random.randint(1, max_aug_len) |
|
|
|
|
|
|
|
|
start_index_aug = start_index - aug_len_l |
|
|
end_index_aug = end_index + aug_len_r |
|
|
|
|
|
if start_index_aug < min_possible_index: |
|
|
start_index_aug = min_possible_index |
|
|
if end_index_aug > max_possible_index: |
|
|
end_index_aug = max_possible_index |
|
|
|
|
|
|
|
|
start_date_aug = ohlc_df.iloc[start_index_aug]['Date'] |
|
|
end_date_aug = ohlc_df.iloc[end_index_aug]['Date'] |
|
|
|
|
|
|
|
|
new_row = row.copy() |
|
|
new_row['Start'] = start_date_aug |
|
|
new_row['End'] = end_date_aug |
|
|
filteredPattern_width_aug_df = pd.concat([filteredPattern_width_aug_df, pd.DataFrame([new_row])], ignore_index=True) |
|
|
|
|
|
if keep_original: |
|
|
|
|
|
filteredPattern_width_aug_df = pd.concat([filteredPattern_width_aug_df, pd.DataFrame([row])], ignore_index=True) |
|
|
|
|
|
return filteredPattern_width_aug_df |
|
|
|
|
|
def normalize_ohlc_len(df, target_len=30 , plot_count= 0): |
|
|
|
|
|
instances_list = df.index.get_level_values(0).unique() |
|
|
normalized_df_list = [] |
|
|
|
|
|
|
|
|
random_indices = np.random.choice(len(instances_list), plot_count, replace=False) |
|
|
|
|
|
for instance in instances_list: |
|
|
|
|
|
sample = df.loc[instance] |
|
|
|
|
|
pattern_df = sample.copy() |
|
|
new_data = {} |
|
|
orig_indices = pattern_df.index.values |
|
|
new_indices = np.linspace(0, len(orig_indices) - 1, target_len) |
|
|
|
|
|
|
|
|
for col in ['Open', 'High', 'Low', 'Close', 'Volume']: |
|
|
|
|
|
if len(orig_indices) >= 4: |
|
|
kind = 'cubic' |
|
|
elif len(orig_indices) >= 3: |
|
|
kind = 'quadratic' |
|
|
elif len(orig_indices) >= 2: |
|
|
kind = 'linear' |
|
|
else: |
|
|
kind = 'nearest' |
|
|
|
|
|
f = interpolate.interp1d(np.arange(len(orig_indices)), pattern_df[col].values, |
|
|
kind=kind, bounds_error=False, fill_value='extrapolate') |
|
|
|
|
|
new_data[col] = f(new_indices) |
|
|
|
|
|
|
|
|
for col in ['Open', 'High', 'Low', 'Close']: |
|
|
new_data[col] = np.maximum(new_data[col], 0.001) |
|
|
|
|
|
|
|
|
for i in range(len(new_indices)): |
|
|
|
|
|
new_data['High'][i] = max(new_data['High'][i], new_data['Open'][i], new_data['Close'][i]) |
|
|
|
|
|
|
|
|
new_data['Low'][i] = min(new_data['Low'][i], new_data['Open'][i], new_data['Close'][i]) |
|
|
|
|
|
|
|
|
if 'Pattern' in pattern_df.columns: |
|
|
f = interpolate.interp1d(np.arange(len(orig_indices)), pattern_df['Pattern'].values, |
|
|
kind='nearest', bounds_error=False, fill_value=pattern_df['Pattern'].iloc[0]) |
|
|
new_data['Pattern'] = f(new_indices) |
|
|
|
|
|
result_df = pd.DataFrame(new_data) |
|
|
result_df.index = pd.MultiIndex.from_product([[instance], result_df.index]) |
|
|
normalized_df_list.append(result_df) |
|
|
|
|
|
if instance in instances_list[random_indices]: |
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
combined_result_df = pd.concat(normalized_df_list, axis=0) |
|
|
return combined_result_df |
|
|
|
|
|
|
|
|
features = ['Open', 'High', 'Low', 'Close', 'Volume'] |
|
|
target = 'Pattern' |
|
|
series_length = 100 |
|
|
|
|
|
|
|
|
|
|
|
def prepare_rocket_data(dataset, features = features, target = target, series_length = series_length): |
|
|
def adjust_series_length(group): |
|
|
arr = group[features].values |
|
|
if len(arr) > series_length: |
|
|
return arr[:series_length] |
|
|
padding = np.zeros((series_length - len(arr), arr.shape[1])) |
|
|
return np.vstack([arr, padding]) |
|
|
|
|
|
|
|
|
adjusted = dataset.groupby(level=0).apply(adjust_series_length) |
|
|
X = np.stack(adjusted.values) |
|
|
X = np.transpose(X, (0, 2, 1)) |
|
|
|
|
|
y = dataset.groupby(level=0)[target].first().values |
|
|
return X, y |