#!/usr/bin/env python # coding: utf-8 from IPython.core.debugger import set_trace import random import matplotlib.pyplot as plt import numpy as np import math import copy import sys from scipy.stats import pearsonr # for computing correlation from functools import reduce #for flattening lists from operator import concat #for flattening lists from scipy.stats import trim_mean # for ensemble evaluation from scipy.stats import differential_entropy import warnings import time import dill import os from sklearn.cluster import KMeans #for clustering in ensemble definition from scipy.optimize import minimize #for uncertainty maximization from sympy import symbols, simplify, expand import sympy as sym try: from IPython.display import display, clear_output except: pass import signal #for timing out functions from contextlib import contextmanager #for timing out functions warnings.filterwarnings('ignore', '.*invalid value.*' ) warnings.filterwarnings('ignore', '.*overflow.*' ) warnings.filterwarnings('ignore', '.*divide by.*' ) warnings.filterwarnings('ignore', '.*is constant.*' ) warnings.filterwarnings('ignore', '.*nearly constant.*' ) warnings.filterwarnings('ignore', '.*Polyfit may be.*' ) warnings.filterwarnings('ignore', '.*Number of.*') ## Get the arrays for the models model_distributions = [] def protectDiv(a,b): if (type(b)==int or type(b)==float or type(b)==np.float64) and b==0: return a/math.nan if (type(b)==np.ndarray) and (0 in b): return a/np.where(b==0,math.nan,b) return a/b def add(a,b): return a+b def sub(a,b): return a-b def mult(a,b): return a*b def exp(a): safe_a = np.clip(a, -90.0, 90.0) return np.exp(safe_a) # def sine(a,b): # return np.sin(a) def power(a,b): if (type(a)==int or type(a)==float or type(a)==np.float64) and a==0: return a/math.nan if (type(a)==np.ndarray) and (0 in a): return a/np.where(a==0,math.nan,a) return a**b def sqrt(a): return np.sqrt(abs(a)) def sqrd(a): return a**2 def inv(a): return np.array(a).astype(float)**(-1) def neg(a): return -a def sin(a): return np.sin(a) def cos(a): return np.cos(a) def tan(a): return np.tan(a) def arccos(a): return np.arccos(a) def arcsin(a): return np.arcsin(a) def arctan(a): return np.arctan(a) def tanh(a): return np.tanh(a) def log(a): a = np.maximum(np.abs(a), 1e-9) return np.log(a) def log10(a): return np.log10(a) def log2(a): return np.log2(a) def abs1(a): return np.abs(a) def and1(a,b): return np.logical_and(a,b) def or1(a,b): return np.logical_or(a,b) def xor1(a,b): return np.logical_xor(a,b) def nand1(a,b): return np.logical_not(np.logical_and(a,b)) def nor1(a,b): return np.logical_not(np.logical_or(a,b)) def xnor1(a,b): return np.logical_not(np.logical_xor(a,b)) def not1(a): return np.logical_not(a) def defaultOps(): return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,"pop","pop","pop","pop","pop","pop"] def allOps(): return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,cos,sin,tan,arccos,arcsin,arctan,tanh,log,"pop","pop","pop","pop","pop","pop","pop","pop","pop","pop"] def customOps(): return [protectDiv,add,sub,mult,exp,sqrd,sqrt,inv,neg,cos,sin,log,"pop","pop","pop","pop","pop","pop","pop","pop"] def booleanOps(): return [and1,or1,xor1,nand1,nor1,xnor1,not1,"pop","pop","pop","pop","pop","pop","pop"] def randomInt(a=-3,b=3): return random.randint(a,b) def defaultConst(): return [np.pi, np.e, randomInt,ranReal ] def booleanConst(): return [1,0] def ranReal(a=20,b=-10): return random.random()*a-b ##### MODEL DISTRIBUTION CODE ##### import sympy as sym import sympy as sym from sympy import preorder_traversal from collections import defaultdict def extract_genotype(population_buffer): """ Extracts in terms of the geneotypes instead of using pheno """ stats = { "operators": [], "terminals": [], } for prog in population_buffer: print(prog[0], prog[1]) ops = [op.__name__ if hasattr(op, '__name__') else str(op) for op in prog[0]] stats["operators"].extend(ops) #(Features and Constants) stats["terminals"].extend([str(t) for t in prog[1]]) return stats def extract_model_stats(expr): stats = { "constants": [], "operators": [], "features": [], "powers": [] } for node in preorder_traversal(expr): if isinstance(node, sym.Number): stats["constants"].append(float(node)) elif isinstance(node, sym.Symbol): stats["features"].append(str(node)) # POWERS elif isinstance(node, sym.Pow): base, exp = node.args stats["operators"].append("pow") if exp.is_number: stats["powers"].append(float(exp)) elif getattr(node, 'is_Function', False): op_name = node.func.__name__ stats["operators"].append(op_name) elif isinstance(node, sym.Add): has_sub = any(arg.could_extract_minus_sign() for arg in node.args) stats["operators"].append("add") if has_sub: stats["operators"].append("sub") # MUL → detect * vs / elif isinstance(node, sym.Mul): has_div = False for arg in node.args: if isinstance(arg, sym.Pow): base, exp = arg.args if exp.is_number and exp < 0: has_div = True break if has_div: stats["operators"].append("div") else: stats["operators"].append("mul") return stats def get_model_distributions(models): ''' Returns list of dictionaries, where each dict is breakdown of each model from list of models ''' all_models_dist, gene_dist = [], [] for i, mod in enumerate(models): expr = printGPModel(mod) if isinstance(expr, sym.Expr): expr = sym.simplify(expr) #expr = sym.expand(expr) stats = extract_model_stats(expr) stats["DEBUG_INDEX"] = i all_models_dist.append(stats) print(f"--- MODEL INDEX {i} ---") print(stats) gene_dist.append(extract_genotype(stats)) return all_models_dist, gene_dist # def get_model_distribution(models): # # We will store the distributions in a dictionary where the key is the feature # # and the value is a list of terms across all models from pareto front # feature_distributions = {} # all_models_expanded = [] # for mod in models: # expr = printGPModel(mod) # print(expr) # if isinstance(expr, sym.Expr): # expanded_expr = sym.expand(expr) # else: # expanded_expr = expr # terms = expanded_expr.args if hasattr(expanded_expr, 'args') and expanded_expr.func == sym.Add else (expanded_expr,) # print("WE ARE PRINTING OUT THE TERMS FOR ONE MODEL") # print(terms) # all_models_expanded.append(terms) # for term in terms: # print("EACH INDIVIDUAL term FOR ONE MODEL", term) # if hasattr(term, 'free_symbols'): # print("HAS FROZEN SYMBOLS") # sig = frozenset(term.free_symbols) # print("sig", sig) # if sig not in feature_distributions: # #THIS REPRESENTS THE MODEL REPRESENTATION # feature_distributions[sig] = [] # for terms in all_models_expanded: # current_model_contributions = {sig: 0 for sig in feature_distributions.keys()} # for term in terms: # if hasattr(term, 'free_symbols'): # sig = frozenset(term.free_symbols) # current_model_contributions[sig] += term # else: # sig = frozenset() # current_model_contributions[sig] += term # # Append this model's contributions to the global buckets # for sig in feature_distributions.keys(): # feature_distributions[sig].append(current_model_contributions[sig]) # return feature_distributions ############################ #Data Subsampling Methods ############################ def randomSubsample(x,y, *args, **kwargs): n=max(int(np.ceil(len(y)**(3/5))),3) idx=np.random.choice(range(x.shape[1]),n,replace=False) return np.array([i[idx] for i in x]),y[idx] def generationProportionalSample(x,y,generation=100,generations=100): n=max(int(np.ceil(len(y)*(generation/generations)**(3/5))),3) idx=np.random.choice(range(x.shape[1]),n,replace=False) return np.array([i[idx] for i in x]),y[idx] def ordinalSample(x,y,generation=100,generations=100): n=max(int(len(y)*generation/generations),3) sortedIdx=np.argsort(y) step=len(y)/(n-1) idx=[sortedIdx[max(int(i*step)-1,0)] for i in range(n)] return np.array([i[idx] for i in x]),y[idx] def orderedSample(x,y,generation=100,generations=100): n=max(int(len(y)*generation/generations),3) idx=[i for i in range(n)] return np.array([i[idx] for i in x]),y[idx] def ordinalBalancedSample(x,y,generation=100,generations=100): n=max(int(len(y)*generation/generations),3) numBins=int(max(np.ceil(np.sqrt(n)),3)) bins=np.linspace(min(y),max(y),numBins+1) binIdx=np.digitize(y,bins)-1 samplesPerBin=max(int(n/numBins),1) idx=[] for i in range(numBins): binMembers=[j for j in range(len(y)) if binIdx[j]==i] if len(binMembers)>0: chosen=np.random.choice(binMembers,min(samplesPerBin,len(binMembers)),replace=False) idx=idx+chosen.tolist() return np.array([i[idx] for i in x]),y[idx] def balancedSample(x,y, *args, **kwargs): n=int(np.ceil(len(y)**(3/5))) numBins=max(round(n**(2/5)),3) bins=np.linspace(min(y),max(y),numBins+1) binIdx=np.digitize(y,bins)-1 samplesPerBin=max(int(n/numBins),1) idx=[] for i in range(numBins): binMembers=[j for j in range(len(y)) if binIdx[j]==i] if len(binMembers)>0: chosen=np.random.choice(binMembers,min(samplesPerBin,len(binMembers)),replace=False) idx=idx+chosen.tolist() return np.array([i[idx] for i in x]),y[idx] import inspect def getArity(func): #Returns the arity of a function: used for model evaluations if func=="pop": return 1 return len(inspect.signature(func).parameters) getArity.__doc__ = "getArity(func) takes a function and returns the function arity" def modelArity(model): #Returns the total arity of a model return 1+sum([getArity(i)-1 for i in model[0]]) modelArity.__doc__ = "modelArity(model) returns the total arity of a model" def listArity(data): #Returns arity of evaluating a list of operators if len(data)==0: return 0 return 1+sum([getArity(i)-1 for i in data]) listArity.__doc__ = "listArity(list) returns the arity of evaluating a list of operators" def buildEmptyModel(): # Generates an empty model return [[],[],[]] buildEmptyModel.__doc__ = "buildEmptyModel() takes no inputs and generates an empty GP model" def variableSelect(num): #Function that creates a function to select a specific variable return lambda variables: variables[num] variableSelect.__doc__ = "variableSelect(n) is a function that creates a function to select the nth variable" def modelToListForm(model): model[0]=model[0].tolist() def modelRestoreForm(model): model[0]=np.array(model[0],dtype=object) def generateRandomModel(variables,ops,const,maxLength): #Generates a random GP model prog = buildEmptyModel() #Generate an empty model with correct structure varChoices=[variableSelect(i) for i in range(variables)]+const #All variable and constants choices prog[0]=np.array(np.random.choice(ops,random.randint(1,maxLength)),dtype=object) #Choose random operators countVars=modelArity(prog) #Count how many variables/constants are needed prog[1]=np.random.choice(varChoices,countVars) #Choose random variables/constants # if all vars are constants then replace one random term if all(t in const for t in prog[1]): replace_idx = random.randrange(countVars) prog[1][replace_idx] = random.choice(varChoices[:variables]) #Replace with a variable prog[1]=[i() if (callable(i) and i.__name__!='' )else i for i in prog[1]] #If function then evaluate return prog generateRandomModel.__doc__ = "generateRandomModel() takes as input the variables, operators, constants, and max program length and returns a random program" def initializeGPModels(variables,ops=defaultOps(),const=defaultConst(),numberOfModels=100,maxLength=10): # generate random linear program prog=[[],[],[]] # prog stores [Operators, VarConst, QualityMetrics] models=[generateRandomModel(variables,ops,const,maxLength) for i in range(numberOfModels)] #Generate models return models initializeGPModels.__doc__ = "initializeGPModels(countOfVariables, operators, constants, numberOfModels=100, maxLength=10) returns a set of randomly generated models" def reverseList(data): #Returns a list reversed return [i for i in reversed(data)] reverseList.__doc__ = "reverseList(data) returns the data list reversed" def varReplace(data,variables): #Replaces variable references with data during model evaluation return [i(variables) if callable(i) else i for i in data] varReplace.__doc__ = "varReplace(data,variables) replaces references to variables in data with actual values" def inputLen(data): #Returns the number of data records in a data set el1=data[0] if type(el1)==list or type(el1)==np.ndarray: return len(el1) else: return 1 inputLen.__doc__ = "inputLen(data) determines the number of data records in a data set" def varCount(data): #Returns the number of variables in a data set return len(data) varCount.__doc__ = "varCount(data) determines the number of variables in a data set" def evaluateGPModel(model,inputData): #Evaluates a model numerically response=evModHelper(model[1],model[0],[],np.array(inputData).astype(float))[2][0] if not type(response)==np.ndarray and inputLen(inputData)>1: response=np.array([response for i in range(inputLen(inputData))]) return response evaluateGPModel.__doc__ = "evaluateGPModel(model,data) numerically evaluates a model using the data stored in inputData" def evModHelper(varStack,opStack,tempStack,data): #Recursive helper function for evaluateGPModel stack1=varStack stack2=opStack stack3=tempStack if len(stack2)==0: return [stack3,stack2,stack1] op=stack2[0] stack2=stack2[1:] if callable(op): patt=getArity(op) while patt>len(stack3): stack3=[stack1[0]]+stack3 stack1=stack1[1:] try: temp=op(*varReplace(reverseList(stack3[:patt]),data)) except TypeError: print("stack3: ", stack3, " patt: ", patt, " data: ", data) temp=np.nan except OverflowError: temp=np.nan stack3=stack3[patt:] stack3=[temp]+stack3 else: if len(stack1)>0: stack3=varReplace([stack1[0]],data)+stack3 stack1=stack1[1:] if len(stack2)>0: stack1,stack2,stack3=evModHelper(stack1,stack2,stack3,data) return [stack1,stack2,stack3] evModHelper.__doc__ = "evModHelper(varStack,opStack,tempStack,data) is a helper function for evaluateGPModel" def rmse(model, inputData, response): predictions = evaluateGPModel(model, inputData) if not all(np.isfinite(predictions)) or any(np.iscomplex(predictions)): return np.nan return np.sqrt(np.mean((predictions - response) ** 2)) rmse.__doc__ = "rmse(model, input, response) is a fitness objective that evaluates the root mean squared error" def binaryError(model, input, response): prediction=evaluateGPModel(model,input) error=np.mean(np.abs(prediction-response)) if np.isnan(error) or np.isinf(error) or error > 1 or error < 0: return 0.5 return min(error,1 - error) def fitness(prog,data,response): # Fitness function using correlation predicted=evaluateGPModel(prog,np.array(data)) if type(predicted)!=list and type(predicted)!=np.ndarray: predicted=np.array([predicted for i in range(inputLen(data))]) try: if np.isnan(predicted).any() or np.isinf(predicted).any(): return np.nan except TypeError: #print(predicted) return np.nan except OverflowError: return np.nan if (not all(np.isfinite(np.array(predicted,dtype=np.float32)))) or np.all(predicted==predicted[0]): return np.nan try: fit=1-pearsonr(predicted,np.array(response))[0]**2 # 1-R^2 except ValueError: return 1 if math.isnan(fit): return 1 # If nan return 1 as fitness return fit # Else return actual fitness 1-R^2 fitness.__doc__ = "fitness(program,data,response) returns the 1-R^2 value of a model" def stackGPModelComplexity(model,*args): return len(model[0])+len(model[1])-model[0].tolist().count("pop") stackGPModelComplexity.__doc__ = "stackGPModelComplexity(model) returns the complexity of the model" ###################### Timeout function for model complexity ###################### class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) #################################################################################### # Compute Hess def ComputeSymbolicHess(model,vars): printedModel=sym.simplify(printGPModel(model)) if type(printedModel)==float: return sym.matrices.dense.MutableDenseMatrix(np.zeros((vars,vars))) hess=sym.hessian(printedModel, [symbols('x'+str(i)) for i in range(vars)]) return hess def EvaluateHess(hess,vars,values): numHess=hess.subs({symbols('x'+str(j)):values[j] for j in range(vars)}) hessN = np.array(numHess).astype(float) rankN=np.linalg.matrix_rank(hessN,tol=0.0001*0.0001*10) return rankN def Approx2Deriv(model,values,diff1,diff2,positions): #maybe diff should be relative to the variation of each feature term1=[values[i]+diff1 if i == positions[0] else values[i] for i in range(len(values))] term1=[term1[i]+diff2 if i == positions[1] else term1[i] for i in range(len(term1))] term2=[values[i]-diff1 if i == positions[0] else values[i] for i in range(len(values))] term2=[term2[i]+diff2 if i == positions[1] else term2[i] for i in range(len(term2))] term3=[values[i]+diff1 if i == positions[0] else values[i] for i in range(len(values))] term3=[term3[i]-diff2 if i == positions[1] else term3[i] for i in range(len(term3))] term4=[values[i]-diff1 if i == positions[0] else values[i] for i in range(len(values))] term4=[term4[i]-diff2 if i == positions[1] else term4[i] for i in range(len(term4))] return ((evaluateGPModel(model,term1)-evaluateGPModel(model,term2))/((2*diff1)) -(evaluateGPModel(model,term3)-evaluateGPModel(model,term4))/((2*diff1)))/(2*diff2) def ApproxHessRank(model,vars,values,diff1=0.001,diff2=0.001): hess=[[Approx2Deriv(model,values,diff1,diff2,[i,j]) for i in range(vars)] for j in range(vars)] hessN = np.array(hess).astype(float) rankN=np.linalg.matrix_rank(hessN,tol=0.0001*0.0001*10) return rankN #def HessRank(model,vars,values): # try: # with time_limit(.01): # hess=ComputeSymbolicHess(model,vars) # hess = EvaluateHess(hess,vars,values) # #print(hess) # return hess # except TimeoutException as e: # hess=ApproxHessRank(model,vars,values) #print(hess) # return hess def HessRank(model,vars,values): hess=ApproxHessRank(model,vars,values) return hess # Counts basis terms in a model def count_basis_terms(equation, expand=False): try: with time_limit(2): if expand: # Simplify the equation to standardize the expression simplified_eq = simplify(equation) # Expand the expression to identify additive terms clearly expanded_eq = expand(simplified_eq) # Separate the terms of the expression terms = expanded_eq.as_ordered_terms() else: terms = equation.as_ordered_terms() #print(terms) except TimeoutException as e: return 1000 return len(terms) # Determines the number of basis functions in a model by counting +s and -s def basisFunctionComplexity(model,vars, values,*args): try: # values should be max, min, and median with respect to response variable return HessRank(model,vars,values)#count_basis_terms(printGPModel(model)) except: return 1000 # Creates a lambda function to be used as a complexity metric when given a target dimensionality and deviation def basisFunctionComplexityDiff(target, deviation, vars, low, mid, high): return lambda model,*args: max(np.mean([abs(basisFunctionComplexity(model,vars,low)-target),abs(basisFunctionComplexity(model,vars,mid)-target) ,abs(basisFunctionComplexity(model,vars,high)-target)] ),(deviation))-deviation def setModelQuality(model,inputData,response,modelEvaluationMetrics=[fitness,stackGPModelComplexity]): model[2]=[i(model,inputData,response) for i in modelEvaluationMetrics] setModelQuality.__doc__ = "setModelQuality(model, inputdata, response, metrics=[r2,size]) is an inplace operator that sets a models quality" def stackPass(model,pt): i=0 t=0 p=0 s=model[0] if i 0: stack2=[stack1[0]]+stack2 stack1=stack1[1:] else: if len(newStack)==0 and pts[0]==0: tStack,stack1,stack2=stackGrab(stack1,stack2,getArity(opStack[i])) else: tStack,stack1,stack2=stackGrab(stack1,stack2,getArity(opStack[i])-1) newStack=newStack+tStack i+=1 return newStack def recombination2pt(model1,model2): #2 point recombination pts1=np.sort(random.sample(range(0,len(model1[0])+1),2)) pts2=np.sort(random.sample(range(0,len(model2[0])+1),2)) #pts1=[4,5] #pts2=[2,4] #pts1=[0,3] #pts2=[1,3] #print(pts1,pts2) child1=buildEmptyModel() child2=buildEmptyModel() parent1=copy.deepcopy(model1) parent2=copy.deepcopy(model2) parent1[0]=np.array(parent1[0],dtype=object).tolist() parent2[0]=np.array(parent2[0],dtype=object).tolist() child1[0]=np.array(parent1[0][0:pts1[0]]+parent2[0][pts2[0]:pts2[1]]+parent1[0][pts1[1]:],dtype=object) child2[0]=np.array(parent2[0][0:pts2[0]]+parent1[0][pts1[0]:pts1[1]]+parent2[0][pts2[1]:],dtype=object) varPts1=[listArity(parent1[0][:(pts1[0])])+0,listArity(parent2[0][:(pts2[0])])+0,listArity(parent2[0][pts2[0]:pts2[1]]),listArity(parent1[0][pts1[0]:pts1[1]])] if pts1[0]==0: varPts1[0]+=1 if pts2[0]==0: varPts1[1]+=1 child1[1]=parent1[1][:varPts1[0]]+parent2[1][varPts1[1]:(varPts1[1]+varPts1[2]-1)]+parent1[1][(varPts1[0]+varPts1[3]-1):] varPts2=[listArity(parent2[0][:(pts2[0])])+0,listArity(parent1[0][:(pts1[0])])+0,listArity(parent1[0][pts1[0]:pts1[1]]),listArity(parent2[0][pts2[0]:pts2[1]])] if pts1[0]==0: varPts2[1]+=1 if pts2[0]==0: varPts2[0]+=1 child2[1]=parent2[1][:varPts2[0]]+parent1[1][varPts2[1]:(varPts2[1]+varPts2[2]-1)]+parent2[1][(varPts2[0]+varPts2[3]-1):] #print(varPts1,varPts2) return [child1,child2] recombination2pt.__doc__ = "recombination2pt(model1,model2) does 2 point crossover and returns two children models" def get_numeric_indices(l): #Returns indices of list that are numeric return [i for i in range(len(l)) if type(l[i]) in [int,float]] def mutate(model,variables,ops=defaultOps(),const=defaultConst(),maxLength=10): newModel=copy.deepcopy(model) newModel[0]=np.array(newModel[0],dtype=object).tolist() mutationType=random.randint(0,7) varChoices=[variableSelect(i) for i in range(variables)]+const opChoice=0 varChoice=0 tmp=0 if mutationType==0: #single operator mutation opChoice=random.randint(0,len(newModel[0])-1) if len(newModel[0])>0: newModel[0][opChoice]=np.random.choice([i for i in ops] ) elif mutationType==1: #single variable mutation varChoice=np.random.choice(varChoices) if callable(varChoice) and varChoice.__name__!='': varChoice=varChoice() newModel[1][random.randint(0,len(newModel[1])-1)]=varChoice elif mutationType==2: #insertion mutation to top of stack opChoice=np.random.choice(ops) newModel[0]=[opChoice]+newModel[0] while modelArity(newModel)>len(newModel[1]): varChoice=np.random.choice(varChoices) if callable(varChoice) and varChoice.__name__!='': varChoice=varChoice() newModel[1]=[varChoice]+newModel[1] elif mutationType==3: #deletion mutation from top of stack if len(newModel[0])>1: opChoice=random.randint(1,len(newModel[0])-1) newModel[0]=newModel[0][-opChoice:] newModel[1]=newModel[1][-listArity(newModel[0]):] elif mutationType==4: #insertion mutation to bottom of stack opChoice=np.random.choice([i for i in ops]) newModel[0].append(opChoice) elif mutationType==5: #mutation via crossover with random model newModel=recombination2pt(newModel,generateRandomModel(variables,ops,const,maxLength))[0] elif mutationType==6: #single operator insertion mutation singleOps=[op for op in ops if getArity(op)==1 and op!='pop'] singleOps.append('pop') pos=random.randint(0,len(newModel[0])-1) newModel[0].insert(pos,np.random.choice(singleOps)) elif mutationType==7: #nudge numeric constant pos=get_numeric_indices(newModel[1]) if(len(pos)>0): #If there are numeric constants pos=random.choice(pos) newModel[1][pos]=newModel[1][pos]+np.random.normal(-1,1) if modelArity(newModel)len(newModel[1]): newModel[1]=newModel[1]+[np.random.choice(varChoices) for i in range(modelArity(newModel)-len(newModel[1]))] newModel[1]=[varChoice() if callable(varChoice) and varChoice.__name__!='' else varChoice for varChoice in newModel[1]] newModel[0]=np.array(newModel[0],dtype=object) return newModel mutate.__doc__ = "mutate(model,variableCount,ops,constants,maxLength) mutates a model" def paretoFront(fitValues): #Returns Boolean list of Pareto front elements onFront = np.ones(fitValues.shape[0], dtype = bool) for i, j in enumerate(fitValues): if onFront[i]: onFront[onFront] = np.any(fitValues[onFront]0: front=paretoTournament(tMods) paretoModels=paretoModels+front for i in front: tMods.remove(i) [modelRestoreForm(mod) for mod in paretoModels] return paretoModels selectModels.__doc__ = "selectModels(models, selectionSize=0.5) iteratively selects the Pareto front of a model population until n or n*popSize models are selected" def stackVarUsage(opStack): #Counts how many variables are used by the operator stack pos=getArity(opStack[0]) for j in range(1,len(opStack)): pos+=getArity(opStack[j])-1 if opStack[j]=='pop': pos+=1 return pos stackVarUsage.__doc__ = "stackVarUsage(opStack) is a helper function that determines how many variables/constants are needed by the operator stack" def trimModel(mod): #Removes extra pop operators that do nothing model=copy.deepcopy(mod) i=0 varStack=len(mod[1]) tempStack=0 varStack-=getArity(model[0][i]) tempStack+=1 i+=1 while varStack>0: if model[0][i]=='pop': varStack-=1 tempStack+=1 else: take=getArity(model[0][i])-tempStack if take>0: varStack-=take tempStack=1 else: tempStack-=getArity(model[0][i])-1 i+=1 model[0]=np.array(model[0][:i].tolist()+[j for j in model[0][i:] if not j=='pop'],dtype=object) return model trimModel.__doc__ = "trimModel(model) trims extra pop operators off the operator stack so that further modifications such as a model alignment aren't altered by those pop operators" def alignGPModel(model, data, response): #Aligns a model prediction=evaluateGPModel(model,data) if (not all(np.isfinite(np.array(prediction)))) or np.all(prediction==prediction[0]): return model if np.isnan(np.array(prediction)).any() or np.isnan(np.array(response)).any() or not np.isfinite(np.array(prediction,dtype=np.float32)).all(): return model # Variance guards if np.std(prediction) < 1e-12: return model if np.ptp(prediction) < 1e-12: return model try: align=np.polyfit(prediction,response,1,rcond=1e-16)#np.round(np.polyfit(prediction,response,1,rcond=1e-16),decimals=14) except np.linalg.LinAlgError: #print("Alignment failed for: ", model, " with prediction: ", prediction, "and reference data: ", response) return model newModel=trimModel(model) newModel[0]=np.array(newModel[0].tolist()+[mult,add],dtype=object) newModel[1]=newModel[1]+align.tolist() #setModelQuality(newModel,data,response) return newModel alignGPModel.__doc__ = "alignGPModel(model, input, response) aligns a model such that response-a*f(x)+b are minimized over a and b" def evolve(inputData, responseData, generations=100, ops=defaultOps(), const=defaultConst(), variableNames=[], mutationRate=79, crossoverRate=11, spawnRate=10, extinction=False,extinctionRate=10,elitismRate=10,popSize=300,maxComplexity=100,align=True,initialPop=[],timeLimit=300,capTime=False,tourneySize=5,tracking=False,returnTracking=False,liveTracking=False,liveTrackingInterval=1,modelEvaluationMetrics=[fitness,stackGPModelComplexity],dataSubsample=False,samplingMethod=randomSubsample,alternateObjectives=[],alternateObjFrequency=10,allowEarlyTermination=False,earlyTerminationThreshold=0): evolution_hisotry = [] alternatingFlag = False if callable(modelEvaluationMetrics): metrics=[modelEvaluationMetrics] allMetrics=[modelEvaluationMetrics]+alternateObjectives elif isinstance(modelEvaluationMetrics, list) and callable(modelEvaluationMetrics[0]): metrics=modelEvaluationMetrics allMetrics=modelEvaluationMetrics+alternateObjectives elif isinstance(modelEvaluationMetrics, list) and isinstance(modelEvaluationMetrics[0], list): metrics=modelEvaluationMetrics[0] allMetrics=[item for sublist in modelEvaluationMetrics for item in sublist]+alternateObjectives alternatingFlag = True else: raise ValueError("modelEvaluationMetrics must be a function, list of functions, or a list of lists of functions") fullInput,fullResponse=copy.deepcopy(inputData),copy.deepcopy(responseData) inData=copy.deepcopy(fullInput) resData=copy.deepcopy(fullResponse) variableCount=varCount(inData) models=initializeGPModels(variableCount,ops,const,popSize) models=models+initialPop startTime=time.perf_counter() bestFits=[] gene_dists = [] if liveTracking: fig, ax = plt.subplots(figsize=(20,10)) ckTime=time.perf_counter() for i in range(generations): if capTime and time.perf_counter()-startTime>timeLimit: break if len(alternateObjectives)>0 and (i+1)%alternateObjFrequency==0: metrics=modelEvaluationMetrics[:1]+alternateObjectives else: if alternatingFlag: metrics=modelEvaluationMetrics[i%len(modelEvaluationMetrics)] else: metrics=modelEvaluationMetrics if dataSubsample: inData,resData=samplingMethod(fullInput,fullResponse,generations=generations,generation=i) for mods in models: setModelQuality(mods,inData,resData,modelEvaluationMetrics=metrics) models=removeIndeterminateModels(models) if allowEarlyTermination and min([mods[2][0] for mods in models])<=earlyTerminationThreshold: print("Early termination at generation ", i) break if tracking or liveTracking or returnTracking: bestFits.append(min([mods[2][0] for mods in paretoTournament(models)])) if liveTracking and time.perf_counter()-ckTime>liveTrackingInterval: ax.clear() ax.plot(bestFits) ax.set_title(f"Best Model: {bestFits[-1]:.2f} at Generation {(i+1)}") ax.set_xlabel("Generations") ax.set_ylabel("Fitness") clear_output(wait=True) display(fig) #plt.show() plt.close(fig) ckTime=time.perf_counter() #get distribution of the models from the last generation and use the models from the Pareto Front paretoModels=selectModels(models,elitismRate/100*popSize if elitismRate/100*popSize0: models=initializeGPModels(variableCount,ops,const,popSize) for mods in models: setModelQuality(mods,inData,resData,modelEvaluationMetrics=metrics) models=tournamentModelSelection(models,popSize,tourneySize) crossoverPairs=random.sample(models,round(crossoverRate/100*popSize)) toMutate=random.sample(models,round(mutationRate/100*popSize)) childModels=paretoModels for j in range(round(len(crossoverPairs)/2)-1): childModels=childModels+recombination2pt(crossoverPairs[j],crossoverPairs[j+round(len(crossoverPairs)/2)]) for j in toMutate: childModels=childModels+[mutate(j,variableCount,ops,const)] childModels=childModels+initializeGPModels(variableCount,ops,const,round(spawnRate/100*popSize)) childModels=deleteDuplicateModels(childModels) childModels=[model for model in childModels if stackGPModelComplexity(model)= generations): evolution_hisotry.append(models) yield models for mods in models: setModelQuality(mods,fullInput,fullResponse,modelEvaluationMetrics=allMetrics) models=[trimModel(mod) for mod in models] models=deleteDuplicateModels(models) models=removeIndeterminateModels(models) models=sortModels(models) if align: models=[alignGPModel(mods,fullInput,fullResponse) for mods in models] for mods in models: setModelQuality(mods,fullInput,fullResponse,modelEvaluationMetrics=allMetrics) if tracking or returnTracking: bestFits.append(min([mods[2][0] for mods in paretoTournament(models)])) if returnTracking: return models, bestFits plt.figure() plt.plot(bestFits) plt.title("Fitness over Time") plt.xlabel("Generations") plt.ylabel("Fitness") plt.show() return models,evolution_hisotry #model_distributions def replaceFunc(stack,f1,f2): return [i if i!=f1 else f2 for i in stack] def printGPModel(mod,inputData=symbols(["x"+str(i) for i in range(100)])): #Evaluates a model algebraically def inv1(a): return a**(-1) from sympy import tan as tan1, exp as exp1, sqrt as sqrt1, sin as sin1, cos as cos1, acos, asin, atan, tanh as tanh1, log as log1 def sqrt2(a): return sqrt1(a) def log2(a): return log1(a) model = copy.deepcopy(mod) model[0] = replaceFunc(model[0],exp,exp1) model[0] = replaceFunc(model[0],tan,tan1) model[0] = replaceFunc(model[0],sqrt,sqrt2) model[0] = replaceFunc(model[0],inv,inv1) model[0] = replaceFunc(model[0],sin,sin1) model[0] = replaceFunc(model[0],cos,cos1) model[0] = replaceFunc(model[0],arccos,acos) model[0] = replaceFunc(model[0],arcsin,asin) model[0] = replaceFunc(model[0],arctan,atan) model[0] = replaceFunc(model[0],tanh,tanh1) model[0] = replaceFunc(model[0],log,log2) try: response=evModHelper(model[1],model[0],[],np.array(inputData))[2][0] except: return np.nan return response def ensembleSelect(models, inputData, responseData, numberOfClusters=10): #Generates a model ensemble using input data partitions data=np.transpose(inputData) if len(data)len(set(clusters)): numberOfClusters=len(set(clusters)) clusters=KMeans(n_clusters=numberOfClusters).fit_predict(data) dataParts=[] partsResponse=[] for i in range(numberOfClusters): dataParts.append([]) partsResponse.append([]) for i in range(len(clusters)): dataParts[clusters[i]].append(data[i]) partsResponse[clusters[i]].append(responseData[i]) modelResiduals=[] for i in range(len(models)): modelResiduals.append([]) for i in range(len(models)): for j in range(numberOfClusters): modelResiduals[i].append(fitness(models[i],np.transpose(dataParts[j]),partsResponse[j])) best=[] for i in range(numberOfClusters): ordering=np.argsort(modelResiduals[i]) j=0 while ordering[j] in best: j+=1 best.append(ordering[j]) ensemble=[models[best[i]] for i in range(numberOfClusters)] return ensemble def uncertainty(data,trim=0.3): wl=None if len(data)<=4: wl=1 h=differential_entropy(data,window_length=wl) if np.isfinite(h): return h else: return 0 def evaluateModelEnsemble(ensemble, inputData): responses=[evaluateGPModel(mod, inputData) for mod in ensemble] if type(responses[0])==np.ndarray: responses=np.transpose(responses) predictions=[np.median(res) for res in responses] else: predictions=[np.median(responses)] return predictions def evaluateModelEnsembleUncertainty(ensemble, inputData): responses=[evaluateGPModel(mod, inputData) for mod in ensemble] if type(responses[0])==np.ndarray: responses=np.transpose(responses) uncertainties=[uncertainty(res,0) for res in responses] else: uncertainties=[uncertainty(responses,0)] return uncertainties def relativeEnsembleUncertainty(ensemble,inputData): output=evaluateModelEnsembleUncertainty(ensemble,inputData) return np.array(output) def createUncertaintyFunc(ensemble): return lambda x: -relativeEnsembleUncertainty(ensemble,x) def maximizeUncertainty(ensemble,varCount,bounds=[]): #Used to select a new point of maximum uncertainty func=createUncertaintyFunc(ensemble) x0=[np.mean(bounds[i]) for i in range(varCount)] if bounds==[]: pt=minimize(func,x0).x else: pt=minimize(func,x0,bounds=bounds).x return pt def extendData(data,newPoint): return np.concatenate((data.T,np.array([newPoint]))).T def activeLearningCheckpoint(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr): path=os.path.join(str(eqNum),str(version)) file=open(path,"wb+") dill.dump([i,inputData,response,testInput,testResponse,errors,models,minerr],file) file.close() def activeLearningCheckpointLoad(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr): path=os.path.join(str(eqNum),str(version)) try: with open(path,'rb') as f: i,inputData,response,testInput,testResponse,errors,models,minerr=dill.load(f) except FileNotFoundError: return i,inputData,response,testInput,testResponse,errors,models,minerr return i,inputData,response,testInput,testResponse,errors,models,minerr def subSampleSpace(space): newSpace=copy.deepcopy(space) newSpace=list(newSpace) for i in range(len(newSpace)): pts=sorted([np.random.uniform(newSpace[i][0],newSpace[i][1]),np.random.uniform(newSpace[i][0],newSpace[i][1])]) newSpace[i]=tuple(pts) return tuple(newSpace) def activeLearning(func, dims, ranges,rangesP,eqNum=1,version=1,iterations=100): #func should be a lamda function of form lambda data: f(data[0],data[1],...) try: with open(os.path.join(str(eqNum),str(version))+".txt",'rb') as f: return -1 except FileNotFoundError: pass inputData=[] testInput=[] found=False for i in range(dims): inputData.append(np.random.uniform(ranges[i][0],ranges[i][1],3)) testInput.append(np.random.uniform(ranges[i][0],ranges[i][1],200)) inputData=np.array(inputData) testInput=np.array(testInput) response=func(inputData) testResponse=func(testInput) errors=[] models=[] minerr=1 for i in range(iterations): print("input: ",inputData) print("\n response: ",response) i,inputData,response,testInput,testResponse,errors,models,minerr=activeLearningCheckpointLoad(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr) if i>iterations-1: break i+=1 models1=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) models2=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) models3=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) models4=evolve(inputData,response,initialPop=models,generations=1000,tracking=False,popSize=300,ops=allOps(),timeLimit=120,capTime=True,align=False,elitismRate=10) models=models1+models2+models3+models4 models=selectModels(models,20) alignedModels=[alignGPModel(mods,inputData,response) for mods in models] ensemble=ensembleSelect(alignedModels,inputData,response) out=maximizeUncertainty(ensemble,dims,rangesP) while out in inputData.T: out=maximizeUncertainty(ensemble,dims,subSampleSpace(rangesP)) inputData=extendData(inputData,out) response=func(inputData) fitList=np.array([fitness(mod,testInput,testResponse) for mod in alignedModels]) errors.append(min(fitList[np.logical_not(np.isnan(fitList))])) minerr=errors[-1] if minerr<1e-14: #print("Points needed in round", j,": ",3+i, " Time needed: ", time.perf_counter()-roundTime) if not os.path.exists(str(eqNum)): os.makedirs(str(eqNum)) path=os.path.join(str(eqNum),str(version)) file=open(path,"wb+") dill.dump([i,inputData,response,testInput,testResponse,errors,models,minerr],file) file.close() file=open(path+'.txt','w+') file.write(str(i+3)+'\n') file.write(str(errors)) file.close() return 3+i found=True ptsNeeded.append(3+i) break activeLearningCheckpoint(eqNum,version,i,inputData,response,testInput,testResponse,errors,models,minerr) if found==False: #print("Points needed in round",j,": NA (model not found)") path=os.path.join(str(eqNum),str(version)) file=open(path,"wb") dill.dump([-1,inputData,response,testInput,testResponse,errors,models,minerr],file) file.close() file=open(path+'.txt',"w+") file.write(str(i+3)+"\n") file.write(str(errors)) file.close() return -1 def plotModels(models, modelExpression=False): tMods=copy.deepcopy(models) if len(tMods[0][2])<2: # add complexity as second value for mod in tMods: mod[2]=[mod[2][0],stackGPModelComplexity(mod)] [modelToListForm(mod) for mod in tMods] paretoModels=paretoTournament(tMods) for i in paretoModels: tMods.remove(i) [modelRestoreForm(mod) for mod in paretoModels] [modelRestoreForm(mod) for mod in tMods] pAccuracies=[mod[2][0] for mod in paretoModels] pComplexities=[mod[2][1] for mod in paretoModels] accuracies=[mod[2][0] for mod in tMods]+pAccuracies complexities=[mod[2][1] for mod in tMods]+pComplexities colors=['blue' for i in range(len(tMods))]+['red' for i in range(len(pAccuracies))] fig,ax = plt.subplots() sc=plt.scatter(complexities,accuracies,color=colors) plt.xlabel("Complexity") plt.ylabel("1-R**2") if modelExpression: names=[str(printGPModel(mod)) for mod in tMods]+[str(printGPModel(mod)) for mod in paretoModels] else: names = [str(mod) for mod in tMods]+[str(mod) for mod in paretoModels] label = ax.annotate("", xy=(0,0), xytext=(np.min(complexities),np.mean([np.max(accuracies),np.min(accuracies)])), bbox=dict(boxstyle="round", fc="w"), arrowprops=dict(arrowstyle="->")) label.set_visible(False) def update_labels(ind): pos = sc.get_offsets()[ind["ind"][0]] label.xy = pos text = "{}".format(" ".join([names[n] for n in [ind["ind"][0]]])) label.set_text(text) label.get_bbox_patch().set_facecolor('grey') label.get_bbox_patch().set_alpha(0.9) def hover(event): vis = label.get_visible() if event.inaxes == ax: cont, ind = sc.contains(event) if cont: update_labels(ind) label.set_visible(True) fig.canvas.draw_idle() else: if vis: label.set_visible(False) fig.canvas.draw_idle() fig.canvas.mpl_connect("motion_notify_event", hover) plt.show() def plotModelResponseComparison(model,inputData,response,sort=False): plt.scatter(range(len(response)),response,label="True Response") plt.scatter(range(len(response)),evaluateGPModel(model,inputData),label="Model Prediction") plt.legend() plt.xlabel("Data Index") plt.ylabel("Response Value") plt.show() def plotPredictionResponseCorrelation(model,inputData,response): plt.scatter(response,evaluateGPModel(model,inputData),label="Model") plt.plot(response,response,label="Perfect Correlation",color='green') plt.xlabel("True Response") plt.ylabel("Predicted Response") plt.legend() plt.show() #Plot model complexity distribution def plotModelComplexityDistribution(models): tMods=copy.deepcopy(models) [modelToListForm(mod) for mod in tMods] paretoModels=paretoTournament(tMods) for i in paretoModels: tMods.remove(i) [modelRestoreForm(mod) for mod in paretoModels] [modelRestoreForm(mod) for mod in tMods] pComplexities=[mod[2][1] for mod in paretoModels] tComplexities=[mod[2][1] for mod in tMods] plt.hist(tComplexities,label="Non-Pareto Models") plt.hist(pComplexities,label="Pareto Models") plt.xlabel("Model Complexity") plt.ylabel("Frequency") plt.legend() plt.show() #Plot model accuracy distribution def plotModelAccuracyDistribution(models): tMods=copy.deepcopy(models) [modelToListForm(mod) for mod in tMods] paretoModels=paretoTournament(tMods) for i in paretoModels: tMods.remove(i) [modelRestoreForm(mod) for mod in paretoModels] [modelRestoreForm(mod) for mod in tMods] pAccuracies=[mod[2][0] for mod in paretoModels] tAccuracies=[mod[2][0] for mod in tMods] plt.hist(tAccuracies,label="Non-Pareto Models") plt.hist(pAccuracies,label="Pareto Models") plt.xlabel("Model Accuracy") plt.ylabel("Frequency") plt.legend() plt.show() #Plot model residuals relative to response def plotModelResiduals(model,input,response): plt.scatter(response,evaluateGPModel(model,input)-response) plt.xlabel("Response") plt.ylabel("Residual") plt.show() #Plot model residual distribution def plotModelResidualDistribution(model,input,response): plt.hist(evaluateGPModel(model,input)-response) plt.xlabel("Residual") plt.ylabel("Frequency") plt.show() ##CAN USE THESE FOR UNCERTAINITY FURTHERMORE #Plot the presence of variables in a model population def plotVariablePresence(models,variables=["x"+str(i) for i in range(100)],sort=False): vars=[varReplace(model[1],variables) for model in models] #Remove all numeric entries in vars vars=[[i for i in var if type(i)!=int and type(i)!=float] for var in vars] #Merge into one list vars=[j for i in vars for j in i] #Count frequency of each variable in vars varFreqs=[vars.count(i) for i in variables] #Keep only variables that appear at least once variablesUsed=[variables[i] for i in range(len(varFreqs)) if varFreqs[i]>0] varFreqs=[varFreqs[i] for i in range(len(varFreqs)) if varFreqs[i]>0] if sort: order=np.argsort(varFreqs)[::-1] variablesUsed=[variablesUsed[i] for i in order] varFreqs=[varFreqs[i] for i in order] #Plot variable frequency plt.bar(variablesUsed,varFreqs) plt.xlabel("Variable") plt.ylabel("Frequency") plt.show() def replaceOpsWithStrings(opStack): model = copy.deepcopy(opStack) model = replaceFunc(model,exp,str("exp")) model = replaceFunc(model,tan,str("tan")) model = replaceFunc(model,sqrt,str("sqrt")) model = replaceFunc(model,inv,str("1/#")) model = replaceFunc(model,sin,str("sin")) model = replaceFunc(model,cos,str("cos")) model = replaceFunc(model,arccos,str("acos")) model = replaceFunc(model,arcsin,str("asin")) model = replaceFunc(model,arctan,str("atan")) model = replaceFunc(model,tanh,str("tanh")) model = replaceFunc(model,log,str("log")) model = replaceFunc(model,add,"+") model = replaceFunc(model,mult,"*") model = replaceFunc(model,sub,"-") model = replaceFunc(model,protectDiv,"/") model = replaceFunc(model,sqrd,"^2") return model #Plot the presence of operators in a model population def plotOperatorPresence(models,sort=False,excludePop=True): ops=[replaceOpsWithStrings(model[0]) for model in models] #Merge into one list ops=[j for i in ops for j in i] #Remove duplicates in ops uniqueOps=list(set(ops)) if excludePop: #Remove pop operator uniqueOps.remove('pop') #Count frequency of each operator in ops opFreqs=[ops.count(i) for i in uniqueOps] #Keep only operators that appear at least once opsUsed=[str(uniqueOps[i]) for i in range(len(opFreqs)) if opFreqs[i]>0] opFreqs=[opFreqs[i] for i in range(len(opFreqs)) if opFreqs[i]>0] if sort: order=np.argsort(opFreqs)[::-1] opsUsed=[opsUsed[i] for i in order] opFreqs=[opFreqs[i] for i in order] #Plot operator frequency plt.bar(opsUsed,opFreqs) #Rotate x axis labels plt.xticks(rotation=0) plt.xlabel("Operator") plt.ylabel("Frequency") plt.show() def diverse_models(best_model, filtered_model_population, trainInput, trainResponse): ### ## Residuals allow us to capture different diverse models since we can fit the models in multiple different ways ### predicted_data = evaluateGPModel(best_model, trainInput) candidate_preds = [] for model in filtered_model_population: candidate_preds.append(np.array(evaluateGPModel(model, trainInput)).flatten()) candidate_preds = np.array(candidate_preds) trainResponse = np.array(trainResponse).flatten() best_preds = np.array(evaluateGPModel(best_model, trainInput)).flatten() selected_preds = [best_preds] ensemble_cluster = [best_model] picked_indices = set() for r in range(10): #choose up to 10 best models from this ensemble if len(picked_indices) >= len(filtered_model_population): break current_ensemble_mean = np.median(selected_preds, axis=0) ensemble_residuals = trainResponse - current_ensemble_mean best_candidate_idx = -1 anti_correlation = float('inf') for i, cur_model in enumerate(candidate_preds): if i in picked_indices:continue cand_residual = trainResponse - cur_model corr_val = corr = pearsonr(cand_residuals, ensemble_residuals)[0] if np.nan(corr_val): corr_val = 1.0 if corr_val < anti_correlation: anti_correlation = corr_val best_candidate_idx = i ##add the choosen best model to our current ensemble picked_indices.add(best_candidate_idx) selected_preds.append(candidate_preds[best_candidate_idx]) ensemble_cluster.append(filtered_model_population[best_candidate_idx]) print(f"Added Model {best_candidate_idx} (Residual Correlation: {lowest_correlation:.4f})") return ensemble_cluster ############################ #Sharpness Computations ############################ def sharpnessConstants(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2): fits=[] #For each model parameter, if numeric, randomly perturb by x% and see how much the model changes for i in range(numPerturbations): tempModel=copy.deepcopy(model) newParameters=[param if callable(param) else param*(1+percentPerturbation*(np.random.uniform()-0.5)) for param in model[1]] tempModel[1]=newParameters fits.append(fitness(tempModel,inputData,responseData)) return np.std(fits) def sharpnessData(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2,preserveSign=False): fits=[] #For each vector, randomly perturb by x% of the standard deviation and see how much the model fitness changes for i in range(numPerturbations): tempData=copy.deepcopy(inputData) tempData=np.array([(vec+percentPerturbation*np.std(vec)*(np.random.uniform(size=len(vec))-0.5)) for vec in tempData]) if preserveSign: signs=[np.unique(var) for var in np.sign(inputData)] tempData=[signs[i]*abs(tempData[i]) if len(signs[i])==1 else tempData[i] for i in range(len(signs))] fits.append(fitness(model,tempData,responseData)) return np.std(fits) def totalSharpness(model,inputData,responseData,numPerturbations=10,percentPerturbation=0.2,preserveSign=False): return sharpnessConstants(model,inputData,responseData,numPerturbations=numPerturbations,percentPerturbation=percentPerturbation)+sharpnessData(model,inputData,responseData,numPerturbations=numPerturbations,percentPerturbation=percentPerturbation,preserveSign=preserveSign) ############################ #Multiple Independent Searches ############################ def runEpochs(x,y,epochs=5,**kwargs): models=[] for i in range(epochs): models+=evolve(x,y,**kwargs) return sortModels(models) ############################ #Parallelization ############################ from joblib import Parallel, delayed def parallelEvolve(*args,n_jobs=-1,avail_cores=-1, **kwargs): if avail_cores==-1: try: avail_cores=len(os.sched_getaffinity(0)) except: avail_cores=os.cpu_count() if n_jobs==-1: try: n_jobs=len(os.sched_getaffinity(0)) except: n_jobs=os.cpu_count() if "tracking" in kwargs and kwargs["tracking"]: kwargs["returnTracking"]=True print(f"Running parallel evolution with {n_jobs} jobs.") if "liveTracking" in kwargs and kwargs["liveTracking"]: print("Live tracking is not supported in parallel evolution, disabling live tracking.") kwargs["liveTracking"]=False runs = Parallel(n_jobs=avail_cores, backend="loky")(delayed(evolve)(*args, **kwargs) for _ in range(n_jobs)) runs, tracking_histories = zip(*runs) if ("tracking" in kwargs and kwargs["tracking"]): runs, tracking = zip(*runs) # plot tracking for each job plt.figure(figsize=(12, 6)) for i, track in enumerate(tracking): plt.plot(track, label=f'Job {i+1}') plt.title('Best Fitness Over Generations for Each Parallel Run') plt.xlabel('Generations') plt.ylabel('Best Fitness') if n_jobs <= 16: # Only show legend if there are a reasonable number of jobs plt.legend() plt.show() flat = [model for sublist in runs for model in sublist] return sortModels(flat), tracking_histories ############################ #Benchmarking ############################ def generateRandomBenchmark(numVars=5, numSamples=100, noiseLevel=0, opsChoices=defaultOps(), constChoices=defaultConst(), maxLength=10): # Generate random input data inputData = np.random.rand(numVars, numSamples) # Generate a random target function randomModel = generateRandomModel(numVars, opsChoices, constChoices, maxLength) # Evaluate the model to get response data responseData = evaluateGPModel(randomModel, inputData) # Add noise if specified if noiseLevel > 0: noise = np.random.normal(0, noiseLevel, size=responseData.shape) responseData += noise return inputData, responseData, randomModel