Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| import torch | |
| import numpy as np | |
| import pandas as pd | |
| import ta | |
| from tqdm import tqdm | |
| def load_model_config(model_dir: str): | |
| with open(file=os.path.join(model_dir, 'config.pkl'), mode='rb') as f: | |
| config = pickle.load(f) | |
| return config | |
| def load_model(model, model_dir: str, device: str = 'cuda'): | |
| model.load_state_dict(state_dict=torch.load(f=os.path.join(model_dir, 'model.pth'), map_location=torch.device(device=device))) | |
| return model | |
| def normalize(data, min_val, max_val): | |
| # data.shape = (bs, ts_size, z_dim) | |
| data = data - min_val | |
| data = data / (max_val + 1e-7) | |
| return data | |
| def renormalize(data, min_val, max_val): | |
| # data.shape = (bs, seq_len, z_dim) | |
| data *= max_val | |
| data += min_val | |
| return data | |
| def train_test_split(data, ratio): | |
| idx = np.random.permutation(len(data)) | |
| train_idx = idx[:int(ratio * len(data))] | |
| test_idx = idx[int(ratio * len(data)):] | |
| train_data = data[train_idx, ...] | |
| test_data = data[test_idx, ...] | |
| return train_data, test_data | |
| def load_data(ts_size, data): | |
| # data.shape = (rows, features) | |
| def sliding_window(ts_size, ori_data): | |
| # Flipping the data to make chronological data | |
| ori_data = ori_data[::-1] # (len(csv), z_dim) | |
| # Make (len(ori_data), z_dim) into (num_samples, seq_len, z_dim) | |
| samples = [] | |
| for i in range(len(ori_data) - ts_size): | |
| single_sample = ori_data[i:i + ts_size] # (seq_len, z_dim) | |
| samples.append(single_sample) | |
| samples = np.array(samples) # (bs, seq_len, z_dim) | |
| np.random.shuffle(samples) # Make it more like i.i.d. | |
| return samples | |
| data = sliding_window(ts_size=ts_size, ori_data=data) # (bs, ts_size, z_dim) | |
| return data | |
| def calculate_technical_indicators(df_passed: pd.DataFrame, rolling_window = 50, handle_nan = True): | |
| df = df_passed.copy() | |
| def generate_indicators(df, rolling_window = 50): | |
| # Calculate technical indicators | |
| # df['momentum'] = ta.momentum.roc(df['Close']) | |
| # df['trend'] = ta.trend.sma_indicator(df['Close']) | |
| # df['volatility'] = ta.volatility.bollinger_mavg(df['Close']) | |
| # df['volume'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) | |
| df['stoch'] = ta.momentum.stoch(df['High'], df['Low'], df['Close']) | |
| df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close']) | |
| df['bollinger_hband'] = ta.volatility.bollinger_hband(df['Close']) | |
| df['mfi'] = ta.volume.money_flow_index(df['High'], df['Low'], df['Close'], df['Volume']) | |
| df['rsi'] = ta.momentum.rsi(df['Close']) | |
| df['ma'] = ta.trend.sma_indicator(df['Close']) | |
| df['std'] = df['Close'].rolling(window=rolling_window).std() | |
| df['adl'] = ta.volume.acc_dist_index(df['High'], df['Low'], df['Close'], df['Volume']) | |
| df['williams'] = ta.momentum.williams_r(df['High'], df['Low'], df['Close']) | |
| df['macd'] = ta.trend.macd(df['Close']) | |
| df['obv'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) | |
| df['sar'] = ta.trend.psar_down(df['High'], df['Low'], df['Close']) # Added the 'close' argument | |
| df['ichimoku_a'] = ta.trend.ichimoku_a(df['High'], df['Low']) | |
| df['ichimoku_b'] = ta.trend.ichimoku_b(df['High'], df['Low']) | |
| return df | |
| df = generate_indicators(df=df, rolling_window=rolling_window) | |
| if not handle_nan: | |
| return df | |
| # Fillna | |
| df = df.fillna(method='ffill') | |
| df = df.iloc[rolling_window + 1 : ] | |
| df = df.fillna(method='bfill') | |
| if df.isna().sum().sum() > 0: | |
| raise Exception('NaN values found') | |
| return df | |
| def create_batches(all_symbols_df: pd.DataFrame, | |
| trainer_config: dict, | |
| model_config: dict): | |
| train_batches = torch.tensor(data=[]) | |
| val_batches = torch.tensor(data=[]) | |
| symbols = all_symbols_df['Symbol'].unique() | |
| for symbol in tqdm(symbols): | |
| df = all_symbols_df[all_symbols_df['Symbol'] == symbol] | |
| df = df.sort_values(by='Date') | |
| if trainer_config['calculate_technical_indicators']: | |
| df = calculate_technical_indicators(df, rolling_window=model_config['ts_size']) | |
| df = df[model_config['stock_features']] | |
| data = df.values | |
| train_data, val_data = train_test_split(data=data, ratio=trainer_config['split_ratio']) | |
| # Create batches (sliding window) | |
| train_data = load_data(ts_size=model_config['ts_size'], data=train_data) | |
| val_data = load_data(ts_size=model_config['ts_size'], data=val_data) | |
| if len(train_data) > 0: | |
| train_data = normalize(train_data, min_val=model_config['min_val'], max_val=model_config['max_val']) | |
| train_data = torch.tensor(train_data) | |
| train_batches = torch.cat(tensors=[train_batches, train_data]) | |
| if len(val_data) > 0: | |
| val_data = normalize(val_data, min_val=model_config['min_val'], max_val=model_config['max_val']) | |
| val_data = torch.tensor(val_data) | |
| val_batches = torch.cat(tensors=[val_batches, val_data]) | |
| return train_batches, val_batches | |
| def get_mini_batch(batch_size, data): | |
| idx = np.random.permutation(len(data)) | |
| idx = idx[:batch_size] | |
| data_mini = data[idx, ...] # (bs, seq_len, z_dim) | |
| return data_mini | |
| def generate_random_masks(num_samples, ts_size, mask_size, num_masks): | |
| # xxxo | |
| # oxxx | |
| # xxox | |
| num_patches = int(ts_size // mask_size) | |
| def single_sample_mask(): | |
| idx = np.random.permutation(num_patches)[:num_masks] | |
| mask = np.zeros(ts_size, dtype=bool) | |
| for j in idx: | |
| mask[j * mask_size:(j + 1) * mask_size] = 1 | |
| return mask | |
| masks_list = [single_sample_mask() for _ in range(num_samples)] | |
| masks_list = [torch.tensor(mask) for mask in masks_list] | |
| masks = torch.stack(masks_list, axis=0) # (num_samples, ts_size) | |
| return masks | |
| def generate_pseudo_masks(ts_size, num_samples): | |
| # xxxx | |
| # xxxx | |
| # xxxx | |
| masks = np.zeros((num_samples, ts_size), dtype=bool) | |
| return masks |