Spaces:
Sleeping
Sleeping
| '''Defines a generic data formatter for CGM data sets.''' | |
| import sys | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| import sklearn.preprocessing | |
| import data_formatter.types as types | |
| import data_formatter.utils as utils | |
| DataTypes = types.DataTypes | |
| InputTypes = types.InputTypes | |
| dict_data_type = {'categorical': DataTypes.CATEGORICAL, | |
| 'real_valued': DataTypes.REAL_VALUED, | |
| 'date': DataTypes.DATE} | |
| dict_input_type = {'target': InputTypes.TARGET, | |
| 'observed_input': InputTypes.OBSERVED_INPUT, | |
| 'known_input': InputTypes.KNOWN_INPUT, | |
| 'static_input': InputTypes.STATIC_INPUT, | |
| 'id': InputTypes.ID, | |
| 'time': InputTypes.TIME} | |
| class DataFormatter: | |
| # Defines and formats data. | |
| def __init__(self, cnf | |
| #, df | |
| ): | |
| """Initialises formatter.""" | |
| # load parameters from the config file | |
| self.params = cnf | |
| # write progress to file if specified | |
| # load column definition | |
| print('-'*32) | |
| print('Loading column definition...') | |
| self.__process_column_definition() | |
| # check that column definition is valid | |
| print('Checking column definition...') | |
| self.__check_column_definition() | |
| # load data | |
| # check if data table has index col: -1 if not, index >= 0 if yes | |
| print('Loading data...') | |
| self.params['index_col'] = False if self.params['index_col'] == -1 else self.params['index_col'] | |
| # read data table | |
| self.data = pd.read_csv(self.params['data_csv_path'], index_col=self.params['index_col']) | |
| # drop columns / rows | |
| print('Dropping columns / rows...') | |
| self.__drop() | |
| # check NA values | |
| print('Checking for NA values...') | |
| self.__check_nan() | |
| # set data types in DataFrame to match column definition | |
| print('Setting data types...') | |
| self.__set_data_types() | |
| # drop columns / rows | |
| print('Dropping columns / rows...') | |
| self.__drop() | |
| # encode | |
| print('Encoding data...') | |
| self._encoding_params = self.params['encoding_params'] | |
| self.__encode() | |
| # interpolate | |
| print('Interpolating data...') | |
| self._interpolation_params = self.params['interpolation_params'] | |
| self._interpolation_params['interval_length'] = self.params['observation_interval'] | |
| self.__interpolate() | |
| # split data | |
| print('Splitting data...') | |
| self._split_params = self.params['split_params'] | |
| self._split_params['max_length_input'] = self.params['max_length_input'] | |
| self.__split_data() | |
| # scale | |
| print('Scaling data...') | |
| self._scaling_params = self.params['scaling_params'] | |
| self.__scale() | |
| print('Data formatting complete.') | |
| print('-'*32) | |
| def __process_column_definition(self): | |
| self._column_definition = [] | |
| for col in self.params['column_definition']: | |
| self._column_definition.append((col['name'], | |
| dict_data_type[col['data_type']], | |
| dict_input_type[col['input_type']])) | |
| def __check_column_definition(self): | |
| # check that there is unique ID column | |
| assert len([col for col in self._column_definition if col[2] == InputTypes.ID]) == 1, 'There must be exactly one ID column.' | |
| # check that there is unique time column | |
| assert len([col for col in self._column_definition if col[2] == InputTypes.TIME]) == 1, 'There must be exactly one time column.' | |
| # check that there is at least one target column | |
| assert len([col for col in self._column_definition if col[2] == InputTypes.TARGET]) >= 1, 'There must be at least one target column.' | |
| def __set_data_types(self): | |
| # set time column as datetime format in pandas | |
| for col in self._column_definition: | |
| if col[1] == DataTypes.DATE: | |
| self.data[col[0]] = pd.to_datetime(self.data[col[0]]) | |
| if col[1] == DataTypes.CATEGORICAL: | |
| self.data[col[0]] = self.data[col[0]].astype('category') | |
| if col[1] == DataTypes.REAL_VALUED: | |
| self.data[col[0]] = self.data[col[0]].astype(np.float32) | |
| def __check_nan(self): | |
| # delete rows where target, time, or id are na | |
| self.data = self.data.dropna(subset=[col[0] | |
| for col in self._column_definition | |
| if col[2] in [InputTypes.TARGET, InputTypes.TIME, InputTypes.ID]]) | |
| # assert that there are no na values in the data | |
| assert self.data.isna().sum().sum() == 0, 'There are NA values in the data even after dropping with missing time, glucose, or id.' | |
| def __drop(self): | |
| # drop columns that are not in the column definition | |
| self.data = self.data[[col[0] for col in self._column_definition]] | |
| # drop rows based on conditions set in the formatter | |
| if self.params['drop'] is not None: | |
| if self.params['drop']['rows'] is not None: | |
| # drop row at indices in the list self.params['drop']['rows'] | |
| self.data = self.data.drop(self.params['drop']['rows']) | |
| self.data = self.data.reset_index(drop=True) | |
| if self.params['drop']['columns'] is not None: | |
| for col in self.params['drop']['columns'].keys(): | |
| # drop rows where specified columns have values in the list self.params['drop']['columns'][col] | |
| self.data = self.data.loc[~self.data[col].isin(self.params['drop']['columns'][col])].copy() | |
| def __interpolate(self): | |
| self.data, self._column_definition = utils.interpolate(self.data, | |
| self._column_definition, | |
| **self._interpolation_params) | |
| def __split_data(self): | |
| if self.params['split_params']['test_percent_subjects'] == 0 or \ | |
| self.params['split_params']['length_segment'] == 0: | |
| print('\tNo splitting performed since test_percent_subjects or length_segment is 0.') | |
| self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = None, None, None, None | |
| self.train_data, self.val_data, self.test_data = self.data, None, None | |
| else: | |
| assert self.params['split_params']['length_segment'] > self.params['length_pred'], \ | |
| 'length_segment for test / val must be greater than length_pred.' | |
| self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = utils.split(self.data, | |
| self._column_definition, | |
| **self._split_params) | |
| self.train_data, self.val_data, self.test_data = self.data.iloc[self.train_idx], \ | |
| self.data.iloc[self.val_idx], \ | |
| self.data.iloc[self.test_idx + self.test_idx_ood] | |
| def __encode(self): | |
| self.data, self._column_definition, self.encoders = utils.encode(self.data, | |
| self._column_definition, | |
| **self._encoding_params) | |
| def __scale(self): | |
| self.train_data, self.val_data, self.test_data, self.scalers = utils.scale(self.train_data, | |
| self.val_data, | |
| self.test_data, | |
| self._column_definition, | |
| **self.params['scaling_params']) | |
| def reshuffle(self, seed): | |
| stdout = sys.stdout | |
| f = open(self.study_file, 'a') | |
| sys.stdout = f | |
| self.params['split_params']['random_state'] = seed | |
| # split data | |
| self.train_idx, self.val_idx, self.test_idx, self.test_idx_ood = utils.split(self.data, | |
| self._column_definition, | |
| **self._split_params) | |
| self.train_data, self.val_data, self.test_data = self.data.iloc[self.train_idx], \ | |
| self.data.iloc[self.val_idx], \ | |
| self.data.iloc[self.test_idx+self.test_idx_ood] | |
| # re-scale data | |
| self.train_data, self.val_data, self.test_data, self.scalers = utils.scale(self.train_data, | |
| self.val_data, | |
| self.test_data, | |
| self._column_definition, | |
| **self.params['scaling_params']) | |
| sys.stdout = stdout | |
| f.close() | |
| def get_column(self, column_name): | |
| # write cases for time, id, target, future, static, dynamic covariates | |
| if column_name == 'time': | |
| return [col[0] for col in self._column_definition if col[2] == InputTypes.TIME][0] | |
| elif column_name == 'id': | |
| return [col[0] for col in self._column_definition if col[2] == InputTypes.ID][0] | |
| elif column_name == 'sid': | |
| return [col[0] for col in self._column_definition if col[2] == InputTypes.SID][0] | |
| elif column_name == 'target': | |
| return [col[0] for col in self._column_definition if col[2] == InputTypes.TARGET] | |
| elif column_name == 'future_covs': | |
| future_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.KNOWN_INPUT] | |
| return future_covs if len(future_covs) > 0 else None | |
| elif column_name == 'static_covs': | |
| static_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.STATIC_INPUT] | |
| return static_covs if len(static_covs) > 0 else None | |
| elif column_name == 'dynamic_covs': | |
| dynamic_covs = [col[0] for col in self._column_definition if col[2] == InputTypes.OBSERVED_INPUT] | |
| return dynamic_covs if len(dynamic_covs) > 0 else None | |
| else: | |
| raise ValueError('Column {} not found.'.format(column_name)) | |