Spaces:
Runtime error
Runtime error
| from scipy.optimize import minimize, OptimizeResult | |
| import numpy as np | |
| import math | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import time | |
| try: | |
| import src.functions as functions | |
| except: | |
| import functions | |
| import argparse | |
| """ | |
| Test functions are available in https://www.sfu.ca/~ssurjano/optimization.html | |
| """ | |
| is_streamlit = False | |
| def rastrigin(x): | |
| if len(x.shape) == 2: | |
| N, n = x.shape | |
| axis = 1 | |
| else: | |
| n = len(x) | |
| axis = 0 | |
| y = np.sum(x**2 -10*np.cos(2*math.pi*x), axis=axis, keepdims=True) | |
| return y + 10*n | |
| def schwefel(x): | |
| if len(x.shape) == 2: | |
| N, n = x.shape | |
| axis = 1 | |
| else: | |
| n = len(x) | |
| axis = 0 | |
| y = 418.9829*n - np.sum(x*np.sin(np.sqrt(np.abs(x))), axis=axis, keepdims=True) | |
| return y | |
| def griewank(x): | |
| if len(x.shape) == 2: | |
| N, n = x.shape | |
| axis = 1 | |
| else: | |
| n = len(x) | |
| axis = 0 | |
| y = np.sum((x**2)/4000, axis=axis, keepdims=True) - np.prod(np.cos(x / np.sqrt(np.arange(1, n+1))), axis=axis, keepdims=True) + 1 | |
| return y | |
| def test_func(x): | |
| if len(x.shape) == 2: | |
| N, n = x.shape | |
| axis = 1 | |
| else: | |
| n = len(x) | |
| axis = 0 | |
| y = np.sum(x**2, axis=axis, keepdims=True) | |
| return y | |
| ## ------------------ BASIS FUNCTIONS START ------------------ ## | |
| def Pn(m, x): | |
| if m == 0: | |
| return np.ones_like(x) | |
| elif m == 1: | |
| return x | |
| else: | |
| return (2*m-1)*x*Pn(m-1, x)/m - (m-1)*Pn(m-2, x)/m | |
| def Legendre(a,b,m,x): | |
| return np.sqrt((2*m+1)/(b-a))*Pn(m, 2*(x-b)/(b-a)+1) | |
| def Cosine(a,b,m,x): | |
| square_root_term = np.sqrt(1 / (b-a) * 8 * math.pi * m / (math.sin(4 * math.pi * m) + 4 * math.pi * m)) | |
| outer_term = np.cos(2 * math.pi * m * (x - a) / (b - a)) | |
| return square_root_term * outer_term | |
| ## ------------------ BASIS FUNCTIONS END ------------------ ## | |
| def hdmr_opt(fun, x0, args=(), jac=None, callback=None, | |
| gtol=1e-5, maxiter=None, | |
| disp=False, return_all=False, finite_diff_rel_step=None, | |
| **unknown_options): | |
| global plt1, plt2, plt3, a, b, xs | |
| def calculate_alpha_coeff(xs): | |
| N, n = xs.shape | |
| alpha = np.zeros((m, n)) | |
| y = fun(xs) | |
| f0 = np.mean(y) | |
| for r in range(m): # Iterate degree of the Legendre polynomial | |
| for i in range(n): # Iterate number of variable | |
| alpha[r, i] = (b - a) * np.mean((y-f0) * BasisFunction(a, b, r+1, np.array(xs[:, [i]]))) | |
| return alpha | |
| def evalute_hdmr(x, f0, alpha): | |
| N, n = x.shape | |
| m, _ = alpha.shape | |
| y = f0 * np.ones((N,1)) | |
| for r in range(m): | |
| for i in range(n): | |
| y = y + alpha[r, i] * BasisFunction(a, b, r+1, np.array(x[:, [i]])) # Meta-model | |
| return y | |
| def one_dim_evaluate_hdmr(x): | |
| m, _ = alpha.shape | |
| f = 0 | |
| for r in range(m): | |
| f = f + alpha[r,i] * BasisFunction(a, b, r+1, x) # One dimensional functions | |
| return f | |
| def one_dim_evaluate_hdmr_for_test(x, idx_var): | |
| m, _ = alpha.shape | |
| f = np.zeros((N,1)) | |
| for r in range(m): | |
| f = f + alpha[r,idx_var] * BasisFunction(a, b, r+1, x) | |
| return f | |
| def plot_results(): | |
| global a, b | |
| f = np.zeros((N,n)) | |
| y = fun(xs) | |
| if not is_adaptive: | |
| for idx in range(n): | |
| f[:,[idx]] = np.mean(y) + one_dim_evaluate_hdmr_for_test(xs[:,[idx]], idx) | |
| else: | |
| a = np.min(a) | |
| b = np.max(b) | |
| for idx in range(n): | |
| f[:,[idx]] = np.mean(y) + one_dim_evaluate_hdmr_for_test(xs[:,[idx]], idx) | |
| columns_xf = [f'x{id+1}' for id in range(n)] + [f'f{id+1}' for id in range(n)] | |
| xf = np.concatenate([xs,f],axis=1) | |
| df_xf = pd.DataFrame(xf, columns=columns_xf) | |
| columns_xy = [f'x{id+1}' for id in range(n)] + ["y"] | |
| xy = np.concatenate([xs,y],axis=1) | |
| df_xy = pd.DataFrame(xy, columns=columns_xy) | |
| yhat = evalute_hdmr(xs, np.mean(y), alpha) | |
| xyhat = np.concatenate([xs,yhat],axis=1) | |
| df_xyhat = pd.DataFrame(xyhat, columns=columns_xy) | |
| fig, axs = plt.subplots(nrows=n, ncols=2, figsize=(8, n*4)) | |
| for jj in range(1,n+1): | |
| axs[jj-1, 0].scatter(df_xy[f"x{jj}"], df_xy["y"], color='red' ,label="test") | |
| axs[jj-1, 0].set_title('Test function') | |
| axs[jj-1, 0].set_xlabel(f"x{jj}") | |
| axs[jj-1, 0].set_ylabel('y') | |
| axs[jj-1, 1].scatter(df_xyhat[f"x{jj}"], df_xyhat["y"], color='red', label="hdmr") | |
| axs[jj-1, 1].scatter(df_xf[f"x{jj}"], df_xf[f"f{jj}"], color='blue', label="1d") | |
| axs[jj-1, 1].set_title('FEOM applied to function') | |
| axs[jj-1, 1].set_xlabel(f"x{jj}") | |
| axs[jj-1, 1].set_ylabel('y') | |
| axs[jj-1, 0].legend() | |
| axs[jj-1, 1].legend() | |
| plt.subplots_adjust(hspace=0.6, wspace=0.3) | |
| return fig | |
| def plot_alpha_squared(alpha): | |
| m, n = alpha.shape | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| for i in range(n): | |
| ax.plot(range(m), alpha[:, i] ** 2, label=f'X {i+1}', linewidth=2) | |
| ax.set_xlabel('Degree of Legendre Polynomial', fontsize=12) | |
| ax.set_ylabel('Square of Alpha Coefficients', fontsize=12) | |
| ax.set_title('Square of Alpha Coefficients', fontsize=14) | |
| ax.legend(fontsize=10) | |
| ax.grid(True, linestyle='--', alpha=0.7) | |
| ax.tick_params(axis='both', labelsize=10) | |
| plt.tight_layout() | |
| return fig | |
| def plot_with_function(): | |
| global is_streamlit | |
| if is_streamlit == True: | |
| import plotly.graph_objects as go | |
| Y = fun(xs) | |
| yhat = evalute_hdmr(xs, np.mean(Y, axis=0), alpha) | |
| X1, X2 = zip(*xs) | |
| X1 = np.array(X1).flatten() | |
| X2 = np.array(X2).flatten() | |
| yhat = np.array(yhat).flatten() | |
| # Scatter plot | |
| scatter_trace = go.Scatter3d(x=X1, y=X2, z=yhat, mode='markers', marker=dict(color='blue', size=2), name='Data Points') | |
| x1_min, x1_max = np.array((np.min(X1), np.max(X1))) | |
| x2_min, x2_max = np.array((np.min(X2), np.max(X2))) | |
| X1 = np.linspace(x1_min, x1_max, int(N/10)) | |
| X2 = np.linspace(x2_min, x2_max, int(N/10)) | |
| X1, X2 = np.meshgrid(X1, X2) | |
| Y = fun(np.column_stack((X1.ravel(), X2.ravel()))).reshape(X1.shape) | |
| # Surface plot | |
| surface_trace = go.Surface(x=X1, y=X2, z=Y, colorscale='jet', opacity=0.6, name='Function Surface') | |
| # Create the figure and add the traces | |
| fig = go.Figure(data=[scatter_trace, surface_trace]) | |
| # Set labels for each axis | |
| fig.update_layout(scene=dict(xaxis_title='X1', yaxis_title='X2', zaxis_title='y'), height=650, width=800, | |
| title='HDMR Scatter & Original Function Surface Plot') | |
| return fig | |
| Y = fun(xs) | |
| yhat = evalute_hdmr(xs, np.mean(Y, axis=0), alpha) | |
| X1, X2 = zip(*xs) | |
| fig = plt.figure(figsize=(8, n*4)) | |
| ax = fig.add_subplot(111, projection='3d') | |
| ax.set_title("HDMR Scatter & Original Function Surface Plot") | |
| # Scatter plot | |
| # ax.scatter(X1, X2, Y, c='r', marker='o', alpha=0.6) | |
| ax.scatter(X1, X2, yhat, c='b', marker='o', alpha=0.6, s=2) | |
| x1_min, x1_max = np.array((np.min(X1), np.max(X1))) | |
| x2_min, x2_max = np.array((np.min(X2), np.max(X2))) | |
| print("x1_min: ", x1_min) | |
| x1 = np.linspace(x1_min, x1_max, N) | |
| x2 = np.linspace(x2_min, x2_max, N) | |
| x1, x2 = np.meshgrid(x1, x2) | |
| y = fun(np.column_stack((x1.ravel(), x2.ravel()))).reshape(x1.shape) | |
| ax.plot_surface(x1, x2, y, cmap='jet', alpha=0.6) | |
| # Limit the Y axis so scatter is easier to see. | |
| # ax.set_zlim(np.min(yhat), np.max(yhat)) | |
| # Set labels for each axis | |
| ax.set_xlabel('X1') | |
| ax.set_ylabel('X2') | |
| ax.set_zlabel('y') | |
| return fig | |
| def calculate_distances(x0_, arr): | |
| return np.sqrt(np.sum((x0_ - np.array(arr)) ** 2, axis=1)) | |
| def find_closest_points(x0_, arr, k): | |
| distances = calculate_distances(x0_, arr) | |
| indexes = np.argsort(distances)[:k] | |
| return arr[indexes] | |
| if is_adaptive: | |
| iter_count = 1 | |
| results = [] | |
| xs = (b - a)*np.random.random((N,n)) + a # Generate sampling data | |
| print('XS: ', xs.shape) | |
| alpha = calculate_alpha_coeff(xs) | |
| print("Alpha: ", alpha) | |
| temp_status = [] | |
| for i in range(n): | |
| status = minimize(one_dim_evaluate_hdmr, np.array(x0[i]), method='BFGS') | |
| temp_status.append(status.x[0]) | |
| result = OptimizeResult(x=temp_status, fun=fun(x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) | |
| result.nfev = N | |
| results.append(result) | |
| if np.sqrt(np.sum((x0 - np.array(result.x)) ** 2)) < epsilon: | |
| print("Convergence occured at the first iteration!") | |
| else: | |
| old_x0 = x0 | |
| old_a = [a for _ in range(n)] | |
| old_b = [b for _ in range(n)] | |
| new_x0 = np.array(result.x) | |
| new_a = -np.inf | |
| new_b = np.inf | |
| while True: | |
| iter_count += 1 | |
| print(f"old x0 = {old_x0}") | |
| print(f"new x0 = {new_x0}") | |
| print("---------------------------------------------------------------------------------") | |
| closest_points = find_closest_points(x0_=new_x0, arr=xs, k=k) | |
| new_a = np.min(closest_points, axis=0) | |
| new_b = np.max(closest_points, axis=0) | |
| for i in range(n): | |
| old_range = old_b[i] - old_a[i] | |
| if new_b[i] - new_a[i] < clip * old_range: | |
| # if np.abs(new_a[i]) < 0.7 * np.abs(old_a[i]): | |
| # new_a[i] = 0.7 * old_a[i] | |
| # if np.abs(new_b[i]) < 0.7 * np.abs(old_b[i]): | |
| # new_b[i] = 0.7 * old_b[i] | |
| middle_point = (old_b[i] + old_a[i]) / 2 | |
| new_range = clip * old_range | |
| new_a[i] = middle_point - (new_range / 2) | |
| new_b[i] = middle_point + (new_range / 2) | |
| print("Creating new sample...", end=" ") | |
| new_xs = (new_b - new_a)*np.random.random((N,n)) + new_a | |
| print("Done!") | |
| # HDMR | |
| alpha = calculate_alpha_coeff(new_xs) | |
| # print("Alpha: ", alpha) | |
| temp_status = [] | |
| for i in range(n): | |
| status = minimize(one_dim_evaluate_hdmr, np.array(new_x0[i]), method='BFGS') | |
| temp_status.append(status.x[0]) | |
| result = OptimizeResult(x=temp_status, fun=fun(new_x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) | |
| result.nfev = N | |
| if np.sqrt(np.sum((old_x0 - new_x0) ** 2)) < epsilon: | |
| print("Convergence occured!") | |
| break | |
| else: | |
| results.append(result) | |
| old_x0 = np.array(new_x0) | |
| xs = new_xs | |
| new_x0 = np.array(result.x) | |
| old_a = new_a | |
| old_b = new_b | |
| a = old_a | |
| b = old_b | |
| x0 = old_x0 | |
| plt1 = plot_results() | |
| plt3 = plot_alpha_squared(alpha) | |
| if n == 2: | |
| plt2 = plot_with_function() | |
| else: | |
| plt2 = None | |
| result = results[-1] | |
| result["nfev"] = result["nfev"] * iter_count | |
| result['n_iterations'] = iter_count | |
| return result | |
| else: | |
| xs = (b-a)*np.random.random((N,n))+a # Generate sampling data | |
| print('XS: ', xs.shape) | |
| alpha = calculate_alpha_coeff(xs) | |
| print("Alpha: ", alpha) | |
| temp_status = [] | |
| for i in range(n): | |
| status = minimize(one_dim_evaluate_hdmr, np.array(x0[i]), method='BFGS') | |
| temp_status.append(status.x[0]) | |
| result = OptimizeResult(x=temp_status, fun=fun(x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) | |
| result.nfev = N | |
| plt1 = plot_results() | |
| if n == 2: | |
| plt2 = plot_with_function() | |
| else: | |
| plt2 = None | |
| return result | |
| def main_function(N_, n_, function_name_, basis_function_, m_, a_, b_, random_init_, x0_, is_adaptive_, k_=None, epsilon_=None, clip_=None): | |
| global N, n, function_name, basis_function, BasisFunction, m, a, b, random_init, is_adaptive, k, epsilon, clip | |
| if basis_function_ not in ["Legendre", "Cosine"]: | |
| raise ValueError("basis_function should be Cosine or Legendre.") | |
| N = N_ | |
| n = n_ | |
| function_name = function_name_ | |
| basis_function = basis_function_ | |
| m = m_ | |
| a = a_ | |
| b = b_ | |
| random_init = random_init_ | |
| is_adaptive = is_adaptive_ | |
| if basis_function == "Legendre": | |
| BasisFunction = Legendre | |
| elif basis_function == "Cosine": | |
| BasisFunction = Cosine | |
| if is_adaptive: | |
| k = k_ | |
| epsilon = epsilon_ | |
| clip = clip_ | |
| if not (0 < clip <= 1): | |
| raise ValueError("Clipping value should be in the interval of (0, 1]") | |
| file_name = f"results/adaptive_{function_name}_a{a}_b{b}_N{N}_m{m}_k{k}_c{clip:.2f}" | |
| else: | |
| file_name = f"results/{function_name}_a{a}_b{b}_N{N}_m{m}" | |
| if not random_init: | |
| x0 = np.fromstring(x0_,dtype=float,sep=',') | |
| else: | |
| file_name += '_randomInit' | |
| if function_name.split('_')[1] == '2d': | |
| x0 = np.random.rand(2) * (b - a) + a # Initial value of function for optimizing process | |
| x0_ = x0 | |
| elif function_name.split('_')[1] == '10d': | |
| x0 = np.random.rand(10) * (b - a) + a | |
| x0_ = x0 | |
| # status_bfgs = minimize(test_function, x0, method="BFGS") # Applying direct optimization method to the function | |
| # print(f"BFGS status: {status_bfgs}") | |
| start=time.time() | |
| status_hdmr = minimize(getattr(functions, function_name), x0, args=(), method=hdmr_opt) # Applying hdmr-opt method to the function | |
| end=time.time() | |
| runtime=end-start | |
| return status_hdmr,runtime, plt1, plt2, plt3, file_name | |
| plt1 = plt2 = plt3 = None | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| prog='HDMR', | |
| description='Program applies the hdmr-opt method and plots the results.') | |
| parser.add_argument('--numSamples', type=int, help='Number of samples to calculate alpha coefficients.', required=True) | |
| parser.add_argument('--numVariables', type=int, help='Number of variable of the test function.', required=True) | |
| parser.add_argument('--function', help='Test function name.', required=True) | |
| parser.add_argument('--min', type=float, help='Lower range of the test function.', required=True) | |
| parser.add_argument('--max', type=float, help='Upper range of the test function.', required=True) | |
| parser.add_argument('--randomInit', action='store_true', help='Initializes x0 as random numbers in the range of xs. Default is initializing as 0.') | |
| parser.add_argument('--basisFunction', type=int, default="Cosine", help='Basis function that will be used in HDMR. Legendre or Cosine. Default is Cosine.') | |
| parser.add_argument('--legendreDegree', type=int, default=7, help='Number of legendre polynomial. Default is 7.') | |
| parser.add_argument('--adaptive', action='store_true', help='Uses iterative method when set.') | |
| parser.add_argument('--numClosestPoints', type=int, help='Number of closest points to x0. Default is 1000.', default=100) | |
| parser.add_argument('--epsilon', type=float, help='Epsilon value for convergence. Default is 0.1.', default=0.1) | |
| parser.add_argument('--clip', type=float, help='Clipping value for updating interval (a, b). Default is 0.9.', default=0.9) | |
| global_args = parser.parse_args() | |
| print('Args: ', global_args) | |
| N_ = global_args.numSamples # Number of samples to calculate alpha coefficients | |
| n_ = global_args.numVariables # Number of variable | |
| function_name_ = global_args.function | |
| basis_function_ = global_args.basis_function | |
| m_ = global_args.legendreDegree # Degree of the Legendre polynomial | |
| a_ = global_args.min # Range of the function | |
| b_ = global_args.max # Range of the function | |
| is_adaptive_ = global_args.adaptive | |
| random_init_ = global_args.randomInit | |
| k_ = global_args.numClosestPoints | |
| epsilon_ = global_args.epsilon | |
| clip_ = global_args.clip | |
| status_hdmr, runtime, _, _, _, file_name = main_function(N_, n_, function_name_, basis_function_, m_, a_, b_, random_init_, | |
| is_adaptive_, k_, epsilon_, clip_) | |
| print(f"{runtime} seconds") | |
| print(f"hdmr_opt status: {status_hdmr}") | |
| with open(file_name + '.txt', 'w') as f: | |
| # f.write("BFGS Status\n" + str(status_bfgs) + "\n\n") | |
| f.write("HDMR Status\n" + str(status_hdmr)) | |
| f.close() | |
| plt.show() |