Spaces:
Sleeping
Sleeping
| """Background model trainer with MLflow tracking.""" | |
| import os | |
| import time | |
| import uuid | |
| import threading | |
| import numpy as np | |
| from datetime import datetime | |
| # Allow override via env var so Airflow tasks (different CWD) hit the same DB | |
| _MLFLOW_URI = os.environ.get("MLFLOW_TRACKING_URI", "sqlite:///mlflow.db") | |
| import mlflow | |
| import mlflow.sklearn | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| from sklearn.metrics import ( | |
| accuracy_score, f1_score, precision_score, recall_score, | |
| r2_score, mean_absolute_error, mean_squared_error, | |
| confusion_matrix, classification_report, | |
| ) | |
| from mlops.datasets import load_dataset | |
| from mlops.algorithms import get_algorithm, ALGORITHMS | |
| # ── Shared job state ────────────────────────────────────────────────────────── | |
| training_jobs: dict = {} | |
| automl_jobs: dict = {} | |
| _lock = threading.Lock() | |
| # ── Internal helpers ────────────────────────────────────────────────────────── | |
| def _get_or_create_experiment(name: str) -> str: | |
| mlflow.set_tracking_uri(_MLFLOW_URI) | |
| exp = mlflow.get_experiment_by_name(name) | |
| if exp is None: | |
| return mlflow.create_experiment(name) | |
| return exp.experiment_id | |
| def _update_job(store: dict, job_id: str, **kwargs): | |
| with _lock: | |
| store[job_id].update(kwargs) | |
| def _classification_metrics(y_test, y_pred) -> dict: | |
| return { | |
| "accuracy": round(float(accuracy_score(y_test, y_pred)), 4), | |
| "f1_score": round(float(f1_score(y_test, y_pred, average="weighted", zero_division=0)), 4), | |
| "precision": round(float(precision_score(y_test, y_pred, average="weighted", zero_division=0)), 4), | |
| "recall": round(float(recall_score(y_test, y_pred, average="weighted", zero_division=0)), 4), | |
| } | |
| def _regression_metrics(y_test, y_pred) -> dict: | |
| mse = float(mean_squared_error(y_test, y_pred)) | |
| return { | |
| "r2_score": round(float(r2_score(y_test, y_pred)), 4), | |
| "mae": round(float(mean_absolute_error(y_test, y_pred)), 4), | |
| "mse": round(mse, 4), | |
| "rmse": round(float(np.sqrt(mse)), 4), | |
| } | |
| # ── Single training run ─────────────────────────────────────────────────────── | |
| def _do_train(job_id: str, dataset_name: str, algorithm_name: str, | |
| algorithm_category: str, task_type: str, custom_params: dict | None): | |
| """Executed in a daemon thread; updates training_jobs[job_id] in place.""" | |
| start_time = time.time() | |
| try: | |
| _update_job(training_jobs, job_id, status="running", progress=5) | |
| mlflow.set_tracking_uri(_MLFLOW_URI) | |
| # 1. Load data | |
| X_train, X_test, y_train, y_test, meta = load_dataset(dataset_name) | |
| _update_job(training_jobs, job_id, progress=20, dataset_meta=meta) | |
| # 2. Algorithm config | |
| algo_cfg = get_algorithm(task_type, algorithm_category, algorithm_name) | |
| params = {**algo_cfg["params"], **(custom_params or {})} | |
| # 3. Pre-process | |
| scaler = StandardScaler() | |
| X_train_s = scaler.fit_transform(X_train) | |
| X_test_s = scaler.transform(X_test) | |
| # Handle NB algorithms that can't take negative inputs | |
| if "Naive Bayes" in algorithm_name or "Complement" in algorithm_name: | |
| from sklearn.preprocessing import MinMaxScaler | |
| mms = MinMaxScaler() | |
| X_train_s = mms.fit_transform(X_train) | |
| X_test_s = mms.transform(X_test) | |
| _update_job(training_jobs, job_id, progress=35) | |
| # 4. Train inside an MLflow run | |
| exp_id = _get_or_create_experiment(dataset_name) | |
| with mlflow.start_run(experiment_id=exp_id, | |
| run_name=f"{algorithm_name} — {dataset_name}") as run: | |
| run_id = run.info.run_id | |
| _update_job(training_jobs, job_id, mlflow_run_id=run_id, progress=40) | |
| mlflow.set_tags({ | |
| "algorithm": algorithm_name, | |
| "category": algorithm_category, | |
| "dataset": dataset_name, | |
| "task_type": task_type, | |
| "job_id": job_id, | |
| }) | |
| mlflow.log_params({"algorithm": algorithm_name, | |
| "category": algorithm_category, | |
| "dataset": dataset_name, | |
| **{k: str(v) for k, v in params.items()}}) | |
| _update_job(training_jobs, job_id, progress=50) | |
| model = algo_cfg["class"](**params) | |
| model.fit(X_train_s, y_train) | |
| _update_job(training_jobs, job_id, progress=75) | |
| y_pred = model.predict(X_test_s) | |
| if task_type == "classification": | |
| metrics = _classification_metrics(y_test, y_pred) | |
| cm = confusion_matrix(y_test, y_pred).tolist() | |
| extra = {"confusion_matrix": cm, | |
| "report": classification_report(y_test, y_pred, output_dict=True, | |
| zero_division=0)} | |
| else: | |
| metrics = _regression_metrics(y_test, y_pred) | |
| extra = {"y_test_sample": y_test[:50].tolist(), | |
| "y_pred_sample": y_pred[:50].tolist()} | |
| mlflow.log_metrics(metrics) | |
| mlflow.sklearn.log_model(model, "model") | |
| _update_job(training_jobs, job_id, progress=90) | |
| duration = round(time.time() - start_time, 2) | |
| _update_job(training_jobs, job_id, | |
| status="completed", progress=100, | |
| metrics=metrics, extra=extra, | |
| duration=duration, | |
| completed_at=datetime.utcnow().isoformat()) | |
| except Exception as exc: | |
| _update_job(training_jobs, job_id, | |
| status="failed", error=str(exc), progress=0) | |
| def start_training(dataset_name: str, algorithm_name: str, | |
| algorithm_category: str, task_type: str, | |
| custom_params: dict | None = None) -> str: | |
| """Kick off a background training job and return its job_id.""" | |
| job_id = str(uuid.uuid4())[:8] | |
| with _lock: | |
| training_jobs[job_id] = { | |
| "job_id": job_id, | |
| "status": "queued", | |
| "progress": 0, | |
| "dataset": dataset_name, | |
| "algorithm": algorithm_name, | |
| "category": algorithm_category, | |
| "task_type": task_type, | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| t = threading.Thread( | |
| target=_do_train, | |
| args=(job_id, dataset_name, algorithm_name, | |
| algorithm_category, task_type, custom_params), | |
| daemon=True, | |
| ) | |
| t.start() | |
| return job_id | |
| # ── AutoML: exhaustive search across all algorithms ─────────────────────────── | |
| def _do_automl(job_id: str, dataset_name: str, task_type: str, | |
| optimize_metric: str, max_runs: int): | |
| """Run every algorithm for the chosen task and log the best.""" | |
| try: | |
| _update_job(automl_jobs, job_id, status="running", progress=2) | |
| mlflow.set_tracking_uri(_MLFLOW_URI) | |
| X_train, X_test, y_train, y_test, meta = load_dataset(dataset_name) | |
| _update_job(automl_jobs, job_id, dataset_meta=meta, progress=5) | |
| scaler = StandardScaler() | |
| X_train_s = scaler.fit_transform(X_train) | |
| X_test_s = scaler.transform(X_test) | |
| exp_id = _get_or_create_experiment(f"AutoML — {dataset_name}") | |
| # Collect all algorithms for this task | |
| all_algos = [] | |
| for cat_name, cat in ALGORITHMS[task_type].items(): | |
| for alg_name, alg_cfg in cat.items(): | |
| all_algos.append((cat_name, alg_name, alg_cfg)) | |
| if max_runs < len(all_algos): | |
| import random | |
| random.seed(42) | |
| all_algos = random.sample(all_algos, max_runs) | |
| results = [] | |
| total = len(all_algos) | |
| for idx, (cat_name, alg_name, alg_cfg) in enumerate(all_algos): | |
| _update_job(automl_jobs, job_id, | |
| progress=int(5 + 90 * idx / total), | |
| current_algo=alg_name) | |
| try: | |
| with mlflow.start_run(experiment_id=exp_id, | |
| run_name=f"AutoML: {alg_name}") as run: | |
| mlflow.set_tags({"algorithm": alg_name, "category": cat_name, | |
| "automl_job": job_id, "task_type": task_type}) | |
| # NB needs non-negative values | |
| X_tr = X_train_s | |
| X_te = X_test_s | |
| if "Naive Bayes" in alg_name or "Complement" in alg_name: | |
| from sklearn.preprocessing import MinMaxScaler | |
| mms = MinMaxScaler() | |
| X_tr = mms.fit_transform(X_train) | |
| X_te = mms.transform(X_test) | |
| model = alg_cfg["class"](**alg_cfg["params"]) | |
| t0 = time.time() | |
| model.fit(X_tr, y_train) | |
| dur = round(time.time() - t0, 2) | |
| y_pred = model.predict(X_te) | |
| if task_type == "classification": | |
| metrics = _classification_metrics(y_test, y_pred) | |
| else: | |
| metrics = _regression_metrics(y_test, y_pred) | |
| mlflow.log_params({"algorithm": alg_name, "category": cat_name}) | |
| mlflow.log_metrics(metrics) | |
| mlflow.sklearn.log_model(model, "model") | |
| results.append({ | |
| "rank": idx + 1, | |
| "algorithm": alg_name, | |
| "category": cat_name, | |
| "metrics": metrics, | |
| "duration": dur, | |
| "run_id": run.info.run_id, | |
| "color": alg_cfg.get("color", "#8b5cf6"), | |
| }) | |
| except Exception: | |
| pass # skip failed algorithms silently | |
| # Sort by optimise metric | |
| higher_is_better = optimize_metric in ("accuracy", "f1_score", "precision", | |
| "recall", "r2_score") | |
| results.sort(key=lambda r: r["metrics"].get(optimize_metric, 0), | |
| reverse=higher_is_better) | |
| for i, r in enumerate(results): | |
| r["rank"] = i + 1 | |
| _update_job(automl_jobs, job_id, | |
| status="completed", progress=100, | |
| results=results, | |
| best=results[0] if results else None, | |
| completed_at=datetime.utcnow().isoformat()) | |
| except Exception as exc: | |
| _update_job(automl_jobs, job_id, status="failed", error=str(exc)) | |
| def train_for_pipeline(dataset_name: str, task_type: str, category: str, | |
| algorithm: str, experiment_name: str = "pipeline") -> dict: | |
| """ | |
| Synchronous training helper used by Airflow pipeline tasks. | |
| Runs the full train/eval loop and returns a metrics dict. | |
| Raises RuntimeError if training fails. | |
| """ | |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler | |
| mlflow.set_tracking_uri(_MLFLOW_URI) | |
| X_train, X_test, y_train, y_test, _ = load_dataset(dataset_name) | |
| algo_cfg = get_algorithm(task_type, category, algorithm) | |
| params = algo_cfg["params"] | |
| if "Naive Bayes" in algorithm or "Complement" in algorithm: | |
| scaler = MinMaxScaler() | |
| else: | |
| scaler = StandardScaler() | |
| X_tr = scaler.fit_transform(X_train) | |
| X_te = scaler.transform(X_test) | |
| exp_id = _get_or_create_experiment(experiment_name) | |
| with mlflow.start_run(experiment_id=exp_id, | |
| run_name=f"{algorithm} — {dataset_name}") as run: | |
| mlflow.set_tags({ | |
| "algorithm": algorithm, "category": category, | |
| "dataset": dataset_name, "source": "airflow_pipeline", | |
| }) | |
| mlflow.log_params({"algorithm": algorithm, "category": category, | |
| "dataset": dataset_name}) | |
| model = algo_cfg["class"](**params) | |
| model.fit(X_tr, y_train) | |
| y_pred = model.predict(X_te) | |
| if task_type == "classification": | |
| metrics = _classification_metrics(y_test, y_pred) | |
| else: | |
| metrics = _regression_metrics(y_test, y_pred) | |
| mlflow.log_metrics(metrics) | |
| mlflow.sklearn.log_model(model, "model") | |
| return metrics | |
| def start_automl(dataset_name: str, task_type: str, | |
| optimize_metric: str = "accuracy", | |
| max_runs: int = 20) -> str: | |
| """Kick off an AutoML sweep and return the job_id.""" | |
| job_id = str(uuid.uuid4())[:8] | |
| with _lock: | |
| automl_jobs[job_id] = { | |
| "job_id": job_id, | |
| "status": "queued", | |
| "progress": 0, | |
| "dataset": dataset_name, | |
| "task_type": task_type, | |
| "metric": optimize_metric, | |
| "results": [], | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| t = threading.Thread( | |
| target=_do_automl, | |
| args=(job_id, dataset_name, task_type, optimize_metric, max_runs), | |
| daemon=True, | |
| ) | |
| t.start() | |
| return job_id | |