from dataclasses import dataclass from typing import Literal import cvxpy as cp import numpy as np from sympy import Expr, lambdify @dataclass class DataGenerationOptions: method: Literal["grid", "random"] num_samples: int noise: float = 0. @dataclass class Dataset: x1: list[float] x2: list[float] y: list[float] @dataclass class PlotsData: W1: np.ndarray W2: np.ndarray loss_values: np.ndarray norms: np.ndarray loss_levels: list[float] reg_levels: list[float] unreg_solution: np.ndarray path: np.ndarray def generate_dataset( function: Expr, x1_lim: tuple[int, int], x2_lim: tuple[int, int], generation_options: DataGenerationOptions, ) -> Dataset: f = lambdify(('x1', 'x2'), function, modules='numpy') if generation_options.method == 'grid': side_length = int(np.ceil(np.sqrt(generation_options.num_samples))) x1 = np.linspace(x1_lim[0], x1_lim[1], side_length) x2 = np.linspace(x2_lim[0], x2_lim[1], side_length) X1, X2 = np.meshgrid(x1, x2) X1_flat = X1.flatten()[:generation_options.num_samples] X2_flat = X2.flatten()[:generation_options.num_samples] elif generation_options.method == 'random': X1_flat = np.random.uniform(x1_lim[0], x1_lim[1], generation_options.num_samples) X2_flat = np.random.uniform(x2_lim[0], x2_lim[1], generation_options.num_samples) else: raise ValueError(f"Unknown generation method: {generation_options.method}") Y = f(X1_flat, X2_flat) if generation_options.noise > 0: Y += np.random.normal(0, generation_options.noise, size=Y.shape) return Dataset(x1=X1_flat.tolist(), x2=X2_flat.tolist(), y=Y.tolist()) def load_dataset_from_csv( file_path: str, header: bool, x1_col: int, x2_col: int, y_col: int ) -> Dataset: # data = np.loadtxt(file_path, delimiter=',', skiprows=1 if header else 0) data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0) data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values x1 = data[:, x1_col].tolist() x2 = data[:, x2_col].tolist() y = data[:, y_col].tolist() return Dataset(x1=x1, x2=x2, y=y) def build_parameter_grid( w1_lim: tuple[float, float], w2_lim: tuple[float, float], min_num_points: int, ) -> tuple[np.ndarray, np.ndarray]: w1 = np.linspace(w1_lim[0], w1_lim[1], min_num_points) w2 = np.linspace(w2_lim[0], w2_lim[1], min_num_points) # make sure (0, 0) is included if 0 not in w1: w1 = np.insert(w1, np.searchsorted(w1, 0), 0) if 0 not in w2: w2 = np.insert(w2, np.searchsorted(w2, 0), 0) W1, W2 = np.meshgrid(w1, w2) return W1, W2 def compute_loss( dataset: Dataset, w1: np.ndarray, w2: np.ndarray, loss: Literal["l1", "l2"], ) -> np.ndarray: x1 = np.array(dataset.x1) x2 = np.array(dataset.x2) y = np.array(dataset.y) grid_size = w1.shape[0] W = np.stack([w1.flatten(), w2.flatten()], axis=-1) # (D^2, 2) X = np.stack([x1, x2], axis=0) # (2, N) y_pred = W @ X y = y.reshape(1, -1) if loss == 'l2': return np.mean((y - y_pred) ** 2, axis=1).reshape(grid_size, grid_size) elif loss == 'l1': return np.mean(np.abs(y - y_pred), axis=1).reshape(grid_size, grid_size) def compute_norms( w1: np.ndarray, w2: np.ndarray, norm: Literal["l1", "l2"], ) -> np.ndarray: if norm == "l2": return np.sqrt(w1 ** 2 + w2 ** 2) elif norm == "l1": return np.abs(w1) + np.abs(w2) def compute_loss_levels( loss_values: np.ndarray, norms: np.ndarray, reg_levels: list[float], ) -> list[float]: levels = [] for reg_level in reg_levels: satisfying = loss_values[norms <= reg_level] if satisfying.size == 0: raise ValueError(f"No satisfying loss level for reg_level {reg_level}") optimal_satisfying = np.min(satisfying) levels.append(optimal_satisfying) # ensure ascending order and no duplicates levels = list(set(levels)) levels = sorted(levels) return levels def compute_unregularized_solution( dataset: Dataset, w1_range: tuple[float, float], w2_range: tuple[float, float], num_dots: int = 100, ) -> np.ndarray: x1 = np.array(dataset.x1) x2 = np.array(dataset.x2) y = np.array(dataset.y) X = np.stack([x1, x2], axis=-1) # (N, 2) try: # find point solution if exists w_opt = np.linalg.solve(X.T @ X, X.T @ y) except np.linalg.LinAlgError: # the solutions are on a line eig_vals, eig_vecs = np.linalg.eigh(X.T @ X) line_direction = eig_vecs[:, np.argmin(eig_vals)] m = line_direction[1] / line_direction[0] candidate_w = np.linalg.lstsq(X, y, rcond=None)[0] b = candidate_w[1] - m * candidate_w[0] w1_opt = np.linspace(w1_range[0], w1_range[1], num_dots) w2_opt = m * w1_opt + b w_opt = np.stack((w1_opt, w2_opt), axis=-1) mask = (w2_opt <= w2_range[1]) & (w2_opt >= w2_range[0]) w_opt = w_opt[mask] return w_opt def compute_regularization_path( dataset: Dataset, loss_type: Literal["l1", "l2"], regularizer_type: Literal["l1", "l2"], ) -> np.ndarray: x1 = np.array(dataset.x1) x2 = np.array(dataset.x2) y = np.array(dataset.y) X = np.stack([x1, x2], axis=1) # (N, 2) w = cp.Variable(2) lambd = cp.Parameter(nonneg=True) if loss_type == "l2": loss_expr = cp.sum_squares(y - X @ w) elif loss_type == "l1": loss_expr = cp.norm1(y - X @ w) else: raise ValueError(f"Unknown loss type: {loss_type}") if regularizer_type == "l2": reg_expr = cp.sum_squares(w) elif regularizer_type == "l1": reg_expr = cp.norm1(w) else: raise ValueError(f"Unknown regularizer type: {regularizer_type}") objective = cp.Minimize(loss_expr + lambd * reg_expr) problem = cp.Problem(objective) # todo - user defined reg levels reg_levels = np.logspace(-4, 4, 100) # solve with reg levels in descending order for using warm start w_solutions = [] for reg_level in sorted(reg_levels, reverse=True): lambd.value = reg_level problem.solve(warm_start=True) if w.value is None: w_solutions.append(np.array([np.nan, np.nan])) else: w_solutions.append(w.value.copy()) return np.array(w_solutions) def compute_plot_values( dataset: Dataset, loss_type: Literal["l1", "l2"], regularizer_type: Literal["l1", "l2"], reg_levels: list[float], w1_range: tuple[float, float], w2_range: tuple[float, float], resolution: int, ) -> PlotsData: W1, W2 = build_parameter_grid(w1_range, w2_range, resolution) loss_values = compute_loss(dataset, W1, W2, loss_type) norms = compute_norms(W1, W2, regularizer_type) loss_levels = compute_loss_levels(loss_values, norms, reg_levels) unreg_solution = compute_unregularized_solution(dataset, w1_range, w2_range) path = compute_regularization_path( dataset, loss_type, regularizer_type, ) return PlotsData( W1=W1, W2=W2, loss_values=loss_values, norms=norms, loss_levels=loss_levels, reg_levels=reg_levels, unreg_solution=unreg_solution, path=path, ) def compute_suggested_settings( dataset: Dataset ) -> tuple[tuple[float, float], tuple[float, float], list[float]]: x = np.stack([dataset.x1, dataset.x2], axis=1) moore_penrose = np.linalg.pinv(x) @ np.array(dataset.y) if np.isclose(moore_penrose, 0).all(): w1_range = (-10, 10) w2_range = (-10, 10) return w1_range, w2_range, [] width = np.max(np.abs(moore_penrose)) * 2 w1_range = (-width, width) w2_range = (-width, width) opt_norm = float(np.linalg.norm(moore_penrose, ord=2)) reg_levels = [i / 4 * opt_norm for i in range(1, 4)] return w1_range, w2_range, reg_levels