| import datetime |
| import json |
| import os |
| from pathlib import Path |
|
|
| import torch |
| from utils.tools import dotdict |
|
|
| import pandas as pd |
|
|
|
|
| |
|
|
|
|
| def args_from_setting(setting, args): |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| path = f"results/{setting}/args.json" |
| assert os.path.exists(path), f"{path}/args.json doesn't exist" |
|
|
| with open(path, "r") as f: |
| args = json.load(f) |
| return dotdict(args) |
|
|
|
|
| def setting_from_args(args, ii=0): |
| setting = "{}_{}_ft{}_sl{}_ll{}_pl{}_ei{}_di{}_co{}_i{}_dm{}_nh{}_el{}_dl{}_df{}_at{}_fc{}_eb{}_dt{}_mx{}_{}_{}".format( |
| args.model, |
| args.data, |
| args.features, |
| args.seq_len, |
| args.label_len, |
| args.pred_len, |
| args.enc_in, |
| args.dec_in, |
| args.c_out, |
| args.inverse, |
| args.d_model, |
| args.n_heads, |
| args.e_layers, |
| args.d_layers, |
| args.d_ff, |
| args.attn, |
| args.factor, |
| args.t_embed, |
| args.distil, |
| args.mix, |
| args.des, |
| ii, |
| ) |
|
|
| return setting |
|
|
|
|
| def bbtest_setting(args): |
| time_label = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") |
| setting = "{}_{}_sl{}_ei{}_dm{}_nh{}_el{}_eb{}".format( |
| time_label, |
| args.model, |
| args.seq_len, |
| args.enc_in, |
| args.d_model, |
| args.n_heads, |
| args.e_layers, |
| args.t_embed, |
| ) |
|
|
| return setting |
|
|
|
|
| def write_df(data, out_file, append=""): |
| |
| og_cols = data.columns.copy() |
| data.columns = data.columns.to_flat_index() |
|
|
| data.columns = pd.Index(["_".join(col) for col in data.columns]) |
|
|
| if append: |
| dot_loc = out_file.rfind(".") |
| out_file = f"{out_file[:dot_loc]}_{append}{out_file[dot_loc:]}" |
|
|
| if os.path.exists(out_file): |
| |
| data_old = "data/old" |
| if not os.path.exists(data_old): |
| os.makedirs(data_old) |
| new_file_name = f"{out_file[:out_file.rfind('.')].replace('./','').replace('/','_')}_{datetime.datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}{out_file[out_file.rfind('.'):]}" |
| os.rename(out_file, os.path.join(data_old, new_file_name)) |
| else: |
| |
| os.makedirs(Path(out_file).parent, exist_ok=True) |
| data.to_csv(out_file) |
| data.columns = og_cols |
| return out_file |
|
|
|
|
| |
| def read_data(out_file="realdata.csv", stock=True): |
| data = pd.read_csv(out_file, index_col=0) |
|
|
| if not stock: |
| |
| converter = lambda col: f"{col}_open" |
| data.columns = data.columns.map(converter) |
| for column in data.columns: |
| data[f"{column.split('_')[0]}_close"] = data[column].shift(-1) |
| data = data.reindex(sorted(data.columns), axis=1) |
|
|
| converter = lambda col: tuple(col.split("_")) |
| |
| data.columns = data.columns.map(converter) |
|
|
| data.index = pd.to_datetime(data.index) |
| if data.index.tz is None: |
| print("Warning: data did not have timestamp, adding utc") |
| data.index = pd.to_datetime(data.index, utc=True) |
|
|
| return data |
|
|
|
|
| def add_tz(data, time_zone="US/Eastern"): |
| """Add timezone to timezone-unlabled df""" |
| t = pd.to_datetime(data.index).to_series() |
| data.index = t.dt.tz_localize(time_zone) |
| return data |
|
|
|
|
| def convert_tz(data, time_zone="US/Eastern"): |
| t = data.index.to_series() |
| t = t.dt.tz_convert(time_zone) |
| data.index = t |
| return data |
|
|
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| def handle_gpu(args, gpu=None): |
| if not gpu and gpu is not None: |
| |
| args.use_gpu = False |
| args.use_multi_gpu = False |
| return |
|
|
| args.use_gpu = True if torch.cuda.is_available() else False |
|
|
| if not args.use_gpu: |
| return |
|
|
| if gpu is None: |
| |
| c = torch.cuda.device_count() |
|
|
| args.device_ids = list(map(int, range(torch.cuda.device_count()))) |
| args.devices = ",".join(map(str, args.device_ids)) |
| else: |
| |
| gpu = str(gpu) |
|
|
| args.devices = gpu.replace(" ", "") |
| args.device_ids = [int(id_) for id_ in args.devices.split(",")] |
|
|
| if len(args.device_ids) >= 1: |
| args.use_multi_gpu = len(args.device_ids) > 1 |
| args.gpu = int(args.device_ids[0]) |
|
|