from scipy.optimize import minimize, OptimizeResult import numpy as np import math import matplotlib.pyplot as plt import pandas as pd import time try: import src.functions as functions except: import functions import argparse """ Test functions are available in https://www.sfu.ca/~ssurjano/optimization.html """ is_streamlit = False def rastrigin(x): if len(x.shape) == 2: N, n = x.shape axis = 1 else: n = len(x) axis = 0 y = np.sum(x**2 -10*np.cos(2*math.pi*x), axis=axis, keepdims=True) return y + 10*n def schwefel(x): if len(x.shape) == 2: N, n = x.shape axis = 1 else: n = len(x) axis = 0 y = 418.9829*n - np.sum(x*np.sin(np.sqrt(np.abs(x))), axis=axis, keepdims=True) return y def griewank(x): if len(x.shape) == 2: N, n = x.shape axis = 1 else: n = len(x) axis = 0 y = np.sum((x**2)/4000, axis=axis, keepdims=True) - np.prod(np.cos(x / np.sqrt(np.arange(1, n+1))), axis=axis, keepdims=True) + 1 return y def test_func(x): if len(x.shape) == 2: N, n = x.shape axis = 1 else: n = len(x) axis = 0 y = np.sum(x**2, axis=axis, keepdims=True) return y ## ------------------ BASIS FUNCTIONS START ------------------ ## def Pn(m, x): if m == 0: return np.ones_like(x) elif m == 1: return x else: return (2*m-1)*x*Pn(m-1, x)/m - (m-1)*Pn(m-2, x)/m def Legendre(a,b,m,x): return np.sqrt((2*m+1)/(b-a))*Pn(m, 2*(x-b)/(b-a)+1) def Cosine(a,b,m,x): square_root_term = np.sqrt(1 / (b-a) * 8 * math.pi * m / (math.sin(4 * math.pi * m) + 4 * math.pi * m)) outer_term = np.cos(2 * math.pi * m * (x - a) / (b - a)) return square_root_term * outer_term ## ------------------ BASIS FUNCTIONS END ------------------ ## def hdmr_opt(fun, x0, args=(), jac=None, callback=None, gtol=1e-5, maxiter=None, disp=False, return_all=False, finite_diff_rel_step=None, **unknown_options): global plt1, plt2, plt3, a, b, xs def calculate_alpha_coeff(xs): N, n = xs.shape alpha = np.zeros((m, n)) y = fun(xs) f0 = np.mean(y) for r in range(m): # Iterate degree of the Legendre polynomial for i in range(n): # Iterate number of variable alpha[r, i] = (b - a) * np.mean((y-f0) * BasisFunction(a, b, r+1, np.array(xs[:, [i]]))) return alpha def evalute_hdmr(x, f0, alpha): N, n = x.shape m, _ = alpha.shape y = f0 * np.ones((N,1)) for r in range(m): for i in range(n): y = y + alpha[r, i] * BasisFunction(a, b, r+1, np.array(x[:, [i]])) # Meta-model return y def one_dim_evaluate_hdmr(x): m, _ = alpha.shape f = 0 for r in range(m): f = f + alpha[r,i] * BasisFunction(a, b, r+1, x) # One dimensional functions return f def one_dim_evaluate_hdmr_for_test(x, idx_var): m, _ = alpha.shape f = np.zeros((N,1)) for r in range(m): f = f + alpha[r,idx_var] * BasisFunction(a, b, r+1, x) return f def plot_results(): global a, b f = np.zeros((N,n)) y = fun(xs) if not is_adaptive: for idx in range(n): f[:,[idx]] = np.mean(y) + one_dim_evaluate_hdmr_for_test(xs[:,[idx]], idx) else: a = np.min(a) b = np.max(b) for idx in range(n): f[:,[idx]] = np.mean(y) + one_dim_evaluate_hdmr_for_test(xs[:,[idx]], idx) columns_xf = [f'x{id+1}' for id in range(n)] + [f'f{id+1}' for id in range(n)] xf = np.concatenate([xs,f],axis=1) df_xf = pd.DataFrame(xf, columns=columns_xf) columns_xy = [f'x{id+1}' for id in range(n)] + ["y"] xy = np.concatenate([xs,y],axis=1) df_xy = pd.DataFrame(xy, columns=columns_xy) yhat = evalute_hdmr(xs, np.mean(y), alpha) xyhat = np.concatenate([xs,yhat],axis=1) df_xyhat = pd.DataFrame(xyhat, columns=columns_xy) fig, axs = plt.subplots(nrows=n, ncols=2, figsize=(8, n*4)) for jj in range(1,n+1): axs[jj-1, 0].scatter(df_xy[f"x{jj}"], df_xy["y"], color='red' ,label="test") axs[jj-1, 0].set_title('Test function') axs[jj-1, 0].set_xlabel(f"x{jj}") axs[jj-1, 0].set_ylabel('y') axs[jj-1, 1].scatter(df_xyhat[f"x{jj}"], df_xyhat["y"], color='red', label="hdmr") axs[jj-1, 1].scatter(df_xf[f"x{jj}"], df_xf[f"f{jj}"], color='blue', label="1d") axs[jj-1, 1].set_title('FEOM applied to function') axs[jj-1, 1].set_xlabel(f"x{jj}") axs[jj-1, 1].set_ylabel('y') axs[jj-1, 0].legend() axs[jj-1, 1].legend() plt.subplots_adjust(hspace=0.6, wspace=0.3) return fig def plot_alpha_squared(alpha): m, n = alpha.shape fig, ax = plt.subplots(figsize=(10, 6)) for i in range(n): ax.plot(range(m), alpha[:, i] ** 2, label=f'X {i+1}', linewidth=2) ax.set_xlabel('Degree of Legendre Polynomial', fontsize=12) ax.set_ylabel('Square of Alpha Coefficients', fontsize=12) ax.set_title('Square of Alpha Coefficients', fontsize=14) ax.legend(fontsize=10) ax.grid(True, linestyle='--', alpha=0.7) ax.tick_params(axis='both', labelsize=10) plt.tight_layout() return fig def plot_with_function(): global is_streamlit if is_streamlit == True: import plotly.graph_objects as go Y = fun(xs) yhat = evalute_hdmr(xs, np.mean(Y, axis=0), alpha) X1, X2 = zip(*xs) X1 = np.array(X1).flatten() X2 = np.array(X2).flatten() yhat = np.array(yhat).flatten() # Scatter plot scatter_trace = go.Scatter3d(x=X1, y=X2, z=yhat, mode='markers', marker=dict(color='blue', size=2), name='Data Points') x1_min, x1_max = np.array((np.min(X1), np.max(X1))) x2_min, x2_max = np.array((np.min(X2), np.max(X2))) X1 = np.linspace(x1_min, x1_max, int(N/10)) X2 = np.linspace(x2_min, x2_max, int(N/10)) X1, X2 = np.meshgrid(X1, X2) Y = fun(np.column_stack((X1.ravel(), X2.ravel()))).reshape(X1.shape) # Surface plot surface_trace = go.Surface(x=X1, y=X2, z=Y, colorscale='jet', opacity=0.6, name='Function Surface') # Create the figure and add the traces fig = go.Figure(data=[scatter_trace, surface_trace]) # Set labels for each axis fig.update_layout(scene=dict(xaxis_title='X1', yaxis_title='X2', zaxis_title='y'), height=650, width=800, title='HDMR Scatter & Original Function Surface Plot') return fig Y = fun(xs) yhat = evalute_hdmr(xs, np.mean(Y, axis=0), alpha) X1, X2 = zip(*xs) fig = plt.figure(figsize=(8, n*4)) ax = fig.add_subplot(111, projection='3d') ax.set_title("HDMR Scatter & Original Function Surface Plot") # Scatter plot # ax.scatter(X1, X2, Y, c='r', marker='o', alpha=0.6) ax.scatter(X1, X2, yhat, c='b', marker='o', alpha=0.6, s=2) x1_min, x1_max = np.array((np.min(X1), np.max(X1))) x2_min, x2_max = np.array((np.min(X2), np.max(X2))) print("x1_min: ", x1_min) x1 = np.linspace(x1_min, x1_max, N) x2 = np.linspace(x2_min, x2_max, N) x1, x2 = np.meshgrid(x1, x2) y = fun(np.column_stack((x1.ravel(), x2.ravel()))).reshape(x1.shape) ax.plot_surface(x1, x2, y, cmap='jet', alpha=0.6) # Limit the Y axis so scatter is easier to see. # ax.set_zlim(np.min(yhat), np.max(yhat)) # Set labels for each axis ax.set_xlabel('X1') ax.set_ylabel('X2') ax.set_zlabel('y') return fig def calculate_distances(x0_, arr): return np.sqrt(np.sum((x0_ - np.array(arr)) ** 2, axis=1)) def find_closest_points(x0_, arr, k): distances = calculate_distances(x0_, arr) indexes = np.argsort(distances)[:k] return arr[indexes] if is_adaptive: iter_count = 1 results = [] xs = (b - a)*np.random.random((N,n)) + a # Generate sampling data print('XS: ', xs.shape) alpha = calculate_alpha_coeff(xs) print("Alpha: ", alpha) temp_status = [] for i in range(n): status = minimize(one_dim_evaluate_hdmr, np.array(x0[i]), method='BFGS') temp_status.append(status.x[0]) result = OptimizeResult(x=temp_status, fun=fun(x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) result.nfev = N results.append(result) if np.sqrt(np.sum((x0 - np.array(result.x)) ** 2)) < epsilon: print("Convergence occured at the first iteration!") else: old_x0 = x0 old_a = [a for _ in range(n)] old_b = [b for _ in range(n)] new_x0 = np.array(result.x) new_a = -np.inf new_b = np.inf while True: iter_count += 1 print(f"old x0 = {old_x0}") print(f"new x0 = {new_x0}") print("---------------------------------------------------------------------------------") closest_points = find_closest_points(x0_=new_x0, arr=xs, k=k) new_a = np.min(closest_points, axis=0) new_b = np.max(closest_points, axis=0) for i in range(n): old_range = old_b[i] - old_a[i] if new_b[i] - new_a[i] < clip * old_range: # if np.abs(new_a[i]) < 0.7 * np.abs(old_a[i]): # new_a[i] = 0.7 * old_a[i] # if np.abs(new_b[i]) < 0.7 * np.abs(old_b[i]): # new_b[i] = 0.7 * old_b[i] middle_point = (old_b[i] + old_a[i]) / 2 new_range = clip * old_range new_a[i] = middle_point - (new_range / 2) new_b[i] = middle_point + (new_range / 2) print("Creating new sample...", end=" ") new_xs = (new_b - new_a)*np.random.random((N,n)) + new_a print("Done!") # HDMR alpha = calculate_alpha_coeff(new_xs) # print("Alpha: ", alpha) temp_status = [] for i in range(n): status = minimize(one_dim_evaluate_hdmr, np.array(new_x0[i]), method='BFGS') temp_status.append(status.x[0]) result = OptimizeResult(x=temp_status, fun=fun(new_x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) result.nfev = N if np.sqrt(np.sum((old_x0 - new_x0) ** 2)) < epsilon: print("Convergence occured!") break else: results.append(result) old_x0 = np.array(new_x0) xs = new_xs new_x0 = np.array(result.x) old_a = new_a old_b = new_b a = old_a b = old_b x0 = old_x0 plt1 = plot_results() plt3 = plot_alpha_squared(alpha) if n == 2: plt2 = plot_with_function() else: plt2 = None result = results[-1] result["nfev"] = result["nfev"] * iter_count result['n_iterations'] = iter_count return result else: xs = (b-a)*np.random.random((N,n))+a # Generate sampling data print('XS: ', xs.shape) alpha = calculate_alpha_coeff(xs) print("Alpha: ", alpha) temp_status = [] for i in range(n): status = minimize(one_dim_evaluate_hdmr, np.array(x0[i]), method='BFGS') temp_status.append(status.x[0]) result = OptimizeResult(x=temp_status, fun=fun(x0, *args), success=True, message=" ", nfev=1, njev=0, nhev=0) result.nfev = N plt1 = plot_results() if n == 2: plt2 = plot_with_function() else: plt2 = None return result def main_function(N_, n_, function_name_, basis_function_, m_, a_, b_, random_init_, x0_, is_adaptive_, k_=None, epsilon_=None, clip_=None): global N, n, function_name, basis_function, BasisFunction, m, a, b, random_init, is_adaptive, k, epsilon, clip if basis_function_ not in ["Legendre", "Cosine"]: raise ValueError("basis_function should be Cosine or Legendre.") N = N_ n = n_ function_name = function_name_ basis_function = basis_function_ m = m_ a = a_ b = b_ random_init = random_init_ is_adaptive = is_adaptive_ if basis_function == "Legendre": BasisFunction = Legendre elif basis_function == "Cosine": BasisFunction = Cosine if is_adaptive: k = k_ epsilon = epsilon_ clip = clip_ if not (0 < clip <= 1): raise ValueError("Clipping value should be in the interval of (0, 1]") file_name = f"results/adaptive_{function_name}_a{a}_b{b}_N{N}_m{m}_k{k}_c{clip:.2f}" else: file_name = f"results/{function_name}_a{a}_b{b}_N{N}_m{m}" if not random_init: x0 = np.fromstring(x0_,dtype=float,sep=',') else: file_name += '_randomInit' if function_name.split('_')[1] == '2d': x0 = np.random.rand(2) * (b - a) + a # Initial value of function for optimizing process x0_ = x0 elif function_name.split('_')[1] == '10d': x0 = np.random.rand(10) * (b - a) + a x0_ = x0 # status_bfgs = minimize(test_function, x0, method="BFGS") # Applying direct optimization method to the function # print(f"BFGS status: {status_bfgs}") start=time.time() status_hdmr = minimize(getattr(functions, function_name), x0, args=(), method=hdmr_opt) # Applying hdmr-opt method to the function end=time.time() runtime=end-start return status_hdmr,runtime, plt1, plt2, plt3, file_name plt1 = plt2 = plt3 = None if __name__ == "__main__": parser = argparse.ArgumentParser( prog='HDMR', description='Program applies the hdmr-opt method and plots the results.') parser.add_argument('--numSamples', type=int, help='Number of samples to calculate alpha coefficients.', required=True) parser.add_argument('--numVariables', type=int, help='Number of variable of the test function.', required=True) parser.add_argument('--function', help='Test function name.', required=True) parser.add_argument('--min', type=float, help='Lower range of the test function.', required=True) parser.add_argument('--max', type=float, help='Upper range of the test function.', required=True) parser.add_argument('--randomInit', action='store_true', help='Initializes x0 as random numbers in the range of xs. Default is initializing as 0.') parser.add_argument('--basisFunction', type=int, default="Cosine", help='Basis function that will be used in HDMR. Legendre or Cosine. Default is Cosine.') parser.add_argument('--legendreDegree', type=int, default=7, help='Number of legendre polynomial. Default is 7.') parser.add_argument('--adaptive', action='store_true', help='Uses iterative method when set.') parser.add_argument('--numClosestPoints', type=int, help='Number of closest points to x0. Default is 1000.', default=100) parser.add_argument('--epsilon', type=float, help='Epsilon value for convergence. Default is 0.1.', default=0.1) parser.add_argument('--clip', type=float, help='Clipping value for updating interval (a, b). Default is 0.9.', default=0.9) global_args = parser.parse_args() print('Args: ', global_args) N_ = global_args.numSamples # Number of samples to calculate alpha coefficients n_ = global_args.numVariables # Number of variable function_name_ = global_args.function basis_function_ = global_args.basis_function m_ = global_args.legendreDegree # Degree of the Legendre polynomial a_ = global_args.min # Range of the function b_ = global_args.max # Range of the function is_adaptive_ = global_args.adaptive random_init_ = global_args.randomInit k_ = global_args.numClosestPoints epsilon_ = global_args.epsilon clip_ = global_args.clip status_hdmr, runtime, _, _, _, file_name = main_function(N_, n_, function_name_, basis_function_, m_, a_, b_, random_init_, is_adaptive_, k_, epsilon_, clip_) print(f"{runtime} seconds") print(f"hdmr_opt status: {status_hdmr}") with open(file_name + '.txt', 'w') as f: # f.write("BFGS Status\n" + str(status_bfgs) + "\n\n") f.write("HDMR Status\n" + str(status_hdmr)) f.close() plt.show()