mlp_visualizer / backend /src /logic.py
joel-woodfield's picture
Refactor file structure
7d20d59
from dataclasses import dataclass
from typing import Literal
import numpy as np
from sympy import Expr, lambdify
import torch
import torch.nn as nn
import torch.optim as optim
SUPPORTED_ACTIVATIONS = {
"relu",
"sigmoid",
"tanh",
"linear",
"leaky_relu",
"elu",
"gelu",
"identity",
}
OUTPUT_LAYER_STRING = "[output_units: 1]"
@dataclass
class DataGenerationOptions:
method: Literal["Grid", "Random"]
num_samples: int
noise: float = 0.
@dataclass
class Dataset:
x: list[float]
y: list[float]
@dataclass
class PlotData:
dataset: Dataset
test_dataset: Dataset
test_predictions: list[float] | None
def generate_dataset(
function: Expr,
xlim: tuple[float, float],
generation_options: DataGenerationOptions,
) -> Dataset:
f = lambdify("x", function, modules='numpy')
if generation_options.method == 'Grid':
x = np.linspace(xlim[0], xlim[1], generation_options.num_samples)
elif generation_options.method == 'Random':
x = np.random.uniform(xlim[0], xlim[1], generation_options.num_samples)
else:
raise ValueError(f"Unknown generation method: {generation_options.method}")
y = f(x)
if generation_options.noise > 0:
y += np.random.normal(0, generation_options.noise, size=y.shape)
return Dataset(x=x.tolist(), y=y.tolist())
def load_dataset_from_csv(
file_path: str, header: bool, x_col: int, y_col: int
) -> Dataset:
data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0)
data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values
x = data[:, x_col].tolist()
y = data[:, y_col].tolist()
return Dataset(x=x, y=y)
def _parse_architecture_string(architecture_string: str) -> tuple[list[int], list[str]]:
lines = architecture_string.strip().split("\n")
hidden_units = []
activations = []
for line in lines:
line = line.strip().lower()
if line == OUTPUT_LAYER_STRING:
continue
parts = line.strip("[]").split(",")
units = None
activation = None
for part in parts:
key, value = part.split(":")
key = key.strip()
value = value.strip()
if key == "units":
if value.isdigit() and int(value) > 0:
units = int(value)
else:
raise ValueError(f"Invalid number of units: {value}")
elif key == "activation":
if value in SUPPORTED_ACTIVATIONS:
activation = value
else:
raise ValueError(f"Unsupported activation: {value}")
else:
raise ValueError(f"Unknown key in architecture string: {key}")
hidden_units.append(units)
activations.append(activation)
return hidden_units, activations
def build_model_from_architecture(architecture_str: str) -> nn.Module:
hidden_units, activations = _parse_architecture_string(architecture_str)
input_size = 1
output_size = 1
layers = []
for hidden_units, activation in zip(hidden_units, activations):
layers.append(nn.Linear(input_size, hidden_units))
activation = (
activation
.lower()
.replace(" ", "")
.replace("-", "")
.replace("_", "")
)
if activation == "relu":
layers.append(nn.ReLU())
elif activation == "sigmoid":
layers.append(nn.Sigmoid())
elif activation == "tanh":
layers.append(nn.Tanh())
elif activation == "leakyrelu":
layers.append(nn.LeakyReLU())
elif activation == "elu":
layers.append(nn.ELU())
elif activation == "gelu":
layers.append(nn.GELU())
elif activation == "identity":
layers.append(nn.Identity())
else:
raise ValueError(f"Unknown activation: {activation}")
input_size = hidden_units
layers.append(nn.Linear(input_size, output_size))
model = nn.Sequential(*layers)
return model
def train_step(
model: nn.Module,
optimizer: optim.Optimizer,
dataset: Dataset,
batch_size: int | None = None,
num_steps: int = 1,
) -> float:
model.train()
criterion = nn.MSELoss()
x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1)
y_tensor = torch.tensor(dataset.y, dtype=torch.float32).unsqueeze(1)
dataset_size = x_tensor.size(0)
if batch_size is None or batch_size > dataset_size:
batch_size = dataset_size
last_loss = np.nan
for _ in range(num_steps):
batch_indices = torch.randperm(dataset_size)[:batch_size]
x_batch = x_tensor[batch_indices]
y_batch = y_tensor[batch_indices]
outputs = model(x_batch)
loss = criterion(outputs, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
last_loss = loss.item()
return last_loss
def generate_test_predictions(
dataset: Dataset,
model: nn.Module,
) -> list[float]:
x_tensor = torch.tensor(dataset.x, dtype=torch.float32).unsqueeze(1)
model.eval()
with torch.no_grad():
y_tensor = model(x_tensor)
y_test = y_tensor.squeeze(1).numpy()
return y_test.tolist()