Spaces:
Runtime error
Runtime error
| import time | |
| import numpy as np | |
| from sklearn.neural_network import MLPRegressor | |
| from sklearn.utils.validation import column_or_1d | |
| import Settings as settings | |
| from DataUtils import make_y_multi_safe | |
| hidden_layer_sizes = (5, 5) | |
| max_iter_mlp = 500000 | |
| class MLP_Model: | |
| def __init__(self): | |
| self.name = "MLP Model" | |
| self.short_name = "MLP" | |
| self.hidden_layer_sizes = hidden_layer_sizes | |
| self.solver = 'adam' | |
| self.max_iter = max_iter_mlp | |
| self.warm_start = False | |
| self.verbose = False | |
| self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
| solver=self.solver, | |
| activation='relu', | |
| max_iter=self.max_iter, | |
| verbose=self.verbose, | |
| tol=1e-7, | |
| warm_start=self.warm_start, | |
| n_iter_no_change=100) | |
| def get_formula_string(self): | |
| return "(neural black box)" | |
| # current_inputs = ["X{}".format(i + 1) for i in range(settings.num_features)] | |
| # # print(current_inputs) | |
| # matrices = self.est_mlp.coefs_ | |
| # vectors = self.est_mlp.intercepts_ | |
| # for i in range(len(matrices)): | |
| # current_outputs = [] | |
| # | |
| # for j in range(matrices[i].shape[1]): | |
| # current_term = [vectors[i][j]] | |
| # for k in range(matrices[i].shape[0]): | |
| # sys.stdout.flush() | |
| # current_term.append(["*", matrices[i][k][j], current_inputs[k]]) | |
| # | |
| # current_output = current_term[-1] | |
| # for k in range(len(current_term), 1, -1): | |
| # current_output = ["+", current_term[k - 2], current_output] | |
| # current_outputs.append(current_output) | |
| # current_inputs = [["max", 0, old_out] for old_out in current_outputs] | |
| # | |
| # # [-1] since we don't do relu activation on the last layer. | |
| # return current_inputs[0][-1] | |
| def get_formula(self): | |
| return "(neural black box)" | |
| # return self.get_formula_string() | |
| def train(self, X, Y): | |
| X = np.reshape(X, [X.shape[0], -1]) | |
| Y = np.reshape(Y, [-1, 1]) | |
| Y = column_or_1d(Y) | |
| self.est_mlp.fit(X, Y) | |
| return None | |
| def predict(self, X): | |
| return self.est_mlp.predict(X) | |
| # Mean square error | |
| def test(self, X, Y): | |
| X = np.reshape(X, [X.shape[0], -1]) | |
| y_hat = np.reshape(self.est_mlp.predict(X), [1, -1])[0] | |
| y_gold = np.reshape(Y, [1, -1])[0] | |
| our_sum = 0 | |
| for i in range(len(y_gold)): | |
| our_sum += (y_hat[i] - y_gold[i]) ** 2 | |
| return our_sum / len(y_gold) | |
| def reset(self): | |
| self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
| solver=self.solver, | |
| activation='relu', | |
| max_iter=self.max_iter, | |
| verbose=self.verbose, | |
| tol=1e-7, | |
| warm_start=self.warm_start, | |
| n_iter_no_change=100) | |
| def soft_reset(self): | |
| self.est_mlp = MLPRegressor(hidden_layer_sizes=self.hidden_layer_sizes, | |
| solver=self.solver, | |
| activation='relu', | |
| max_iter=self.max_iter, | |
| verbose=self.verbose, | |
| tol=1e-7, | |
| warm_start=self.warm_start, | |
| n_iter_no_change=100) | |
| def get_simple_formula(self, digits=None): | |
| full_formula = self.get_formula_string() | |
| return full_formula | |
| # return DataUtils.simplify_formula(full_formula, digits=digits) | |
| def real_repeat_train(self, x, y=None, | |
| num_repeats=settings.num_train_repeat_processes, | |
| test_x=None, test_y=None, | |
| verbose=True): | |
| # we still reduce train set size if only 1 repeat | |
| train_set_size = int(len(x) * settings.quick_train_fraction + 0.1) | |
| x = np.array(x) | |
| if y is not None: | |
| y = np.array(y) | |
| sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False) | |
| train_x = x[sample][:] | |
| if y is not None: | |
| train_y = y[sample] | |
| else: | |
| train_y = None | |
| out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample] | |
| valid_x = x[out_sample][:] | |
| if y is not None: | |
| valid_y = y[out_sample] | |
| # valid_y = self.make_y_multi_safe(valid_y) | |
| else: | |
| valid_y = None | |
| best_formula = "" | |
| best_iter = 0 | |
| best_validation = 999999 | |
| best_err = 999999 | |
| old_time = time.time() | |
| if verbose: | |
| print("Beginning {} repeat sessions of {} iterations each.".format(num_repeats, | |
| settings.num_train_steps_in_repeat_mode)) | |
| print() | |
| start_time = time.time() | |
| old_time = start_time | |
| for train_iter in range(1, 1 + num_repeats): | |
| if verbose: | |
| print("Repeated train session {} of {}.".format(train_iter, num_repeats)) | |
| self.soft_reset() | |
| self.train(train_x, train_y) | |
| valid_err = self.test(valid_x, valid_y) | |
| current_time = time.time() | |
| if verbose: | |
| # print(self.get_simple_formula()) | |
| print("Attained validation error: {:.5f}".format(valid_err)) | |
| if valid_err < best_validation: | |
| best_validation = valid_err | |
| best_formula = self.get_simple_formula() | |
| best_iter = train_iter | |
| if test_x is not None: | |
| safe_test_y = make_y_multi_safe(test_y) | |
| best_err = self.test(test_x, safe_test_y) | |
| else: | |
| best_err = valid_err | |
| if verbose: | |
| print(">>> New best model!") | |
| print(best_formula) | |
| if verbose: | |
| iters_per_minute = 60.0 / (current_time - old_time) | |
| print("Took {:.2f} minutes.".format((current_time - old_time) / 60)) | |
| print("Est. {:.2f} minutes remaining.".format((num_repeats - train_iter) / iters_per_minute)) | |
| print() | |
| old_time = current_time | |
| if verbose: | |
| print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60)) | |
| return best_formula, best_iter, best_err | |
| # Does not repeat train. sorry. | |
| def repeat_train(self, x, y=None, | |
| num_repeats=settings.num_train_repeat_processes, | |
| test_x=None, test_y=None, | |
| verbose=True): | |
| # we still reduce train set size if only 1 repeat | |
| train_set_size = int(len(x) * settings.quick_train_fraction + 0.1) | |
| x = np.array(x) | |
| if y is not None: | |
| y = np.array(y) | |
| sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False) | |
| train_x = x[sample][:] | |
| if y is not None: | |
| train_y = y[sample] | |
| else: | |
| train_y = None | |
| out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample] | |
| valid_x = x[out_sample][:] | |
| if y is not None: | |
| valid_y = y[out_sample] | |
| # valid_y = self.make_y_multi_safe(valid_y) | |
| else: | |
| valid_y = None | |
| if verbose: | |
| start_time = time.time() | |
| old_time = start_time | |
| self.soft_reset() | |
| self.train(train_x, train_y) | |
| current_time = time.time() | |
| best_formula = self.get_simple_formula() | |
| if test_x is not None: | |
| safe_test_y = make_y_multi_safe(test_y) | |
| best_err = self.test(test_x, safe_test_y) | |
| else: | |
| best_err = self.test(valid_x, valid_y) | |
| if verbose: | |
| print(">>> New best model!") | |
| print(best_formula) | |
| if verbose: | |
| print("Took {:.2f} minutes.".format((current_time - old_time) / 60)) | |
| print() | |
| if verbose: | |
| print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60)) | |
| return best_formula, 0, best_err | |