import sympy import numpy as np from sklearn.metrics import r2_score, mean_squared_error from sklearn.metrics import mean_absolute_error from scipy.optimize import minimize import math import re class Expression: SAFE_FUNCTIONS = { 'sqrt': np.sqrt, 'log': np.log, 'exp': np.exp, 'sin': np.sin, 'cos': np.cos, 'tan': np.tan, 'asin': np.arcsin, # Corrected to np.arcsin 'abs': np.abs, 'pow': np.power, # Use np.power for vectorization and NaN handling # '**' is handled by Python's eval; if operands are numpy arrays, np.power is used. } OPERATOR_ARITY = { '+': 2, '-': 2, '*': 2, '/': 2, '**': 2, # Changed from '^' to '**' 'sin': 1, 'cos': 1, 'tan': 1, 'log': 1, 'sqrt': 1, 'exp': 1 } OPERATOR_FUNCS = { '+': sympy.Add, '-': lambda x, y: x - y, '*': sympy.Mul, '/': lambda x, y: x / y, '**': sympy.Pow, # Changed from '^' to '**', sympy.Pow handles both 'sin': sympy.sin, 'cos': sympy.cos, 'tan': sympy.tan, 'log': sympy.log, 'sqrt': sympy.sqrt, 'exp': sympy.exp } def parse_prefix(self, tokens): """Parse prefix notation expression to SymPy. Example: ['*', 'x_1', '+', 'x_2', 'C'] -> x_1*(x_2 + C) """ if not tokens: raise ValueError("Empty token list") # Define unary and binary operators UNARY_OPS = {'sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin'} BINARY_OPS = {'+', '-', '*', '/', '**', '^'} stack = [] # Process tokens in reverse order for token in reversed(tokens): if token in BINARY_OPS or token in UNARY_OPS: # Operator: pop operands from stack if token in UNARY_OPS: if len(stack) < 1: raise ValueError(f"Not enough operands for {token}") arg = stack.pop() if token in ['sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin']: stack.append(f"{token}({arg})") else: raise ValueError(f"Unknown unary operator: {token}") else: # Binary operator if len(stack) < 2: raise ValueError(f"Not enough operands for {token}") right = stack.pop() left = stack.pop() # Handle operator mapping op_map = {'+': '+', '-': '-', '*': '*', '/': '/', '**': '**', '^': '**'} op = op_map.get(token, token) if op in ['**', '^']: stack.append(f"({left})**({right})") elif op == '/': stack.append(f"({left})/({right})") else: stack.append(f"({left}){op}({right})") else: # Operand: push to stack stack.append(token) if len(stack) != 1: raise ValueError(f"Invalid prefix expression, {len(stack)} elements remaining") return sympy.sympify(stack[0], evaluate=False) def __init__(self, expression, is_prefix=False): try: self.original_expression = expression # Save original if is_prefix: # Ensure input prefix uses '**' if converting from external source tokens = expression.replace('^', '**').split() self.sympy_expression = self.parse_prefix(tokens) else: # Load the expression as a sympy expression without simplification self.sympy_expression = sympy.sympify(expression, evaluate=False) except Exception as e: raise ValueError(f"Failed to parse expression: {e}") self.max_var = 0 for symbol in self.sympy_expression.free_symbols: if symbol.name.startswith('x_'): try: index = int(symbol.name.split('_')[1]) self.max_var = max(self.max_var, index) except ValueError: # Handle symbols that look like x_ but aren't x_number pass # Or raise ValueError(f"Invalid variable name: {symbol.name}") if strict computable_expression = str(self.sympy_expression) for i in range(1, self.max_var + 1): # Use regex to match whole words to avoid issues with x_1 followed by x_11 computable_expression = re.sub(rf'\bx_{i}\b', f'x[{i-1}]', computable_expression) self.computable_expression = computable_expression.replace('**C', '**2') self.constant_count = self.computable_expression.count('C') self.best_constants = [1.0] * self.constant_count if self.constant_count > 0: # Replace 'C' with indexable constants split_expr = self.computable_expression.split('C') new_expr = split_expr[0] # Start with first part for i in range(1, len(split_expr)): # Add constant reference new_expr += f'constants[{i-1}]' # Add next part new_expr += split_expr[i] self.computable_expression = new_expr def __str__(self): return f"Expression: {self.original_expression}, Best constants: {self.best_constants}" def sympy_str(self): """ Returns the string representation of the sympy expression. """ return str(self.sympy_expression) def is_valid_on_dataset(self, X, test_constants_list=None): """ Checks if the expression evaluates to valid (finite) values for all rows in X, across one or more sets of test constants. Args: X (np.ndarray): Input data, shape (n_samples, n_features) test_constants_list (list of lists): Optional. Defaults to [[1.0]*count]. Example: [[1.0]*n, [0.5]*n, [2.0]*n] to test more thoroughly. Returns: bool: True if no evaluation returns nan/inf or crashes. False otherwise. """ if test_constants_list is None: test_constants_list = [[1.0] * self.constant_count] try: for constants in test_constants_list: results = self.evaluate(X, constants) if not np.all(np.isfinite(results)): return False return True except Exception: return False # Inside the Expression class def evaluate(self, X, constants=None): # with warnings.catch_warnings(): # warnings.simplefilter("ignore", category=RuntimeWarning) # Hide power/tan warnings # np.seterr(invalid='ignore', divide='ignore') if constants is None: # print("No constants provided, using best constants.") # Optional: uncomment for debugging constants = self.best_constants try: local_env = { "constants": np.array(constants), # Ensure constants is a numpy array for broadcasting **self.SAFE_FUNCTIONS, "__builtins__": None } if not isinstance(X, np.ndarray): X = np.array(X) # Ensure X is a numpy array # Ensure X is 2D, even if it has only one sample if X.ndim == 1: X = X.reshape(1, -1) # x becomes a list of columns (1D arrays of shape (n_samples,)) x_cols = [X[:, i] for i in range(X.shape[1])] local_env["x"] = x_cols # The result will be a numpy array of shape (n_samples,) try: y_pred_array = eval(self.computable_expression, local_env) except FloatingPointError as e: # print(f"FloatingPointError during eval: {e}") # print(f"Expression: {self.computable_expression}") # print(f"Constants: {constants}") return np.full(X.shape[0], np.nan) # Return NaNs to be caught by loss except Exception as e: # print(f"General exception during eval: {e}") return np.full(X.shape[0], np.nan) finally: np.seterr(all='warn') # 🔁 Reset to default behavior # Ensure output is float to avoid issues with mixed types if some results are int return np.asarray(y_pred_array, dtype=float) except Exception as e: # Return an array of NaNs of the expected shape to ensure loss calculation doesn't break num_samples = X.shape[0] if X.ndim > 0 else 1 return np.full(num_samples, np.nan) # Return NaNs on error def fit_constants(self, X, y): X = np.array(X) y = np.array(y) if self.constant_count == 0: try: y_pred = self.evaluate(X) # Vectorized call if not np.all(np.isfinite(y_pred)): # Check for NaNs/Infs return -np.inf if np.all(y_pred == y_pred[0]) and len(np.unique(y)) > 1: # Avoid R2 issues with constant prediction for non-constant y return 0.0 # Or handle as per specific requirements return r2_score(y, y_pred) except Exception as e: # Broader catch for any eval issue return -np.inf def loss(current_constants): try: y_pred = self.evaluate(X, current_constants) except Exception as e: print(f"Exception during evaluation: {e}") return np.inf if not np.all(np.isfinite(y_pred)): return np.inf # MSE calculation mse = np.mean((y - y_pred) ** 2) return mse bounds = [(-2., 2.)] * self.constant_count initial_guess = ( self.best_constants if self.best_constants and len(self.best_constants) == self.constant_count else [.0] * self.constant_count # Default to 1.0 ) # Ensure initial_guess is a flat numpy array initial_guess = np.array(initial_guess, dtype=float).flatten() # from scipy.optimize import differential_evolution # # Step 1: Use Differential Evolution for global exploration # print("\n--- Starting Differential Evolution ---") # result_de = differential_evolution(loss, bounds, # popsize=70, # Aumente para 50, 70, ou mais # maxiter=10000, # Aumente para 5000, 10000, ou mais # strategy='rand1bin', # Tente 'rand1exp' se rand1bin não funcionar # tol=1e-7, # Tolerância mais apertada # mutation=(0.8, 1.2), # Experimente valores mais altos # recombination=0.5, # Experimente valores mais baixos # seed=42, # Mantém a reproducibilidade # disp=True, # Exibe o progresso # polish=False) # if result_de.success: # print(f"\nDifferential Evolution finished successfully. Best raw constants: {result_de.x}, Best MSE: {result_de.fun}") # # Use the result from DE as initial guess for local optimizer # initial_guess_for_minimize = result_de.x # # Step 2: (Optional but recommended) Refine with L-BFGS-B # # L-BFGS-B will be applied to the "raw" (non-rounded) values, # # but the loss function internally rounds for discrete ones. # # It might still struggle if the function is too "stepped" from rounding. # print("\n--- Starting L-BFGS-B refinement ---") # result_min = minimize(loss, # x0=initial_guess_for_minimize, # method='L-BFGS-B', # bounds=bounds, # options={'maxiter': 500, 'ftol': 1e-9, 'disp': True} # More iterations, tighter tolerance # ) # if result_min.success: # print(f"\nL-BFGS-B refinement successful. Final raw constants: {result_min.x}, Final MSE: {result_min.fun}") # self.best_constants = list(result_min.x) # else: # print(f"\nL-BFGS-B refinement failed: {result_min.message}. Using Differential Evolution's result.") # self.best_constants = list(result_de.x) # else: # print(f"\nDifferential Evolution did not converge successfully: {result_de.message}. Cannot proceed with optimization.") # return -np.inf # Indicate failure # try: # y_pred = self.evaluate(X) # if not np.all(np.isfinite(y_pred)): # print("Final evaluation produced non-finite values for R2 score.") # return -np.inf # if len(np.unique(y)) == 1: # if np.allclose(y_pred, y[0]): # return 1.0 # else: # return 0.0 # return r2_score(y, y_pred) # except Exception as e: # print(f"Error calculating final R2: {e}") # return -np.inf result = minimize(loss, x0=initial_guess, method='L-BFGS-B', bounds=bounds, #options={'maxiter': 10, 'maxfun': 10, 'disp': True} ) if result.success: self.best_constants = result.x.tolist() # print(f"Optimization successful. Final loss: {result.fun}") # Optional try: y_pred = self.evaluate(X) # Uses self.best_constants (vectorized) if not np.all(np.isfinite(y_pred)): return -np.inf # Refined R2 calculation for edge cases if len(np.unique(y)) == 1: # If y is constant if np.allclose(y_pred, y[0]): return 1.0 # Perfect prediction of a constant else: return 0.0 # Or some other metric for imperfect constant prediction #return mean_squared_error(y, y_pred) # Use MSE for optimization #return mean_absolute_error(y, y_pred) # Use MAE for robustness return r2_score(y, y_pred) except Exception as e: return -np.inf else: return -np.inf # from dataset import RegressionDataset # import numpy as np # import warnings # with warnings.catch_warnings(): # warnings.simplefilter("ignore", category=RuntimeWarning) # np.seterr(invalid='ignore') # #reg = RegressionDataset('../data/evaluate/srsd-feynman_hard/train', 'feynman-bonus.12.txt', delimiter=' ') # reg = RegressionDataset('./data/evaluate/srsd-feynman_easy/train', 'feynman-i.18.16.txt', delimiter=' ') # X, y = reg.get_numpy() # #x = np.array(X).T # expression = "x_1*x_2*sin(x_4)" # #expr = "0.5*x[0]*x[1]**2" # expr = Expression(expression) # print("Expression:", expr) # if expr.is_valid_on_dataset(X): # print("Expression is valid on dataset.") # score = expr.fit_constants(X, y) # print("Fitted constants:", expr.best_constants) # print("R2 score:", score) # else: # print("Expression is not valid on dataset.")