""" models/linear_regression.py All training logic, metric computation, and plot-data preparation for the Linear Regression page. """ import numpy as np from scipy import stats from sklearn.linear_model import LinearRegression, Ridge, Lasso, SGDRegressor from sklearn.model_selection import train_test_split, learning_curve from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error from sklearn.utils import resample from pydantic import BaseModel from typing import Optional, List from data.datasets import ( SyntheticConfig, RealDatasetConfig, generate_synthetic, load_real_dataset, SYNTHETIC_DATASETS, ) # ── Request schema ──────────────────────────────────────────────────────────── class TrainRequest(BaseModel): dataset_type: str # "synthetic" | "real" synthetic_config: Optional[SyntheticConfig] = None real_config: Optional[RealDatasetConfig] = None test_size: float = 0.20 model_type: str = "linear" # "linear" | "ridge" | "lasso" alpha: float = 1.0 feature_x: Optional[str] = None # index (str) for scatter x-axis # ── Helpers ─────────────────────────────────────────────────────────────────── def _build_model(model_type: str, alpha: float): if model_type == "ridge": return Ridge(alpha=alpha) elif model_type == "lasso": return Lasso(alpha=alpha, max_iter=10_000) return LinearRegression() def _corr(a, b) -> float: """Pearson r between two arrays.""" a, b = np.asarray(a), np.asarray(b) da, db = a - a.mean(), b - b.mean() denom = np.sqrt((da**2).sum() * (db**2).sum()) + 1e-12 return float((da * db).sum() / denom) def _mape(y_true, y_pred) -> float: """Mean Absolute Percentage Error (%). Returns nan when all targets are zero.""" y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) mask = np.abs(y_true) > 1e-8 if not mask.any(): return float("nan") return float(np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100) # ── Main training function ──────────────────────────────────────────────────── def run_training(req: TrainRequest) -> dict: """ Full training pipeline. Returns a dict with all data needed by the frontend (metrics, scatter, diagnostics, new plots). """ # ── 1. Load data ────────────────────────────────────────────────────────── is_synthetic = req.dataset_type == "synthetic" if is_synthetic: cfg = req.synthetic_config or SyntheticConfig(dataset_type="linear") X_1d, y = generate_synthetic(cfg) X = X_1d.reshape(-1, 1) feature_names = ["x"] else: rc = req.real_config X, y, feature_names = load_real_dataset(rc.dataset_name) X_1d = None # ── 2. Split ────────────────────────────────────────────────────────────── idx_all = np.arange(len(y)) idx_tr, idx_te = train_test_split(idx_all, test_size=req.test_size, random_state=42) X_tr_raw, X_te_raw = X[idx_tr], X[idx_te] y_tr, y_te = y[idx_tr], y[idx_te] # Scale for real datasets scaler = None if not is_synthetic: scaler = StandardScaler() X_tr = scaler.fit_transform(X_tr_raw) X_te = scaler.transform(X_te_raw) else: X_tr, X_te = X_tr_raw, X_te_raw # ── 3. Train ────────────────────────────────────────────────────────────── model = _build_model(req.model_type, req.alpha) model.fit(X_tr, y_tr) y_pred_tr = model.predict(X_tr) y_pred_te = model.predict(X_te) # ── 4. Basic metrics ────────────────────────────────────────────────────── residuals = y_te - y_pred_te fitted = y_pred_te metrics = { "r2_train": float(r2_score(y_tr, y_pred_tr)), "r2_test": float(r2_score(y_te, y_pred_te)), "rmse_train": float(np.sqrt(mean_squared_error(y_tr, y_pred_tr))), "rmse_test": float(np.sqrt(mean_squared_error(y_te, y_pred_te))), "mae_train": float(mean_absolute_error(y_tr, y_pred_tr)), "mae_test": float(mean_absolute_error(y_te, y_pred_te)), "mape_train": _mape(y_tr, y_pred_tr), "mape_test": _mape(y_te, y_pred_te), "mse_train": float(mean_squared_error(y_tr, y_pred_tr)), "mse_test": float(mean_squared_error(y_te, y_pred_te)), "n_train": int(len(y_tr)), "n_test": int(len(y_te)), } # ── 5. Coefficients ─────────────────────────────────────────────────────── coef_arr = model.coef_.flatten() coefs = {feature_names[i]: float(coef_arr[i]) for i in range(len(feature_names))} coefs["intercept"] = float(model.intercept_) # OLS standard errors & confidence intervals (only for plain LinearRegression) coef_ci = {} if req.model_type == "linear": coef_ci = _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model) # ── 6. Scatter data ─────────────────────────────────────────────────────── scatter = _scatter_data( is_synthetic, X_1d, X, y, idx_tr, idx_te, model, feature_names, req.feature_x, X_tr_raw if not is_synthetic else None, rc.dataset_name if not is_synthetic else None, ) # ── 7. Diagnostic plots ─────────────────────────────────────────────────── sorted_res = np.sort(residuals) n_res = len(sorted_res) theoretical = stats.norm.ppf(np.linspace(0.01, 0.99, n_res)).tolist() sw_stat, sw_p = stats.shapiro(residuals[:min(5000, len(residuals))]) rvf = {"fitted": fitted.tolist(), "residuals": residuals.tolist()} qq = {"theoretical": theoretical, "sample": sorted_res.tolist()} sl = {"fitted": fitted.tolist(), "sqrt_abs_resid": np.sqrt(np.abs(residuals)).tolist()} avp = {"actual": y_te.tolist(), "predicted": y_pred_te.tolist()} # ── 8. Cook's Distance ──────────────────────────────────────────────────── cooks = _cooks_distance(X_te, y_te, y_pred_te, len(feature_names) + 1) # ── 9. Leverage (hat matrix diagonal) ──────────────────────────────────── leverage = _leverage(X_te) # ── 10. Partial Regression plots (real datasets only) ──────────────────── partial_regression = [] if not is_synthetic and X.shape[1] > 1: partial_regression = _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names) # ── 11. Learning Curve ──────────────────────────────────────────────────── lc = _learning_curve_data( _build_model(req.model_type, req.alpha), X_tr, y_tr, req.model_type ) # ── 12. Regularization Path (Ridge / Lasso only) ────────────────────────── reg_path = {} if req.model_type in ("ridge", "lasso") or True: # always compute both reg_path = _regularization_path(X_tr, y_tr, feature_names) # ── 13. Gradient Descent animation data ────────────────────────────────── gd = _gradient_descent_path( X_1d[idx_tr] if is_synthetic else X_tr[:, 0], y_tr, is_synthetic ) # ── 14. Permutation Feature Importance ─────────────────────────────────── perm_imp = _permutation_importance(model, X_te, y_te, feature_names) return { "ok": True, "metrics": metrics, "coefs": coefs, "coef_ci": coef_ci, "scatter": scatter, "avp": avp, "rvf": rvf, "sl": sl, "qq": qq, "shapiro": {"stat": float(sw_stat), "p": float(sw_p), "normal": bool(sw_p > 0.05)}, "cooks": cooks, "leverage": leverage, "partial_regression": partial_regression, "learning_curve": lc, "reg_path": reg_path, "gradient_descent": gd, "perm_importance": perm_imp, "feature_names": feature_names, "is_synthetic": is_synthetic, } # ── Plot-data helpers ───────────────────────────────────────────────────────── def _scatter_data(is_synthetic, X_1d, X, y, idx_tr, idx_te, model, feature_names, feature_x_str, X_tr_raw, dataset_name): if is_synthetic: x_range = np.linspace(X_1d.min(), X_1d.max(), 300) y_line = model.predict(x_range.reshape(-1, 1)).tolist() return { "x_train": X_1d[idx_tr].tolist(), "y_train": y[idx_tr].tolist(), "x_test": X_1d[idx_te].tolist(), "y_test": y[idx_te].tolist(), "x_line": x_range.tolist(), "y_line": y_line, "feature_names": feature_names, } else: fx_idx = int(feature_x_str) if feature_x_str and feature_x_str.isdigit() else 0 return { "x_train": X[idx_tr, fx_idx].tolist(), "y_train": y[idx_tr].tolist(), "x_test": X[idx_te, fx_idx].tolist(), "y_test": y[idx_te].tolist(), "feature_names": feature_names, "fx_name": feature_names[fx_idx], "fx_idx": fx_idx, } def _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model): """Compute standard errors and 95% CIs for OLS coefficients.""" n, p = X_tr.shape resid = y_tr - y_pred_tr s2 = (resid**2).sum() / max(n - p - 1, 1) try: X_b = np.column_stack([np.ones(n), X_tr]) cov = s2 * np.linalg.pinv(X_b.T @ X_b) se = np.sqrt(np.diag(cov)) t_cr = stats.t.ppf(0.975, df=max(n - p - 1, 1)) coef_full = np.concatenate([[model.intercept_], model.coef_.flatten()]) names = ["intercept"] + list(feature_names) result = {} for i, name in enumerate(names): result[name] = { "coef": float(coef_full[i]), "se": float(se[i]), "ci_lo": float(coef_full[i] - t_cr * se[i]), "ci_hi": float(coef_full[i] + t_cr * se[i]), "t_stat": float(coef_full[i] / (se[i] + 1e-12)), "p_val": float(2 * stats.t.sf(abs(coef_full[i] / (se[i] + 1e-12)), df=max(n-p-1,1))), } return result except Exception: return {} def _cooks_distance(X_te, y_te, y_pred_te, p): """Approximate Cook's Distance for test set points.""" n = len(y_te) resid = y_te - y_pred_te mse = float(np.mean(resid**2)) leverage = _leverage(X_te)["h"] h = np.asarray(leverage) h = np.clip(h, 1e-6, 1 - 1e-6) d = (resid**2 / (p * mse + 1e-12)) * (h / (1 - h)**2) threshold = 4 / max(n, 1) return { "index": list(range(n)), "distance": d.tolist(), "threshold": float(threshold), "influential": [int(i) for i, v in enumerate(d) if v > threshold], } def _leverage(X_te): """Hat matrix diagonal h_ii for test set.""" n = X_te.shape[0] X_b = np.column_stack([np.ones(n), X_te]) try: H = X_b @ np.linalg.pinv(X_b.T @ X_b) @ X_b.T h = np.diag(H).tolist() except Exception: h = [1.0 / n] * n return {"h": h, "threshold": float(2 * (X_te.shape[1] + 1) / max(n, 1))} def _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names): """Added-variable plots: residuals of y~X_{-j} vs residuals of x_j~X_{-j}.""" n, p = X_tr.shape if p < 2: return [] results = [] for j in range(p): X_minus_j = np.delete(X_tr, j, axis=1) # residuals of y on X_{-j} m1 = LinearRegression().fit(X_minus_j, y_tr) ey = y_tr - m1.predict(X_minus_j) # residuals of x_j on X_{-j} m2 = LinearRegression().fit(X_minus_j, X_tr[:, j]) ex = X_tr[:, j] - m2.predict(X_minus_j) # slope = partial regression coefficient slope = float(np.cov(ex, ey)[0, 1] / (np.var(ex) + 1e-12)) results.append({ "feature": feature_names[j], "ex": ex.tolist(), "ey": ey.tolist(), "slope": slope, "r": float(_corr(ex, ey)), }) return results[:6] # cap at 6 to avoid frontend overload def _learning_curve_data(model, X_tr, y_tr, model_type): """Train/val error vs training set size.""" n = len(y_tr) sizes = np.unique(np.linspace(max(5, int(n * 0.1)), n, 10).astype(int)) train_scores, val_scores = [], [] for s in sizes: X_s, y_s = resample(X_tr, y_tr, n_samples=s, random_state=42) if s < 6: continue X_tr2, X_va2, y_tr2, y_va2 = train_test_split(X_s, y_s, test_size=0.2, random_state=0) if len(X_tr2) < 3 or len(X_va2) < 2: continue m = model.__class__(**model.get_params()) m.fit(X_tr2, y_tr2) train_scores.append(float(r2_score(y_tr2, m.predict(X_tr2)))) val_scores.append(float(r2_score(y_va2, m.predict(X_va2)))) valid_sizes = sizes[:len(train_scores)].tolist() return {"sizes": valid_sizes, "train": train_scores, "val": val_scores} def _regularization_path(X_tr, y_tr, feature_names): """Coefficient paths vs log10(alpha) for Ridge and Lasso.""" alphas = np.logspace(-3, 3, 60) ridge_coefs = [] lasso_coefs = [] for a in alphas: rc = Ridge(alpha=a).fit(X_tr, y_tr).coef_.flatten().tolist() lc = Lasso(alpha=a, max_iter=10_000).fit(X_tr, y_tr).coef_.flatten().tolist() ridge_coefs.append(rc) lasso_coefs.append(lc) return { "alphas": np.log10(alphas).tolist(), "ridge_coefs": ridge_coefs, # list[list[float]] shape=(60, n_features) "lasso_coefs": lasso_coefs, "feature_names": feature_names, } def _gradient_descent_path(X_1d, y, is_synthetic, lr=0.05, n_iter=80): """ Manually run gradient descent on a 1-D regression (β0, β1). Returns the path of (β0, β1, mse) per iteration plus the loss surface grid for the contour plot. """ # use at most 300 points for speed if len(X_1d) > 300: idx = np.random.RandomState(0).choice(len(X_1d), 300, replace=False) X_1d, y = X_1d[idx], y[idx] n = len(X_1d) b0, b1 = 0.0, 0.0 path = [] for _ in range(n_iter): y_hat = b0 + b1 * X_1d resid = y_hat - y mse = float(np.mean(resid**2)) path.append({"b0": round(b0, 5), "b1": round(b1, 5), "mse": round(mse, 5)}) db0 = (2 / n) * resid.sum() db1 = (2 / n) * (resid * X_1d).sum() b0 -= lr * db0 b1 -= lr * db1 # Loss surface: grid of (b0, b1) → MSE b0_final = path[-1]["b0"] b1_final = path[-1]["b1"] b0_grid = np.linspace(b0_final - 3, b0_final + 3, 30) b1_grid = np.linspace(b1_final - 3, b1_final + 3, 30) Z = [] for b0v in b0_grid: row = [] for b1v in b1_grid: y_h = b0v + b1v * X_1d row.append(round(float(np.mean((y_h - y)**2)), 4)) Z.append(row) return { "path": path, "b0_grid": b0_grid.tolist(), "b1_grid": b1_grid.tolist(), "Z": Z, "x_data": X_1d.tolist(), "y_data": y.tolist(), } def _permutation_importance(model, X_te, y_te, feature_names, n_repeats=20): """Drop in R² when each feature is permuted.""" base_r2 = r2_score(y_te, model.predict(X_te)) rng = np.random.RandomState(42) results = [] for j in range(X_te.shape[1]): drops = [] for _ in range(n_repeats): X_perm = X_te.copy() X_perm[:, j] = rng.permutation(X_perm[:, j]) drops.append(base_r2 - r2_score(y_te, model.predict(X_perm))) results.append({ "feature": feature_names[j], "mean": float(np.mean(drops)), "std": float(np.std(drops)), }) results.sort(key=lambda x: x["mean"], reverse=True) return results