Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| from IPython.core.debugger import set_trace | |
| import random | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import math | |
| import copy | |
| import sys | |
| from scipy.stats import pearsonr # for computing correlation | |
| from functools import reduce #for flattening lists | |
| from operator import concat #for flattening lists | |
| from scipy.stats import trim_mean # for ensemble evaluation | |
| from scipy.stats import differential_entropy | |
| import warnings | |
| import time | |
| import dill | |
| import os | |
| from sklearn.cluster import KMeans #for clustering in ensemble definition | |
| from scipy.optimize import minimize #for uncertainty maximization | |
| from sympy import symbols, simplify, expand | |
| import sympy as sym | |
| try: | |
| from IPython.display import display, clear_output | |
| except: | |
| pass | |
| import signal #for timing out functions | |
| from contextlib import contextmanager #for timing out functions | |
| warnings.filterwarnings('ignore', '.*invalid value.*' ) | |
| warnings.filterwarnings('ignore', '.*overflow.*' ) | |
| warnings.filterwarnings('ignore', '.*divide by.*' ) | |
| warnings.filterwarnings('ignore', '.*is constant.*' ) | |
| warnings.filterwarnings('ignore', '.*nearly constant.*' ) | |
| warnings.filterwarnings('ignore', '.*Polyfit may be.*' ) | |
| warnings.filterwarnings('ignore', '.*Number of.*') | |
| ## Get the arrays for the models | |
| model_distributions = [] | |
| def protectDiv(a,b): | |
| if (type(b)==int or type(b)==float or type(b)==np.float64) and b==0: | |
| return a/math.nan | |
| if (type(b)==np.ndarray) and (0 in b): | |
| return a/np.where(b==0,math.nan,b) | |
| return a/b | |
| def add(a,b): | |
| return a+b | |
| def sub(a,b): | |
| return a-b | |
| def mult(a,b): | |
| return a*b | |
| def exp(a): | |
| safe_a = np.clip(a, -90.0, 90.0) | |
| return np.exp(safe_a) | |
| # def sine(a,b): | |
| # return np.sin(a) | |
| def power(a,b): | |
| if (type(a)==int or type(a)==float or type(a)==np.float64) and a==0: | |
| return a/math.nan | |
| if (type(a)==np.ndarray) and (0 in a): | |
| return a/np.where(a==0,math.nan,a) | |
| return a**b | |
| def sqrt(a): | |
| return np.sqrt(abs(a)) | |
| def sqrd(a): | |
| return a**2 | |
| def inv(a): | |
| return np.array(a).astype(float)**(-1) | |
| def neg(a): | |
| return -a | |
| def sin(a): | |
| return np.sin(a) | |
| def cos(a): | |
| return np.cos(a) | |
| def tan(a): | |
| return np.tan(a) | |
| def arccos(a): | |
| return np.arccos(a) | |
| def arcsin(a): | |
| return np.arcsin(a) | |
| def arctan(a): | |
| return np.arctan(a) | |
| def tanh(a): | |
| return np.tanh(a) | |
| def log(a): | |
| a = np.maximum(np.abs(a), 1e-9) | |
| return np.log(a) | |
| def log10(a): | |
| return np.log10(a) | |
| def log2(a): | |
| return np.log2(a) | |
| def abs1(a): | |
| return np.abs(a) | |
| def and1(a,b): | |
| return np.logical_and(a,b) | |
| def or1(a,b): | |
| return np.logical_or(a,b) | |
| def xor1(a,b): | |
| return np.logical_xor(a,b) | |
| def nand1(a,b): | |
| return np.logical_not(np.logical_and(a,b)) | |
| def nor1(a,b): | |
| return np.logical_not(np.logical_or(a,b)) | |
| def xnor1(a,b): | |
| return np.logical_not(np.logical_xor(a,b)) | |
| def not1(a): | |
| return np.logical_not(a) | |
| def defaultOps(): | |
| return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,"pop","pop","pop","pop","pop","pop"] | |
| def allOps(): | |
| return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,cos,sin,tan,arccos,arcsin,arctan,tanh,log,"pop","pop","pop","pop","pop","pop","pop","pop","pop","pop"] | |
| def customOps(): | |
| return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,cos,sin,log,"pop","pop","pop","pop","pop","pop","pop","pop"] | |
| def booleanOps(): | |
| return [and1,or1,xor1,nand1,nor1,xnor1,not1,"pop","pop","pop","pop","pop","pop","pop"] | |
| def randomInt(a=-3,b=3): | |
| return random.randint(a,b) | |
| def defaultConst(): | |
| return [np.pi, np.e, randomInt,ranReal ] | |
| def booleanConst(): | |
| return [1,0] | |
| def ranReal(a=20,b=-10): | |
| return random.random()*a-b | |
| ##### MODEL DISTRIBUTION CODE ##### | |
| import sympy as sym | |
| import sympy as sym | |
| from sympy import preorder_traversal | |
| from collections import defaultdict | |
| def extract_genotype(population_buffer): | |
| """ | |
| Extracts in terms of the geneotypes instead of using pheno | |
| """ | |
| stats = { | |
| "operators": [], | |
| "terminals": [], | |
| } | |
| for prog in population_buffer: | |
| print(prog[0], prog[1]) | |
| ops = [op.__name__ if hasattr(op, '__name__') else str(op) for op in prog[0]] | |
| stats["operators"].extend(ops) | |
| #(Features and Constants) | |
| stats["terminals"].extend([str(t) for t in prog[1]]) | |
| return stats | |
| def extract_model_stats(expr): | |
| stats = { | |
| "constants": [], | |
| "operators": [], | |
| "features": [], | |
| "powers": [] | |
| } | |
| for node in preorder_traversal(expr): | |
| if isinstance(node, sym.Number): | |
| stats["constants"].append(float(node)) | |
| elif isinstance(node, sym.Symbol): | |
| stats["features"].append(str(node)) | |
| # POWERS | |
| elif isinstance(node, sym.Pow): | |
| base, exp = node.args | |
| stats["operators"].append("pow") | |
| if exp.is_number: | |
| stats["powers"].append(float(exp)) | |
| elif getattr(node, 'is_Function', False): | |
| op_name = node.func.__name__ | |
| stats["operators"].append(op_name) | |
| elif isinstance(node, sym.Add): | |
| has_sub = any(arg.could_extract_minus_sign() for arg in node.args) | |
| stats["operators"].append("add") | |
| if has_sub: | |
| stats["operators"].append("sub") | |
| # MUL → detect * vs / | |
| elif isinstance(node, sym.Mul): | |
| has_div = False | |
| for arg in node.args: | |
| if isinstance(arg, sym.Pow): | |
| base, exp = arg.args | |
| if exp.is_number and exp < 0: | |
| has_div = True | |
| break | |
| if has_div: | |
| stats["operators"].append("div") | |
| else: | |
| stats["operators"].append("mul") | |
| return stats | |
| def get_model_distributions(models): | |
| ''' | |
| Returns list of dictionaries, where each dict is breakdown of each model from list of models | |
| ''' | |
| all_models_dist, gene_dist = [], [] | |
| for i, mod in enumerate(models): | |
| expr = printGPModel(mod) | |
| if isinstance(expr, sym.Expr): | |
| expr = sym.simplify(expr) | |
| #expr = sym.expand(expr) | |
| stats = extract_model_stats(expr) | |
| stats["DEBUG_INDEX"] = i | |
| all_models_dist.append(stats) | |
| print(f"--- MODEL INDEX {i} ---") | |
| print(stats) | |
| gene_dist.append(extract_genotype(stats)) | |
| return all_models_dist, gene_dist | |
| # def get_model_distribution(models): | |
| # # We will store the distributions in a dictionary where the key is the feature | |
| # # and the value is a list of terms across all models from pareto front | |
| # feature_distributions = {} | |
| # all_models_expanded = [] | |
| # for mod in models: | |
| # expr = printGPModel(mod) | |
| # print(expr) | |
| # if isinstance(expr, sym.Expr): | |
| # expanded_expr = sym.expand(expr) | |
| # else: | |
| # expanded_expr = expr | |
| # terms = expanded_expr.args if hasattr(expanded_expr, 'args') and expanded_expr.func == sym.Add else (expanded_expr,) | |
| # print("WE ARE PRINTING OUT THE TERMS FOR ONE MODEL") | |
| # print(terms) | |
| # all_models_expanded.append(terms) | |
| # for term in terms: | |
| # print("EACH INDIVIDUAL term FOR ONE MODEL", term) | |
| # if hasattr(term, 'free_symbols'): | |
| # print("HAS FROZEN SYMBOLS") | |
| # sig = frozenset(term.free_symbols) | |
| # print("sig", sig) | |
| # if sig not in feature_distributions: | |
| # #THIS REPRESENTS THE MODEL REPRESENTATION | |
| # feature_distributions[sig] = [] | |
| # for terms in all_models_expanded: | |
| # current_model_contributions = {sig: 0 for sig in feature_distributions.keys()} | |
| # for term in terms: | |
| # if hasattr(term, 'free_symbols'): | |
| # sig = frozenset(term.free_symbols) | |
| # current_model_contributions[sig] += term | |
| # else: | |
| # sig = frozenset() | |
| # current_model_contributions[sig] += term | |
| # # Append this model's contributions to the global buckets | |
| # for sig in feature_distributions.keys(): | |
| # feature_distributions[sig].append(current_model_contributions[sig]) | |
| # return feature_distributions | |
| ############################ | |
| #Data Subsampling Methods | |
| ############################ | |
| def randomSubsample(x,y, *args, **kwargs): | |
| n=max(int(np.ceil(len(y)**(3/5))),3) | |
| idx=np.random.choice(range(x.shape[1]),n,replace=False) | |
| return np.array([i[idx] for i in x]),y[idx] | |
| def generationProportionalSample(x,y,generation=100,generations=100): | |
| n=max(int(np.ceil(len(y)*(generation/generations)**(3/5))),3) | |
| idx=np.random.choice(range(x.shape[1]),n,replace=False) | |
| return np.array([i[idx] for i in x]),y[idx] | |
| def ordinalSample(x,y,generation=100,generations=100): | |
| n=max(int(len(y)*generation/generations),3) | |
| sortedIdx=np.argsort(y) | |
| step=len(y)/(n-1) | |
| idx=[sortedIdx[max(int(i*step)-1,0)] for i in range(n)] | |
| return np.array([i[idx] for i in x]),y[idx] | |
| def orderedSample(x,y,generation=100,generations=100): | |
| n=max(int(len(y)*generation/generations),3) | |
| idx=[i for i in range(n)] | |
| return np.array([i[idx] for i in x]),y[idx] | |
| def ordinalBalancedSample(x,y,generation=100,generations=100): | |
| n=max(int(len(y)*generation/generations),3) | |
| numBins=int(max(np.ceil(np.sqrt(n)),3)) | |
| bins=np.linspace(min(y),max(y),numBins+1) | |
| binIdx=np.digitize(y,bins)-1 | |
| samplesPerBin=max(int(n/numBins),1) | |
| idx=[] | |
| for i in range(numBins): | |
| binMembers=[j for j in range(len(y)) if binIdx[j]==i] | |
| if len(binMembers)>0: | |
| chosen=np.random.choice(binMembers,min(samplesPerBin,len(binMembers)),replace=False) | |
| idx=idx+chosen.tolist() | |
| return np.array([i[idx] for i in x]),y[idx] | |
| def balancedSample(x,y, *args, **kwargs): | |
| n=int(np.ceil(len(y)**(3/5))) | |
| numBins=max(round(n**(2/5)),3) | |
| bins=np.linspace(min(y),max(y),numBins+1) | |
| binIdx=np.digitize(y,bins)-1 | |
| samplesPerBin=max(int(n/numBins),1) | |
| idx=[] | |
| for i in range(numBins): | |
| binMembers=[j for j in range(len(y)) if binIdx[j]==i] | |
| if len(binMembers)>0: | |
| chosen=np.random.choice(binMembers,min(samplesPerBin,len(binMembers)),replace=False) | |
| idx=idx+chosen.tolist() | |
| return np.array([i[idx] for i in x]),y[idx] | |
| import inspect | |
| def getArity(func): #Returns the arity of a function: used for model evaluations | |
| if func=="pop": | |
| return 1 | |
| return len(inspect.signature(func).parameters) | |
| getArity.__doc__ = "getArity(func) takes a function and returns the function arity" | |
| def modelArity(model): #Returns the total arity of a model | |
| return 1+sum([getArity(i)-1 for i in model[0]]) | |
| modelArity.__doc__ = "modelArity(model) returns the total arity of a model" | |
| def listArity(data): #Returns arity of evaluating a list of operators | |
| if len(data)==0: | |
| return 0 | |
| return 1+sum([getArity(i)-1 for i in data]) | |
| listArity.__doc__ = "listArity(list) returns the arity of evaluating a list of operators" | |
| def buildEmptyModel(): # Generates an empty model | |
| return [[],[],[]] | |
| buildEmptyModel.__doc__ = "buildEmptyModel() takes no inputs and generates an empty GP model" | |
| def variableSelect(num): #Function that creates a function to select a specific variable | |
| return lambda variables: variables[num] | |
| variableSelect.__doc__ = "variableSelect(n) is a function that creates a function to select the nth variable" | |
| def modelToListForm(model): | |
| model[0]=model[0].tolist() | |
| def modelRestoreForm(model): | |
| model[0]=np.array(model[0],dtype=object) | |
| def generateRandomModel(variables,ops,const,maxLength): #Generates a random GP model | |
| prog = buildEmptyModel() #Generate an empty model with correct structure | |
| varChoices=[variableSelect(i) for i in range(variables)]+const #All variable and constants choices | |
| prog[0]=np.array(np.random.choice(ops,random.randint(1,maxLength)),dtype=object) #Choose random operators | |
| countVars=modelArity(prog) #Count how many variables/constants are needed | |
| prog[1]=np.random.choice(varChoices,countVars) #Choose random variables/constants | |
| # if all vars are constants then replace one random term | |
| if all(t in const for t in prog[1]): | |
| replace_idx = random.randrange(countVars) | |
| prog[1][replace_idx] = random.choice(varChoices[:variables]) #Replace with a variable | |
| prog[1]=[i() if (callable(i) and i.__name__!='<lambda>' )else i for i in prog[1]] #If function then evaluate | |
| return prog | |
| generateRandomModel.__doc__ = "generateRandomModel() takes as input the variables, operators, constants, and max program length and returns a random program" | |
| def initializeGPModels(variables,ops=defaultOps(),const=defaultConst(),numberOfModels=100,maxLength=10): # generate random linear program | |
| prog=[[],[],[]] | |
| # prog stores [Operators, VarConst, QualityMetrics] | |
| models=[generateRandomModel(variables,ops,const,maxLength) for i in range(numberOfModels)] #Generate models | |
| return models | |
| initializeGPModels.__doc__ = "initializeGPModels(countOfVariables, operators, constants, numberOfModels=100, maxLength=10) returns a set of randomly generated models" | |
| def reverseList(data): #Returns a list reversed | |
| return [i for i in reversed(data)] | |
| reverseList.__doc__ = "reverseList(data) returns the data list reversed" | |
| def varReplace(data,variables): #Replaces variable references with data during model evaluation | |
| return [i(variables) if callable(i) else i for i in data] | |
| varReplace.__doc__ = "varReplace(data,variables) replaces references to variables in data with actual values" | |
| def inputLen(data): #Returns the number of data records in a data set | |
| el1=data[0] | |
| if type(el1)==list or type(el1)==np.ndarray: | |
| return len(el1) | |
| else: | |
| return 1 | |
| inputLen.__doc__ = "inputLen(data) determines the number of data records in a data set" | |
| def varCount(data): #Returns the number of variables in a data set | |
| return len(data) | |
| varCount.__doc__ = "varCount(data) determines the number of variables in a data set" | |
| def evaluateGPModel(model,inputData): #Evaluates a model numerically | |
| response=evModHelper(model[1],model[0],[],np.array(inputData).astype(float))[2][0] | |
| if not type(response)==np.ndarray and inputLen(inputData)>1: | |
| response=np.array([response for i in range(inputLen(inputData))]) | |
| return response | |
| evaluateGPModel.__doc__ = "evaluateGPModel(model,data) numerically evaluates a model using the data stored in inputData" | |
| def evModHelper(varStack,opStack,tempStack,data): #Recursive helper function for evaluateGPModel | |
| stack1=varStack | |
| stack2=opStack | |
| stack3=tempStack | |
| if len(stack2)==0: | |
| return [stack3,stack2,stack1] | |
| op=stack2[0] | |
| stack2=stack2[1:] | |
| if callable(op): | |
| patt=getArity(op) | |
| while patt>len(stack3): | |
| stack3=[stack1[0]]+stack3 | |
| stack1=stack1[1:] | |
| try: | |
| temp=op(*varReplace(reverseList(stack3[:patt]),data)) | |
| except TypeError: | |
| print("stack3: ", stack3, " patt: ", patt, " data: ", data) | |
| temp=np.nan | |
| except OverflowError: | |
| temp=np.nan | |
| stack3=stack3[patt:] | |
| stack3=[temp]+stack3 | |
| else: | |
| if len(stack1)>0: | |
| stack3=varReplace([stack1[0]],data)+stack3 | |
| stack1=stack1[1:] | |
| if len(stack2)>0: | |
| stack1,stack2,stack3=evModHelper(stack1,stack2,stack3,data) | |
| return [stack1,stack2,stack3] | |
| evModHelper.__doc__ = "evModHelper(varStack,opStack,tempStack,data) is a helper function for evaluateGPModel" | |
| def rmse(model, inputData, response): | |
| predictions = evaluateGPModel(model, inputData) | |
| if not all(np.isfinite(predictions)) or any(np.iscomplex(predictions)): | |
| return np.nan | |
| return np.sqrt(np.mean((predictions - response) ** 2)) | |
| rmse.__doc__ = "rmse(model, input, response) is a fitness objective that evaluates the root mean squared error" | |
| def binaryError(model, input, response): | |
| prediction=evaluateGPModel(model,input) | |
| error=np.mean(np.abs(prediction-response)) | |
| if np.isnan(error) or np.isinf(error) or error > 1 or error < 0: | |
| return 0.5 | |
| return min(error,1 - error) | |
| def fitness(prog,data,response): # Fitness function using correlation | |
| predicted=evaluateGPModel(prog,np.array(data)) | |
| if type(predicted)!=list and type(predicted)!=np.ndarray: | |
| predicted=np.array([predicted for i in range(inputLen(data))]) | |
| try: | |
| if np.isnan(predicted).any() or np.isinf(predicted).any(): | |
| return np.nan | |
| except TypeError: | |
| #print(predicted) | |
| return np.nan | |
| except OverflowError: | |
| return np.nan | |
| if (not all(np.isfinite(np.array(predicted,dtype=np.float32)))) or np.all(predicted==predicted[0]): | |
| return np.nan | |
| try: | |
| fit=1-pearsonr(predicted,np.array(response))[0]**2 # 1-R^2 | |
| except ValueError: | |
| return 1 | |
| if math.isnan(fit): | |
| return 1 # If nan return 1 as fitness | |
| return fit # Else return actual fitness 1-R^2 | |
| fitness.__doc__ = "fitness(program,data,response) returns the 1-R^2 value of a model" | |
| def stackGPModelComplexity(model,*args): | |
| return len(model[0])+len(model[1])-model[0].tolist().count("pop") | |
| stackGPModelComplexity.__doc__ = "stackGPModelComplexity(model) returns the complexity of the model" | |
| ###################### Timeout function for model complexity ###################### | |
| class TimeoutException(Exception): pass | |
| def time_limit(seconds): | |
| def signal_handler(signum, frame): | |
| raise TimeoutException("Timed out!") | |
| signal.signal(signal.SIGALRM, signal_handler) | |
| signal.alarm(seconds) | |
| try: | |
| yield | |
| finally: | |
| signal.alarm(0) | |
| #################################################################################### | |
| # Compute Hess | |
| def ComputeSymbolicHess(model,vars): | |
| printedModel=sym.simplify(printGPModel(model)) | |
| if type(printedModel)==float: | |
| return sym.matrices.dense.MutableDenseMatrix(np.zeros((vars,vars))) | |
| hess=sym.hessian(printedModel, [symbols('x'+str(i)) for i in range(vars)]) | |
| return hess | |
| def EvaluateHess(hess,vars,values): | |
| numHess=hess.subs({symbols('x'+str(j)):values[j] for j in range(vars)}) | |
| hessN = np.array(numHess).astype(float) | |
| rankN=np.linalg.matrix_rank(hessN,tol=0.0001*0.0001*10) | |
| return rankN | |
| def Approx2Deriv(model,values,diff1,diff2,positions): #maybe diff should be relative to the variation of each feature | |
| term1=[values[i]+diff1 if i == positions[0] else values[i] for i in range(len(values))] | |
| term1=[term1[i]+diff2 if i == positions[1] else term1[i] for i in range(len(term1))] | |
| term2=[values[i]-diff1 if i == positions[0] else values[i] for i in range(len(values))] | |
| term2=[term2[i]+diff2 if i == positions[1] else term2[i] for i in range(len(term2))] | |
| term3=[values[i]+diff1 if i == positions[0] else values[i] for i in range(len(values))] | |
| term3=[term3[i]-diff2 if i == positions[1] else term3[i] for i in range(len(term3))] | |
| term4=[values[i]-diff1 if i == positions[0] else values[i] for i in range(len(values))] | |
| term4=[term4[i]-diff2 if i == positions[1] else term4[i] for i in range(len(term4))] | |
| return ((evaluateGPModel(model,term1)-evaluateGPModel(model,term2))/((2*diff1)) | |
| -(evaluateGPModel(model,term3)-evaluateGPModel(model,term4))/((2*diff1)))/(2*diff2) | |
| def ApproxHessRank(model,vars,values,diff1=0.001,diff2=0.001): | |
| hess=[[Approx2Deriv(model,values,diff1,diff2,[i,j]) for i in range(vars)] for j in range(vars)] | |
| hessN = np.array(hess).astype(float) | |
| rankN=np.linalg.matrix_rank(hessN,tol=0.0001*0.0001*10) | |
| return rankN | |
| #def HessRank(model,vars,values): | |
| # try: | |
| # with time_limit(.01): | |
| # hess=ComputeSymbolicHess(model,vars) | |
| # hess = EvaluateHess(hess,vars,values) | |
| # #print(hess) | |
| # return hess | |
| # except TimeoutException as e: | |
| # hess=ApproxHessRank(model,vars,values) | |
| #print(hess) | |
| # return hess | |
| def HessRank(model,vars,values): | |
| hess=ApproxHessRank(model,vars,values) | |
| return hess | |
| # Counts basis terms in a model | |
| def count_basis_terms(equation, expand=False): | |
| try: | |
| with time_limit(2): | |
| if expand: | |
| # Simplify the equation to standardize the expression | |
| simplified_eq = simplify(equation) | |
| # Expand the expression to identify additive terms clearly | |
| expanded_eq = expand(simplified_eq) | |
| # Separate the terms of the expression | |
| terms = expanded_eq.as_ordered_terms() | |
| else: | |
| terms = equation.as_ordered_terms() | |
| #print(terms) | |
| except TimeoutException as e: | |
| return 1000 | |
| return len(terms) | |
| # Determines the number of basis functions in a model by counting +s and -s | |
| def basisFunctionComplexity(model,vars, values,*args): | |
| try: # values should be max, min, and median with respect to response variable | |
| return HessRank(model,vars,values)#count_basis_terms(printGPModel(model)) | |
| except: | |
| return 1000 | |
| # Creates a lambda function to be used as a complexity metric when given a target dimensionality and deviation | |
| def basisFunctionComplexityDiff(target, deviation, vars, low, mid, high): | |
| return lambda model,*args: max(np.mean([abs(basisFunctionComplexity(model,vars,low)-target),abs(basisFunctionComplexity(model,vars,mid)-target) ,abs(basisFunctionComplexity(model,vars,high)-target)] ),(deviation))-deviation | |
| def setModelQuality(model,inputData,response,modelEvaluationMetrics=[fitness,stackGPModelComplexity]): | |
| model[2]=[i(model,inputData,response) for i in modelEvaluationMetrics] | |
| setModelQuality.__doc__ = "setModelQuality(model, inputdata, response, metrics=[r2,size]) is an inplace operator that sets a models quality" | |
| def stackPass(model,pt): | |
| i=0 | |
| t=0 | |
| p=0 | |
| s=model[0] | |
| if i <pt: | |
| t+=1 | |
| while i<pt: | |
| if s[i]=="pop": | |
| t+=1 | |
| p+=1 | |
| else: | |
| p+=max(0,getArity(s[i])-t) | |
| t=max(1,t-getArity(s[i])+1) | |
| i+=1 | |
| stack1=model[1][p:] | |
| stack2=reverseList(model[1][:p])[:t+1] | |
| return [stack1,stack2] | |
| def stackGrab(stack1, stack2, num): | |
| tStack1=copy.deepcopy(stack1) | |
| tStack2=copy.deepcopy(stack2) | |
| newStack=[] | |
| if len(stack2)<num: | |
| newStack=stack2+stack1[:(num-len(stack2))] | |
| tStack1=tStack1[num-len(tStack2):] | |
| tStack2=[] | |
| else: | |
| newStack=stack2[:num] | |
| tStack2=tStack2[num:] | |
| return [newStack,tStack1,tStack2] | |
| def fragmentVariables(model,pts): | |
| stack1,stack2=stackPass(model,pts[0]) | |
| opStack=model[0] | |
| newStack=[] | |
| i=pts[0] | |
| while i<=pts[1]: | |
| if opStack[i]=="pop" and len(stack1)>0: | |
| stack2=[stack1[0]]+stack2 | |
| stack1=stack1[1:] | |
| else: | |
| if len(newStack)==0 and pts[0]==0: | |
| tStack,stack1,stack2=stackGrab(stack1,stack2,getArity(opStack[i])) | |
| else: | |
| tStack,stack1,stack2=stackGrab(stack1,stack2,getArity(opStack[i])-1) | |
| newStack=newStack+tStack | |
| i+=1 | |
| return newStack | |
| def recombination2pt(model1,model2): #2 point recombination | |
| pts1=np.sort(random.sample(range(0,len(model1[0])+1),2)) | |
| pts2=np.sort(random.sample(range(0,len(model2[0])+1),2)) | |
| #pts1=[4,5] | |
| #pts2=[2,4] | |
| #pts1=[0,3] | |
| #pts2=[1,3] | |
| #print(pts1,pts2) | |
| child1=buildEmptyModel() | |
| child2=buildEmptyModel() | |
| parent1=copy.deepcopy(model1) | |
| parent2=copy.deepcopy(model2) | |
| parent1[0]=np.array(parent1[0],dtype=object).tolist() | |
| parent2[0]=np.array(parent2[0],dtype=object).tolist() | |
| child1[0]=np.array(parent1[0][0:pts1[0]]+parent2[0][pts2[0]:pts2[1]]+parent1[0][pts1[1]:],dtype=object) | |
| child2[0]=np.array(parent2[0][0:pts2[0]]+parent1[0][pts1[0]:pts1[1]]+parent2[0][pts2[1]:],dtype=object) | |
| varPts1=[listArity(parent1[0][:(pts1[0])])+0,listArity(parent2[0][:(pts2[0])])+0,listArity(parent2[0][pts2[0]:pts2[1]]),listArity(parent1[0][pts1[0]:pts1[1]])] | |
| if pts1[0]==0: | |
| varPts1[0]+=1 | |
| if pts2[0]==0: | |
| varPts1[1]+=1 | |
| child1[1]=parent1[1][:varPts1[0]]+parent2[1][varPts1[1]:(varPts1[1]+varPts1[2]-1)]+parent1[1][(varPts1[0]+varPts1[3]-1):] | |
| varPts2=[listArity(parent2[0][:(pts2[0])])+0,listArity(parent1[0][:(pts1[0])])+0,listArity(parent1[0][pts1[0]:pts1[1]]),listArity(parent2[0][pts2[0]:pts2[1]])] | |
| if pts1[0]==0: | |
| varPts2[1]+=1 | |
| if pts2[0]==0: | |
| varPts2[0]+=1 | |
| child2[1]=parent2[1][:varPts2[0]]+parent1[1][varPts2[1]:(varPts2[1]+varPts2[2]-1)]+parent2[1][(varPts2[0]+varPts2[3]-1):] | |
| #print(varPts1,varPts2) | |
| return [child1,child2] | |
| recombination2pt.__doc__ = "recombination2pt(model1,model2) does 2 point crossover and returns two children models" | |
| def get_numeric_indices(l): #Returns indices of list that are numeric | |
| return [i for i in range(len(l)) if type(l[i]) in [int,float]] | |
| def mutate(model,variables,ops=defaultOps(),const=defaultConst(),maxLength=10): | |
| newModel=copy.deepcopy(model) | |
| newModel[0]=np.array(newModel[0],dtype=object).tolist() | |
| mutationType=random.randint(0,7) | |
| varChoices=[variableSelect(i) for i in range(variables)]+const | |
| opChoice=0 | |
| varChoice=0 | |
| tmp=0 | |
| if mutationType==0: #single operator mutation | |
| opChoice=random.randint(0,len(newModel[0])-1) | |
| if len(newModel[0])>0: | |
| newModel[0][opChoice]=np.random.choice([i for i in ops] ) | |
| elif mutationType==1: #single variable mutation | |
| varChoice=np.random.choice(varChoices) | |
| if callable(varChoice) and varChoice.__name__!='<lambda>': | |
| varChoice=varChoice() | |
| newModel[1][random.randint(0,len(newModel[1])-1)]=varChoice | |
| elif mutationType==2: #insertion mutation to top of stack | |
| opChoice=np.random.choice(ops) | |
| newModel[0]=[opChoice]+newModel[0] | |
| while modelArity(newModel)>len(newModel[1]): | |
| varChoice=np.random.choice(varChoices) | |
| if callable(varChoice) and varChoice.__name__!='<lambda>': | |
| varChoice=varChoice() | |
| newModel[1]=[varChoice]+newModel[1] | |
| elif mutationType==3: #deletion mutation from top of stack | |
| if len(newModel[0])>1: | |
| opChoice=random.randint(1,len(newModel[0])-1) | |
| newModel[0]=newModel[0][-opChoice:] | |
| newModel[1]=newModel[1][-listArity(newModel[0]):] | |
| elif mutationType==4: #insertion mutation to bottom of stack | |
| opChoice=np.random.choice([i for i in ops]) | |
| newModel[0].append(opChoice) | |
| elif mutationType==5: #mutation via crossover with random model | |
| newModel=recombination2pt(newModel,generateRandomModel(variables,ops,const,maxLength))[0] | |
| elif mutationType==6: #single operator insertion mutation | |
| singleOps=[op for op in ops if getArity(op)==1 and op!='pop'] | |
| singleOps.append('pop') | |
| pos=random.randint(0,len(newModel[0])-1) | |
| newModel[0].insert(pos,np.random.choice(singleOps)) | |
| elif mutationType==7: #nudge numeric constant | |
| pos=get_numeric_indices(newModel[1]) | |
| if(len(pos)>0): #If there are numeric constants | |
| pos=random.choice(pos) | |
| newModel[1][pos]=newModel[1][pos]+np.random.normal(-1,1) | |
| if modelArity(newModel)<len(newModel[1]): | |
| newModel[1]=newModel[1][:modelArity(newModel)] | |
| elif modelArity(newModel)>len(newModel[1]): | |
| newModel[1]=newModel[1]+[np.random.choice(varChoices) for i in range(modelArity(newModel)-len(newModel[1]))] | |
| newModel[1]=[varChoice() if callable(varChoice) and varChoice.__name__!='<lambda>' else varChoice for varChoice in newModel[1]] | |
| newModel[0]=np.array(newModel[0],dtype=object) | |
| return newModel | |
| mutate.__doc__ = "mutate(model,variableCount,ops,constants,maxLength) mutates a model" | |
| def paretoFront(fitValues): #Returns Boolean list of Pareto front elements | |
| onFront = np.ones(fitValues.shape[0], dtype = bool) | |
| for i, j in enumerate(fitValues): | |
| if onFront[i]: | |
| onFront[onFront] = np.any(fitValues[onFront]<j, axis=1) | |
| onFront[i] = True | |
| return onFront | |
| def paretoTournament(pop): # selects the Pareto front of a model set | |
| fitnessValues=np.array([mod[2] for mod in pop]) | |
| return (np.array(pop,dtype=object)[paretoFront(fitnessValues)]).tolist() | |
| def tournamentModelSelection(models, popSize=100,tourneySize=5): | |
| selectedModels=[] | |
| selectionSize=popSize | |
| while len(selectedModels)<popSize: | |
| tournament=random.sample(models,tourneySize) | |
| winners=paretoTournament(tournament) | |
| selectedModels=selectedModels+winners | |
| return selectedModels | |
| paretoTournament.__doc__ = "paretoTournament(models, inputData, responseData) returns the Pareto front of a model set" | |
| def modelSameQ(model1,model2): #Checks if two models are the same | |
| return len(model1[0])==len(model2[0]) and len(model1[1]) == len(model2[1]) and all(model1[0]==model2[0]) and model1[1]==model2[1] | |
| modelSameQ.__doc__ = "modelSameQ(model1,model2) checks if model1 and model2 are the same and returns True if so, else False" | |
| def deleteDuplicateModels(models): #Removes any models that are the same, does not consider simplified form | |
| uniqueMods = [models[0]] | |
| for mod in models: | |
| test=False | |
| for checkMod in uniqueMods: | |
| if modelSameQ(mod,checkMod): | |
| test=True | |
| if not test: | |
| uniqueMods.append(mod) | |
| return uniqueMods | |
| deleteDuplicateModels.__doc__ = "deleteDuplicateModels(models) deletes models that have the same form without simplifying" | |
| def deleteDuplicateModelsPhenotype(models): #Removes any models that are the same regarding phenotype, does not consider simplified form | |
| uniqueMods = [printGPModel(models[0])] | |
| remainingMods=[printGPModel(mod) for mod in models[1:]] | |
| uniquePos = [0] | |
| currPos=1 | |
| for mod in remainingMods: | |
| test=False | |
| for checkMod in uniqueMods: | |
| if mod==checkMod: | |
| test=True | |
| if not test: | |
| uniqueMods.append(mod) | |
| uniquePos.append(currPos) | |
| currPos+=1 | |
| return [models[i] for i in uniquePos] | |
| def removeIndeterminateModels(models): #Removes models from the population that evaluate to nonreal values | |
| return [i for i in models if (not any(np.isnan(i[2]))) and all(np.isfinite(np.isnan(i[2])))] | |
| removeIndeterminateModels.__doc__ = "removeIndeterminateModels(models) removes models that have a fitness that results from inf or nan values" | |
| def sortModels(models): | |
| return sorted(models, key=lambda m:m[2]) | |
| sortModels.__doc__ = "sortModels(models) sorts a model population by the models' accuracies" | |
| def selectModels(models, selectionSize=0.5, thresholds=None): | |
| tMods=copy.deepcopy(models) | |
| [modelToListForm(mod) for mod in tMods] | |
| if thresholds is not None: | |
| tMods=[mod for mod in tMods if all([mod[2][i]<=thresholds[i] for i in range(len(thresholds))])] | |
| paretoModels=[] | |
| if selectionSize<=1: | |
| selection=selectionSize*len(models) | |
| else: | |
| selection=selectionSize | |
| while len(paretoModels)<selection and len(tMods)>0: | |
| front=paretoTournament(tMods) | |
| paretoModels=paretoModels+front | |
| for i in front: | |
| tMods.remove(i) | |
| [modelRestoreForm(mod) for mod in paretoModels] | |
| return paretoModels | |
| selectModels.__doc__ = "selectModels(models, selectionSize=0.5) iteratively selects the Pareto front of a model population until n or n*popSize models are selected" | |
| def stackVarUsage(opStack): #Counts how many variables are used by the operator stack | |
| pos=getArity(opStack[0]) | |
| for j in range(1,len(opStack)): | |
| pos+=getArity(opStack[j])-1 | |
| if opStack[j]=='pop': | |
| pos+=1 | |
| return pos | |
| stackVarUsage.__doc__ = "stackVarUsage(opStack) is a helper function that determines how many variables/constants are needed by the operator stack" | |
| def trimModel(mod): #Removes extra pop operators that do nothing | |
| model=copy.deepcopy(mod) | |
| i=0 | |
| varStack=len(mod[1]) | |
| tempStack=0 | |
| varStack-=getArity(model[0][i]) | |
| tempStack+=1 | |
| i+=1 | |
| while varStack>0: | |
| if model[0][i]=='pop': | |
| varStack-=1 | |
| tempStack+=1 | |
| else: | |
| take=getArity(model[0][i])-tempStack | |
| if take>0: | |
| varStack-=take | |
| tempStack=1 | |
| else: | |
| tempStack-=getArity(model[0][i])-1 | |
| i+=1 | |
| model[0]=np.array(model[0][:i].tolist()+[j for j in model[0][i:] if not j=='pop'],dtype=object) | |
| return model | |
| trimModel.__doc__ = "trimModel(model) trims extra pop operators off the operator stack so that further modifications such as a model alignment aren't altered by those pop operators" | |
| def alignGPModel(model, data, response): #Aligns a model | |
| prediction=evaluateGPModel(model,data) | |
| if (not all(np.isfinite(np.array(prediction)))) or np.all(prediction==prediction[0]): | |
| return model | |
| if np.isnan(np.array(prediction)).any() or np.isnan(np.array(response)).any() or not np.isfinite(np.array(prediction,dtype=np.float32)).all(): | |
| return model | |
| # Variance guards | |
| if np.std(prediction) < 1e-12: | |
| return model | |
| if np.ptp(prediction) < 1e-12: | |
| return model | |
| try: | |
| align=np.polyfit(prediction,response,1,rcond=1e-16)#np.round(np.polyfit(prediction,response,1,rcond=1e-16),decimals=14) | |
| except np.linalg.LinAlgError: | |
| #print("Alignment failed for: ", model, " with prediction: ", prediction, "and reference data: ", response) | |
| return model | |
| newModel=trimModel(model) | |
| newModel[0]=np.array(newModel[0].tolist()+[mult,add],dtype=object) | |
| newModel[1]=newModel[1]+align.tolist() | |
| #setModelQuality(newModel,data,response) | |
| return newModel | |
| alignGPModel.__doc__ = "alignGPModel(model, input, response) aligns a model such that response-a*f(x)+b are minimized over a and b" | |
| def evolve(inputData, responseData, generations=100, ops=defaultOps(), const=defaultConst(), variableNames=[], mutationRate=79, crossoverRate=11, spawnRate=10, extinction=False,extinctionRate=10,elitismRate=10,popSize=300,maxComplexity=100,align=True,initialPop=[],timeLimit=300,capTime=False,tourneySize=5,tracking=False,returnTracking=False,liveTracking=False,liveTrackingInterval=1,modelEvaluationMetrics=[fitness,stackGPModelComplexity],dataSubsample=False,samplingMethod=randomSubsample,alternateObjectives=[],alternateObjFrequency=10,allowEarlyTermination=False,earlyTerminationThreshold=0): | |
| evolution_hisotry = [] | |
| alternatingFlag = False | |
| if callable(modelEvaluationMetrics): | |
| metrics=[modelEvaluationMetrics] | |
| allMetrics=[modelEvaluationMetrics]+alternateObjectives | |
| elif isinstance(modelEvaluationMetrics, list) and callable(modelEvaluationMetrics[0]): | |
| metrics=modelEvaluationMetrics | |
| allMetrics=modelEvaluationMetrics+alternateObjectives | |
| elif isinstance(modelEvaluationMetrics, list) and isinstance(modelEvaluationMetrics[0], list): | |
| metrics=modelEvaluationMetrics[0] | |
| allMetrics=[item for sublist in modelEvaluationMetrics for item in sublist]+alternateObjectives | |
| alternatingFlag = True | |
| else: | |
| raise ValueError("modelEvaluationMetrics must be a function, list of functions, or a list of lists of functions") | |
| fullInput,fullResponse=copy.deepcopy(inputData),copy.deepcopy(responseData) | |
| inData=copy.deepcopy(fullInput) | |
| resData=copy.deepcopy(fullResponse) | |
| variableCount=varCount(inData) | |
| models=initializeGPModels(variableCount,ops,const,popSize) | |
| models=models+initialPop | |
| startTime=time.perf_counter() | |
| bestFits=[] | |
| gene_dists = [] | |
| if liveTracking: | |
| fig, ax = plt.subplots(figsize=(20,10)) | |
| ckTime=time.perf_counter() | |
| for i in range(generations): | |
| if capTime and time.perf_counter()-startTime>timeLimit: | |
| break | |
| if len(alternateObjectives)>0 and (i+1)%alternateObjFrequency==0: | |
| metrics=modelEvaluationMetrics[:1]+alternateObjectives | |
| else: | |
| if alternatingFlag: | |
| metrics=modelEvaluationMetrics[i%len(modelEvaluationMetrics)] | |
| else: | |
| metrics=modelEvaluationMetrics | |
| if dataSubsample: | |
| inData,resData=samplingMethod(fullInput,fullResponse,generations=generations,generation=i) | |
| for mods in models: | |
| setModelQuality(mods,inData,resData,modelEvaluationMetrics=metrics) | |
| models=removeIndeterminateModels(models) | |
| if allowEarlyTermination and min([mods[2][0] for mods in models])<=earlyTerminationThreshold: | |
| print("Early termination at generation ", i) | |
| break | |
| if tracking or liveTracking or returnTracking: | |
| bestFits.append(min([mods[2][0] for mods in paretoTournament(models)])) | |
| if liveTracking and time.perf_counter()-ckTime>liveTrackingInterval: | |
| ax.clear() | |
| ax.plot(bestFits) | |
| ax.set_title(f"Best Model: {bestFits[-1]:.2f} at Generation {(i+1)}") | |
| ax.set_xlabel("Generations") | |
| ax.set_ylabel("Fitness") | |
| clear_output(wait=True) | |
| display(fig) | |
| #plt.show() | |
| plt.close(fig) | |
| ckTime=time.perf_counter() | |
| #get distribution of the models from the last generation and use the models from the Pareto Front | |
| paretoModels=selectModels(models,elitismRate/100*popSize if elitismRate/100*popSize<len(models) else len(models)) | |
| if extinction and i%extinctionRate==0 and i>0: | |
| models=initializeGPModels(variableCount,ops,const,popSize) | |
| for mods in models: | |
| setModelQuality(mods,inData,resData,modelEvaluationMetrics=metrics) | |
| models=tournamentModelSelection(models,popSize,tourneySize) | |
| crossoverPairs=random.sample(models,round(crossoverRate/100*popSize)) | |
| toMutate=random.sample(models,round(mutationRate/100*popSize)) | |
| childModels=paretoModels | |
| for j in range(round(len(crossoverPairs)/2)-1): | |
| childModels=childModels+recombination2pt(crossoverPairs[j],crossoverPairs[j+round(len(crossoverPairs)/2)]) | |
| for j in toMutate: | |
| childModels=childModels+[mutate(j,variableCount,ops,const)] | |
| childModels=childModels+initializeGPModels(variableCount,ops,const,round(spawnRate/100*popSize)) | |
| childModels=deleteDuplicateModels(childModels) | |
| childModels=[model for model in childModels if stackGPModelComplexity(model)<maxComplexity] | |
| #for mods in childModels: | |
| # setModelQuality(mods,inData,resData,modelEvaluationMetrics=modelEvaluationMetrics) | |
| #childModels=removeIndeterminateModels(childModels) | |
| if len(childModels)<popSize: | |
| childModels=childModels+initializeGPModels(variableCount,ops,const,popSize-len(childModels)) | |
| models=copy.deepcopy(childModels) | |
| if ( i + 5 >= generations): | |
| evolution_hisotry.append(models) | |
| yield models | |
| for mods in models: | |
| setModelQuality(mods,fullInput,fullResponse,modelEvaluationMetrics=allMetrics) | |
| models=[trimModel(mod) for mod in models] | |
| models=deleteDuplicateModels(models) | |
| models=removeIndeterminateModels(models) | |
| models=sortModels(models) | |
| if align: | |
| models=[alignGPModel(mods,fullInput,fullResponse) for mods in models] | |
| for mods in models: | |
| setModelQuality(mods,fullInput,fullResponse,modelEvaluationMetrics=allMetrics) | |
| if tracking or returnTracking: | |
| bestFits.append(min([mods[2][0] for mods in paretoTournament(models)])) | |
| if returnTracking: | |
| return models, bestFits | |
| plt.figure() | |
| plt.plot(bestFits) | |
| plt.title("Fitness over Time") | |
| plt.xlabel("Generations") | |
| plt.ylabel("Fitness") | |
| plt.show() | |
| return models,evolution_hisotry | |
| #model_distributions | |
| def replaceFunc(stack,f1,f2): | |
| return [i if i!=f1 else f2 for i in stack] | |
| def printGPModel(mod,inputData=symbols(["x"+str(i) for i in range(100)])): #Evaluates a model algebraically | |
| def inv1(a): | |
| return a**(-1) | |
| from sympy import tan as tan1, exp as exp1, sqrt as sqrt1, sin as sin1, cos as cos1, acos, asin, atan, tanh as tanh1, log as log1 | |
| def sqrt2(a): | |
| return sqrt1(a) | |
| def log2(a): | |
| return log1(a) | |
| model = copy.deepcopy(mod) | |
| model[0] = replaceFunc(model[0],exp,exp1) | |
| model[0] = replaceFunc(model[0],tan,tan1) | |
| model[0] = replaceFunc(model[0],sqrt,sqrt2) | |
| model[0] = replaceFunc(model[0],inv,inv1) | |
| model[0] = replaceFunc(model[0],sin,sin1) | |
| model[0] = replaceFunc(model[0],cos,cos1) | |
| model[0] = replaceFunc(model[0],arccos,acos) | |
| model[0] = replaceFunc(model[0],arcsin,asin) | |
| model[0] = replaceFunc(model[0],arctan,atan) | |
| model[0] = replaceFunc(model[0],tanh,tanh1) | |
| model[0] = replaceFunc(model[0],log,log2) | |
| try: | |
| response=evModHelper(model[1],model[0],[],np.array(inputData))[2][0] | |
| except: | |
| return np.nan | |
| return response | |
| def ensembleSelect(models, inputData, responseData, numberOfClusters=10): #Generates a model ensemble using input data partitions | |
| data=np.transpose(inputData) | |
| if len(data)<numberOfClusters: | |
| numberOfClusters=len(data) | |
| clusters=KMeans(n_clusters=numberOfClusters).fit_predict(data) | |
| if numberOfClusters>len(set(clusters)): | |
| numberOfClusters=len(set(clusters)) | |
| clusters=KMeans(n_clusters=numberOfClusters).fit_predict(data) | |
| dataParts=[] | |
| partsResponse=[] | |
| for i in range(numberOfClusters): | |
| dataParts.append([]) | |
| partsResponse.append([]) | |
| for i in range(len(clusters)): | |
| dataParts[clusters[i]].append(data[i]) | |
| partsResponse[clusters[i]].append(responseData[i]) | |
| modelResiduals=[] | |
| for i in range(len(models)): | |
| modelResiduals.append([]) | |
| for i in range(len(models)): | |
| for j in range(numberOfClusters): | |
| modelResiduals[i].append(fitness(models[i],np.transpose(dataParts[j]),partsResponse[j])) | |
| best=[] | |
| for i in range(numberOfClusters): | |
| ordering=np.argsort(modelResiduals[i]) | |
| j=0 | |
| while ordering[j] in best: | |
| j+=1 | |
| best.append(ordering[j]) | |
| ensemble=[models[best[i]] for i in range(numberOfClusters)] | |
| return ensemble | |
| def uncertainty(data,trim=0.3): | |
| wl=None | |
| if len(data)<=4: | |
| wl=1 | |
| h=differential_entropy(data,window_length=wl) | |
| if np.isfinite(h): | |
| return h | |
| else: | |
| return 0 | |
| def evaluateModelEnsemble(ensemble, inputData): | |
| responses=[evaluateGPModel(mod, inputData) for mod in ensemble] | |
| if type(responses[0])==np.ndarray: | |
| responses=np.transpose(responses) | |
| predictions=[np.median(res) for res in responses] | |
| else: | |
| predictions=[np.median(responses)] | |
| return predictions | |
| def evaluateModelEnsembleUncertainty(ensemble, inputData): | |
| responses=[evaluateGPModel(mod, inputData) for mod in ensemble] | |
| if type(responses[0])==np.ndarray: | |
| responses=np.transpose(responses) | |
| uncertainties=[uncertainty(res,0) for res in responses] | |
| else: | |
| uncertainties=[uncertainty(responses,0)] | |
| return uncertainties | |
| def relativeEnsembleUncertainty(ensemble,inputData): | |
| output=evaluateModelEnsembleUncertainty(ensemble,inputData) | |
| return np.array(output) | |
| def createUncertaintyFunc(ensemble): | |
| return lambda x: -relativeEnsembleUncertainty(ensemble,x) | |
| def maximizeUncertainty(ensemble,varCount,bounds=[]): #Used to select a new point of maximum uncertainty | |
| func=createUncertaintyFunc(ensemble) | |
| x0=[np.mean(bounds[i]) for i in range(varCount)] | |
| if bounds==[]: | |
| pt=minimize(func,x0).x | |
| else: | |
| pt=minimize(func,x0,bounds=bounds).x | |
| return pt | |
| def extendData(data,newPoint): | |
| return np.concatenate((data.T,np.array([newPoint]))).T | |
| def activeLearningCheckpoint(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr): | |
| path=os.path.join(str(eqNum),str(version)) | |
| file=open(path,"wb+") | |
| dill.dump([i,inputData,response,testInput,testResponse,errors,models,minerr],file) | |
| file.close() | |
| def activeLearningCheckpointLoad(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr): | |
| path=os.path.join(str(eqNum),str(version)) | |
| try: | |
| with open(path,'rb') as f: | |
| i,inputData,response,testInput,testResponse,errors,models,minerr=dill.load(f) | |
| except FileNotFoundError: | |
| return i,inputData,response,testInput,testResponse,errors,models,minerr | |
| return i,inputData,response,testInput,testResponse,errors,models,minerr | |
| def subSampleSpace(space): | |
| newSpace=copy.deepcopy(space) | |
| newSpace=list(newSpace) | |
| for i in range(len(newSpace)): | |
| pts=sorted([np.random.uniform(newSpace[i][0],newSpace[i][1]),np.random.uniform(newSpace[i][0],newSpace[i][1])]) | |
| newSpace[i]=tuple(pts) | |
| return tuple(newSpace) | |
| def activeLearning(func, dims, ranges,rangesP,eqNum=1,version=1,iterations=100): #func should be a lamda function of form lambda data: f(data[0],data[1],...) | |
| try: | |
| with open(os.path.join(str(eqNum),str(version))+".txt",'rb') as f: | |
| return -1 | |
| except FileNotFoundError: | |
| pass | |
| inputData=[] | |
| testInput=[] | |
| found=False | |
| for i in range(dims): | |
| inputData.append(np.random.uniform(ranges[i][0],ranges[i][1],3)) | |
| testInput.append(np.random.uniform(ranges[i][0],ranges[i][1],200)) | |
| inputData=np.array(inputData) | |
| testInput=np.array(testInput) | |
| response=func(inputData) | |
| testResponse=func(testInput) | |
| errors=[] | |
| models=[] | |
| minerr=1 | |
| for i in range(iterations): | |
| print("input: ",inputData) | |
| print("\n response: ",response) | |
| i,inputData,response,testInput,testResponse,errors,models,minerr=activeLearningCheckpointLoad(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr) | |
| if i>iterations-1: | |
| break | |
| i+=1 | |
| models1=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) | |
| models2=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) | |
| models3=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) | |
| models4=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) | |
| models=models1+models2+models3+models4 | |
| models=selectModels(models,20) | |
| alignedModels=[alignGPModel(mods,inputData,response) for mods in models] | |
| ensemble=ensembleSelect(alignedModels,inputData,response) | |
| out=maximizeUncertainty(ensemble,dims,rangesP) | |
| while out in inputData.T: | |
| out=maximizeUncertainty(ensemble,dims,subSampleSpace(rangesP)) | |
| inputData=extendData(inputData,out) | |
| response=func(inputData) | |
| fitList=np.array([fitness(mod,testInput,testResponse) for mod in alignedModels]) | |
| errors.append(min(fitList[np.logical_not(np.isnan(fitList))])) | |
| minerr=errors[-1] | |
| if minerr<1e-14: | |
| #print("Points needed in round", j,": ",3+i, " Time needed: ", time.perf_counter()-roundTime) | |
| if not os.path.exists(str(eqNum)): | |
| os.makedirs(str(eqNum)) | |
| path=os.path.join(str(eqNum),str(version)) | |
| file=open(path,"wb+") | |
| dill.dump([i,inputData,response,testInput,testResponse,errors,models,minerr],file) | |
| file.close() | |
| file=open(path+'.txt','w+') | |
| file.write(str(i+3)+'\n') | |
| file.write(str(errors)) | |
| file.close() | |
| return 3+i | |
| found=True | |
| ptsNeeded.append(3+i) | |
| break | |
| activeLearningCheckpoint(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr) | |
| if found==False: | |
| #print("Points needed in round",j,": NA (model not found)") | |
| path=os.path.join(str(eqNum),str(version)) | |
| file=open(path,"wb") | |
| dill.dump([-1,inputData,response,testInput,testResponse,errors,models,minerr],file) | |
| file.close() | |
| file=open(path+'.txt',"w+") | |
| file.write(str(i+3)+"\n") | |
| file.write(str(errors)) | |
| file.close() | |
| return -1 | |
| def plotModels(models, modelExpression=False): | |
| tMods=copy.deepcopy(models) | |
| if len(tMods[0][2])<2: | |
| # add complexity as second value | |
| for mod in tMods: | |
| mod[2]=[mod[2][0],stackGPModelComplexity(mod)] | |
| [modelToListForm(mod) for mod in tMods] | |
| paretoModels=paretoTournament(tMods) | |
| for i in paretoModels: | |
| tMods.remove(i) | |
| [modelRestoreForm(mod) for mod in paretoModels] | |
| [modelRestoreForm(mod) for mod in tMods] | |
| pAccuracies=[mod[2][0] for mod in paretoModels] | |
| pComplexities=[mod[2][1] for mod in paretoModels] | |
| accuracies=[mod[2][0] for mod in tMods]+pAccuracies | |
| complexities=[mod[2][1] for mod in tMods]+pComplexities | |
| colors=['blue' for i in range(len(tMods))]+['red' for i in range(len(pAccuracies))] | |
| fig,ax = plt.subplots() | |
| sc=plt.scatter(complexities,accuracies,color=colors) | |
| plt.xlabel("Complexity") | |
| plt.ylabel("1-R**2") | |
| if modelExpression: | |
| names=[str(printGPModel(mod)) for mod in tMods]+[str(printGPModel(mod)) for mod in paretoModels] | |
| else: | |
| names = [str(mod) for mod in tMods]+[str(mod) for mod in paretoModels] | |
| label = ax.annotate("", xy=(0,0), xytext=(np.min(complexities),np.mean([np.max(accuracies),np.min(accuracies)])), | |
| bbox=dict(boxstyle="round", fc="w"), | |
| arrowprops=dict(arrowstyle="->")) | |
| label.set_visible(False) | |
| def update_labels(ind): | |
| pos = sc.get_offsets()[ind["ind"][0]] | |
| label.xy = pos | |
| text = "{}".format(" ".join([names[n] for n in [ind["ind"][0]]])) | |
| label.set_text(text) | |
| label.get_bbox_patch().set_facecolor('grey') | |
| label.get_bbox_patch().set_alpha(0.9) | |
| def hover(event): | |
| vis = label.get_visible() | |
| if event.inaxes == ax: | |
| cont, ind = sc.contains(event) | |
| if cont: | |
| update_labels(ind) | |
| label.set_visible(True) | |
| fig.canvas.draw_idle() | |
| else: | |
| if vis: | |
| label.set_visible(False) | |
| fig.canvas.draw_idle() | |
| fig.canvas.mpl_connect("motion_notify_event", hover) | |
| plt.show() | |
| def plotModelResponseComparison(model,inputData,response,sort=False): | |
| plt.scatter(range(len(response)),response,label="True Response") | |
| plt.scatter(range(len(response)),evaluateGPModel(model,inputData),label="Model Prediction") | |
| plt.legend() | |
| plt.xlabel("Data Index") | |
| plt.ylabel("Response Value") | |
| plt.show() | |
| def plotPredictionResponseCorrelation(model,inputData,response): | |
| plt.scatter(response,evaluateGPModel(model,inputData),label="Model") | |
| plt.plot(response,response,label="Perfect Correlation",color='green') | |
| plt.xlabel("True Response") | |
| plt.ylabel("Predicted Response") | |
| plt.legend() | |
| plt.show() | |
| #Plot model complexity distribution | |
| def plotModelComplexityDistribution(models): | |
| tMods=copy.deepcopy(models) | |
| [modelToListForm(mod) for mod in tMods] | |
| paretoModels=paretoTournament(tMods) | |
| for i in paretoModels: | |
| tMods.remove(i) | |
| [modelRestoreForm(mod) for mod in paretoModels] | |
| [modelRestoreForm(mod) for mod in tMods] | |
| pComplexities=[mod[2][1] for mod in paretoModels] | |
| tComplexities=[mod[2][1] for mod in tMods] | |
| plt.hist(tComplexities,label="Non-Pareto Models") | |
| plt.hist(pComplexities,label="Pareto Models") | |
| plt.xlabel("Model Complexity") | |
| plt.ylabel("Frequency") | |
| plt.legend() | |
| plt.show() | |
| #Plot model accuracy distribution | |
| def plotModelAccuracyDistribution(models): | |
| tMods=copy.deepcopy(models) | |
| [modelToListForm(mod) for mod in tMods] | |
| paretoModels=paretoTournament(tMods) | |
| for i in paretoModels: | |
| tMods.remove(i) | |
| [modelRestoreForm(mod) for mod in paretoModels] | |
| [modelRestoreForm(mod) for mod in tMods] | |
| pAccuracies=[mod[2][0] for mod in paretoModels] | |
| tAccuracies=[mod[2][0] for mod in tMods] | |
| plt.hist(tAccuracies,label="Non-Pareto Models") | |
| plt.hist(pAccuracies,label="Pareto Models") | |
| plt.xlabel("Model Accuracy") | |
| plt.ylabel("Frequency") | |
| plt.legend() | |
| plt.show() | |
| #Plot model residuals relative to response | |
| def plotModelResiduals(model,input,response): | |
| plt.scatter(response,evaluateGPModel(model,input)-response) | |
| plt.xlabel("Response") | |
| plt.ylabel("Residual") | |
| plt.show() | |
| #Plot model residual distribution | |
| def plotModelResidualDistribution(model,input,response): | |
| plt.hist(evaluateGPModel(model,input)-response) | |
| plt.xlabel("Residual") | |
| plt.ylabel("Frequency") | |
| plt.show() | |
| ##CAN USE THESE FOR UNCERTAINITY FURTHERMORE | |
| #Plot the presence of variables in a model population | |
| def plotVariablePresence(models,variables=["x"+str(i) for i in range(100)],sort=False): | |
| vars=[varReplace(model[1],variables) for model in models] | |
| #Remove all numeric entries in vars | |
| vars=[[i for i in var if type(i)!=int and type(i)!=float] for var in vars] | |
| #Merge into one list | |
| vars=[j for i in vars for j in i] | |
| #Count frequency of each variable in vars | |
| varFreqs=[vars.count(i) for i in variables] | |
| #Keep only variables that appear at least once | |
| variablesUsed=[variables[i] for i in range(len(varFreqs)) if varFreqs[i]>0] | |
| varFreqs=[varFreqs[i] for i in range(len(varFreqs)) if varFreqs[i]>0] | |
| if sort: | |
| order=np.argsort(varFreqs)[::-1] | |
| variablesUsed=[variablesUsed[i] for i in order] | |
| varFreqs=[varFreqs[i] for i in order] | |
| #Plot variable frequency | |
| plt.bar(variablesUsed,varFreqs) | |
| plt.xlabel("Variable") | |
| plt.ylabel("Frequency") | |
| plt.show() | |
| def replaceOpsWithStrings(opStack): | |
| model = copy.deepcopy(opStack) | |
| model = replaceFunc(model,exp,str("exp")) | |
| model = replaceFunc(model,tan,str("tan")) | |
| model = replaceFunc(model,sqrt,str("sqrt")) | |
| model = replaceFunc(model,inv,str("1/#")) | |
| model = replaceFunc(model,sin,str("sin")) | |
| model = replaceFunc(model,cos,str("cos")) | |
| model = replaceFunc(model,arccos,str("acos")) | |
| model = replaceFunc(model,arcsin,str("asin")) | |
| model = replaceFunc(model,arctan,str("atan")) | |
| model = replaceFunc(model,tanh,str("tanh")) | |
| model = replaceFunc(model,log,str("log")) | |
| model = replaceFunc(model,add,"+") | |
| model = replaceFunc(model,mult,"*") | |
| model = replaceFunc(model,sub,"-") | |
| model = replaceFunc(model,protectDiv,"/") | |
| model = replaceFunc(model,sqrd,"^2") | |
| return model | |
| #Plot the presence of operators in a model population | |
| def plotOperatorPresence(models,sort=False,excludePop=True): | |
| ops=[replaceOpsWithStrings(model[0]) for model in models] | |
| #Merge into one list | |
| ops=[j for i in ops for j in i] | |
| #Remove duplicates in ops | |
| uniqueOps=list(set(ops)) | |
| if excludePop: | |
| #Remove pop operator | |
| uniqueOps.remove('pop') | |
| #Count frequency of each operator in ops | |
| opFreqs=[ops.count(i) for i in uniqueOps] | |
| #Keep only operators that appear at least once | |
| opsUsed=[str(uniqueOps[i]) for i in range(len(opFreqs)) if opFreqs[i]>0] | |
| opFreqs=[opFreqs[i] for i in range(len(opFreqs)) if opFreqs[i]>0] | |
| if sort: | |
| order=np.argsort(opFreqs)[::-1] | |
| opsUsed=[opsUsed[i] for i in order] | |
| opFreqs=[opFreqs[i] for i in order] | |
| #Plot operator frequency | |
| plt.bar(opsUsed,opFreqs) | |
| #Rotate x axis labels | |
| plt.xticks(rotation=0) | |
| plt.xlabel("Operator") | |
| plt.ylabel("Frequency") | |
| plt.show() | |
| def diverse_models(best_model, filtered_model_population, trainInput, trainResponse): | |
| ### | |
| ## Residuals allow us to capture different diverse models since we can fit the models in multiple different ways | |
| ### | |
| predicted_data = evaluateGPModel(best_model, trainInput) | |
| candidate_preds = [] | |
| for model in filtered_model_population: | |
| candidate_preds.append(np.array(evaluateGPModel(model, trainInput)).flatten()) | |
| candidate_preds = np.array(candidate_preds) | |
| trainResponse = np.array(trainResponse).flatten() | |
| best_preds = np.array(evaluateGPModel(best_model, trainInput)).flatten() | |
| selected_preds = [best_preds] | |
| ensemble_cluster = [best_model] | |
| picked_indices = set() | |
| for r in range(10): | |
| #choose up to 10 best models from this ensemble | |
| if len(picked_indices) >= len(filtered_model_population): | |
| break | |
| current_ensemble_mean = np.median(selected_preds, axis=0) | |
| ensemble_residuals = trainResponse - current_ensemble_mean | |
| best_candidate_idx = -1 | |
| anti_correlation = float('inf') | |
| for i, cur_model in enumerate(candidate_preds): | |
| if i in picked_indices:continue | |
| cand_residual = trainResponse - cur_model | |
| corr_val = corr = pearsonr(cand_residuals, ensemble_residuals)[0] | |
| if np.nan(corr_val): | |
| corr_val = 1.0 | |
| if corr_val < anti_correlation: | |
| anti_correlation = corr_val | |
| best_candidate_idx = i | |
| ##add the choosen best model to our current ensemble | |
| picked_indices.add(best_candidate_idx) | |
| selected_preds.append(candidate_preds[best_candidate_idx]) | |
| ensemble_cluster.append(filtered_model_population[best_candidate_idx]) | |
| print(f"Added Model {best_candidate_idx} (Residual Correlation: {lowest_correlation:.4f})") | |
| return ensemble_cluster | |
| ############################ | |
| #Sharpness Computations | |
| ############################ | |
| def sharpnessConstants(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2): | |
| fits=[] | |
| #For each model parameter, if numeric, randomly perturb by x% and see how much the model changes | |
| for i in range(numPerturbations): | |
| tempModel=copy.deepcopy(model) | |
| newParameters=[param if callable(param) else param*(1+percentPerturbation*(np.random.uniform()-0.5)) for param in model[1]] | |
| tempModel[1]=newParameters | |
| fits.append(fitness(tempModel,inputData,responseData)) | |
| return np.std(fits) | |
| def sharpnessData(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2,preserveSign=False): | |
| fits=[] | |
| #For each vector, randomly perturb by x% of the standard deviation and see how much the model fitness changes | |
| for i in range(numPerturbations): | |
| tempData=copy.deepcopy(inputData) | |
| tempData=np.array([(vec+percentPerturbation*np.std(vec)*(np.random.uniform(size=len(vec))-0.5)) for vec in tempData]) | |
| if preserveSign: | |
| signs=[np.unique(var) for var in np.sign(inputData)] | |
| tempData=[signs[i]*abs(tempData[i]) if len(signs[i])==1 else tempData[i] for i in range(len(signs))] | |
| fits.append(fitness(model,tempData,responseData)) | |
| return np.std(fits) | |
| def totalSharpness(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2,preserveSign=False): | |
| return sharpnessConstants(model,inputData,responseData,numPerturbations=numPerturbations,percentPerturbation=percentPerturbation)+sharpnessData(model,inputData,responseData,numPerturbations=numPerturbations,percentPerturbation=percentPerturbation,preserveSign=preserveSign) | |
| ############################ | |
| #Multiple Independent Searches | |
| ############################ | |
| def runEpochs(x,y,epochs=5,**kwargs): | |
| models=[] | |
| for i in range(epochs): | |
| models+=evolve(x,y,**kwargs) | |
| return sortModels(models) | |
| ############################ | |
| #Parallelization | |
| ############################ | |
| from joblib import Parallel, delayed | |
| def parallelEvolve(*args,n_jobs=-1,avail_cores=-1, **kwargs): | |
| if avail_cores==-1: | |
| try: | |
| avail_cores=len(os.sched_getaffinity(0)) | |
| except: | |
| avail_cores=os.cpu_count() | |
| if n_jobs==-1: | |
| try: | |
| n_jobs=len(os.sched_getaffinity(0)) | |
| except: | |
| n_jobs=os.cpu_count() | |
| if "tracking" in kwargs and kwargs["tracking"]: | |
| kwargs["returnTracking"]=True | |
| print(f"Running parallel evolution with {n_jobs} jobs.") | |
| if "liveTracking" in kwargs and kwargs["liveTracking"]: | |
| print("Live tracking is not supported in parallel evolution, disabling live tracking.") | |
| kwargs["liveTracking"]=False | |
| runs = Parallel(n_jobs=avail_cores, backend="loky")(delayed(evolve)(*args, **kwargs) for _ in range(n_jobs)) | |
| runs, tracking_histories = zip(*runs) | |
| if ("tracking" in kwargs and kwargs["tracking"]): | |
| runs, tracking = zip(*runs) | |
| # plot tracking for each job | |
| plt.figure(figsize=(12, 6)) | |
| for i, track in enumerate(tracking): | |
| plt.plot(track, label=f'Job {i+1}') | |
| plt.title('Best Fitness Over Generations for Each Parallel Run') | |
| plt.xlabel('Generations') | |
| plt.ylabel('Best Fitness') | |
| if n_jobs <= 16: # Only show legend if there are a reasonable number of jobs | |
| plt.legend() | |
| plt.show() | |
| flat = [model for sublist in runs for model in sublist] | |
| return sortModels(flat), tracking_histories | |
| ############################ | |
| #Benchmarking | |
| ############################ | |
| def generateRandomBenchmark(numVars=5, numSamples=100, noiseLevel=0, opsChoices=defaultOps(), constChoices=defaultConst(), maxLength=10): | |
| # Generate random input data | |
| inputData = np.random.rand(numVars, numSamples) | |
| # Generate a random target function | |
| randomModel = generateRandomModel(numVars, opsChoices, constChoices, maxLength) | |
| # Evaluate the model to get response data | |
| responseData = evaluateGPModel(randomModel, inputData) | |
| # Add noise if specified | |
| if noiseLevel > 0: | |
| noise = np.random.normal(0, noiseLevel, size=responseData.shape) | |
| responseData += noise | |
| return inputData, responseData, randomModel | |