import numpy as np import pandas as pd from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler, RobustScaler from imblearn.over_sampling import RandomOverSampler from imblearn.under_sampling import RandomUnderSampler from sklearn.model_selection import train_test_split from sksurv.util import Surv class Preprocessor: def __init__(self, dataset_url, target_column, target_columnn_ttc, resampling = None, scaling = 'minmax', test_size=0.2, random_state=42): self.url = dataset_url self.target_column = target_column self.target_column_ttc = target_columnn_ttc self.resampling = resampling self.scaling = scaling self.test_size = test_size self.random_state = random_state self.label_encoders = {} self.scaler = None def _download_dataset(self): d = pd.read_csv(self.url) ids = d.id d.drop(['id', 'Surname', 'CustomerId'], axis = 1, inplace = True) return d def _encode_categorical(self): categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns for col in categorical_cols: le = LabelEncoder() self.df[col] = le.fit_transform(self.df[col].astype(str)) self.label_encoders[col] = le def _scale_features(self, df, scaler, df_type = 'train'): if isinstance(scaler, str) and scaler.lower() not in ['standard', 'robust', 'minmax']: print("Invalid scaler. Choose one between 'standard', 'robust' or 'minmax'") return df, scaler feature_cols = self.df.drop(columns=[self.target_column]).columns if scaler == 'standard': scaler = StandardScaler() elif scaler == 'minmax': scaler = MinMaxScaler(feature_range = (0,1)) elif scaler == 'robust': scaler = RobustScaler() if df_type == 'train': dataset = scaler.fit_transform(df) self.scaler = scaler else: dataset = scaler.transform(df) return pd.DataFrame(dataset, columns=feature_cols, index=df.index), scaler def _scale_features_ttc(self, df, scaler, df_type = 'train'): if isinstance(scaler, str) and scaler.lower() not in ['standard', 'robust', 'minmax']: print("Invalid scaler. Choose one between 'standard', 'robust' or 'minmax'") return df, scaler cols = [] for col in self.target_column_ttc: cols.append(col) feature_cols = self.df.drop(cols, axis = 1).columns if scaler == 'standard': scaler = StandardScaler() elif scaler == 'minmax': scaler = MinMaxScaler(feature_range = (0,1)) elif scaler == 'robust': scaler = RobustScaler() if df_type == 'train': dataset = scaler.fit_transform(df) self.scaler = scaler else: dataset = scaler.transform(df) return pd.DataFrame(dataset, columns=feature_cols, index=df.index), scaler def _split_data(self): X = self.df.drop(columns=[self.target_column]) y = self.df[self.target_column] if isinstance(self.resampling, str) and self.resampling.lower() == "over": sampler = RandomOverSampler(sampling_strategy='auto', random_state=42) elif isinstance(self.resampling, str) and self.resampling.lower() == "under": sampler = RandomUnderSampler(sampling_strategy='auto', random_state=42) else: sampler = None if sampler != None: X, y = sampler.fit_resample(X, y) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self.test_size, random_state=self.random_state) return X_train, X_test, y_train, y_test def _split_data_ttc(self): cols = [] for col in self.target_column_ttc: cols.append(col) #X = self.df.drop(columns = [col]) print(cols) X = self.df.drop(cols, axis = 1) print(X.columns) y = self.df[self.target_column_ttc] """ if isinstance(self.resampling, str) and self.resampling.lower() == "over": sampler = RandomOverSampler(sampling_strategy='auto', random_state=42) elif isinstance(self.resampling, str) and self.resampling.lower() == "under": sampler = RandomUnderSampler(sampling_strategy='auto', random_state=42) else: sampler = None if len(self.target_column_ttc) > 1: sampler = None if sampler != None: X, y = sampler.fit_resample(X, y)""" X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self.test_size, random_state=self.random_state) return X_train, X_test, y_train, y_test def process_cp(self): # 1) download the dataset self.df = self._download_dataset() # 2) encode categorical features self._encode_categorical() # 3) divide into train and test sets X_train, X_test, y_train, y_test = self._split_data() X_train_df = X_train.copy() X_test_df = X_test.copy() y_train_df = y_train.copy() y_test_df = y_test.copy() # 4) scale features X_train, scaler = self._scale_features(X_train, self.scaling, 'train') X_test, scaler = self._scale_features(X_test, scaler, 'test') return X_train, X_test, y_train, y_test, X_train_df, X_test_df, y_train_df, y_test_df def process_ttcp(self): # 1) download the dataset self.df = self._download_dataset() # 2) encode categorical features self._encode_categorical() # 3) divide into train and test sets X_train, X_test, y_train, y_test = self._split_data_ttc() X_train_df = X_train.copy() X_test_df = X_test.copy() y_train_df = y_train.copy() y_test_df = y_test.copy() # 4) scale features X_train, scaler = self._scale_features_ttc(X_train, self.scaling, 'train') X_test, scaler = self._scale_features_ttc(X_test, scaler, 'test') y_train = Surv.from_dataframe(self.target_column_ttc[0], self.target_column_ttc[1], y_train) y_test = Surv.from_dataframe(self.target_column_ttc[0], self.target_column_ttc[1], y_test) return X_train, X_test, y_train, y_test, X_train_df, X_test_df, y_train_df, y_test_df