optimization / backend /src /optimization_manager.py
joel-woodfield's picture
Do not reset trajectory when plot has relayout
cda1627
import numpy as np
from sympy import (
lambdify,
symbols,
sin,
cos,
tan,
asin,
acos,
atan,
exp,
log,
sqrt,
pi,
Abs,
)
from sympy.parsing.sympy_parser import (
standard_transformations,
implicit_multiplication_application,
convert_xor,
parse_expr,
)
from optimization_logic import *
class OptimizationManager:
def __init__(self):
self.function_values = {"x": [], "y": []}
self.trajectory_values = {"x": [], "y": []}
self.settings = {}
def handle_update_settings(self, new_settings) -> dict[str, dict] | None:
if new_settings == self.settings:
return None
non_relayout_settings = (new_settings.keys() | self.settings.keys()) - {"xlim", "ylim"}
non_relayout_settings_changed = any(
new_settings.get(k) != self.settings.get(k)
for k in non_relayout_settings
)
self.settings = new_settings
function = new_settings.get("functionExpr", "").strip()
mode = new_settings.get("mode", "").lower().strip()
xlim = new_settings.get("xlim", [])
ylim = new_settings.get("ylim", [])
if not self._is_valid_function(function, mode, xlim, ylim):
return {
"trajectoryValues": {"x": [], "y": []},
"functionValues": {"x": [], "y": []},
}
if non_relayout_settings_changed:
self._reset_trajectory()
if not self._function_changed(function, mode):
return {
"trajectoryValues": self.trajectory_values,
}
try:
self._compute_function_values(function, mode, xlim, ylim)
except Exception as e:
self.function_values = {"x": [], "y": []}
self.trajectory_values = {"x": [], "y": []}
return {
"functionValues": self.function_values,
"trajectoryValues": self.trajectory_values,
}
def handle_reset(self) -> dict[str, list]:
self._reset_trajectory()
return {
"trajectoryValues": self.trajectory_values,
}
def handle_next_step(self) -> dict[str, list]:
current_steps = len(self.trajectory_values["x"])
self._compute_trajectory_values(self.settings, current_steps + 1)
return {
"trajectoryValues": self.trajectory_values,
}
def handle_prev_step(self) -> dict[str, list]:
current_steps = len(self.trajectory_values["x"])
if current_steps > 1:
self._compute_trajectory_values(self.settings, current_steps - 1)
return {
"trajectoryValues": self.trajectory_values,
}
def handle_play(self) -> dict[str, list]:
pass
def handle_pause(self) -> dict[str, list]:
pass
def _is_valid_function(
self, function: str, mode: str, xlim: list, ylim: list
) -> bool:
# axis limit checks
if len(xlim) != 2 or len(ylim) != 2:
return False
if xlim[0] >= xlim[1] or ylim[0] >= ylim[1]:
return False
# function expression check
try:
expr = self._parse_function(function)
except Exception as e:
return False
if mode == "univariate" and symbols("y") in expr.free_symbols:
return False
return True
def _function_changed(self, function: str, mode: str) -> bool:
function = function.strip()
previous_function = self.settings.get("function", "").strip()
previous_mode = self.settings.get("mode", "")
return function != previous_function or mode != previous_mode
def _reset_trajectory(self) -> None:
try:
self._compute_trajectory_values(self.settings, steps=1)
except Exception as e:
self.trajectory_values = {"x": [], "y": []}
def _parse_function(self, function: str) -> Expr:
if not function.strip():
raise ValueError("Function expression cannot be empty")
x, y = symbols("x y")
allowed_locals = {
'x': x,
'y': y,
'sin': sin,
'cos': cos,
'tan': tan,
'asin': asin,
'acos': acos,
'atan': atan,
'log': log,
'ln': log,
'sqrt': sqrt,
'abs': Abs,
'exp': exp,
'e': exp(1),
'pi': pi,
'π': pi,
}
try:
parsed_function = parse_expr(
function,
local_dict=allowed_locals,
transformations=standard_transformations + (
implicit_multiplication_application,
convert_xor,
),
evaluate=True,
)
except Exception as e:
raise ValueError(f"Invalid function expression: {e}")
unknown_symbols = parsed_function.free_symbols - {x, y}
if unknown_symbols:
unknown_names = ", ".join(sorted(str(s) for s in unknown_symbols))
raise ValueError(f"Unknown variable(s): {unknown_names}. Allowed: x, y")
return parsed_function
def _compute_function_values(self, function: str, mode: str, xlim: list, ylim: list) -> None:
expr = self._parse_function(function)
if mode == "univariate":
x = np.linspace(xlim[0], xlim[1], 100)
f = lambdify('x', expr, modules=['numpy'])
y = f(x)
if not isinstance(y, np.ndarray):
y = np.full_like(x, y)
self.function_values = {
"x": x.tolist(),
"y": y.tolist(),
}
elif mode == "bivariate":
x = np.linspace(xlim[0], xlim[1], 100)
y = np.linspace(ylim[0], ylim[1], 100)
X, Y = np.meshgrid(x, y)
f = lambdify(('x', 'y'), expr, modules=['numpy'])
Z = f(X, Y)
if not isinstance(Z, np.ndarray):
Z = np.full_like(X, Z)
self.function_values = {
"x": x.tolist(),
"y": y.tolist(),
"z": Z.tolist(),
}
else:
raise ValueError("Unsupported mode")
def _compute_trajectory_values(self, settings: dict, steps: int) -> None:
mode = settings.get("mode", "").lower().strip()
algorithm = settings.get("algorithm", "").lower().strip().replace(" ", "_")
function = self._parse_function(settings.get("functionExpr", "").strip())
if mode == "univariate":
if algorithm == "gradient_descent":
self.trajectory_values = gd_univariate(
function,
float(settings["x0"]),
float(settings["learningRate"]),
float(settings["momentum"]),
steps,
)
elif algorithm == "nesterov":
self.trajectory_values = nesterov_univariate(
function,
float(settings["x0"]),
float(settings["learningRate"]),
float(settings["momentum"]),
steps,
)
elif algorithm == "adam":
self.trajectory_values = adam_univariate(
function,
float(settings["x0"]),
float(settings["learningRate"]),
float(settings["beta1"]),
float(settings["beta2"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "adagrad":
self.trajectory_values = adagrad_univariate(
function,
float(settings["x0"]),
float(settings["learningRate"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "rmsprop":
self.trajectory_values = rmsprop_univariate(
function,
float(settings["x0"]),
float(settings["learningRate"]),
float(settings["beta"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "adadelta":
self.trajectory_values = adadelta_univariate(
function,
float(settings["x0"]),
float(settings["beta"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "newton":
self.trajectory_values = newton_univariate(
function,
float(settings["x0"]),
steps,
)
else:
raise ValueError("Unsupported algorithm for univariate mode")
elif mode == "bivariate":
if algorithm == "gradient_descent":
self.trajectory_values = gd_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["learningRate"]),
float(settings["momentum"]),
steps,
)
elif algorithm == "nesterov":
self.trajectory_values = nesterov_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["learningRate"]),
float(settings["momentum"]),
steps,
)
elif algorithm == "adam":
self.trajectory_values = adam_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["learningRate"]),
float(settings["beta1"]),
float(settings["beta2"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "adagrad":
self.trajectory_values = adagrad_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["learningRate"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "rmsprop":
self.trajectory_values = rmsprop_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["learningRate"]),
float(settings["beta"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "adadelta":
self.trajectory_values = adadelta_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
float(settings["beta"]),
float(settings["epsilon"]),
steps,
)
elif algorithm == "newton":
self.trajectory_values = newton_bivariate(
function,
float(settings["x0"]),
float(settings["y0"]),
steps,
)
else:
raise ValueError("Unsupported algorithm for bivariate mode")