regularization / backend /src /logic.py
joel-woodfield's picture
Make the contours bigger
71bb4dd
from dataclasses import dataclass
from typing import Literal
import cvxpy as cp
import numpy as np
from sympy import Expr, lambdify
@dataclass
class DataGenerationOptions:
method: Literal["grid", "random"]
num_samples: int
noise: float = 0.
@dataclass
class Dataset:
x1: list[float]
x2: list[float]
y: list[float]
@dataclass
class PlotsData:
W1: np.ndarray
W2: np.ndarray
loss_values: np.ndarray
norms: np.ndarray
loss_levels: list[float]
reg_levels: list[float]
unreg_solution: np.ndarray
path: np.ndarray
def generate_dataset(
function: Expr,
x1_lim: tuple[int, int],
x2_lim: tuple[int, int],
generation_options: DataGenerationOptions,
) -> Dataset:
f = lambdify(('x1', 'x2'), function, modules='numpy')
if generation_options.method == 'grid':
side_length = int(np.ceil(np.sqrt(generation_options.num_samples)))
x1 = np.linspace(x1_lim[0], x1_lim[1], side_length)
x2 = np.linspace(x2_lim[0], x2_lim[1], side_length)
X1, X2 = np.meshgrid(x1, x2)
X1_flat = X1.flatten()[:generation_options.num_samples]
X2_flat = X2.flatten()[:generation_options.num_samples]
elif generation_options.method == 'random':
X1_flat = np.random.uniform(x1_lim[0], x1_lim[1], generation_options.num_samples)
X2_flat = np.random.uniform(x2_lim[0], x2_lim[1], generation_options.num_samples)
else:
raise ValueError(f"Unknown generation method: {generation_options.method}")
Y = f(X1_flat, X2_flat)
if generation_options.noise > 0:
Y += np.random.normal(0, generation_options.noise, size=Y.shape)
return Dataset(x1=X1_flat.tolist(), x2=X2_flat.tolist(), y=Y.tolist())
def load_dataset_from_csv(
file_path: str, header: bool, x1_col: int, x2_col: int, y_col: int
) -> Dataset:
# data = np.loadtxt(file_path, delimiter=',', skiprows=1 if header else 0)
data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0)
data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values
x1 = data[:, x1_col].tolist()
x2 = data[:, x2_col].tolist()
y = data[:, y_col].tolist()
return Dataset(x1=x1, x2=x2, y=y)
def build_parameter_grid(
w1_lim: tuple[float, float],
w2_lim: tuple[float, float],
min_num_points: int,
) -> tuple[np.ndarray, np.ndarray]:
w1 = np.linspace(w1_lim[0], w1_lim[1], min_num_points)
w2 = np.linspace(w2_lim[0], w2_lim[1], min_num_points)
# make sure (0, 0) is included
if 0 not in w1:
w1 = np.insert(w1, np.searchsorted(w1, 0), 0)
if 0 not in w2:
w2 = np.insert(w2, np.searchsorted(w2, 0), 0)
W1, W2 = np.meshgrid(w1, w2)
return W1, W2
def compute_loss(
dataset: Dataset,
w1: np.ndarray,
w2: np.ndarray,
loss: Literal["l1", "l2"],
) -> np.ndarray:
x1 = np.array(dataset.x1)
x2 = np.array(dataset.x2)
y = np.array(dataset.y)
grid_size = w1.shape[0]
W = np.stack([w1.flatten(), w2.flatten()], axis=-1) # (D^2, 2)
X = np.stack([x1, x2], axis=0) # (2, N)
y_pred = W @ X
y = y.reshape(1, -1)
if loss == 'l2':
return np.mean((y - y_pred) ** 2, axis=1).reshape(grid_size, grid_size)
elif loss == 'l1':
return np.mean(np.abs(y - y_pred), axis=1).reshape(grid_size, grid_size)
def compute_norms(
w1: np.ndarray,
w2: np.ndarray,
norm: Literal["l1", "l2"],
) -> np.ndarray:
if norm == "l2":
return np.sqrt(w1 ** 2 + w2 ** 2)
elif norm == "l1":
return np.abs(w1) + np.abs(w2)
def compute_loss_levels(
loss_values: np.ndarray,
norms: np.ndarray,
reg_levels: list[float],
) -> list[float]:
levels = []
for reg_level in reg_levels:
satisfying = loss_values[norms <= reg_level]
if satisfying.size == 0:
raise ValueError(f"No satisfying loss level for reg_level {reg_level}")
optimal_satisfying = np.min(satisfying)
levels.append(optimal_satisfying)
# ensure ascending order and no duplicates
levels = list(set(levels))
levels = sorted(levels)
return levels
def compute_unregularized_solution(
dataset: Dataset,
w1_range: tuple[float, float],
w2_range: tuple[float, float],
num_dots: int = 100,
) -> np.ndarray:
x1 = np.array(dataset.x1)
x2 = np.array(dataset.x2)
y = np.array(dataset.y)
X = np.stack([x1, x2], axis=-1) # (N, 2)
try:
# find point solution if exists
w_opt = np.linalg.solve(X.T @ X, X.T @ y)
except np.linalg.LinAlgError:
# the solutions are on a line
eig_vals, eig_vecs = np.linalg.eigh(X.T @ X)
line_direction = eig_vecs[:, np.argmin(eig_vals)]
m = line_direction[1] / line_direction[0]
candidate_w = np.linalg.lstsq(X, y, rcond=None)[0]
b = candidate_w[1] - m * candidate_w[0]
w1_opt = np.linspace(w1_range[0], w1_range[1], num_dots)
w2_opt = m * w1_opt + b
w_opt = np.stack((w1_opt, w2_opt), axis=-1)
mask = (w2_opt <= w2_range[1]) & (w2_opt >= w2_range[0])
w_opt = w_opt[mask]
return w_opt
def compute_regularization_path(
dataset: Dataset,
loss_type: Literal["l1", "l2"],
regularizer_type: Literal["l1", "l2"],
) -> np.ndarray:
x1 = np.array(dataset.x1)
x2 = np.array(dataset.x2)
y = np.array(dataset.y)
X = np.stack([x1, x2], axis=1) # (N, 2)
w = cp.Variable(2)
lambd = cp.Parameter(nonneg=True)
if loss_type == "l2":
loss_expr = cp.sum_squares(y - X @ w)
elif loss_type == "l1":
loss_expr = cp.norm1(y - X @ w)
else:
raise ValueError(f"Unknown loss type: {loss_type}")
if regularizer_type == "l2":
reg_expr = cp.sum_squares(w)
elif regularizer_type == "l1":
reg_expr = cp.norm1(w)
else:
raise ValueError(f"Unknown regularizer type: {regularizer_type}")
objective = cp.Minimize(loss_expr + lambd * reg_expr)
problem = cp.Problem(objective)
# todo - user defined reg levels
reg_levels = np.logspace(-4, 4, 100)
# solve with reg levels in descending order for using warm start
w_solutions = []
for reg_level in sorted(reg_levels, reverse=True):
lambd.value = reg_level
problem.solve(warm_start=True)
if w.value is None:
w_solutions.append(np.array([np.nan, np.nan]))
else:
w_solutions.append(w.value.copy())
return np.array(w_solutions)
def compute_plot_values(
dataset: Dataset,
loss_type: Literal["l1", "l2"],
regularizer_type: Literal["l1", "l2"],
reg_levels: list[float],
w1_range: tuple[float, float],
w2_range: tuple[float, float],
resolution: int,
) -> PlotsData:
W1, W2 = build_parameter_grid(w1_range, w2_range, resolution)
loss_values = compute_loss(dataset, W1, W2, loss_type)
norms = compute_norms(W1, W2, regularizer_type)
loss_levels = compute_loss_levels(loss_values, norms, reg_levels)
unreg_solution = compute_unregularized_solution(dataset, w1_range, w2_range)
path = compute_regularization_path(
dataset,
loss_type,
regularizer_type,
)
return PlotsData(
W1=W1,
W2=W2,
loss_values=loss_values,
norms=norms,
loss_levels=loss_levels,
reg_levels=reg_levels,
unreg_solution=unreg_solution,
path=path,
)
def compute_suggested_settings(
dataset: Dataset
) -> tuple[tuple[float, float], tuple[float, float], list[float]]:
x = np.stack([dataset.x1, dataset.x2], axis=1)
moore_penrose = np.linalg.pinv(x) @ np.array(dataset.y)
if np.isclose(moore_penrose, 0).all():
w1_range = (-10, 10)
w2_range = (-10, 10)
return w1_range, w2_range, []
width = np.max(np.abs(moore_penrose)) * 2
w1_range = (-width, width)
w2_range = (-width, width)
opt_norm = float(np.linalg.norm(moore_penrose, ord=2))
reg_levels = [i / 4 * opt_norm for i in range(1, 4)]
return w1_range, w2_range, reg_levels