Spaces:
Sleeping
Sleeping
| # Author: Juan Parras & Patricia A. Apellániz | |
| # Email: patricia.alonsod@upm.es | |
| # Date: 05/08/2025 | |
| # Package imports | |
| from kan import * | |
| from src.models.nam.wrapper import NAMClassifier | |
| from sklearn.neural_network import MLPClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.model_selection import train_test_split | |
| #--------------------------------------------------------# | |
| # NAM MODEL # | |
| #--------------------------------------------------------# | |
| class NAMModel: | |
| """ | |
| Class for NAM model, it includes the definition of the model and functions to run the model | |
| """ | |
| def __init__(self, num_epochs=1000, num_learners=20, metric='aucroc', early_stop_mode='max', n_jobs=1, | |
| random_state=0, | |
| num_basis_functions=64, hidden_size=[64, 32]): | |
| # The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV | |
| self._estimator_type = 'classifier' | |
| self.classes_ = np.array([0, 1]).astype(int) | |
| self.hyperparameters = {'num_epochs': num_epochs, | |
| 'num_learners': num_learners, | |
| 'metric': metric, | |
| 'early_stop_mode': early_stop_mode, | |
| 'n_jobs': n_jobs, | |
| 'random_state': random_state, | |
| 'num_basis_functions': num_basis_functions, | |
| 'hidden_size': hidden_size} | |
| def fit(self, x, y): | |
| self.set_model() | |
| self.model.fit(x, y) | |
| return {} | |
| def set_model(self): | |
| self.model = NAMClassifier(num_epochs=self.hyperparameters["num_epochs"], | |
| num_learners=self.hyperparameters["num_learners"], | |
| metric=self.hyperparameters["metric"], | |
| early_stop_mode=self.hyperparameters["early_stop_mode"], | |
| n_jobs=self.hyperparameters["n_jobs"], | |
| random_state=self.hyperparameters["random_state"], | |
| num_basis_functions=self.hyperparameters["num_basis_functions"], | |
| hidden_sizes=self.hyperparameters["hidden_size"]) | |
| def get_params(self, deep=False): | |
| return self.hyperparameters | |
| def set_params(self, num_epochs=1000, num_learners=20, metric='aucroc', early_stop_mode='max', n_jobs=1, | |
| random_state=0, num_basis_functions=64, hidden_size=[64, 32]): | |
| self.hyperparameters = {'num_epochs': num_epochs, | |
| 'num_learners': num_learners, | |
| 'metric': metric, | |
| 'early_stop_mode': early_stop_mode, | |
| 'n_jobs': n_jobs, | |
| 'random_state': random_state, | |
| 'num_basis_functions': num_basis_functions, | |
| 'hidden_size': hidden_size} | |
| return self | |
| def predict_proba(self, data): | |
| prob_of_ones = self.model.predict_proba(data) | |
| prob_of_zeros = 1 - prob_of_ones | |
| return np.hstack([prob_of_zeros, prob_of_ones]) | |
| def predict(self, data, threshold=0.5): | |
| proba_preds = self.predict_proba(data) | |
| return np.argmax(proba_preds, axis=1).astype(int) | |
| def run_model(self, x_train, x_test, y_train, y_test): | |
| self.set_model() | |
| # Train the model | |
| _ = self.model.fit(x_train, y_train) | |
| return {'model': self.model, | |
| 'y_pred_proba': np.squeeze(self.predict_proba(x_test)), | |
| 'y_pred': np.squeeze(self.predict(x_test))} | |
| #--------------------------------------------------------# | |
| # RF MODEL # | |
| #--------------------------------------------------------# | |
| class RandomForestModel: | |
| """ | |
| Class for random forest model, it includes the definition of the model and functions to run the model | |
| """ | |
| def __init__(self, n_estimators=20, criterion='gini', max_depth=10, min_samples_split=2, min_samples_leaf=1, | |
| class_weight='balanced', max_features='log2', bootstrap=True, random_state=0, n_jobs=1): | |
| self.classification_flag = True | |
| # The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV | |
| self._estimator_type = 'classifier' | |
| self.classes_ = np.array([0, 1]).astype(int) | |
| self.hyperparameters = {'n_estimators': n_estimators, | |
| 'criterion': criterion, | |
| 'max_depth': max_depth, | |
| 'min_samples_split': min_samples_split, | |
| 'min_samples_leaf': min_samples_leaf, | |
| 'class_weight': class_weight, | |
| 'max_features': max_features, | |
| 'bootstrap': bootstrap, | |
| 'random_state': random_state, | |
| 'n_jobs': n_jobs} | |
| self.model = RandomForestClassifier(**self.hyperparameters) | |
| def fit(self, x, y): | |
| self.classes_ = np.arange(len(np.unique(y))).astype(int) | |
| return self.model.fit(x, y) | |
| def predict_proba(self, x): | |
| return self.model.predict_proba(x) | |
| def predict(self, x): | |
| return self.model.predict(x) | |
| def get_params(self, deep=False): | |
| return self.hyperparameters | |
| def set_params(self, n_estimators=20, criterion='gini', max_depth=10, min_samples_split=2, min_samples_leaf=1, | |
| class_weight='balanced', max_features='log2', bootstrap=True, random_state=0, n_jobs=1): | |
| self.hyperparameters = {'n_estimators': n_estimators, | |
| 'criterion': criterion, | |
| 'max_depth': max_depth, | |
| 'min_samples_split': min_samples_split, | |
| 'min_samples_leaf': min_samples_leaf, | |
| 'class_weight': class_weight, | |
| 'max_features': max_features, | |
| 'bootstrap': bootstrap, | |
| 'random_state': random_state, | |
| 'n_jobs': n_jobs} | |
| self.model = RandomForestClassifier(**self.hyperparameters) | |
| return self | |
| def run_model(self, x_train, y_train, x_test, y_test): | |
| """ | |
| Function to run the model | |
| :return: | |
| """ | |
| # Train model | |
| y_all = np.concatenate([y_train, y_test]) | |
| self.classes_ = np.arange(len(np.unique(y_all))).astype(int) | |
| _ = self.model.fit(x_train, y_train) | |
| return {'model': self.model, | |
| 'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)), | |
| 'y_pred': np.squeeze(self.model.predict(x_test))} | |
| #--------------------------------------------------------# | |
| # KAN MODEL # | |
| #--------------------------------------------------------# | |
| class Kan_model: | |
| """ | |
| Class for MLP model, it includes the definition of the model and functions to run the model | |
| """ | |
| def __init__(self, hidden_dim=0, batch_size=500, grid=1, k=1, seed=0, lr=0.01, early_stop=True, steps=10000, | |
| lamb=0.1, lamb_entropy=0.1, weight=True, sparse_init=False, mult_kan=False, try_gpu=False): | |
| self.device = torch.device( | |
| "cuda" if torch.cuda.is_available() and try_gpu else "cpu") # Note that code may need adaptation to work properly on GPU (only tested in CPU so far) | |
| # The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV | |
| self._estimator_type = 'classifier' | |
| self.classes_ = np.array([0, 1]).astype(int) | |
| self.n_classes = None | |
| self.hyperparameters = {'hidden_dim': hidden_dim, | |
| 'batch_size': batch_size, | |
| 'grid': grid, | |
| 'k': k, | |
| 'seed': seed, | |
| 'lr': lr, | |
| 'early_stop': early_stop, | |
| 'steps': steps, | |
| 'lamb': lamb, | |
| 'lamb_entropy': lamb_entropy, | |
| 'weight': weight, | |
| 'sparse_init': sparse_init, | |
| 'mult_kan': mult_kan} | |
| def seed_all(self, seed): | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if self.device == 'cuda': | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| def fit(self, x, y): | |
| x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0) | |
| r = self.run_model(x_train, x_test, y_train, y_test) | |
| return r | |
| def get_params(self, deep=False): | |
| return self.hyperparameters | |
| def set_params(self, hidden_dim=0, batch_size=500, grid=1, k=1, seed=0, lr=0.01, early_stop=True, steps=10000, | |
| lamb=0.1, lamb_entropy=0.1, weight=True, sparse_init=False, mult_kan=False): | |
| self.hyperparameters = {'hidden_dim': hidden_dim, | |
| 'batch_size': batch_size, | |
| 'grid': grid, | |
| 'k': k, | |
| 'seed': seed, | |
| 'lr': lr, | |
| 'early_stop': early_stop, | |
| 'steps': steps, | |
| 'lamb': lamb, | |
| 'lamb_entropy': lamb_entropy, | |
| 'weight': weight, | |
| 'sparse_init': sparse_init, | |
| 'mult_kan': mult_kan} | |
| return self | |
| def set_model(self, x_train, x_test, y_train, y_test): | |
| self.seed_all(self.hyperparameters["seed"]) | |
| y_all = np.concatenate([y_train, y_test]) | |
| self.n_classes = len(np.unique(y_all)) | |
| self.classes_ = np.arange(self.n_classes).astype(int) | |
| self.input_size = x_train.shape[1] | |
| if self.hyperparameters["hidden_dim"] == 0: | |
| self.width = [self.input_size, self.n_classes] | |
| elif isinstance(self.hyperparameters["hidden_dim"], int): | |
| self.width = [self.input_size, self.hyperparameters["hidden_dim"], self.n_classes] | |
| else: | |
| self.width = [self.input_size] + self.hyperparameters["hidden_dim"] + [self.n_classes] | |
| if self.hyperparameters['mult_kan']: # Add multiplication nodes | |
| new_width = [] | |
| for i in range(len(self.width)): | |
| if i == 0 or i == len(self.width) - 1: | |
| new_width.append(self.width[i]) | |
| else: | |
| new_width.append([self.width[i], self.width[i]]) | |
| self.width = new_width | |
| self.model = KAN(width=self.width, grid=self.hyperparameters["grid"], k=self.hyperparameters["k"], | |
| seed=0, device=self.device, sparse_init=self.hyperparameters["sparse_init"]) | |
| if isinstance(x_train, np.ndarray): | |
| self.x_train = torch.from_numpy(x_train).to(self.device).float() | |
| self.y_train = np.squeeze(torch.from_numpy(y_train).to(self.device)).long() | |
| self.x_test = torch.from_numpy(x_test).to(self.device).float() | |
| self.y_test = np.squeeze(torch.from_numpy(y_test).to(self.device)).long() | |
| else: | |
| self.x_train = torch.from_numpy(x_train.values).to(self.device).float() | |
| self.y_train = np.squeeze(torch.from_numpy(y_train.values).to(self.device)).long() | |
| self.x_test = torch.from_numpy(x_test.values).to(self.device).float() | |
| self.y_test = np.squeeze(torch.from_numpy(y_test.values).to(self.device)).long() | |
| # Note that KAN interface uses "test" for what we call "val": we reverse here for consistency | |
| self.dataset = {'train_input': self.x_train, | |
| 'train_label': self.y_train, | |
| 'test_input': self.x_test, | |
| 'test_label': self.y_test} | |
| if self.hyperparameters["weight"]: | |
| # Weight the classes according to the number of samples of each class to correct for class imbalance | |
| class_weigths = [] | |
| freq_per_class = torch.bincount(self.y_train) | |
| for i in range(self.n_classes): | |
| class_weigths.append(len(self.y_train) / (self.n_classes * freq_per_class[i].item())) | |
| class_weights = torch.tensor(class_weigths).to(self.device) | |
| self.criterion = nn.CrossEntropyLoss(weight=class_weights) | |
| else: | |
| self.criterion = nn.CrossEntropyLoss() | |
| def predict_proba(self, data): | |
| if isinstance(data, np.ndarray): | |
| data = torch.from_numpy(data).to(self.device).float() | |
| elif isinstance(data, pd.DataFrame): | |
| data = torch.from_numpy(data.values).to(self.device).float() | |
| else: | |
| pass # Assume it is already a tensor | |
| sm = nn.Softmax(dim=1) | |
| proba = sm(self.model.forward(data)).detach().cpu().numpy() | |
| return proba | |
| def predict(self, data, threshold=0.5): | |
| proba = self.predict_proba(data) | |
| return np.argmax(proba, axis=1).astype(int) | |
| def custom_fit(self, dataset, steps=100, log=1, lamb=0., lamb_l1=1., lamb_entropy=2., lamb_coef=0., | |
| lamb_coefdiff=0., update_grid=True, grid_update_num=10, lr=1., start_grid_update_step=-1, | |
| stop_grid_update_step=50, batch=-1, | |
| save_fig=False, in_vars=None, out_vars=None, beta=3, save_fig_freq=1, | |
| img_folder='./video', singularity_avoiding=False, y_th=1000., reg_metric='edge_forward_spline_n', | |
| display_metrics=None, early_stop=True, patience=30, verbose=1): | |
| if lamb > 0. and not self.model.save_act: | |
| print('setting lamb=0. If you want to set lamb > 0, set self.save_act=True') | |
| old_save_act, old_symbolic_enabled = self.model.disable_symbolic_in_fit(lamb) | |
| if verbose > 0: | |
| pbar = tqdm(range(steps), desc='description', ncols=100) | |
| else: | |
| pbar = range(steps) | |
| grid_update_freq = int(stop_grid_update_step / grid_update_num) | |
| optimizer = torch.optim.Adam(self.model.get_params(), lr=lr) | |
| results = {} | |
| results['train_loss'] = [] | |
| results['test_loss'] = [] | |
| results['reg'] = [] | |
| results['train_metrics'] = [] | |
| results['test_metrics'] = [] | |
| if batch == -1 or batch > dataset['train_input'].shape[0]: | |
| batch_size = dataset['train_input'].shape[0] | |
| else: | |
| batch_size = batch | |
| batch_size_test = dataset['test_input'].shape[0] | |
| if save_fig: | |
| if not os.path.exists(img_folder): | |
| os.makedirs(img_folder) | |
| best_loss = np.inf | |
| patience_counter = 0 | |
| for _ in pbar: | |
| if _ == steps - 1 and old_save_act: | |
| self.model.save_act = True | |
| if save_fig and _ % save_fig_freq == 0: | |
| save_act = self.model.save_act | |
| self.model.save_act = True | |
| n_batches_train = len(dataset['train_input']) // batch_size | |
| for ibt in range(n_batches_train): | |
| batch_start = ibt * batch_size | |
| batch_end = min((ibt + 1) * batch_size, len(dataset['train_input'])) | |
| train_id = np.arange(batch_start, batch_end) | |
| if _ % grid_update_freq == 0 and _ < stop_grid_update_step and update_grid and _ >= start_grid_update_step: | |
| self.model.update_grid(dataset['train_input'][train_id]) | |
| pred_train = self.model.forward(dataset['train_input'][train_id], | |
| singularity_avoiding=singularity_avoiding, y_th=y_th) | |
| train_loss = self.criterion(pred_train, dataset['train_label'][train_id]) | |
| if self.model.save_act: | |
| if reg_metric == 'edge_backward': | |
| self.model.attribute() | |
| if reg_metric == 'node_backward': | |
| self.model.node_attribute() | |
| reg_ = self.model.get_reg(reg_metric, lamb_l1, lamb_entropy, lamb_coef, lamb_coefdiff) | |
| else: | |
| reg_ = torch.tensor(0.) | |
| loss = train_loss + lamb * reg_ | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| pred_test = self.model.forward(dataset['test_input']) | |
| test_loss = self.criterion(pred_test, dataset['test_label']) | |
| # For conveniency, we get train loss and reg on the last batch only | |
| results['train_loss'].append(train_loss.cpu().detach().numpy()) | |
| results['test_loss'].append(test_loss.cpu().detach().numpy()) | |
| results['reg'].append(reg_.cpu().detach().numpy()) | |
| if _ % log == 0 and verbose > 0: | |
| if display_metrics == None: | |
| pbar.set_description("| train_loss: %.2e | test_loss: %.2e | reg: %.2e | " % ( | |
| torch.sqrt(train_loss).cpu().detach().numpy(), torch.sqrt(test_loss).cpu().detach().numpy(), | |
| reg_.cpu().detach().numpy())) | |
| else: | |
| string = '' | |
| data = () | |
| for metric in display_metrics: | |
| string += f' {metric}: %.2e |' | |
| try: | |
| results[metric] | |
| except: | |
| raise Exception(f'{metric} not recognized') | |
| data += (results[metric][-1],) | |
| pbar.set_description(string % data) | |
| if save_fig and _ % save_fig_freq == 0: | |
| self.model.plot(folder=img_folder, in_vars=in_vars, out_vars=out_vars, title="Step {}".format(_), | |
| beta=beta) | |
| plt.savefig(img_folder + os.sep + str(_) + '.jpg', bbox_inches='tight', dpi=200) | |
| plt.close() | |
| self.model.save_act = save_act | |
| if early_stop: | |
| if results['test_loss'][-1] < best_loss: | |
| best_loss = results['test_loss'][-1] | |
| patience_counter = 0 | |
| else: | |
| patience_counter += 1 | |
| if patience_counter > patience: | |
| print(f'Early stopping at step {_}') | |
| break | |
| self.model.log_history('fit') | |
| # revert back to original state | |
| self.model.symbolic_enabled = old_symbolic_enabled | |
| return results | |
| def run_model(self, x_train, x_test, y_train, y_test): | |
| self.set_model(x_train, x_test, y_train, y_test) | |
| # Train the model | |
| _ = self.custom_fit(self.dataset, batch=self.hyperparameters["batch_size"], | |
| steps=self.hyperparameters["steps"], lamb=self.hyperparameters["lamb"], | |
| lamb_entropy=self.hyperparameters["lamb_entropy"], lr=self.hyperparameters["lr"], | |
| early_stop=self.hyperparameters["early_stop"], patience=30, | |
| save_fig=False, verbose=0) | |
| return {'model': self.model, | |
| 'y_pred_proba': np.squeeze(self.predict_proba(self.x_test)), | |
| 'y_pred': np.squeeze(self.predict(self.x_test))} | |
| def prune(self): | |
| self.model = self.model.prune() | |
| #--------------------------------------------------------# | |
| # LR MODEL # | |
| #--------------------------------------------------------# | |
| class LogisticRegressionModel: | |
| """ | |
| Class for Logistic Regression model, including the definition and functions to train, validate, and test the model. | |
| """ | |
| def __init__(self, C=0, penalty='l2', solver='lbfgs', max_iter=1000, class_weight='balanced', random_state=0): | |
| # The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV | |
| self._estimator_type = 'classifier' | |
| self.classes_ = np.array([0, 1]).astype(int) | |
| self.hyperparameters = {'C': C, | |
| 'penalty': penalty, | |
| 'solver': solver, | |
| 'max_iter': max_iter, | |
| 'class_weight': class_weight, | |
| 'random_state': random_state} | |
| # Initialize the Logistic Regression model | |
| self.model = LogisticRegression(**self.hyperparameters) | |
| def fit(self, x, y): | |
| self.classes_ = np.arange(len(np.unique(y))).astype(int) | |
| return self.model.fit(x, y) | |
| def predict_proba(self, x): | |
| return self.model.predict_proba(x) | |
| def predict(self, x): | |
| return self.model.predict(x) | |
| def get_params(self, deep=False): | |
| return self.hyperparameters | |
| def set_params(self, C=0, penalty='l2', solver='lbfgs', max_iter=1000, class_weight='balanced', random_state=0): | |
| self.hyperparameters = {'C': C, | |
| 'penalty': penalty, | |
| 'solver': solver, | |
| 'max_iter': max_iter, | |
| 'class_weight': class_weight, | |
| 'random_state': random_state} | |
| self.model = LogisticRegression(**self.hyperparameters) | |
| return self | |
| def run_model(self, x_train, y_train, x_test, y_test): | |
| """ | |
| Train the Logistic Regression model using hyperparameter tuning, validate it, and test it. | |
| :return: Trained model, validation predictions, test predictions | |
| """ | |
| # Train the model | |
| y_all = np.concatenate([y_train, y_test]) | |
| self.classes_ = np.arange(len(np.unique(y_all))).astype(int) | |
| _ = self.model.fit(x_train, y_train) | |
| return {'model': self.model, | |
| 'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)), | |
| 'y_pred': np.squeeze(self.model.predict(x_test))} | |
| def get_coefficients(self, model): | |
| """ | |
| Retrieve coefficients and intercept of the trained logistic regression model. | |
| :param model: Trained logistic regression model | |
| :return: Coefficients and intercept as lists | |
| """ | |
| coefficients = model.coef_.flatten().tolist() | |
| intercept = model.intercept_.tolist()[0] | |
| return coefficients, intercept | |
| #--------------------------------------------------------# | |
| # MLP MODEL # | |
| #--------------------------------------------------------# | |
| class Mlp_model: | |
| """ | |
| Class for MLP model, it includes the definition of the model and functions to run the model | |
| """ | |
| def __init__(self, hidden_layer_sizes=(32,), max_iter=10000, early_stopping=True, alpha=0.0001): | |
| self.classification_flag = True | |
| # The next two values are needed to obtain the optimal parameters using Sklearn's GridSearchCV | |
| self._estimator_type = 'classifier' | |
| self.classes_ = np.array([0, 1]).astype(int) | |
| self.hyperparameters = {'hidden_layer_sizes': hidden_layer_sizes, | |
| 'max_iter': max_iter, | |
| 'early_stopping': early_stopping, | |
| 'alpha': alpha} | |
| self.model = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes, | |
| max_iter=max_iter, | |
| early_stopping=early_stopping, | |
| alpha=alpha) | |
| def fit(self, x, y): | |
| self.classes_ = np.arange(len(np.unique(y))).astype(int) | |
| return self.model.fit(x, y) | |
| def predict_proba(self, x): | |
| return self.model.predict_proba(x) | |
| def predict(self, x): | |
| return self.model.predict(x) | |
| def get_params(self, deep=False): | |
| return self.hyperparameters | |
| def set_params(self, hidden_layer_sizes=(32,), max_iter=10000, early_stopping=True, alpha=0.0001): | |
| self.hyperparameters = {'hidden_layer_sizes': hidden_layer_sizes, | |
| 'max_iter': max_iter, | |
| 'early_stopping': early_stopping, | |
| 'alpha': alpha} | |
| return self | |
| def run_model(self, x_train, y_train, x_test, y_test): | |
| """ | |
| Function to run the model | |
| :return: | |
| """ | |
| # Train model | |
| y_all = np.concatenate([y_train, y_test]) | |
| self.classes_ = np.arange(len(np.unique(y_all))).astype(int) | |
| _ = self.model.fit(x_train, y_train) | |
| return {'model': self.model, | |
| 'y_pred_proba': np.squeeze(self.model.predict_proba(x_test)), | |
| 'y_pred': np.squeeze(self.model.predict(x_test))} | |