| """ |
| models/linear_regression.py |
| All training logic, metric computation, and plot-data preparation |
| for the Linear Regression page. |
| """ |
|
|
| import numpy as np |
| from scipy import stats |
| from sklearn.linear_model import LinearRegression, Ridge, Lasso, SGDRegressor |
| from sklearn.model_selection import train_test_split, learning_curve |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error |
| from sklearn.utils import resample |
| from pydantic import BaseModel |
| from typing import Optional, List |
|
|
| from data.datasets import ( |
| SyntheticConfig, RealDatasetConfig, |
| generate_synthetic, load_real_dataset, |
| SYNTHETIC_DATASETS, |
| ) |
|
|
|
|
| |
|
|
| class TrainRequest(BaseModel): |
| dataset_type: str |
| synthetic_config: Optional[SyntheticConfig] = None |
| real_config: Optional[RealDatasetConfig] = None |
| test_size: float = 0.20 |
| model_type: str = "linear" |
| alpha: float = 1.0 |
| feature_x: Optional[str] = None |
|
|
|
|
| |
|
|
| def _build_model(model_type: str, alpha: float): |
| if model_type == "ridge": |
| return Ridge(alpha=alpha) |
| elif model_type == "lasso": |
| return Lasso(alpha=alpha, max_iter=10_000) |
| return LinearRegression() |
|
|
|
|
| def _corr(a, b) -> float: |
| """Pearson r between two arrays.""" |
| a, b = np.asarray(a), np.asarray(b) |
| da, db = a - a.mean(), b - b.mean() |
| denom = np.sqrt((da**2).sum() * (db**2).sum()) + 1e-12 |
| return float((da * db).sum() / denom) |
|
|
|
|
| def _mape(y_true, y_pred) -> float: |
| """Mean Absolute Percentage Error (%). Returns nan when all targets are zero.""" |
| y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) |
| mask = np.abs(y_true) > 1e-8 |
| if not mask.any(): |
| return float("nan") |
| return float(np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100) |
|
|
|
|
| |
|
|
| def run_training(req: TrainRequest) -> dict: |
| """ |
| Full training pipeline. Returns a dict with all data needed |
| by the frontend (metrics, scatter, diagnostics, new plots). |
| """ |
|
|
| |
| is_synthetic = req.dataset_type == "synthetic" |
|
|
| if is_synthetic: |
| cfg = req.synthetic_config or SyntheticConfig(dataset_type="linear") |
| X_1d, y = generate_synthetic(cfg) |
| X = X_1d.reshape(-1, 1) |
| feature_names = ["x"] |
| else: |
| rc = req.real_config |
| X, y, feature_names = load_real_dataset(rc.dataset_name) |
| X_1d = None |
|
|
| |
| idx_all = np.arange(len(y)) |
| idx_tr, idx_te = train_test_split(idx_all, test_size=req.test_size, random_state=42) |
|
|
| X_tr_raw, X_te_raw = X[idx_tr], X[idx_te] |
| y_tr, y_te = y[idx_tr], y[idx_te] |
|
|
| |
| scaler = None |
| if not is_synthetic: |
| scaler = StandardScaler() |
| X_tr = scaler.fit_transform(X_tr_raw) |
| X_te = scaler.transform(X_te_raw) |
| else: |
| X_tr, X_te = X_tr_raw, X_te_raw |
|
|
| |
| model = _build_model(req.model_type, req.alpha) |
| model.fit(X_tr, y_tr) |
|
|
| y_pred_tr = model.predict(X_tr) |
| y_pred_te = model.predict(X_te) |
|
|
| |
| residuals = y_te - y_pred_te |
| fitted = y_pred_te |
|
|
| metrics = { |
| "r2_train": float(r2_score(y_tr, y_pred_tr)), |
| "r2_test": float(r2_score(y_te, y_pred_te)), |
| "rmse_train": float(np.sqrt(mean_squared_error(y_tr, y_pred_tr))), |
| "rmse_test": float(np.sqrt(mean_squared_error(y_te, y_pred_te))), |
| "mae_train": float(mean_absolute_error(y_tr, y_pred_tr)), |
| "mae_test": float(mean_absolute_error(y_te, y_pred_te)), |
| "mape_train": _mape(y_tr, y_pred_tr), |
| "mape_test": _mape(y_te, y_pred_te), |
| "mse_train": float(mean_squared_error(y_tr, y_pred_tr)), |
| "mse_test": float(mean_squared_error(y_te, y_pred_te)), |
| "n_train": int(len(y_tr)), |
| "n_test": int(len(y_te)), |
| } |
|
|
| |
| coef_arr = model.coef_.flatten() |
| coefs = {feature_names[i]: float(coef_arr[i]) for i in range(len(feature_names))} |
| coefs["intercept"] = float(model.intercept_) |
|
|
| |
| coef_ci = {} |
| if req.model_type == "linear": |
| coef_ci = _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model) |
|
|
| |
| scatter = _scatter_data( |
| is_synthetic, X_1d, X, y, idx_tr, idx_te, |
| model, feature_names, req.feature_x, |
| X_tr_raw if not is_synthetic else None, |
| rc.dataset_name if not is_synthetic else None, |
| ) |
|
|
| |
| sorted_res = np.sort(residuals) |
| n_res = len(sorted_res) |
| theoretical = stats.norm.ppf(np.linspace(0.01, 0.99, n_res)).tolist() |
| sw_stat, sw_p = stats.shapiro(residuals[:min(5000, len(residuals))]) |
|
|
| rvf = {"fitted": fitted.tolist(), "residuals": residuals.tolist()} |
| qq = {"theoretical": theoretical, "sample": sorted_res.tolist()} |
| sl = {"fitted": fitted.tolist(), "sqrt_abs_resid": np.sqrt(np.abs(residuals)).tolist()} |
| avp = {"actual": y_te.tolist(), "predicted": y_pred_te.tolist()} |
|
|
| |
| cooks = _cooks_distance(X_te, y_te, y_pred_te, len(feature_names) + 1) |
|
|
| |
| leverage = _leverage(X_te) |
|
|
| |
| partial_regression = [] |
| if not is_synthetic and X.shape[1] > 1: |
| partial_regression = _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names) |
|
|
| |
| lc = _learning_curve_data( |
| _build_model(req.model_type, req.alpha), |
| X_tr, y_tr, req.model_type |
| ) |
|
|
| |
| reg_path = {} |
| if req.model_type in ("ridge", "lasso") or True: |
| reg_path = _regularization_path(X_tr, y_tr, feature_names) |
|
|
| |
| gd = _gradient_descent_path( |
| X_1d[idx_tr] if is_synthetic else X_tr[:, 0], |
| y_tr, is_synthetic |
| ) |
|
|
| |
| perm_imp = _permutation_importance(model, X_te, y_te, feature_names) |
|
|
| return { |
| "ok": True, |
| "metrics": metrics, |
| "coefs": coefs, |
| "coef_ci": coef_ci, |
| "scatter": scatter, |
| "avp": avp, |
| "rvf": rvf, |
| "sl": sl, |
| "qq": qq, |
| "shapiro": {"stat": float(sw_stat), "p": float(sw_p), "normal": bool(sw_p > 0.05)}, |
| "cooks": cooks, |
| "leverage": leverage, |
| "partial_regression": partial_regression, |
| "learning_curve": lc, |
| "reg_path": reg_path, |
| "gradient_descent": gd, |
| "perm_importance": perm_imp, |
| "feature_names": feature_names, |
| "is_synthetic": is_synthetic, |
| } |
|
|
|
|
| |
|
|
| def _scatter_data(is_synthetic, X_1d, X, y, idx_tr, idx_te, |
| model, feature_names, feature_x_str, |
| X_tr_raw, dataset_name): |
| if is_synthetic: |
| x_range = np.linspace(X_1d.min(), X_1d.max(), 300) |
| y_line = model.predict(x_range.reshape(-1, 1)).tolist() |
| return { |
| "x_train": X_1d[idx_tr].tolist(), |
| "y_train": y[idx_tr].tolist(), |
| "x_test": X_1d[idx_te].tolist(), |
| "y_test": y[idx_te].tolist(), |
| "x_line": x_range.tolist(), |
| "y_line": y_line, |
| "feature_names": feature_names, |
| } |
| else: |
| fx_idx = int(feature_x_str) if feature_x_str and feature_x_str.isdigit() else 0 |
| return { |
| "x_train": X[idx_tr, fx_idx].tolist(), |
| "y_train": y[idx_tr].tolist(), |
| "x_test": X[idx_te, fx_idx].tolist(), |
| "y_test": y[idx_te].tolist(), |
| "feature_names": feature_names, |
| "fx_name": feature_names[fx_idx], |
| "fx_idx": fx_idx, |
| } |
|
|
|
|
| def _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model): |
| """Compute standard errors and 95% CIs for OLS coefficients.""" |
| n, p = X_tr.shape |
| resid = y_tr - y_pred_tr |
| s2 = (resid**2).sum() / max(n - p - 1, 1) |
| try: |
| X_b = np.column_stack([np.ones(n), X_tr]) |
| cov = s2 * np.linalg.pinv(X_b.T @ X_b) |
| se = np.sqrt(np.diag(cov)) |
| t_cr = stats.t.ppf(0.975, df=max(n - p - 1, 1)) |
| coef_full = np.concatenate([[model.intercept_], model.coef_.flatten()]) |
| names = ["intercept"] + list(feature_names) |
| result = {} |
| for i, name in enumerate(names): |
| result[name] = { |
| "coef": float(coef_full[i]), |
| "se": float(se[i]), |
| "ci_lo": float(coef_full[i] - t_cr * se[i]), |
| "ci_hi": float(coef_full[i] + t_cr * se[i]), |
| "t_stat": float(coef_full[i] / (se[i] + 1e-12)), |
| "p_val": float(2 * stats.t.sf(abs(coef_full[i] / (se[i] + 1e-12)), df=max(n-p-1,1))), |
| } |
| return result |
| except Exception: |
| return {} |
|
|
|
|
| def _cooks_distance(X_te, y_te, y_pred_te, p): |
| """Approximate Cook's Distance for test set points.""" |
| n = len(y_te) |
| resid = y_te - y_pred_te |
| mse = float(np.mean(resid**2)) |
| leverage = _leverage(X_te)["h"] |
| h = np.asarray(leverage) |
| h = np.clip(h, 1e-6, 1 - 1e-6) |
| d = (resid**2 / (p * mse + 1e-12)) * (h / (1 - h)**2) |
| threshold = 4 / max(n, 1) |
| return { |
| "index": list(range(n)), |
| "distance": d.tolist(), |
| "threshold": float(threshold), |
| "influential": [int(i) for i, v in enumerate(d) if v > threshold], |
| } |
|
|
|
|
| def _leverage(X_te): |
| """Hat matrix diagonal h_ii for test set.""" |
| n = X_te.shape[0] |
| X_b = np.column_stack([np.ones(n), X_te]) |
| try: |
| H = X_b @ np.linalg.pinv(X_b.T @ X_b) @ X_b.T |
| h = np.diag(H).tolist() |
| except Exception: |
| h = [1.0 / n] * n |
| return {"h": h, "threshold": float(2 * (X_te.shape[1] + 1) / max(n, 1))} |
|
|
|
|
| def _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names): |
| """Added-variable plots: residuals of y~X_{-j} vs residuals of x_j~X_{-j}.""" |
| n, p = X_tr.shape |
| if p < 2: |
| return [] |
| results = [] |
| for j in range(p): |
| X_minus_j = np.delete(X_tr, j, axis=1) |
| |
| m1 = LinearRegression().fit(X_minus_j, y_tr) |
| ey = y_tr - m1.predict(X_minus_j) |
| |
| m2 = LinearRegression().fit(X_minus_j, X_tr[:, j]) |
| ex = X_tr[:, j] - m2.predict(X_minus_j) |
| |
| slope = float(np.cov(ex, ey)[0, 1] / (np.var(ex) + 1e-12)) |
| results.append({ |
| "feature": feature_names[j], |
| "ex": ex.tolist(), |
| "ey": ey.tolist(), |
| "slope": slope, |
| "r": float(_corr(ex, ey)), |
| }) |
| return results[:6] |
|
|
|
|
| def _learning_curve_data(model, X_tr, y_tr, model_type): |
| """Train/val error vs training set size.""" |
| n = len(y_tr) |
| sizes = np.unique(np.linspace(max(5, int(n * 0.1)), n, 10).astype(int)) |
| train_scores, val_scores = [], [] |
| for s in sizes: |
| X_s, y_s = resample(X_tr, y_tr, n_samples=s, random_state=42) |
| if s < 6: |
| continue |
| X_tr2, X_va2, y_tr2, y_va2 = train_test_split(X_s, y_s, test_size=0.2, random_state=0) |
| if len(X_tr2) < 3 or len(X_va2) < 2: |
| continue |
| m = model.__class__(**model.get_params()) |
| m.fit(X_tr2, y_tr2) |
| train_scores.append(float(r2_score(y_tr2, m.predict(X_tr2)))) |
| val_scores.append(float(r2_score(y_va2, m.predict(X_va2)))) |
| valid_sizes = sizes[:len(train_scores)].tolist() |
| return {"sizes": valid_sizes, "train": train_scores, "val": val_scores} |
|
|
|
|
| def _regularization_path(X_tr, y_tr, feature_names): |
| """Coefficient paths vs log10(alpha) for Ridge and Lasso.""" |
| alphas = np.logspace(-3, 3, 60) |
| ridge_coefs = [] |
| lasso_coefs = [] |
| for a in alphas: |
| rc = Ridge(alpha=a).fit(X_tr, y_tr).coef_.flatten().tolist() |
| lc = Lasso(alpha=a, max_iter=10_000).fit(X_tr, y_tr).coef_.flatten().tolist() |
| ridge_coefs.append(rc) |
| lasso_coefs.append(lc) |
| return { |
| "alphas": np.log10(alphas).tolist(), |
| "ridge_coefs": ridge_coefs, |
| "lasso_coefs": lasso_coefs, |
| "feature_names": feature_names, |
| } |
|
|
|
|
| def _gradient_descent_path(X_1d, y, is_synthetic, lr=0.05, n_iter=80): |
| """ |
| Manually run gradient descent on a 1-D regression (β0, β1). |
| Returns the path of (β0, β1, mse) per iteration plus |
| the loss surface grid for the contour plot. |
| """ |
| |
| if len(X_1d) > 300: |
| idx = np.random.RandomState(0).choice(len(X_1d), 300, replace=False) |
| X_1d, y = X_1d[idx], y[idx] |
|
|
| n = len(X_1d) |
| b0, b1 = 0.0, 0.0 |
| path = [] |
|
|
| for _ in range(n_iter): |
| y_hat = b0 + b1 * X_1d |
| resid = y_hat - y |
| mse = float(np.mean(resid**2)) |
| path.append({"b0": round(b0, 5), "b1": round(b1, 5), "mse": round(mse, 5)}) |
| db0 = (2 / n) * resid.sum() |
| db1 = (2 / n) * (resid * X_1d).sum() |
| b0 -= lr * db0 |
| b1 -= lr * db1 |
|
|
| |
| b0_final = path[-1]["b0"] |
| b1_final = path[-1]["b1"] |
| b0_grid = np.linspace(b0_final - 3, b0_final + 3, 30) |
| b1_grid = np.linspace(b1_final - 3, b1_final + 3, 30) |
| Z = [] |
| for b0v in b0_grid: |
| row = [] |
| for b1v in b1_grid: |
| y_h = b0v + b1v * X_1d |
| row.append(round(float(np.mean((y_h - y)**2)), 4)) |
| Z.append(row) |
|
|
| return { |
| "path": path, |
| "b0_grid": b0_grid.tolist(), |
| "b1_grid": b1_grid.tolist(), |
| "Z": Z, |
| "x_data": X_1d.tolist(), |
| "y_data": y.tolist(), |
| } |
|
|
|
|
| def _permutation_importance(model, X_te, y_te, feature_names, n_repeats=20): |
| """Drop in R² when each feature is permuted.""" |
| base_r2 = r2_score(y_te, model.predict(X_te)) |
| rng = np.random.RandomState(42) |
| results = [] |
| for j in range(X_te.shape[1]): |
| drops = [] |
| for _ in range(n_repeats): |
| X_perm = X_te.copy() |
| X_perm[:, j] = rng.permutation(X_perm[:, j]) |
| drops.append(base_r2 - r2_score(y_te, model.predict(X_perm))) |
| results.append({ |
| "feature": feature_names[j], |
| "mean": float(np.mean(drops)), |
| "std": float(np.std(drops)), |
| }) |
| results.sort(key=lambda x: x["mean"], reverse=True) |
| return results |
|
|