Mar Elizo
clean deploy
c52261f
# Author: Juan Parras & Patricia A. Apellániz
# Email: patricia.alonsod@upm.es
# Date: 05/08/2025
# Package imports
from kan import *
from src.models.nam.wrapper import NAMClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
#--------------------------------------------------------#
# NAM MODEL #
#--------------------------------------------------------#
class NAMModel:
"""
Class for NAM model, it includes the definition of the model and functions to run the model
"""
def __init__(self, num_epochs=1000, num_learners=20, metric='aucroc', early_stop_mode='max', n_jobs=1,
random_state=0,
num_basis_functions=64, hidden_size=[64, 32]):
# The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV
self._estimator_type = 'classifier'
self.classes_ = np.array([0, 1]).astype(int)
self.hyperparameters = {'num_epochs': num_epochs,
'num_learners': num_learners,
'metric': metric,
'early_stop_mode': early_stop_mode,
'n_jobs': n_jobs,
'random_state': random_state,
'num_basis_functions': num_basis_functions,
'hidden_size': hidden_size}
def fit(self, x, y):
self.set_model()
self.model.fit(x, y)
return {}
def set_model(self):
self.model = NAMClassifier(num_epochs=self.hyperparameters["num_epochs"],
num_learners=self.hyperparameters["num_learners"],
metric=self.hyperparameters["metric"],
early_stop_mode=self.hyperparameters["early_stop_mode"],
n_jobs=self.hyperparameters["n_jobs"],
random_state=self.hyperparameters["random_state"],
num_basis_functions=self.hyperparameters["num_basis_functions"],
hidden_sizes=self.hyperparameters["hidden_size"])
def get_params(self, deep=False):
return self.hyperparameters
def set_params(self, num_epochs=1000, num_learners=20, metric='aucroc', early_stop_mode='max', n_jobs=1,
random_state=0, num_basis_functions=64, hidden_size=[64, 32]):
self.hyperparameters = {'num_epochs': num_epochs,
'num_learners': num_learners,
'metric': metric,
'early_stop_mode': early_stop_mode,
'n_jobs': n_jobs,
'random_state': random_state,
'num_basis_functions': num_basis_functions,
'hidden_size': hidden_size}
return self
def predict_proba(self, data):
prob_of_ones = self.model.predict_proba(data)
prob_of_zeros = 1 - prob_of_ones
return np.hstack([prob_of_zeros, prob_of_ones])
def predict(self, data, threshold=0.5):
proba_preds = self.predict_proba(data)
return np.argmax(proba_preds, axis=1).astype(int)
def run_model(self, x_train, x_test, y_train, y_test):
self.set_model()
# Train the model
_ = self.model.fit(x_train, y_train)
return {'model': self.model,
'y_pred_proba': np.squeeze(self.predict_proba(x_test)),
'y_pred': np.squeeze(self.predict(x_test))}
#--------------------------------------------------------#
# RF MODEL #
#--------------------------------------------------------#
class RandomForestModel:
"""
Class for random forest model, it includes the definition of the model and functions to run the model
"""
def __init__(self, n_estimators=20, criterion='gini', max_depth=10, min_samples_split=2, min_samples_leaf=1,
class_weight='balanced', max_features='log2', bootstrap=True, random_state=0, n_jobs=1):
self.classification_flag = True
# The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV
self._estimator_type = 'classifier'
self.classes_ = np.array([0, 1]).astype(int)
self.hyperparameters = {'n_estimators': n_estimators,
'criterion': criterion,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf,
'class_weight': class_weight,
'max_features': max_features,
'bootstrap': bootstrap,
'random_state': random_state,
'n_jobs': n_jobs}
self.model = RandomForestClassifier(**self.hyperparameters)
def fit(self, x, y):
self.classes_ = np.arange(len(np.unique(y))).astype(int)
return self.model.fit(x, y)
def predict_proba(self, x):
return self.model.predict_proba(x)
def predict(self, x):
return self.model.predict(x)
def get_params(self, deep=False):
return self.hyperparameters
def set_params(self, n_estimators=20, criterion='gini', max_depth=10, min_samples_split=2, min_samples_leaf=1,
class_weight='balanced', max_features='log2', bootstrap=True, random_state=0, n_jobs=1):
self.hyperparameters = {'n_estimators': n_estimators,
'criterion': criterion,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf,
'class_weight': class_weight,
'max_features': max_features,
'bootstrap': bootstrap,
'random_state': random_state,
'n_jobs': n_jobs}
self.model = RandomForestClassifier(**self.hyperparameters)
return self
def run_model(self, x_train, y_train, x_test, y_test):
"""
Function to run the model
:return:
"""
# Train model
y_all = np.concatenate([y_train, y_test])
self.classes_ = np.arange(len(np.unique(y_all))).astype(int)
_ = self.model.fit(x_train, y_train)
return {'model': self.model,
'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)),
'y_pred': np.squeeze(self.model.predict(x_test))}
#--------------------------------------------------------#
# KAN MODEL #
#--------------------------------------------------------#
class Kan_model:
"""
Class for MLP model, it includes the definition of the model and functions to run the model
"""
def __init__(self, hidden_dim=0, batch_size=500, grid=1, k=1, seed=0, lr=0.01, early_stop=True, steps=10000,
lamb=0.1, lamb_entropy=0.1, weight=True, sparse_init=False, mult_kan=False, try_gpu=False):
self.device = torch.device(
"cuda" if torch.cuda.is_available() and try_gpu else "cpu") # Note that code may need adaptation to work properly on GPU (only tested in CPU so far)
# The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV
self._estimator_type = 'classifier'
self.classes_ = np.array([0, 1]).astype(int)
self.n_classes = None
self.hyperparameters = {'hidden_dim': hidden_dim,
'batch_size': batch_size,
'grid': grid,
'k': k,
'seed': seed,
'lr': lr,
'early_stop': early_stop,
'steps': steps,
'lamb': lamb,
'lamb_entropy': lamb_entropy,
'weight': weight,
'sparse_init': sparse_init,
'mult_kan': mult_kan}
def seed_all(self, seed):
np.random.seed(seed)
torch.manual_seed(seed)
if self.device == 'cuda':
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def fit(self, x, y):
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0)
r = self.run_model(x_train, x_test, y_train, y_test)
return r
def get_params(self, deep=False):
return self.hyperparameters
def set_params(self, hidden_dim=0, batch_size=500, grid=1, k=1, seed=0, lr=0.01, early_stop=True, steps=10000,
lamb=0.1, lamb_entropy=0.1, weight=True, sparse_init=False, mult_kan=False):
self.hyperparameters = {'hidden_dim': hidden_dim,
'batch_size': batch_size,
'grid': grid,
'k': k,
'seed': seed,
'lr': lr,
'early_stop': early_stop,
'steps': steps,
'lamb': lamb,
'lamb_entropy': lamb_entropy,
'weight': weight,
'sparse_init': sparse_init,
'mult_kan': mult_kan}
return self
def set_model(self, x_train, x_test, y_train, y_test):
self.seed_all(self.hyperparameters["seed"])
y_all = np.concatenate([y_train, y_test])
self.n_classes = len(np.unique(y_all))
self.classes_ = np.arange(self.n_classes).astype(int)
self.input_size = x_train.shape[1]
if self.hyperparameters["hidden_dim"] == 0:
self.width = [self.input_size, self.n_classes]
elif isinstance(self.hyperparameters["hidden_dim"], int):
self.width = [self.input_size, self.hyperparameters["hidden_dim"], self.n_classes]
else:
self.width = [self.input_size] + self.hyperparameters["hidden_dim"] + [self.n_classes]
if self.hyperparameters['mult_kan']: # Add multiplication nodes
new_width = []
for i in range(len(self.width)):
if i == 0 or i == len(self.width) - 1:
new_width.append(self.width[i])
else:
new_width.append([self.width[i], self.width[i]])
self.width = new_width
self.model = KAN(width=self.width, grid=self.hyperparameters["grid"], k=self.hyperparameters["k"],
seed=0, device=self.device, sparse_init=self.hyperparameters["sparse_init"])
if isinstance(x_train, np.ndarray):
self.x_train = torch.from_numpy(x_train).to(self.device).float()
self.y_train = np.squeeze(torch.from_numpy(y_train).to(self.device)).long()
self.x_test = torch.from_numpy(x_test).to(self.device).float()
self.y_test = np.squeeze(torch.from_numpy(y_test).to(self.device)).long()
else:
self.x_train = torch.from_numpy(x_train.values).to(self.device).float()
self.y_train = np.squeeze(torch.from_numpy(y_train.values).to(self.device)).long()
self.x_test = torch.from_numpy(x_test.values).to(self.device).float()
self.y_test = np.squeeze(torch.from_numpy(y_test.values).to(self.device)).long()
# Note that KAN interface uses "test" for what we call "val": we reverse here for consistency
self.dataset = {'train_input': self.x_train,
'train_label': self.y_train,
'test_input': self.x_test,
'test_label': self.y_test}
if self.hyperparameters["weight"]:
# Weight the classes according to the number of samples of each class to correct for class imbalance
class_weigths = []
freq_per_class = torch.bincount(self.y_train)
for i in range(self.n_classes):
class_weigths.append(len(self.y_train) / (self.n_classes * freq_per_class[i].item()))
class_weights = torch.tensor(class_weigths).to(self.device)
self.criterion = nn.CrossEntropyLoss(weight=class_weights)
else:
self.criterion = nn.CrossEntropyLoss()
def predict_proba(self, data):
if isinstance(data, np.ndarray):
data = torch.from_numpy(data).to(self.device).float()
elif isinstance(data, pd.DataFrame):
data = torch.from_numpy(data.values).to(self.device).float()
else:
pass # Assume it is already a tensor
sm = nn.Softmax(dim=1)
proba = sm(self.model.forward(data)).detach().cpu().numpy()
return proba
def predict(self, data, threshold=0.5):
proba = self.predict_proba(data)
return np.argmax(proba, axis=1).astype(int)
def custom_fit(self, dataset, steps=100, log=1, lamb=0., lamb_l1=1., lamb_entropy=2., lamb_coef=0.,
lamb_coefdiff=0., update_grid=True, grid_update_num=10, lr=1., start_grid_update_step=-1,
stop_grid_update_step=50, batch=-1,
save_fig=False, in_vars=None, out_vars=None, beta=3, save_fig_freq=1,
img_folder='./video', singularity_avoiding=False, y_th=1000., reg_metric='edge_forward_spline_n',
display_metrics=None, early_stop=True, patience=30, verbose=1):
if lamb > 0. and not self.model.save_act:
print('setting lamb=0. If you want to set lamb > 0, set self.save_act=True')
old_save_act, old_symbolic_enabled = self.model.disable_symbolic_in_fit(lamb)
if verbose > 0:
pbar = tqdm(range(steps), desc='description', ncols=100)
else:
pbar = range(steps)
grid_update_freq = int(stop_grid_update_step / grid_update_num)
optimizer = torch.optim.Adam(self.model.get_params(), lr=lr)
results = {}
results['train_loss'] = []
results['test_loss'] = []
results['reg'] = []
results['train_metrics'] = []
results['test_metrics'] = []
if batch == -1 or batch > dataset['train_input'].shape[0]:
batch_size = dataset['train_input'].shape[0]
else:
batch_size = batch
batch_size_test = dataset['test_input'].shape[0]
if save_fig:
if not os.path.exists(img_folder):
os.makedirs(img_folder)
best_loss = np.inf
patience_counter = 0
for _ in pbar:
if _ == steps - 1 and old_save_act:
self.model.save_act = True
if save_fig and _ % save_fig_freq == 0:
save_act = self.model.save_act
self.model.save_act = True
n_batches_train = len(dataset['train_input']) // batch_size
for ibt in range(n_batches_train):
batch_start = ibt * batch_size
batch_end = min((ibt + 1) * batch_size, len(dataset['train_input']))
train_id = np.arange(batch_start, batch_end)
if _ % grid_update_freq == 0 and _ < stop_grid_update_step and update_grid and _ >= start_grid_update_step:
self.model.update_grid(dataset['train_input'][train_id])
pred_train = self.model.forward(dataset['train_input'][train_id],
singularity_avoiding=singularity_avoiding, y_th=y_th)
train_loss = self.criterion(pred_train, dataset['train_label'][train_id])
if self.model.save_act:
if reg_metric == 'edge_backward':
self.model.attribute()
if reg_metric == 'node_backward':
self.model.node_attribute()
reg_ = self.model.get_reg(reg_metric, lamb_l1, lamb_entropy, lamb_coef, lamb_coefdiff)
else:
reg_ = torch.tensor(0.)
loss = train_loss + lamb * reg_
optimizer.zero_grad()
loss.backward()
optimizer.step()
pred_test = self.model.forward(dataset['test_input'])
test_loss = self.criterion(pred_test, dataset['test_label'])
# For conveniency, we get train loss and reg on the last batch only
results['train_loss'].append(train_loss.cpu().detach().numpy())
results['test_loss'].append(test_loss.cpu().detach().numpy())
results['reg'].append(reg_.cpu().detach().numpy())
if _ % log == 0 and verbose > 0:
if display_metrics == None:
pbar.set_description("| train_loss: %.2e | test_loss: %.2e | reg: %.2e | " % (
torch.sqrt(train_loss).cpu().detach().numpy(), torch.sqrt(test_loss).cpu().detach().numpy(),
reg_.cpu().detach().numpy()))
else:
string = ''
data = ()
for metric in display_metrics:
string += f' {metric}: %.2e |'
try:
results[metric]
except:
raise Exception(f'{metric} not recognized')
data += (results[metric][-1],)
pbar.set_description(string % data)
if save_fig and _ % save_fig_freq == 0:
self.model.plot(folder=img_folder, in_vars=in_vars, out_vars=out_vars, title="Step {}".format(_),
beta=beta)
plt.savefig(img_folder + os.sep + str(_) + '.jpg', bbox_inches='tight', dpi=200)
plt.close()
self.model.save_act = save_act
if early_stop:
if results['test_loss'][-1] < best_loss:
best_loss = results['test_loss'][-1]
patience_counter = 0
else:
patience_counter += 1
if patience_counter > patience:
print(f'Early stopping at step {_}')
break
self.model.log_history('fit')
# revert back to original state
self.model.symbolic_enabled = old_symbolic_enabled
return results
def run_model(self, x_train, x_test, y_train, y_test):
self.set_model(x_train, x_test, y_train, y_test)
# Train the model
_ = self.custom_fit(self.dataset, batch=self.hyperparameters["batch_size"],
steps=self.hyperparameters["steps"], lamb=self.hyperparameters["lamb"],
lamb_entropy=self.hyperparameters["lamb_entropy"], lr=self.hyperparameters["lr"],
early_stop=self.hyperparameters["early_stop"], patience=30,
save_fig=False, verbose=0)
return {'model': self.model,
'y_pred_proba': np.squeeze(self.predict_proba(self.x_test)),
'y_pred': np.squeeze(self.predict(self.x_test))}
def prune(self):
self.model = self.model.prune()
#--------------------------------------------------------#
# LR MODEL #
#--------------------------------------------------------#
class LogisticRegressionModel:
"""
Class for Logistic Regression model, including the definition and functions to train, validate, and test the model.
"""
def __init__(self, C=0, penalty='l2', solver='lbfgs', max_iter=1000, class_weight='balanced', random_state=0):
# The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV
self._estimator_type = 'classifier'
self.classes_ = np.array([0, 1]).astype(int)
self.hyperparameters = {'C': C,
'penalty': penalty,
'solver': solver,
'max_iter': max_iter,
'class_weight': class_weight,
'random_state': random_state}
# Initialize the Logistic Regression model
self.model = LogisticRegression(**self.hyperparameters)
def fit(self, x, y):
self.classes_ = np.arange(len(np.unique(y))).astype(int)
return self.model.fit(x, y)
def predict_proba(self, x):
return self.model.predict_proba(x)
def predict(self, x):
return self.model.predict(x)
def get_params(self, deep=False):
return self.hyperparameters
def set_params(self, C=0, penalty='l2', solver='lbfgs', max_iter=1000, class_weight='balanced', random_state=0):
self.hyperparameters = {'C': C,
'penalty': penalty,
'solver': solver,
'max_iter': max_iter,
'class_weight': class_weight,
'random_state': random_state}
self.model = LogisticRegression(**self.hyperparameters)
return self
def run_model(self, x_train, y_train, x_test, y_test):
"""
Train the Logistic Regression model using hyperparameter tuning, validate it, and test it.
:return: Trained model, validation predictions, test predictions
"""
# Train the model
y_all = np.concatenate([y_train, y_test])
self.classes_ = np.arange(len(np.unique(y_all))).astype(int)
_ = self.model.fit(x_train, y_train)
return {'model': self.model,
'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)),
'y_pred': np.squeeze(self.model.predict(x_test))}
def get_coefficients(self, model):
"""
Retrieve coefficients and intercept of the trained logistic regression model.
:param model: Trained logistic regression model
:return: Coefficients and intercept as lists
"""
coefficients = model.coef_.flatten().tolist()
intercept = model.intercept_.tolist()[0]
return coefficients, intercept
#--------------------------------------------------------#
# MLP MODEL #
#--------------------------------------------------------#
class Mlp_model:
"""
Class for MLP model, it includes the definition of the model and functions to run the model
"""
def __init__(self, hidden_layer_sizes=(32,), max_iter=10000, early_stopping=True, alpha=0.0001):
self.classification_flag = True
# The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV
self._estimator_type = 'classifier'
self.classes_ = np.array([0, 1]).astype(int)
self.hyperparameters = {'hidden_layer_sizes': hidden_layer_sizes,
'max_iter': max_iter,
'early_stopping': early_stopping,
'alpha': alpha}
self.model = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes,
max_iter=max_iter,
early_stopping=early_stopping,
alpha=alpha)
def fit(self, x, y):
self.classes_ = np.arange(len(np.unique(y))).astype(int)
return self.model.fit(x, y)
def predict_proba(self, x):
return self.model.predict_proba(x)
def predict(self, x):
return self.model.predict(x)
def get_params(self, deep=False):
return self.hyperparameters
def set_params(self, hidden_layer_sizes=(32,), max_iter=10000, early_stopping=True, alpha=0.0001):
self.hyperparameters = {'hidden_layer_sizes': hidden_layer_sizes,
'max_iter': max_iter,
'early_stopping': early_stopping,
'alpha': alpha}
return self
def run_model(self, x_train, y_train, x_test, y_test):
"""
Function to run the model
:return:
"""
# Train model
y_all = np.concatenate([y_train, y_test])
self.classes_ = np.arange(len(np.unique(y_all))).astype(int)
_ = self.model.fit(x_train, y_train)
return {'model': self.model,
'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)),
'y_pred': np.squeeze(self.model.predict(x_test))}