Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import yaml | |
| import random | |
| from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union | |
| from pathlib import Path | |
| import numpy as np | |
| from scipy import stats | |
| import pandas as pd | |
| import darts | |
| from darts import models | |
| from darts import metrics | |
| from darts import TimeSeries | |
| from darts.dataprocessing.transformers import Scaler | |
| from pytorch_lightning.callbacks import Callback | |
| from sympy import pprint | |
| # import data formatter | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
| from data_formatter.base import * | |
| pd.set_option('display.width', None) # Set display width to None to avoid truncation | |
| pd.set_option('display.max_columns', None) # Display all columns | |
| def make_series(data: Dict[str, pd.DataFrame], | |
| time_col: str, | |
| group_col: str, | |
| value_cols: Dict[str, List[str]], | |
| include_sid: bool = False, | |
| verbose: bool = False | |
| ) -> Dict[str, darts.TimeSeries]: | |
| """ | |
| Makes the TimeSeries from the data. | |
| Parameters | |
| ---------- | |
| data | |
| dict of train, val, test dataframes | |
| time_col | |
| name of time column | |
| group_col | |
| name of group column | |
| value_cols | |
| dict with key specifying the type of covariate and value specifying the list of columns. | |
| include_sid | |
| whether to include segment id as static covariate | |
| Returns | |
| ------- | |
| series: Dict[str, Dict[str, darts.TimeSeries]] | |
| dict of train, val, test splits of target and covariates TimeSeries objects | |
| scalers: Dict[str, darts.preprocessing.Scaler] | |
| dict of scalers for target and covariates | |
| """ | |
| series = {i: {j: None for j in value_cols} for i in data.keys()} | |
| scalers = {} | |
| for key, df in data.items(): | |
| for name, cols in value_cols.items(): | |
| # Adjust display settings | |
| if verbose: | |
| print(f"DATAFRAME for key {key} in NAME {name} and COLS {cols} and GROUP_COL {group_col}") | |
| pprint(df.head(1)) | |
| series[key][name] = TimeSeries.from_group_dataframe(df = df, | |
| group_cols = group_col, | |
| time_col = time_col, | |
| value_cols = cols) if cols is not None else None | |
| if series[key][name] is not None and include_sid is False: | |
| for i in range(len(series[key][name])): | |
| series[key][name][i] = series[key][name][i].with_static_covariates(None) | |
| if cols is not None: | |
| if key == 'train': | |
| scalers[name] = ScalerCustom() | |
| series[key][name] = scalers[name].fit_transform(series[key][name]) | |
| else: | |
| series[key][name] = scalers[name].transform(series[key][name]) | |
| else: | |
| scalers[name] = None | |
| return series, scalers | |
| def load_data(url: str, | |
| config_path: Path, | |
| #df: pd.DataFrame, | |
| use_covs: bool = False, | |
| cov_type: str = 'past', | |
| use_static_covs: bool = False, seed = 0): | |
| """ | |
| Load data according to the specified config file and covert to Darts TimeSeries objects. | |
| Parameters | |
| ---------- | |
| seed: int | |
| Random seed for data splitting. | |
| study_file: str | |
| Path to the study file. | |
| dataset: str | |
| Name of the dataset. | |
| use_covs: bool | |
| Whether to use covariates. | |
| cov_type: str | |
| Type of covariates to use. Can be 'past' or 'mixed' or 'dual'. | |
| use_static_covs: bool | |
| Whether to use static covariates. | |
| Returns | |
| ------- | |
| formatter: DataFormatter | |
| Data formatter object. | |
| series: Dict[str, Dict[str, TimeSeries]] | |
| First dictionary specified the split, second dictionary specifies the type of series (target or covariate). | |
| scalers: Dict[str, Scaler] | |
| Dictionary of scalers with key indicating the type of series (target or covariate). | |
| """ | |
| """ | |
| config={ | |
| 'data_csv_path':f'{url}', | |
| 'drop': None, | |
| 'ds_name': 'livia_mini', | |
| 'index_col': -1, | |
| 'observation_interval': '5min', | |
| 'column_definition': { | |
| {'data_type': 'categorical', | |
| 'input_type':'id', | |
| 'name':'id' | |
| }, | |
| {'date_type':'date', | |
| 'input_type':'time', | |
| 'name':'time' | |
| }, | |
| {'date_type':'real_valued', | |
| 'input_type':'target', | |
| 'name':'gl' | |
| } | |
| }, | |
| 'encoding_params':{'date':['day','month','year','hour','minute','second'] | |
| }, | |
| 'nan_vals':None, | |
| 'interpolation_params':{'gap_threshold': 45, | |
| 'min_drop_length': 240 | |
| }, | |
| 'scaling_params':{'scaler':None | |
| }, | |
| 'split_params':{'length_segment': 13, | |
| 'random_state':seed, | |
| 'test_percent_subjects': 0.1 | |
| }, | |
| 'max_length_input': 192, | |
| 'length_pred': 12, | |
| 'params':{ | |
| 'gluformer':{'in_len': 96, | |
| 'd_model': 512, | |
| 'n_heads': 10, | |
| 'd_fcn': 1024, | |
| 'num_enc_layers': 2, | |
| 'num_dec_layers': 2, | |
| 'length_pred': 12 | |
| } | |
| } | |
| } | |
| """ | |
| with config_path.open("r") as f: | |
| config = yaml.safe_load(f) | |
| config["data_csv_path"] = url | |
| formatter = DataFormatter(config | |
| #,df | |
| ) | |
| assert use_covs is not None, 'use_covs must be specified in the load_data call' | |
| # convert to series | |
| time_col = formatter.get_column('time') | |
| group_col = formatter.get_column('sid') | |
| target_col = formatter.get_column('target') | |
| static_cols = formatter.get_column('static_covs') | |
| static_cols = static_cols + [formatter.get_column('id')] if static_cols is not None else [formatter.get_column('id')] | |
| dynamic_cols = formatter.get_column('dynamic_covs') | |
| future_cols = formatter.get_column('future_covs') | |
| data = {'train': formatter.train_data, | |
| 'val': formatter.val_data, | |
| 'test': formatter.test_data.loc[~formatter.test_data.index.isin(formatter.test_idx_ood)], | |
| 'test_ood': formatter.test_data.loc[formatter.test_data.index.isin(formatter.test_idx_ood)]} | |
| value_cols = {'target': target_col, | |
| 'static': static_cols, | |
| 'dynamic': dynamic_cols, | |
| 'future': future_cols} | |
| # build series | |
| series, scalers = make_series(data, | |
| time_col, | |
| group_col, | |
| value_cols) | |
| if not use_covs: | |
| # set dynamic and future covariates to None | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for cov in ['dynamic', 'future']: | |
| series[split][cov] = None | |
| elif use_covs and cov_type == 'mixed': | |
| pass # this is the default for make_series() | |
| elif use_covs and cov_type == 'past': | |
| # use future covariates as dynamic (past) covariates | |
| if series['train']['dynamic'] is None: | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| series[split]['dynamic'] = series[split]['future'] | |
| else: | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for i in range(len(series[split]['future'])): | |
| series[split]['dynamic'][i] = series[split]['dynamic'][i].concatenate(series[split]['future'][i], axis=1) | |
| # erase future covariates | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| series[split]['future'] = None | |
| elif use_covs and cov_type == 'dual': | |
| # erase dynamic (past) covariates | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| series[split]['dynamic'] = None | |
| if use_static_covs: | |
| # attach static covariates to series | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for i in range(len(series[split]['target'])): | |
| static_covs = series[split]['static'][i][0].pd_dataframe() | |
| series[split]['target'][i] = series[split]['target'][i].with_static_covariates(static_covs) | |
| return formatter, series, scalers | |
| def reshuffle_data(formatter: DataFormatter, | |
| seed: int = 0, | |
| use_covs: bool = None, | |
| cov_type: str = 'past', | |
| use_static_covs: bool = False,): | |
| """ | |
| Reshuffle data according to the seed and covert to Darts TimeSeries objects. | |
| Parameters | |
| ---------- | |
| formatter: DataFormatter | |
| Data formatter object containing the data | |
| seed: int | |
| Random seed for data splitting. | |
| use_covs: bool | |
| Whether to use covariates. | |
| cov_type: str | |
| Type of covariates to use. Can be 'past' or 'mixed' or 'dual'. | |
| use_static_covs: bool | |
| Whether to use static covariates. | |
| Returns | |
| ------- | |
| formatter: DataFormatter | |
| Reshuffled data formatter object. | |
| series: Dict[str, Dict[str, TimeSeries]] | |
| First dictionary specified the split, second dictionary specifies the type of series (target or covariate). | |
| scalers: Dict[str, Scaler] | |
| Dictionary of scalers with key indicating the type of series (target or covariate). | |
| """ | |
| # reshuffle | |
| formatter.reshuffle(seed) | |
| assert use_covs is not None, 'use_covs must be specified in the reshuffle_data call' | |
| # convert to series | |
| time_col = formatter.get_column('time') | |
| group_col = formatter.get_column('sid') | |
| target_col = formatter.get_column('target') | |
| static_cols = formatter.get_column('static_covs') | |
| static_cols = static_cols + [formatter.get_column('id')] if static_cols is not None else [formatter.get_column('id')] | |
| dynamic_cols = formatter.get_column('dynamic_covs') | |
| future_cols = formatter.get_column('future_covs') | |
| # build series | |
| series, scalers = make_series({'train': formatter.train_data, | |
| 'val': formatter.val_data, | |
| 'test': formatter.test_data.loc[~formatter.test_data.index.isin(formatter.test_idx_ood)], | |
| 'test_ood': formatter.test_data.loc[formatter.test_data.index.isin(formatter.test_idx_ood)]}, | |
| time_col, | |
| group_col, | |
| {'target': target_col, | |
| 'static': static_cols, | |
| 'dynamic': dynamic_cols, | |
| 'future': future_cols}) | |
| if not use_covs: | |
| # set dynamic and future covariates to None | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for cov in ['dynamic', 'future']: | |
| series[split][cov] = None | |
| elif use_covs and cov_type == 'past': | |
| # use future covariates as dynamic covariates | |
| if series['train']['dynamic'] is None: | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| series[split]['dynamic'] = series[split]['future'] | |
| # or attach them to dynamic covariates | |
| else: | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for i in range(len(series[split]['future'])): | |
| series[split]['dynamic'][i] = series[split]['dynamic'][i].concatenate(series[split]['future'][i], axis=1) | |
| elif use_covs and cov_type == 'dual': | |
| # set dynamic covariates to None, because they are not supported | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| series[split]['dynamic'] = None | |
| if use_static_covs: | |
| # attach static covariates to series | |
| for split in ['train', 'val', 'test', 'test_ood']: | |
| for i in range(len(series[split]['target'])): | |
| static_covs = series[split]['static'][i][0].pd_dataframe() | |
| series[split]['target'][i] = series[split]['target'][i].with_static_covariates(static_covs) | |
| return formatter, series, scalers | |
| class ScalerCustom: | |
| ''' | |
| Min-max scaler for TimeSeries that fits on all sequences simultaenously. | |
| Default Darts scaler fits one scaler per sequence in the list. | |
| Attributes | |
| ---------- | |
| scaler: Scaler | |
| Darts scaler object. | |
| min_: np.ndarray | |
| Per feature adjustment for minimum (see Scikit-learn). | |
| scale_: np.ndarray | |
| Per feature relative scaling of the data (see Scikit-learn). | |
| ''' | |
| def __init__(self): | |
| self.scaler = Scaler() | |
| self.min_ = None | |
| self.scale_ = None | |
| def fit(self, time_series: Union[List[TimeSeries], TimeSeries]) -> None: | |
| if isinstance(time_series, list): | |
| # extract series as Pandas dataframe | |
| df = pd.concat([ts.pd_dataframe() for ts in time_series]) | |
| value_cols = df.columns | |
| df.reset_index(inplace=True) | |
| # create new equally spaced time grid | |
| df['new_time'] = pd.date_range(start=df['time'].min(), periods=len(df), freq='1h') | |
| # fit scaler | |
| series = TimeSeries.from_dataframe(df, time_col='new_time', value_cols=value_cols) | |
| series = self.scaler.fit(series) | |
| else: | |
| series = self.scaler.fit(time_series) | |
| # extract min and scale | |
| self.min_ = self.scaler._fitted_params[0].min_ | |
| self.scale_ = self.scaler._fitted_params[0].scale_ | |
| def transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: | |
| if isinstance(time_series, list): | |
| # transform one by one | |
| series = [self.scaler.transform(ts) for ts in time_series] | |
| else: | |
| series = self.scaler.transform(time_series) | |
| return series | |
| def inverse_transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: | |
| if isinstance(time_series, list): | |
| # transform one by one | |
| series = [self.scaler.inverse_transform(ts) for ts in time_series] | |
| else: | |
| series = self.scaler.inverse_transform(time_series) | |
| return series | |
| def fit_transform(self, time_series: Union[List[TimeSeries], TimeSeries]) -> Union[List[TimeSeries], TimeSeries]: | |
| self.fit(time_series) | |
| series = self.transform(time_series) | |
| return series |