HYPE_Churn_Analysis / preprocessing.py
cmmedoro
Upload demo code
f8da90e
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler, RobustScaler
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from sklearn.model_selection import train_test_split
from sksurv.util import Surv
class Preprocessor:
def __init__(self, dataset_url, target_column, target_columnn_ttc, resampling = None, scaling = 'minmax', test_size=0.2, random_state=42):
self.url = dataset_url
self.target_column = target_column
self.target_column_ttc = target_columnn_ttc
self.resampling = resampling
self.scaling = scaling
self.test_size = test_size
self.random_state = random_state
self.label_encoders = {}
self.scaler = None
def _download_dataset(self):
d = pd.read_csv(self.url)
ids = d.id
d.drop(['id', 'Surname', 'CustomerId'], axis = 1, inplace = True)
return d
def _encode_categorical(self):
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
le = LabelEncoder()
self.df[col] = le.fit_transform(self.df[col].astype(str))
self.label_encoders[col] = le
def _scale_features(self, df, scaler, df_type = 'train'):
if isinstance(scaler, str) and scaler.lower() not in ['standard', 'robust', 'minmax']:
print("Invalid scaler. Choose one between 'standard', 'robust' or 'minmax'")
return df, scaler
feature_cols = self.df.drop(columns=[self.target_column]).columns
if scaler == 'standard':
scaler = StandardScaler()
elif scaler == 'minmax':
scaler = MinMaxScaler(feature_range = (0,1))
elif scaler == 'robust':
scaler = RobustScaler()
if df_type == 'train':
dataset = scaler.fit_transform(df)
self.scaler = scaler
else:
dataset = scaler.transform(df)
return pd.DataFrame(dataset, columns=feature_cols, index=df.index), scaler
def _scale_features_ttc(self, df, scaler, df_type = 'train'):
if isinstance(scaler, str) and scaler.lower() not in ['standard', 'robust', 'minmax']:
print("Invalid scaler. Choose one between 'standard', 'robust' or 'minmax'")
return df, scaler
cols = []
for col in self.target_column_ttc:
cols.append(col)
feature_cols = self.df.drop(cols, axis = 1).columns
if scaler == 'standard':
scaler = StandardScaler()
elif scaler == 'minmax':
scaler = MinMaxScaler(feature_range = (0,1))
elif scaler == 'robust':
scaler = RobustScaler()
if df_type == 'train':
dataset = scaler.fit_transform(df)
self.scaler = scaler
else:
dataset = scaler.transform(df)
return pd.DataFrame(dataset, columns=feature_cols, index=df.index), scaler
def _split_data(self):
X = self.df.drop(columns=[self.target_column])
y = self.df[self.target_column]
if isinstance(self.resampling, str) and self.resampling.lower() == "over":
sampler = RandomOverSampler(sampling_strategy='auto', random_state=42)
elif isinstance(self.resampling, str) and self.resampling.lower() == "under":
sampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)
else:
sampler = None
if sampler != None:
X, y = sampler.fit_resample(X, y)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, random_state=self.random_state)
return X_train, X_test, y_train, y_test
def _split_data_ttc(self):
cols = []
for col in self.target_column_ttc:
cols.append(col)
#X = self.df.drop(columns = [col])
print(cols)
X = self.df.drop(cols, axis = 1)
print(X.columns)
y = self.df[self.target_column_ttc]
"""
if isinstance(self.resampling, str) and self.resampling.lower() == "over":
sampler = RandomOverSampler(sampling_strategy='auto', random_state=42)
elif isinstance(self.resampling, str) and self.resampling.lower() == "under":
sampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)
else:
sampler = None
if len(self.target_column_ttc) > 1:
sampler = None
if sampler != None:
X, y = sampler.fit_resample(X, y)"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, random_state=self.random_state)
return X_train, X_test, y_train, y_test
def process_cp(self):
# 1) download the dataset
self.df = self._download_dataset()
# 2) encode categorical features
self._encode_categorical()
# 3) divide into train and test sets
X_train, X_test, y_train, y_test = self._split_data()
X_train_df = X_train.copy()
X_test_df = X_test.copy()
y_train_df = y_train.copy()
y_test_df = y_test.copy()
# 4) scale features
X_train, scaler = self._scale_features(X_train, self.scaling, 'train')
X_test, scaler = self._scale_features(X_test, scaler, 'test')
return X_train, X_test, y_train, y_test, X_train_df, X_test_df, y_train_df, y_test_df
def process_ttcp(self):
# 1) download the dataset
self.df = self._download_dataset()
# 2) encode categorical features
self._encode_categorical()
# 3) divide into train and test sets
X_train, X_test, y_train, y_test = self._split_data_ttc()
X_train_df = X_train.copy()
X_test_df = X_test.copy()
y_train_df = y_train.copy()
y_test_df = y_test.copy()
# 4) scale features
X_train, scaler = self._scale_features_ttc(X_train, self.scaling, 'train')
X_test, scaler = self._scale_features_ttc(X_test, scaler, 'test')
y_train = Surv.from_dataframe(self.target_column_ttc[0], self.target_column_ttc[1], y_train)
y_test = Surv.from_dataframe(self.target_column_ttc[0], self.target_column_ttc[1], y_test)
return X_train, X_test, y_train, y_test, X_train_df, X_test_df, y_train_df, y_test_df