AutoMLOps / mlops /trainer.py
mnoorchenar's picture
Update 2026-03-25 18:13:59
edc9558
"""Background model trainer with MLflow tracking."""
import os
import time
import uuid
import threading
import numpy as np
from datetime import datetime
# Allow override via env var so Airflow tasks (different CWD) hit the same DB
_MLFLOW_URI = os.environ.get("MLFLOW_TRACKING_URI", "sqlite:///mlflow.db")
import mlflow
import mlflow.sklearn
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (
accuracy_score, f1_score, precision_score, recall_score,
r2_score, mean_absolute_error, mean_squared_error,
confusion_matrix, classification_report,
)
from mlops.datasets import load_dataset
from mlops.algorithms import get_algorithm, ALGORITHMS
# ── Shared job state ──────────────────────────────────────────────────────────
training_jobs: dict = {}
automl_jobs: dict = {}
_lock = threading.Lock()
# ── Internal helpers ──────────────────────────────────────────────────────────
def _get_or_create_experiment(name: str) -> str:
mlflow.set_tracking_uri(_MLFLOW_URI)
exp = mlflow.get_experiment_by_name(name)
if exp is None:
return mlflow.create_experiment(name)
return exp.experiment_id
def _update_job(store: dict, job_id: str, **kwargs):
with _lock:
store[job_id].update(kwargs)
def _classification_metrics(y_test, y_pred) -> dict:
return {
"accuracy": round(float(accuracy_score(y_test, y_pred)), 4),
"f1_score": round(float(f1_score(y_test, y_pred, average="weighted", zero_division=0)), 4),
"precision": round(float(precision_score(y_test, y_pred, average="weighted", zero_division=0)), 4),
"recall": round(float(recall_score(y_test, y_pred, average="weighted", zero_division=0)), 4),
}
def _regression_metrics(y_test, y_pred) -> dict:
mse = float(mean_squared_error(y_test, y_pred))
return {
"r2_score": round(float(r2_score(y_test, y_pred)), 4),
"mae": round(float(mean_absolute_error(y_test, y_pred)), 4),
"mse": round(mse, 4),
"rmse": round(float(np.sqrt(mse)), 4),
}
# ── Single training run ───────────────────────────────────────────────────────
def _do_train(job_id: str, dataset_name: str, algorithm_name: str,
algorithm_category: str, task_type: str, custom_params: dict | None):
"""Executed in a daemon thread; updates training_jobs[job_id] in place."""
start_time = time.time()
try:
_update_job(training_jobs, job_id, status="running", progress=5)
mlflow.set_tracking_uri(_MLFLOW_URI)
# 1. Load data
X_train, X_test, y_train, y_test, meta = load_dataset(dataset_name)
_update_job(training_jobs, job_id, progress=20, dataset_meta=meta)
# 2. Algorithm config
algo_cfg = get_algorithm(task_type, algorithm_category, algorithm_name)
params = {**algo_cfg["params"], **(custom_params or {})}
# 3. Pre-process
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
# Handle NB algorithms that can't take negative inputs
if "Naive Bayes" in algorithm_name or "Complement" in algorithm_name:
from sklearn.preprocessing import MinMaxScaler
mms = MinMaxScaler()
X_train_s = mms.fit_transform(X_train)
X_test_s = mms.transform(X_test)
_update_job(training_jobs, job_id, progress=35)
# 4. Train inside an MLflow run
exp_id = _get_or_create_experiment(dataset_name)
with mlflow.start_run(experiment_id=exp_id,
run_name=f"{algorithm_name}{dataset_name}") as run:
run_id = run.info.run_id
_update_job(training_jobs, job_id, mlflow_run_id=run_id, progress=40)
mlflow.set_tags({
"algorithm": algorithm_name,
"category": algorithm_category,
"dataset": dataset_name,
"task_type": task_type,
"job_id": job_id,
})
mlflow.log_params({"algorithm": algorithm_name,
"category": algorithm_category,
"dataset": dataset_name,
**{k: str(v) for k, v in params.items()}})
_update_job(training_jobs, job_id, progress=50)
model = algo_cfg["class"](**params)
model.fit(X_train_s, y_train)
_update_job(training_jobs, job_id, progress=75)
y_pred = model.predict(X_test_s)
if task_type == "classification":
metrics = _classification_metrics(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred).tolist()
extra = {"confusion_matrix": cm,
"report": classification_report(y_test, y_pred, output_dict=True,
zero_division=0)}
else:
metrics = _regression_metrics(y_test, y_pred)
extra = {"y_test_sample": y_test[:50].tolist(),
"y_pred_sample": y_pred[:50].tolist()}
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model")
_update_job(training_jobs, job_id, progress=90)
duration = round(time.time() - start_time, 2)
_update_job(training_jobs, job_id,
status="completed", progress=100,
metrics=metrics, extra=extra,
duration=duration,
completed_at=datetime.utcnow().isoformat())
except Exception as exc:
_update_job(training_jobs, job_id,
status="failed", error=str(exc), progress=0)
def start_training(dataset_name: str, algorithm_name: str,
algorithm_category: str, task_type: str,
custom_params: dict | None = None) -> str:
"""Kick off a background training job and return its job_id."""
job_id = str(uuid.uuid4())[:8]
with _lock:
training_jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"progress": 0,
"dataset": dataset_name,
"algorithm": algorithm_name,
"category": algorithm_category,
"task_type": task_type,
"created_at": datetime.utcnow().isoformat(),
}
t = threading.Thread(
target=_do_train,
args=(job_id, dataset_name, algorithm_name,
algorithm_category, task_type, custom_params),
daemon=True,
)
t.start()
return job_id
# ── AutoML: exhaustive search across all algorithms ───────────────────────────
def _do_automl(job_id: str, dataset_name: str, task_type: str,
optimize_metric: str, max_runs: int):
"""Run every algorithm for the chosen task and log the best."""
try:
_update_job(automl_jobs, job_id, status="running", progress=2)
mlflow.set_tracking_uri(_MLFLOW_URI)
X_train, X_test, y_train, y_test, meta = load_dataset(dataset_name)
_update_job(automl_jobs, job_id, dataset_meta=meta, progress=5)
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
exp_id = _get_or_create_experiment(f"AutoML — {dataset_name}")
# Collect all algorithms for this task
all_algos = []
for cat_name, cat in ALGORITHMS[task_type].items():
for alg_name, alg_cfg in cat.items():
all_algos.append((cat_name, alg_name, alg_cfg))
if max_runs < len(all_algos):
import random
random.seed(42)
all_algos = random.sample(all_algos, max_runs)
results = []
total = len(all_algos)
for idx, (cat_name, alg_name, alg_cfg) in enumerate(all_algos):
_update_job(automl_jobs, job_id,
progress=int(5 + 90 * idx / total),
current_algo=alg_name)
try:
with mlflow.start_run(experiment_id=exp_id,
run_name=f"AutoML: {alg_name}") as run:
mlflow.set_tags({"algorithm": alg_name, "category": cat_name,
"automl_job": job_id, "task_type": task_type})
# NB needs non-negative values
X_tr = X_train_s
X_te = X_test_s
if "Naive Bayes" in alg_name or "Complement" in alg_name:
from sklearn.preprocessing import MinMaxScaler
mms = MinMaxScaler()
X_tr = mms.fit_transform(X_train)
X_te = mms.transform(X_test)
model = alg_cfg["class"](**alg_cfg["params"])
t0 = time.time()
model.fit(X_tr, y_train)
dur = round(time.time() - t0, 2)
y_pred = model.predict(X_te)
if task_type == "classification":
metrics = _classification_metrics(y_test, y_pred)
else:
metrics = _regression_metrics(y_test, y_pred)
mlflow.log_params({"algorithm": alg_name, "category": cat_name})
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model")
results.append({
"rank": idx + 1,
"algorithm": alg_name,
"category": cat_name,
"metrics": metrics,
"duration": dur,
"run_id": run.info.run_id,
"color": alg_cfg.get("color", "#8b5cf6"),
})
except Exception:
pass # skip failed algorithms silently
# Sort by optimise metric
higher_is_better = optimize_metric in ("accuracy", "f1_score", "precision",
"recall", "r2_score")
results.sort(key=lambda r: r["metrics"].get(optimize_metric, 0),
reverse=higher_is_better)
for i, r in enumerate(results):
r["rank"] = i + 1
_update_job(automl_jobs, job_id,
status="completed", progress=100,
results=results,
best=results[0] if results else None,
completed_at=datetime.utcnow().isoformat())
except Exception as exc:
_update_job(automl_jobs, job_id, status="failed", error=str(exc))
def train_for_pipeline(dataset_name: str, task_type: str, category: str,
algorithm: str, experiment_name: str = "pipeline") -> dict:
"""
Synchronous training helper used by Airflow pipeline tasks.
Runs the full train/eval loop and returns a metrics dict.
Raises RuntimeError if training fails.
"""
from sklearn.preprocessing import StandardScaler, MinMaxScaler
mlflow.set_tracking_uri(_MLFLOW_URI)
X_train, X_test, y_train, y_test, _ = load_dataset(dataset_name)
algo_cfg = get_algorithm(task_type, category, algorithm)
params = algo_cfg["params"]
if "Naive Bayes" in algorithm or "Complement" in algorithm:
scaler = MinMaxScaler()
else:
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_train)
X_te = scaler.transform(X_test)
exp_id = _get_or_create_experiment(experiment_name)
with mlflow.start_run(experiment_id=exp_id,
run_name=f"{algorithm}{dataset_name}") as run:
mlflow.set_tags({
"algorithm": algorithm, "category": category,
"dataset": dataset_name, "source": "airflow_pipeline",
})
mlflow.log_params({"algorithm": algorithm, "category": category,
"dataset": dataset_name})
model = algo_cfg["class"](**params)
model.fit(X_tr, y_train)
y_pred = model.predict(X_te)
if task_type == "classification":
metrics = _classification_metrics(y_test, y_pred)
else:
metrics = _regression_metrics(y_test, y_pred)
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model")
return metrics
def start_automl(dataset_name: str, task_type: str,
optimize_metric: str = "accuracy",
max_runs: int = 20) -> str:
"""Kick off an AutoML sweep and return the job_id."""
job_id = str(uuid.uuid4())[:8]
with _lock:
automl_jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"progress": 0,
"dataset": dataset_name,
"task_type": task_type,
"metric": optimize_metric,
"results": [],
"created_at": datetime.utcnow().isoformat(),
}
t = threading.Thread(
target=_do_automl,
args=(job_id, dataset_name, task_type, optimize_metric, max_runs),
daemon=True,
)
t.start()
return job_id