from dataclasses import dataclass from typing import Literal import numpy as np from sympy import Expr, lambdify import torch import torch.nn as nn import torch.optim as optim SUPPORTED_ACTIVATIONS = { "relu", "sigmoid", "tanh", "linear", "leaky_relu", "elu", "gelu", "identity", } OUTPUT_LAYER_STRING = "[output_units: 1]" @dataclass class DataGenerationOptions: method: Literal["Grid", "Random"] num_samples: int noise: float = 0. @dataclass class Dataset: x: list[float] y: list[float] @dataclass class PlotData: dataset: Dataset test_dataset: Dataset test_predictions: list[float] | None def generate_dataset( function: Expr, xlim: tuple[float, float], generation_options: DataGenerationOptions, ) -> Dataset: f = lambdify("x", function, modules='numpy') if generation_options.method == 'Grid': x = np.linspace(xlim[0], xlim[1], generation_options.num_samples) elif generation_options.method == 'Random': x = np.random.uniform(xlim[0], xlim[1], generation_options.num_samples) else: raise ValueError(f"Unknown generation method: {generation_options.method}") y = f(x) if generation_options.noise > 0: y += np.random.normal(0, generation_options.noise, size=y.shape) return Dataset(x=x.tolist(), y=y.tolist()) def load_dataset_from_csv( file_path: str, header: bool, x_col: int, y_col: int ) -> Dataset: data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0) data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values x = data[:, x_col].tolist() y = data[:, y_col].tolist() return Dataset(x=x, y=y) def _parse_architecture_string(architecture_string: str) -> tuple[list[int], list[str]]: lines = architecture_string.strip().split("\n") hidden_units = [] activations = [] for line in lines: line = line.strip().lower() if line == OUTPUT_LAYER_STRING: continue parts = line.strip("[]").split(",") units = None activation = None for part in parts: key, value = part.split(":") key = key.strip() value = value.strip() if key == "units": if value.isdigit() and int(value) > 0: units = int(value) else: raise ValueError(f"Invalid number of units: {value}") elif key == "activation": if value in SUPPORTED_ACTIVATIONS: activation = value else: raise ValueError(f"Unsupported activation: {value}") else: raise ValueError(f"Unknown key in architecture string: {key}") hidden_units.append(units) activations.append(activation) return hidden_units, activations def build_model_from_architecture(architecture_str: str) -> nn.Module: hidden_units, activations = _parse_architecture_string(architecture_str) input_size = 1 output_size = 1 layers = [] for hidden_units, activation in zip(hidden_units, activations): layers.append(nn.Linear(input_size, hidden_units)) activation = ( activation .lower() .replace(" ", "") .replace("-", "") .replace("_", "") ) if activation == "relu": layers.append(nn.ReLU()) elif activation == "sigmoid": layers.append(nn.Sigmoid()) elif activation == "tanh": layers.append(nn.Tanh()) elif activation == "leakyrelu": layers.append(nn.LeakyReLU()) elif activation == "elu": layers.append(nn.ELU()) elif activation == "gelu": layers.append(nn.GELU()) elif activation == "identity": layers.append(nn.Identity()) else: raise ValueError(f"Unknown activation: {activation}") input_size = hidden_units layers.append(nn.Linear(input_size, output_size)) model = nn.Sequential(*layers) return model def train_step( model: nn.Module, optimizer: optim.Optimizer, dataset: Dataset, batch_size: int | None = None, num_steps: int = 1, ) -> float: model.train() criterion = nn.MSELoss() x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1) y_tensor = torch.tensor(dataset.y, dtype=torch.float32).unsqueeze(1) dataset_size = x_tensor.size(0) if batch_size is None or batch_size > dataset_size: batch_size = dataset_size last_loss = np.nan for _ in range(num_steps): batch_indices = torch.randperm(dataset_size)[:batch_size] x_batch = x_tensor[batch_indices] y_batch = y_tensor[batch_indices] outputs = model(x_batch) loss = criterion(outputs, y_batch) optimizer.zero_grad() loss.backward() optimizer.step() last_loss = loss.item() return last_loss def generate_test_predictions( dataset: Dataset, model: nn.Module, ) -> list[float]: x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1) model.eval() with torch.no_grad(): y_tensor = model(x_tensor) y_test = y_tensor.squeeze(1).numpy() return y_test.tolist()