Spaces:
Sleeping
Sleeping
| from dataclasses import dataclass | |
| from typing import Literal | |
| import numpy as np | |
| from sympy import Expr, lambdify | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| SUPPORTED_ACTIVATIONS = { | |
| "relu", | |
| "sigmoid", | |
| "tanh", | |
| "linear", | |
| "leaky_relu", | |
| "elu", | |
| "gelu", | |
| "identity", | |
| } | |
| OUTPUT_LAYER_STRING = "[output_units: 1]" | |
| class DataGenerationOptions: | |
| method: Literal["Grid", "Random"] | |
| num_samples: int | |
| noise: float = 0. | |
| class Dataset: | |
| x: list[float] | |
| y: list[float] | |
| class PlotData: | |
| dataset: Dataset | |
| test_dataset: Dataset | |
| test_predictions: list[float] | None | |
| def generate_dataset( | |
| function: Expr, | |
| xlim: tuple[float, float], | |
| generation_options: DataGenerationOptions, | |
| ) -> Dataset: | |
| f = lambdify("x", function, modules='numpy') | |
| if generation_options.method == 'Grid': | |
| x = np.linspace(xlim[0], xlim[1], generation_options.num_samples) | |
| elif generation_options.method == 'Random': | |
| x = np.random.uniform(xlim[0], xlim[1], generation_options.num_samples) | |
| else: | |
| raise ValueError(f"Unknown generation method: {generation_options.method}") | |
| y = f(x) | |
| if generation_options.noise > 0: | |
| y += np.random.normal(0, generation_options.noise, size=y.shape) | |
| return Dataset(x=x.tolist(), y=y.tolist()) | |
| def load_dataset_from_csv( | |
| file_path: str, header: bool, x_col: int, y_col: int | |
| ) -> Dataset: | |
| data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0) | |
| data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values | |
| x = data[:, x_col].tolist() | |
| y = data[:, y_col].tolist() | |
| return Dataset(x=x, y=y) | |
| def _parse_architecture_string(architecture_string: str) -> tuple[list[int], list[str]]: | |
| lines = architecture_string.strip().split("\n") | |
| hidden_units = [] | |
| activations = [] | |
| for line in lines: | |
| line = line.strip().lower() | |
| if line == OUTPUT_LAYER_STRING: | |
| continue | |
| parts = line.strip("[]").split(",") | |
| units = None | |
| activation = None | |
| for part in parts: | |
| key, value = part.split(":") | |
| key = key.strip() | |
| value = value.strip() | |
| if key == "units": | |
| if value.isdigit() and int(value) > 0: | |
| units = int(value) | |
| else: | |
| raise ValueError(f"Invalid number of units: {value}") | |
| elif key == "activation": | |
| if value in SUPPORTED_ACTIVATIONS: | |
| activation = value | |
| else: | |
| raise ValueError(f"Unsupported activation: {value}") | |
| else: | |
| raise ValueError(f"Unknown key in architecture string: {key}") | |
| hidden_units.append(units) | |
| activations.append(activation) | |
| return hidden_units, activations | |
| def build_model_from_architecture(architecture_str: str) -> nn.Module: | |
| hidden_units, activations = _parse_architecture_string(architecture_str) | |
| input_size = 1 | |
| output_size = 1 | |
| layers = [] | |
| for hidden_units, activation in zip(hidden_units, activations): | |
| layers.append(nn.Linear(input_size, hidden_units)) | |
| activation = ( | |
| activation | |
| .lower() | |
| .replace(" ", "") | |
| .replace("-", "") | |
| .replace("_", "") | |
| ) | |
| if activation == "relu": | |
| layers.append(nn.ReLU()) | |
| elif activation == "sigmoid": | |
| layers.append(nn.Sigmoid()) | |
| elif activation == "tanh": | |
| layers.append(nn.Tanh()) | |
| elif activation == "leakyrelu": | |
| layers.append(nn.LeakyReLU()) | |
| elif activation == "elu": | |
| layers.append(nn.ELU()) | |
| elif activation == "gelu": | |
| layers.append(nn.GELU()) | |
| elif activation == "identity": | |
| layers.append(nn.Identity()) | |
| else: | |
| raise ValueError(f"Unknown activation: {activation}") | |
| input_size = hidden_units | |
| layers.append(nn.Linear(input_size, output_size)) | |
| model = nn.Sequential(*layers) | |
| return model | |
| def train_step( | |
| model: nn.Module, | |
| optimizer: optim.Optimizer, | |
| dataset: Dataset, | |
| batch_size: int | None = None, | |
| num_steps: int = 1, | |
| ) -> float: | |
| model.train() | |
| criterion = nn.MSELoss() | |
| x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1) | |
| y_tensor = torch.tensor(dataset.y, dtype=torch.float32).unsqueeze(1) | |
| dataset_size = x_tensor.size(0) | |
| if batch_size is None or batch_size > dataset_size: | |
| batch_size = dataset_size | |
| last_loss = np.nan | |
| for _ in range(num_steps): | |
| batch_indices = torch.randperm(dataset_size)[:batch_size] | |
| x_batch = x_tensor[batch_indices] | |
| y_batch = y_tensor[batch_indices] | |
| outputs = model(x_batch) | |
| loss = criterion(outputs, y_batch) | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| last_loss = loss.item() | |
| return last_loss | |
| def generate_test_predictions( | |
| dataset: Dataset, | |
| model: nn.Module, | |
| ) -> list[float]: | |
| x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1) | |
| model.eval() | |
| with torch.no_grad(): | |
| y_tensor = model(x_tensor) | |
| y_test = y_tensor.squeeze(1).numpy() | |
| return y_test.tolist() | |