ML_course / models /linear_regression.py
livieris's picture
Upload 15 files
be64da1 verified
"""
models/linear_regression.py
All training logic, metric computation, and plot-data preparation
for the Linear Regression page.
"""
import numpy as np
from scipy import stats
from sklearn.linear_model import LinearRegression, Ridge, Lasso, SGDRegressor
from sklearn.model_selection import train_test_split, learning_curve
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.utils import resample
from pydantic import BaseModel
from typing import Optional, List
from data.datasets import (
SyntheticConfig, RealDatasetConfig,
generate_synthetic, load_real_dataset,
SYNTHETIC_DATASETS,
)
# ── Request schema ────────────────────────────────────────────────────────────
class TrainRequest(BaseModel):
dataset_type: str # "synthetic" | "real"
synthetic_config: Optional[SyntheticConfig] = None
real_config: Optional[RealDatasetConfig] = None
test_size: float = 0.20
model_type: str = "linear" # "linear" | "ridge" | "lasso"
alpha: float = 1.0
feature_x: Optional[str] = None # index (str) for scatter x-axis
# ── Helpers ───────────────────────────────────────────────────────────────────
def _build_model(model_type: str, alpha: float):
if model_type == "ridge":
return Ridge(alpha=alpha)
elif model_type == "lasso":
return Lasso(alpha=alpha, max_iter=10_000)
return LinearRegression()
def _corr(a, b) -> float:
"""Pearson r between two arrays."""
a, b = np.asarray(a), np.asarray(b)
da, db = a - a.mean(), b - b.mean()
denom = np.sqrt((da**2).sum() * (db**2).sum()) + 1e-12
return float((da * db).sum() / denom)
def _mape(y_true, y_pred) -> float:
"""Mean Absolute Percentage Error (%). Returns nan when all targets are zero."""
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
mask = np.abs(y_true) > 1e-8
if not mask.any():
return float("nan")
return float(np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100)
# ── Main training function ────────────────────────────────────────────────────
def run_training(req: TrainRequest) -> dict:
"""
Full training pipeline. Returns a dict with all data needed
by the frontend (metrics, scatter, diagnostics, new plots).
"""
# ── 1. Load data ──────────────────────────────────────────────────────────
is_synthetic = req.dataset_type == "synthetic"
if is_synthetic:
cfg = req.synthetic_config or SyntheticConfig(dataset_type="linear")
X_1d, y = generate_synthetic(cfg)
X = X_1d.reshape(-1, 1)
feature_names = ["x"]
else:
rc = req.real_config
X, y, feature_names = load_real_dataset(rc.dataset_name)
X_1d = None
# ── 2. Split ──────────────────────────────────────────────────────────────
idx_all = np.arange(len(y))
idx_tr, idx_te = train_test_split(idx_all, test_size=req.test_size, random_state=42)
X_tr_raw, X_te_raw = X[idx_tr], X[idx_te]
y_tr, y_te = y[idx_tr], y[idx_te]
# Scale for real datasets
scaler = None
if not is_synthetic:
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_tr_raw)
X_te = scaler.transform(X_te_raw)
else:
X_tr, X_te = X_tr_raw, X_te_raw
# ── 3. Train ──────────────────────────────────────────────────────────────
model = _build_model(req.model_type, req.alpha)
model.fit(X_tr, y_tr)
y_pred_tr = model.predict(X_tr)
y_pred_te = model.predict(X_te)
# ── 4. Basic metrics ──────────────────────────────────────────────────────
residuals = y_te - y_pred_te
fitted = y_pred_te
metrics = {
"r2_train": float(r2_score(y_tr, y_pred_tr)),
"r2_test": float(r2_score(y_te, y_pred_te)),
"rmse_train": float(np.sqrt(mean_squared_error(y_tr, y_pred_tr))),
"rmse_test": float(np.sqrt(mean_squared_error(y_te, y_pred_te))),
"mae_train": float(mean_absolute_error(y_tr, y_pred_tr)),
"mae_test": float(mean_absolute_error(y_te, y_pred_te)),
"mape_train": _mape(y_tr, y_pred_tr),
"mape_test": _mape(y_te, y_pred_te),
"mse_train": float(mean_squared_error(y_tr, y_pred_tr)),
"mse_test": float(mean_squared_error(y_te, y_pred_te)),
"n_train": int(len(y_tr)),
"n_test": int(len(y_te)),
}
# ── 5. Coefficients ───────────────────────────────────────────────────────
coef_arr = model.coef_.flatten()
coefs = {feature_names[i]: float(coef_arr[i]) for i in range(len(feature_names))}
coefs["intercept"] = float(model.intercept_)
# OLS standard errors & confidence intervals (only for plain LinearRegression)
coef_ci = {}
if req.model_type == "linear":
coef_ci = _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model)
# ── 6. Scatter data ───────────────────────────────────────────────────────
scatter = _scatter_data(
is_synthetic, X_1d, X, y, idx_tr, idx_te,
model, feature_names, req.feature_x,
X_tr_raw if not is_synthetic else None,
rc.dataset_name if not is_synthetic else None,
)
# ── 7. Diagnostic plots ───────────────────────────────────────────────────
sorted_res = np.sort(residuals)
n_res = len(sorted_res)
theoretical = stats.norm.ppf(np.linspace(0.01, 0.99, n_res)).tolist()
sw_stat, sw_p = stats.shapiro(residuals[:min(5000, len(residuals))])
rvf = {"fitted": fitted.tolist(), "residuals": residuals.tolist()}
qq = {"theoretical": theoretical, "sample": sorted_res.tolist()}
sl = {"fitted": fitted.tolist(), "sqrt_abs_resid": np.sqrt(np.abs(residuals)).tolist()}
avp = {"actual": y_te.tolist(), "predicted": y_pred_te.tolist()}
# ── 8. Cook's Distance ────────────────────────────────────────────────────
cooks = _cooks_distance(X_te, y_te, y_pred_te, len(feature_names) + 1)
# ── 9. Leverage (hat matrix diagonal) ────────────────────────────────────
leverage = _leverage(X_te)
# ── 10. Partial Regression plots (real datasets only) ────────────────────
partial_regression = []
if not is_synthetic and X.shape[1] > 1:
partial_regression = _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names)
# ── 11. Learning Curve ────────────────────────────────────────────────────
lc = _learning_curve_data(
_build_model(req.model_type, req.alpha),
X_tr, y_tr, req.model_type
)
# ── 12. Regularization Path (Ridge / Lasso only) ──────────────────────────
reg_path = {}
if req.model_type in ("ridge", "lasso") or True: # always compute both
reg_path = _regularization_path(X_tr, y_tr, feature_names)
# ── 13. Gradient Descent animation data ──────────────────────────────────
gd = _gradient_descent_path(
X_1d[idx_tr] if is_synthetic else X_tr[:, 0],
y_tr, is_synthetic
)
# ── 14. Permutation Feature Importance ───────────────────────────────────
perm_imp = _permutation_importance(model, X_te, y_te, feature_names)
return {
"ok": True,
"metrics": metrics,
"coefs": coefs,
"coef_ci": coef_ci,
"scatter": scatter,
"avp": avp,
"rvf": rvf,
"sl": sl,
"qq": qq,
"shapiro": {"stat": float(sw_stat), "p": float(sw_p), "normal": bool(sw_p > 0.05)},
"cooks": cooks,
"leverage": leverage,
"partial_regression": partial_regression,
"learning_curve": lc,
"reg_path": reg_path,
"gradient_descent": gd,
"perm_importance": perm_imp,
"feature_names": feature_names,
"is_synthetic": is_synthetic,
}
# ── Plot-data helpers ─────────────────────────────────────────────────────────
def _scatter_data(is_synthetic, X_1d, X, y, idx_tr, idx_te,
model, feature_names, feature_x_str,
X_tr_raw, dataset_name):
if is_synthetic:
x_range = np.linspace(X_1d.min(), X_1d.max(), 300)
y_line = model.predict(x_range.reshape(-1, 1)).tolist()
return {
"x_train": X_1d[idx_tr].tolist(),
"y_train": y[idx_tr].tolist(),
"x_test": X_1d[idx_te].tolist(),
"y_test": y[idx_te].tolist(),
"x_line": x_range.tolist(),
"y_line": y_line,
"feature_names": feature_names,
}
else:
fx_idx = int(feature_x_str) if feature_x_str and feature_x_str.isdigit() else 0
return {
"x_train": X[idx_tr, fx_idx].tolist(),
"y_train": y[idx_tr].tolist(),
"x_test": X[idx_te, fx_idx].tolist(),
"y_test": y[idx_te].tolist(),
"feature_names": feature_names,
"fx_name": feature_names[fx_idx],
"fx_idx": fx_idx,
}
def _ols_confidence_intervals(X_tr, y_tr, y_pred_tr, feature_names, model):
"""Compute standard errors and 95% CIs for OLS coefficients."""
n, p = X_tr.shape
resid = y_tr - y_pred_tr
s2 = (resid**2).sum() / max(n - p - 1, 1)
try:
X_b = np.column_stack([np.ones(n), X_tr])
cov = s2 * np.linalg.pinv(X_b.T @ X_b)
se = np.sqrt(np.diag(cov))
t_cr = stats.t.ppf(0.975, df=max(n - p - 1, 1))
coef_full = np.concatenate([[model.intercept_], model.coef_.flatten()])
names = ["intercept"] + list(feature_names)
result = {}
for i, name in enumerate(names):
result[name] = {
"coef": float(coef_full[i]),
"se": float(se[i]),
"ci_lo": float(coef_full[i] - t_cr * se[i]),
"ci_hi": float(coef_full[i] + t_cr * se[i]),
"t_stat": float(coef_full[i] / (se[i] + 1e-12)),
"p_val": float(2 * stats.t.sf(abs(coef_full[i] / (se[i] + 1e-12)), df=max(n-p-1,1))),
}
return result
except Exception:
return {}
def _cooks_distance(X_te, y_te, y_pred_te, p):
"""Approximate Cook's Distance for test set points."""
n = len(y_te)
resid = y_te - y_pred_te
mse = float(np.mean(resid**2))
leverage = _leverage(X_te)["h"]
h = np.asarray(leverage)
h = np.clip(h, 1e-6, 1 - 1e-6)
d = (resid**2 / (p * mse + 1e-12)) * (h / (1 - h)**2)
threshold = 4 / max(n, 1)
return {
"index": list(range(n)),
"distance": d.tolist(),
"threshold": float(threshold),
"influential": [int(i) for i, v in enumerate(d) if v > threshold],
}
def _leverage(X_te):
"""Hat matrix diagonal h_ii for test set."""
n = X_te.shape[0]
X_b = np.column_stack([np.ones(n), X_te])
try:
H = X_b @ np.linalg.pinv(X_b.T @ X_b) @ X_b.T
h = np.diag(H).tolist()
except Exception:
h = [1.0 / n] * n
return {"h": h, "threshold": float(2 * (X_te.shape[1] + 1) / max(n, 1))}
def _partial_regression_plots(X_tr, y_tr, y_pred_tr, feature_names):
"""Added-variable plots: residuals of y~X_{-j} vs residuals of x_j~X_{-j}."""
n, p = X_tr.shape
if p < 2:
return []
results = []
for j in range(p):
X_minus_j = np.delete(X_tr, j, axis=1)
# residuals of y on X_{-j}
m1 = LinearRegression().fit(X_minus_j, y_tr)
ey = y_tr - m1.predict(X_minus_j)
# residuals of x_j on X_{-j}
m2 = LinearRegression().fit(X_minus_j, X_tr[:, j])
ex = X_tr[:, j] - m2.predict(X_minus_j)
# slope = partial regression coefficient
slope = float(np.cov(ex, ey)[0, 1] / (np.var(ex) + 1e-12))
results.append({
"feature": feature_names[j],
"ex": ex.tolist(),
"ey": ey.tolist(),
"slope": slope,
"r": float(_corr(ex, ey)),
})
return results[:6] # cap at 6 to avoid frontend overload
def _learning_curve_data(model, X_tr, y_tr, model_type):
"""Train/val error vs training set size."""
n = len(y_tr)
sizes = np.unique(np.linspace(max(5, int(n * 0.1)), n, 10).astype(int))
train_scores, val_scores = [], []
for s in sizes:
X_s, y_s = resample(X_tr, y_tr, n_samples=s, random_state=42)
if s < 6:
continue
X_tr2, X_va2, y_tr2, y_va2 = train_test_split(X_s, y_s, test_size=0.2, random_state=0)
if len(X_tr2) < 3 or len(X_va2) < 2:
continue
m = model.__class__(**model.get_params())
m.fit(X_tr2, y_tr2)
train_scores.append(float(r2_score(y_tr2, m.predict(X_tr2))))
val_scores.append(float(r2_score(y_va2, m.predict(X_va2))))
valid_sizes = sizes[:len(train_scores)].tolist()
return {"sizes": valid_sizes, "train": train_scores, "val": val_scores}
def _regularization_path(X_tr, y_tr, feature_names):
"""Coefficient paths vs log10(alpha) for Ridge and Lasso."""
alphas = np.logspace(-3, 3, 60)
ridge_coefs = []
lasso_coefs = []
for a in alphas:
rc = Ridge(alpha=a).fit(X_tr, y_tr).coef_.flatten().tolist()
lc = Lasso(alpha=a, max_iter=10_000).fit(X_tr, y_tr).coef_.flatten().tolist()
ridge_coefs.append(rc)
lasso_coefs.append(lc)
return {
"alphas": np.log10(alphas).tolist(),
"ridge_coefs": ridge_coefs, # list[list[float]] shape=(60, n_features)
"lasso_coefs": lasso_coefs,
"feature_names": feature_names,
}
def _gradient_descent_path(X_1d, y, is_synthetic, lr=0.05, n_iter=80):
"""
Manually run gradient descent on a 1-D regression (β0, β1).
Returns the path of (β0, β1, mse) per iteration plus
the loss surface grid for the contour plot.
"""
# use at most 300 points for speed
if len(X_1d) > 300:
idx = np.random.RandomState(0).choice(len(X_1d), 300, replace=False)
X_1d, y = X_1d[idx], y[idx]
n = len(X_1d)
b0, b1 = 0.0, 0.0
path = []
for _ in range(n_iter):
y_hat = b0 + b1 * X_1d
resid = y_hat - y
mse = float(np.mean(resid**2))
path.append({"b0": round(b0, 5), "b1": round(b1, 5), "mse": round(mse, 5)})
db0 = (2 / n) * resid.sum()
db1 = (2 / n) * (resid * X_1d).sum()
b0 -= lr * db0
b1 -= lr * db1
# Loss surface: grid of (b0, b1) → MSE
b0_final = path[-1]["b0"]
b1_final = path[-1]["b1"]
b0_grid = np.linspace(b0_final - 3, b0_final + 3, 30)
b1_grid = np.linspace(b1_final - 3, b1_final + 3, 30)
Z = []
for b0v in b0_grid:
row = []
for b1v in b1_grid:
y_h = b0v + b1v * X_1d
row.append(round(float(np.mean((y_h - y)**2)), 4))
Z.append(row)
return {
"path": path,
"b0_grid": b0_grid.tolist(),
"b1_grid": b1_grid.tolist(),
"Z": Z,
"x_data": X_1d.tolist(),
"y_data": y.tolist(),
}
def _permutation_importance(model, X_te, y_te, feature_names, n_repeats=20):
"""Drop in R² when each feature is permuted."""
base_r2 = r2_score(y_te, model.predict(X_te))
rng = np.random.RandomState(42)
results = []
for j in range(X_te.shape[1]):
drops = []
for _ in range(n_repeats):
X_perm = X_te.copy()
X_perm[:, j] = rng.permutation(X_perm[:, j])
drops.append(base_r2 - r2_score(y_te, model.predict(X_perm)))
results.append({
"feature": feature_names[j],
"mean": float(np.mean(drops)),
"std": float(np.std(drops)),
})
results.sort(key=lambda x: x["mean"], reverse=True)
return results