| | import sympy |
| | import numpy as np |
| | from sklearn.metrics import r2_score, mean_squared_error |
| | from sklearn.metrics import mean_absolute_error |
| | from scipy.optimize import minimize |
| | import math |
| | import re |
| |
|
| | |
| | class Expression: |
| | SAFE_FUNCTIONS = { |
| | 'sqrt': np.sqrt, |
| | 'log': np.log, |
| | 'exp': np.exp, |
| | 'sin': np.sin, |
| | 'cos': np.cos, |
| | 'tan': np.tan, |
| | 'asin': np.arcsin, |
| | 'abs': np.abs, |
| | 'pow': np.power, |
| | |
| | } |
| |
|
| | OPERATOR_ARITY = { |
| | '+': 2, |
| | '-': 2, |
| | '*': 2, |
| | '/': 2, |
| | '**': 2, |
| | 'sin': 1, |
| | 'cos': 1, |
| | 'tan': 1, |
| | 'log': 1, |
| | 'sqrt': 1, |
| | 'exp': 1 |
| | } |
| |
|
| | OPERATOR_FUNCS = { |
| | '+': sympy.Add, |
| | '-': lambda x, y: x - y, |
| | '*': sympy.Mul, |
| | '/': lambda x, y: x / y, |
| | '**': sympy.Pow, |
| | 'sin': sympy.sin, |
| | 'cos': sympy.cos, |
| | 'tan': sympy.tan, |
| | 'log': sympy.log, |
| | 'sqrt': sympy.sqrt, |
| | 'exp': sympy.exp |
| | } |
| |
|
| | def parse_prefix(self, tokens): |
| | """Parse prefix notation expression to SymPy. |
| | |
| | Example: ['*', 'x_1', '+', 'x_2', 'C'] -> x_1*(x_2 + C) |
| | """ |
| | if not tokens: |
| | raise ValueError("Empty token list") |
| |
|
| | |
| | UNARY_OPS = {'sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin'} |
| | BINARY_OPS = {'+', '-', '*', '/', '**', '^'} |
| |
|
| | stack = [] |
| |
|
| | |
| | for token in reversed(tokens): |
| | if token in BINARY_OPS or token in UNARY_OPS: |
| | |
| | if token in UNARY_OPS: |
| | if len(stack) < 1: |
| | raise ValueError(f"Not enough operands for {token}") |
| | arg = stack.pop() |
| | if token in ['sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin']: |
| | stack.append(f"{token}({arg})") |
| | else: |
| | raise ValueError(f"Unknown unary operator: {token}") |
| | else: |
| | if len(stack) < 2: |
| | raise ValueError(f"Not enough operands for {token}") |
| | right = stack.pop() |
| | left = stack.pop() |
| |
|
| | |
| | op_map = {'+': '+', '-': '-', '*': '*', '/': '/', '**': '**', '^': '**'} |
| | op = op_map.get(token, token) |
| |
|
| | if op in ['**', '^']: |
| | stack.append(f"({left})**({right})") |
| | elif op == '/': |
| | stack.append(f"({left})/({right})") |
| | else: |
| | stack.append(f"({left}){op}({right})") |
| | else: |
| | |
| | stack.append(token) |
| |
|
| | if len(stack) != 1: |
| | raise ValueError(f"Invalid prefix expression, {len(stack)} elements remaining") |
| |
|
| | return sympy.sympify(stack[0], evaluate=False) |
| |
|
| | def __init__(self, expression, is_prefix=False): |
| | try: |
| | self.original_expression = expression |
| |
|
| | if is_prefix: |
| | |
| | tokens = expression.replace('^', '**').split() |
| | self.sympy_expression = self.parse_prefix(tokens) |
| | else: |
| | |
| | self.sympy_expression = sympy.sympify(expression, evaluate=False) |
| | except Exception as e: |
| | raise ValueError(f"Failed to parse expression: {e}") |
| |
|
| | self.max_var = 0 |
| | for symbol in self.sympy_expression.free_symbols: |
| | if symbol.name.startswith('x_'): |
| | try: |
| | index = int(symbol.name.split('_')[1]) |
| | self.max_var = max(self.max_var, index) |
| | except ValueError: |
| | |
| | pass |
| | |
| | computable_expression = str(self.sympy_expression) |
| |
|
| | for i in range(1, self.max_var + 1): |
| | |
| | computable_expression = re.sub(rf'\bx_{i}\b', f'x[{i-1}]', computable_expression) |
| | |
| |
|
| | self.computable_expression = computable_expression.replace('**C', '**2') |
| | |
| | self.constant_count = self.computable_expression.count('C') |
| | self.best_constants = [1.0] * self.constant_count |
| |
|
| |
|
| | if self.constant_count > 0: |
| | |
| | split_expr = self.computable_expression.split('C') |
| | new_expr = split_expr[0] |
| |
|
| | for i in range(1, len(split_expr)): |
| | |
| | new_expr += f'constants[{i-1}]' |
| | |
| | new_expr += split_expr[i] |
| |
|
| | self.computable_expression = new_expr |
| | |
| |
|
| |
|
| | |
| |
|
| | def __str__(self): |
| | return f"Expression: {self.original_expression}, Best constants: {self.best_constants}" |
| | def sympy_str(self): |
| | """ |
| | Returns the string representation of the sympy expression. |
| | """ |
| | return str(self.sympy_expression) |
| | |
| | def is_valid_on_dataset(self, X, test_constants_list=None): |
| | """ |
| | Checks if the expression evaluates to valid (finite) values for all rows in X, |
| | across one or more sets of test constants. |
| | |
| | Args: |
| | X (np.ndarray): Input data, shape (n_samples, n_features) |
| | test_constants_list (list of lists): Optional. Defaults to [[1.0]*count]. |
| | Example: [[1.0]*n, [0.5]*n, [2.0]*n] to test more thoroughly. |
| | |
| | Returns: |
| | bool: True if no evaluation returns nan/inf or crashes. False otherwise. |
| | """ |
| | if test_constants_list is None: |
| | test_constants_list = [[1.0] * self.constant_count] |
| | |
| | try: |
| | for constants in test_constants_list: |
| | results = self.evaluate(X, constants) |
| | |
| | if not np.all(np.isfinite(results)): |
| | return False |
| | |
| | return True |
| | except Exception: |
| | return False |
| |
|
| | |
| | def evaluate(self, X, constants=None): |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | if constants is None: |
| | |
| | constants = self.best_constants |
| |
|
| | try: |
| | local_env = { |
| | "constants": np.array(constants), |
| | **self.SAFE_FUNCTIONS, |
| | "__builtins__": None |
| | } |
| |
|
| | if not isinstance(X, np.ndarray): |
| | X = np.array(X) |
| | |
| | |
| | if X.ndim == 1: |
| | X = X.reshape(1, -1) |
| |
|
| | |
| | x_cols = [X[:, i] for i in range(X.shape[1])] |
| | local_env["x"] = x_cols |
| | |
| | |
| | |
| | try: |
| | y_pred_array = eval(self.computable_expression, local_env) |
| |
|
| | except FloatingPointError as e: |
| | |
| | |
| | |
| | return np.full(X.shape[0], np.nan) |
| |
|
| | except Exception as e: |
| | |
| | return np.full(X.shape[0], np.nan) |
| |
|
| | finally: |
| | np.seterr(all='warn') |
| |
|
| | |
| | return np.asarray(y_pred_array, dtype=float) |
| |
|
| | except Exception as e: |
| | |
| | num_samples = X.shape[0] if X.ndim > 0 else 1 |
| | return np.full(num_samples, np.nan) |
| |
|
| | def fit_constants(self, X, y): |
| | X = np.array(X) |
| | y = np.array(y) |
| |
|
| | if self.constant_count == 0: |
| | try: |
| | y_pred = self.evaluate(X) |
| | if not np.all(np.isfinite(y_pred)): |
| | return -np.inf |
| | if np.all(y_pred == y_pred[0]) and len(np.unique(y)) > 1: |
| | return 0.0 |
| | return r2_score(y, y_pred) |
| | except Exception as e: |
| | return -np.inf |
| |
|
| | def loss(current_constants): |
| |
|
| | try: |
| | y_pred = self.evaluate(X, current_constants) |
| | |
| | except Exception as e: |
| | print(f"Exception during evaluation: {e}") |
| | return np.inf |
| | |
| | if not np.all(np.isfinite(y_pred)): |
| | return np.inf |
| | |
| | |
| | mse = np.mean((y - y_pred) ** 2) |
| | |
| | return mse |
| |
|
| | bounds = [(-2., 2.)] * self.constant_count |
| | |
| | initial_guess = ( |
| | self.best_constants |
| | if self.best_constants and len(self.best_constants) == self.constant_count |
| | else [.0] * self.constant_count |
| | ) |
| |
|
| | |
| | initial_guess = np.array(initial_guess, dtype=float).flatten() |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | result = minimize(loss, |
| | x0=initial_guess, |
| | method='L-BFGS-B', |
| | bounds=bounds, |
| | |
| | ) |
| |
|
| | if result.success: |
| | self.best_constants = result.x.tolist() |
| | |
| | try: |
| | y_pred = self.evaluate(X) |
| | if not np.all(np.isfinite(y_pred)): |
| | return -np.inf |
| | |
| | if len(np.unique(y)) == 1: |
| | if np.allclose(y_pred, y[0]): |
| | return 1.0 |
| | else: |
| | return 0.0 |
| | |
| | |
| | return r2_score(y, y_pred) |
| | except Exception as e: |
| | return -np.inf |
| | else: |
| | return -np.inf |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |