Spaces:
Sleeping
Sleeping
| from dataclasses import dataclass | |
| from typing import Literal | |
| import cvxpy as cp | |
| import numpy as np | |
| from sympy import Expr, lambdify | |
| class DataGenerationOptions: | |
| method: Literal["grid", "random"] | |
| num_samples: int | |
| noise: float = 0. | |
| class Dataset: | |
| x1: list[float] | |
| x2: list[float] | |
| y: list[float] | |
| class PlotsData: | |
| W1: np.ndarray | |
| W2: np.ndarray | |
| loss_values: np.ndarray | |
| norms: np.ndarray | |
| loss_levels: list[float] | |
| reg_levels: list[float] | |
| unreg_solution: np.ndarray | |
| path: np.ndarray | |
| def generate_dataset( | |
| function: Expr, | |
| x1_lim: tuple[int, int], | |
| x2_lim: tuple[int, int], | |
| generation_options: DataGenerationOptions, | |
| ) -> Dataset: | |
| f = lambdify(('x1', 'x2'), function, modules='numpy') | |
| if generation_options.method == 'grid': | |
| side_length = int(np.ceil(np.sqrt(generation_options.num_samples))) | |
| x1 = np.linspace(x1_lim[0], x1_lim[1], side_length) | |
| x2 = np.linspace(x2_lim[0], x2_lim[1], side_length) | |
| X1, X2 = np.meshgrid(x1, x2) | |
| X1_flat = X1.flatten()[:generation_options.num_samples] | |
| X2_flat = X2.flatten()[:generation_options.num_samples] | |
| elif generation_options.method == 'random': | |
| X1_flat = np.random.uniform(x1_lim[0], x1_lim[1], generation_options.num_samples) | |
| X2_flat = np.random.uniform(x2_lim[0], x2_lim[1], generation_options.num_samples) | |
| else: | |
| raise ValueError(f"Unknown generation method: {generation_options.method}") | |
| Y = f(X1_flat, X2_flat) | |
| if generation_options.noise > 0: | |
| Y += np.random.normal(0, generation_options.noise, size=Y.shape) | |
| return Dataset(x1=X1_flat.tolist(), x2=X2_flat.tolist(), y=Y.tolist()) | |
| def load_dataset_from_csv( | |
| file_path: str, header: bool, x1_col: int, x2_col: int, y_col: int | |
| ) -> Dataset: | |
| # data = np.loadtxt(file_path, delimiter=',', skiprows=1 if header else 0) | |
| data = np.genfromtxt(file_path, delimiter=',', skip_header=1 if header else 0) | |
| data = data[~np.isnan(data).any(axis=1)] # remove rows with NaN values | |
| x1 = data[:, x1_col].tolist() | |
| x2 = data[:, x2_col].tolist() | |
| y = data[:, y_col].tolist() | |
| return Dataset(x1=x1, x2=x2, y=y) | |
| def build_parameter_grid( | |
| w1_lim: tuple[float, float], | |
| w2_lim: tuple[float, float], | |
| min_num_points: int, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| w1 = np.linspace(w1_lim[0], w1_lim[1], min_num_points) | |
| w2 = np.linspace(w2_lim[0], w2_lim[1], min_num_points) | |
| # make sure (0, 0) is included | |
| if 0 not in w1: | |
| w1 = np.insert(w1, np.searchsorted(w1, 0), 0) | |
| if 0 not in w2: | |
| w2 = np.insert(w2, np.searchsorted(w2, 0), 0) | |
| W1, W2 = np.meshgrid(w1, w2) | |
| return W1, W2 | |
| def compute_loss( | |
| dataset: Dataset, | |
| w1: np.ndarray, | |
| w2: np.ndarray, | |
| loss: Literal["l1", "l2"], | |
| ) -> np.ndarray: | |
| x1 = np.array(dataset.x1) | |
| x2 = np.array(dataset.x2) | |
| y = np.array(dataset.y) | |
| grid_size = w1.shape[0] | |
| W = np.stack([w1.flatten(), w2.flatten()], axis=-1) # (D^2, 2) | |
| X = np.stack([x1, x2], axis=0) # (2, N) | |
| y_pred = W @ X | |
| y = y.reshape(1, -1) | |
| if loss == 'l2': | |
| return np.mean((y - y_pred) ** 2, axis=1).reshape(grid_size, grid_size) | |
| elif loss == 'l1': | |
| return np.mean(np.abs(y - y_pred), axis=1).reshape(grid_size, grid_size) | |
| def compute_norms( | |
| w1: np.ndarray, | |
| w2: np.ndarray, | |
| norm: Literal["l1", "l2"], | |
| ) -> np.ndarray: | |
| if norm == "l2": | |
| return np.sqrt(w1 ** 2 + w2 ** 2) | |
| elif norm == "l1": | |
| return np.abs(w1) + np.abs(w2) | |
| def compute_loss_levels( | |
| loss_values: np.ndarray, | |
| norms: np.ndarray, | |
| reg_levels: list[float], | |
| ) -> list[float]: | |
| levels = [] | |
| for reg_level in reg_levels: | |
| satisfying = loss_values[norms <= reg_level] | |
| if satisfying.size == 0: | |
| raise ValueError(f"No satisfying loss level for reg_level {reg_level}") | |
| optimal_satisfying = np.min(satisfying) | |
| levels.append(optimal_satisfying) | |
| # ensure ascending order and no duplicates | |
| levels = list(set(levels)) | |
| levels = sorted(levels) | |
| return levels | |
| def compute_unregularized_solution( | |
| dataset: Dataset, | |
| w1_range: tuple[float, float], | |
| w2_range: tuple[float, float], | |
| num_dots: int = 100, | |
| ) -> np.ndarray: | |
| x1 = np.array(dataset.x1) | |
| x2 = np.array(dataset.x2) | |
| y = np.array(dataset.y) | |
| X = np.stack([x1, x2], axis=-1) # (N, 2) | |
| try: | |
| # find point solution if exists | |
| w_opt = np.linalg.solve(X.T @ X, X.T @ y) | |
| except np.linalg.LinAlgError: | |
| # the solutions are on a line | |
| eig_vals, eig_vecs = np.linalg.eigh(X.T @ X) | |
| line_direction = eig_vecs[:, np.argmin(eig_vals)] | |
| m = line_direction[1] / line_direction[0] | |
| candidate_w = np.linalg.lstsq(X, y, rcond=None)[0] | |
| b = candidate_w[1] - m * candidate_w[0] | |
| w1_opt = np.linspace(w1_range[0], w1_range[1], num_dots) | |
| w2_opt = m * w1_opt + b | |
| w_opt = np.stack((w1_opt, w2_opt), axis=-1) | |
| mask = (w2_opt <= w2_range[1]) & (w2_opt >= w2_range[0]) | |
| w_opt = w_opt[mask] | |
| return w_opt | |
| def compute_regularization_path( | |
| dataset: Dataset, | |
| loss_type: Literal["l1", "l2"], | |
| regularizer_type: Literal["l1", "l2"], | |
| ) -> np.ndarray: | |
| x1 = np.array(dataset.x1) | |
| x2 = np.array(dataset.x2) | |
| y = np.array(dataset.y) | |
| X = np.stack([x1, x2], axis=1) # (N, 2) | |
| w = cp.Variable(2) | |
| lambd = cp.Parameter(nonneg=True) | |
| if loss_type == "l2": | |
| loss_expr = cp.sum_squares(y - X @ w) | |
| elif loss_type == "l1": | |
| loss_expr = cp.norm1(y - X @ w) | |
| else: | |
| raise ValueError(f"Unknown loss type: {loss_type}") | |
| if regularizer_type == "l2": | |
| reg_expr = cp.sum_squares(w) | |
| elif regularizer_type == "l1": | |
| reg_expr = cp.norm1(w) | |
| else: | |
| raise ValueError(f"Unknown regularizer type: {regularizer_type}") | |
| objective = cp.Minimize(loss_expr + lambd * reg_expr) | |
| problem = cp.Problem(objective) | |
| # todo - user defined reg levels | |
| reg_levels = np.logspace(-4, 4, 100) | |
| # solve with reg levels in descending order for using warm start | |
| w_solutions = [] | |
| for reg_level in sorted(reg_levels, reverse=True): | |
| lambd.value = reg_level | |
| problem.solve(warm_start=True) | |
| if w.value is None: | |
| w_solutions.append(np.array([np.nan, np.nan])) | |
| else: | |
| w_solutions.append(w.value.copy()) | |
| return np.array(w_solutions) | |
| def compute_plot_values( | |
| dataset: Dataset, | |
| loss_type: Literal["l1", "l2"], | |
| regularizer_type: Literal["l1", "l2"], | |
| reg_levels: list[float], | |
| w1_range: tuple[float, float], | |
| w2_range: tuple[float, float], | |
| resolution: int, | |
| ) -> PlotsData: | |
| W1, W2 = build_parameter_grid(w1_range, w2_range, resolution) | |
| loss_values = compute_loss(dataset, W1, W2, loss_type) | |
| norms = compute_norms(W1, W2, regularizer_type) | |
| loss_levels = compute_loss_levels(loss_values, norms, reg_levels) | |
| unreg_solution = compute_unregularized_solution(dataset, w1_range, w2_range) | |
| path = compute_regularization_path( | |
| dataset, | |
| loss_type, | |
| regularizer_type, | |
| ) | |
| return PlotsData( | |
| W1=W1, | |
| W2=W2, | |
| loss_values=loss_values, | |
| norms=norms, | |
| loss_levels=loss_levels, | |
| reg_levels=reg_levels, | |
| unreg_solution=unreg_solution, | |
| path=path, | |
| ) | |
| def compute_suggested_settings( | |
| dataset: Dataset | |
| ) -> tuple[tuple[float, float], tuple[float, float], list[float]]: | |
| x = np.stack([dataset.x1, dataset.x2], axis=1) | |
| moore_penrose = np.linalg.pinv(x) @ np.array(dataset.y) | |
| if np.isclose(moore_penrose, 0).all(): | |
| w1_range = (-10, 10) | |
| w2_range = (-10, 10) | |
| return w1_range, w2_range, [] | |
| width = np.max(np.abs(moore_penrose)) * 2 | |
| w1_range = (-width, width) | |
| w2_range = (-width, width) | |
| opt_norm = float(np.linalg.norm(moore_penrose, ord=2)) | |
| reg_levels = [i / 4 * opt_norm for i in range(1, 4)] | |
| return w1_range, w2_range, reg_levels |