|
|
import sympy |
|
|
import numpy as np |
|
|
from sklearn.metrics import r2_score, mean_squared_error |
|
|
from sklearn.metrics import mean_absolute_error |
|
|
from scipy.optimize import minimize |
|
|
import math |
|
|
import re |
|
|
|
|
|
|
|
|
class Expression: |
|
|
SAFE_FUNCTIONS = { |
|
|
'sqrt': np.sqrt, |
|
|
'log': np.log, |
|
|
'exp': np.exp, |
|
|
'sin': np.sin, |
|
|
'cos': np.cos, |
|
|
'tan': np.tan, |
|
|
'asin': np.arcsin, |
|
|
'abs': np.abs, |
|
|
'pow': np.power, |
|
|
|
|
|
} |
|
|
|
|
|
OPERATOR_ARITY = { |
|
|
'+': 2, |
|
|
'-': 2, |
|
|
'*': 2, |
|
|
'/': 2, |
|
|
'**': 2, |
|
|
'sin': 1, |
|
|
'cos': 1, |
|
|
'tan': 1, |
|
|
'log': 1, |
|
|
'sqrt': 1, |
|
|
'exp': 1 |
|
|
} |
|
|
|
|
|
OPERATOR_FUNCS = { |
|
|
'+': sympy.Add, |
|
|
'-': lambda x, y: x - y, |
|
|
'*': sympy.Mul, |
|
|
'/': lambda x, y: x / y, |
|
|
'**': sympy.Pow, |
|
|
'sin': sympy.sin, |
|
|
'cos': sympy.cos, |
|
|
'tan': sympy.tan, |
|
|
'log': sympy.log, |
|
|
'sqrt': sympy.sqrt, |
|
|
'exp': sympy.exp |
|
|
} |
|
|
|
|
|
def parse_prefix(self, tokens): |
|
|
"""Parse prefix notation expression to SymPy. |
|
|
|
|
|
Example: ['*', 'x_1', '+', 'x_2', 'C'] -> x_1*(x_2 + C) |
|
|
""" |
|
|
if not tokens: |
|
|
raise ValueError("Empty token list") |
|
|
|
|
|
|
|
|
UNARY_OPS = {'sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin'} |
|
|
BINARY_OPS = {'+', '-', '*', '/', '**', '^'} |
|
|
|
|
|
stack = [] |
|
|
|
|
|
|
|
|
for token in reversed(tokens): |
|
|
if token in BINARY_OPS or token in UNARY_OPS: |
|
|
|
|
|
if token in UNARY_OPS: |
|
|
if len(stack) < 1: |
|
|
raise ValueError(f"Not enough operands for {token}") |
|
|
arg = stack.pop() |
|
|
if token in ['sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin']: |
|
|
stack.append(f"{token}({arg})") |
|
|
else: |
|
|
raise ValueError(f"Unknown unary operator: {token}") |
|
|
else: |
|
|
if len(stack) < 2: |
|
|
raise ValueError(f"Not enough operands for {token}") |
|
|
right = stack.pop() |
|
|
left = stack.pop() |
|
|
|
|
|
|
|
|
op_map = {'+': '+', '-': '-', '*': '*', '/': '/', '**': '**', '^': '**'} |
|
|
op = op_map.get(token, token) |
|
|
|
|
|
if op in ['**', '^']: |
|
|
stack.append(f"({left})**({right})") |
|
|
elif op == '/': |
|
|
stack.append(f"({left})/({right})") |
|
|
else: |
|
|
stack.append(f"({left}){op}({right})") |
|
|
else: |
|
|
|
|
|
stack.append(token) |
|
|
|
|
|
if len(stack) != 1: |
|
|
raise ValueError(f"Invalid prefix expression, {len(stack)} elements remaining") |
|
|
|
|
|
return sympy.sympify(stack[0], evaluate=False) |
|
|
|
|
|
def __init__(self, expression, is_prefix=False): |
|
|
try: |
|
|
self.original_expression = expression |
|
|
|
|
|
if is_prefix: |
|
|
|
|
|
tokens = expression.replace('^', '**').split() |
|
|
self.sympy_expression = self.parse_prefix(tokens) |
|
|
else: |
|
|
|
|
|
self.sympy_expression = sympy.sympify(expression, evaluate=False) |
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to parse expression: {e}") |
|
|
|
|
|
self.max_var = 0 |
|
|
for symbol in self.sympy_expression.free_symbols: |
|
|
if symbol.name.startswith('x_'): |
|
|
try: |
|
|
index = int(symbol.name.split('_')[1]) |
|
|
self.max_var = max(self.max_var, index) |
|
|
except ValueError: |
|
|
|
|
|
pass |
|
|
|
|
|
computable_expression = str(self.sympy_expression) |
|
|
|
|
|
for i in range(1, self.max_var + 1): |
|
|
|
|
|
computable_expression = re.sub(rf'\bx_{i}\b', f'x[{i-1}]', computable_expression) |
|
|
|
|
|
|
|
|
self.computable_expression = computable_expression.replace('**C', '**2') |
|
|
|
|
|
self.constant_count = self.computable_expression.count('C') |
|
|
self.best_constants = [1.0] * self.constant_count |
|
|
|
|
|
|
|
|
if self.constant_count > 0: |
|
|
|
|
|
split_expr = self.computable_expression.split('C') |
|
|
new_expr = split_expr[0] |
|
|
|
|
|
for i in range(1, len(split_expr)): |
|
|
|
|
|
new_expr += f'constants[{i-1}]' |
|
|
|
|
|
new_expr += split_expr[i] |
|
|
|
|
|
self.computable_expression = new_expr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __str__(self): |
|
|
return f"Expression: {self.original_expression}, Best constants: {self.best_constants}" |
|
|
def sympy_str(self): |
|
|
""" |
|
|
Returns the string representation of the sympy expression. |
|
|
""" |
|
|
return str(self.sympy_expression) |
|
|
|
|
|
def is_valid_on_dataset(self, X, test_constants_list=None): |
|
|
""" |
|
|
Checks if the expression evaluates to valid (finite) values for all rows in X, |
|
|
across one or more sets of test constants. |
|
|
|
|
|
Args: |
|
|
X (np.ndarray): Input data, shape (n_samples, n_features) |
|
|
test_constants_list (list of lists): Optional. Defaults to [[1.0]*count]. |
|
|
Example: [[1.0]*n, [0.5]*n, [2.0]*n] to test more thoroughly. |
|
|
|
|
|
Returns: |
|
|
bool: True if no evaluation returns nan/inf or crashes. False otherwise. |
|
|
""" |
|
|
if test_constants_list is None: |
|
|
test_constants_list = [[1.0] * self.constant_count] |
|
|
|
|
|
try: |
|
|
for constants in test_constants_list: |
|
|
results = self.evaluate(X, constants) |
|
|
|
|
|
if not np.all(np.isfinite(results)): |
|
|
return False |
|
|
|
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def evaluate(self, X, constants=None): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if constants is None: |
|
|
|
|
|
constants = self.best_constants |
|
|
|
|
|
try: |
|
|
local_env = { |
|
|
"constants": np.array(constants), |
|
|
**self.SAFE_FUNCTIONS, |
|
|
"__builtins__": None |
|
|
} |
|
|
|
|
|
if not isinstance(X, np.ndarray): |
|
|
X = np.array(X) |
|
|
|
|
|
|
|
|
if X.ndim == 1: |
|
|
X = X.reshape(1, -1) |
|
|
|
|
|
|
|
|
x_cols = [X[:, i] for i in range(X.shape[1])] |
|
|
local_env["x"] = x_cols |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
y_pred_array = eval(self.computable_expression, local_env) |
|
|
|
|
|
except FloatingPointError as e: |
|
|
|
|
|
|
|
|
|
|
|
return np.full(X.shape[0], np.nan) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
return np.full(X.shape[0], np.nan) |
|
|
|
|
|
finally: |
|
|
np.seterr(all='warn') |
|
|
|
|
|
|
|
|
return np.asarray(y_pred_array, dtype=float) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
num_samples = X.shape[0] if X.ndim > 0 else 1 |
|
|
return np.full(num_samples, np.nan) |
|
|
|
|
|
def fit_constants(self, X, y): |
|
|
X = np.array(X) |
|
|
y = np.array(y) |
|
|
|
|
|
if self.constant_count == 0: |
|
|
try: |
|
|
y_pred = self.evaluate(X) |
|
|
if not np.all(np.isfinite(y_pred)): |
|
|
return -np.inf |
|
|
if np.all(y_pred == y_pred[0]) and len(np.unique(y)) > 1: |
|
|
return 0.0 |
|
|
return r2_score(y, y_pred) |
|
|
except Exception as e: |
|
|
return -np.inf |
|
|
|
|
|
def loss(current_constants): |
|
|
|
|
|
try: |
|
|
y_pred = self.evaluate(X, current_constants) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Exception during evaluation: {e}") |
|
|
return np.inf |
|
|
|
|
|
if not np.all(np.isfinite(y_pred)): |
|
|
return np.inf |
|
|
|
|
|
|
|
|
mse = np.mean((y - y_pred) ** 2) |
|
|
|
|
|
return mse |
|
|
|
|
|
bounds = [(-2., 2.)] * self.constant_count |
|
|
|
|
|
initial_guess = ( |
|
|
self.best_constants |
|
|
if self.best_constants and len(self.best_constants) == self.constant_count |
|
|
else [.0] * self.constant_count |
|
|
) |
|
|
|
|
|
|
|
|
initial_guess = np.array(initial_guess, dtype=float).flatten() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = minimize(loss, |
|
|
x0=initial_guess, |
|
|
method='L-BFGS-B', |
|
|
bounds=bounds, |
|
|
|
|
|
) |
|
|
|
|
|
if result.success: |
|
|
self.best_constants = result.x.tolist() |
|
|
|
|
|
try: |
|
|
y_pred = self.evaluate(X) |
|
|
if not np.all(np.isfinite(y_pred)): |
|
|
return -np.inf |
|
|
|
|
|
if len(np.unique(y)) == 1: |
|
|
if np.allclose(y_pred, y[0]): |
|
|
return 1.0 |
|
|
else: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
return r2_score(y, y_pred) |
|
|
except Exception as e: |
|
|
return -np.inf |
|
|
else: |
|
|
return -np.inf |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|