| | """Training entrypoint for fraud detection models with MLflow tracking.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import json |
| | from datetime import datetime, timezone |
| | from pathlib import Path |
| | from typing import Any |
| |
|
| | import joblib |
| | import mlflow |
| | import pandas as pd |
| | import yaml |
| | from sklearn.linear_model import LogisticRegression |
| |
|
| | from src.data_ingestion import load_data, run_data_validation |
| | from src.evaluate import calculate_metrics, rank_models, select_best_threshold |
| | from src.preprocessing import preprocess_for_training |
| |
|
| | try: |
| | from xgboost import XGBClassifier |
| | except Exception: |
| | XGBClassifier = None |
| |
|
| |
|
| | DEFAULT_CONFIG_PATH = Path("configs/train.yaml") |
| | DEFAULT_DATA_PATH = Path("data/raw/creditcard.csv") |
| | DEFAULT_MODEL_PATH = Path("models/model.pkl") |
| | DEFAULT_PREPROCESSOR_PATH = Path("models/preprocessor.pkl") |
| | DEFAULT_REPORT_PATH = Path("artifacts/model_training_report.json") |
| | DEFAULT_MODEL_REPORT_PATH = Path("artifacts/model_report.json") |
| | DEFAULT_VALIDATION_REPORT_PATH = Path("artifacts/data_validation.json") |
| |
|
| |
|
| | def load_training_config(config_path: str | Path = DEFAULT_CONFIG_PATH) -> dict[str, Any]: |
| | """Load YAML training configuration.""" |
| | config = yaml.safe_load(Path(config_path).read_text(encoding="utf-8")) or {} |
| | config.setdefault("experiment", {}) |
| | config.setdefault("training", {}) |
| | config.setdefault("mlflow", {}) |
| | return config |
| |
|
| |
|
| | def create_model(model_name: str, random_state: int) -> Any: |
| | """Create model instance from configured model name.""" |
| | if model_name == "logistic_regression": |
| | return LogisticRegression( |
| | max_iter=500, |
| | solver="lbfgs", |
| | class_weight="balanced", |
| | random_state=random_state, |
| | ) |
| |
|
| | if model_name == "xgboost": |
| | if XGBClassifier is None: |
| | raise RuntimeError("xgboost is not available in the environment") |
| | return XGBClassifier( |
| | n_estimators=300, |
| | max_depth=5, |
| | learning_rate=0.05, |
| | subsample=0.9, |
| | colsample_bytree=0.9, |
| | eval_metric="logloss", |
| | random_state=random_state, |
| | n_jobs=2, |
| | ) |
| |
|
| | raise ValueError(f"Unsupported model: {model_name}") |
| |
|
| |
|
| | def train_single_model( |
| | model_name: str, |
| | X_train: pd.DataFrame, |
| | y_train: pd.Series, |
| | X_test: pd.DataFrame, |
| | y_test: pd.Series, |
| | *, |
| | random_state: int, |
| | ) -> tuple[Any, dict[str, Any]]: |
| | """Train one model and return model + metrics.""" |
| | model = create_model(model_name, random_state=random_state) |
| | model.fit(X_train, y_train) |
| |
|
| | y_pred = model.predict(X_test) |
| | y_pred_proba = model.predict_proba(X_test)[:, 1] |
| | metrics = calculate_metrics(y_test, y_pred, y_pred_proba) |
| | return model, metrics |
| |
|
| |
|
| | def log_run_to_mlflow( |
| | *, |
| | experiment_name: str, |
| | model_name: str, |
| | params: dict[str, Any], |
| | metrics: dict[str, Any], |
| | preprocessor_path: Path, |
| | model_temp_path: Path, |
| | artifact_dir: Path, |
| | ) -> str: |
| | """Log one training run to MLflow and return run id.""" |
| | mlflow.set_experiment(experiment_name) |
| | with mlflow.start_run(run_name=model_name) as run: |
| | mlflow.log_params(params) |
| | metric_values = {k: v for k, v in metrics.items() if isinstance(v, float)} |
| | mlflow.log_metrics(metric_values) |
| |
|
| | |
| | metrics_path = artifact_dir / f"metrics_{model_name}.json" |
| | metrics_path.parent.mkdir(parents=True, exist_ok=True) |
| | metrics_path.write_text(json.dumps(metrics, indent=2), encoding="utf-8") |
| |
|
| | mlflow.log_artifact(str(preprocessor_path), artifact_path="preprocessor") |
| | mlflow.log_artifact(str(model_temp_path), artifact_path="model") |
| | mlflow.log_artifact(str(metrics_path), artifact_path="metrics") |
| |
|
| | return run.info.run_id |
| |
|
| |
|
| | def save_model(model: Any, output_path: str | Path = DEFAULT_MODEL_PATH) -> Path: |
| | """Save model artifact to disk.""" |
| | path = Path(output_path) |
| | path.parent.mkdir(parents=True, exist_ok=True) |
| | joblib.dump(model, path) |
| | return path |
| |
|
| |
|
| | def run_training_pipeline( |
| | *, |
| | config_path: str | Path = DEFAULT_CONFIG_PATH, |
| | data_path: str | Path = DEFAULT_DATA_PATH, |
| | model_path: str | Path = DEFAULT_MODEL_PATH, |
| | preprocessor_path: str | Path = DEFAULT_PREPROCESSOR_PATH, |
| | report_path: str | Path = DEFAULT_REPORT_PATH, |
| | model_report_path: str | Path = DEFAULT_MODEL_REPORT_PATH, |
| | validation_report_path: str | Path = DEFAULT_VALIDATION_REPORT_PATH, |
| | ) -> dict[str, Any]: |
| | """Execute end-to-end training and experiment tracking pipeline.""" |
| | config = load_training_config(config_path) |
| |
|
| | experiment_name = config["experiment"].get("name", "fraud-detection-baseline") |
| | tracking_uri = config["mlflow"].get("tracking_uri", "file:./mlruns") |
| | mlflow.set_tracking_uri(tracking_uri) |
| |
|
| | training_cfg = config["training"] |
| | random_state = int(training_cfg.get("random_state", 42)) |
| | test_size = float(training_cfg.get("test_size", 0.2)) |
| | imbalance_method = str(training_cfg.get("imbalance_method", "class_weight")) |
| | models = training_cfg.get("models") or [training_cfg.get("model", "logistic_regression")] |
| | threshold_cfg = config.get("threshold", {}) |
| | min_recall_target = float(threshold_cfg.get("min_recall_target", 0.90)) |
| | threshold_grid_size = int(threshold_cfg.get("grid_size", 99)) |
| | threshold_min = float(threshold_cfg.get("min_threshold", 0.01)) |
| | threshold_max = float(threshold_cfg.get("max_threshold", 0.99)) |
| |
|
| | run_data_validation(file_path=data_path, report_path=validation_report_path) |
| | raw_df = load_data(data_path) |
| | prep = preprocess_for_training( |
| | raw_df, |
| | test_size=test_size, |
| | random_state=random_state, |
| | imbalance_method=imbalance_method, |
| | preprocessor_path=preprocessor_path, |
| | ) |
| |
|
| | results: list[dict[str, Any]] = [] |
| | skipped_models: list[dict[str, str]] = [] |
| | artifact_dir = Path(report_path).parent |
| | artifact_dir.mkdir(parents=True, exist_ok=True) |
| | preprocessor_path_obj = Path(preprocessor_path) |
| | for model_name in models: |
| | try: |
| | model, metrics = train_single_model( |
| | model_name=model_name, |
| | X_train=prep["X_train"], |
| | y_train=prep["y_train"], |
| | X_test=prep["X_test"], |
| | y_test=prep["y_test"], |
| | random_state=random_state, |
| | ) |
| | except RuntimeError as exc: |
| | skipped_models.append({"model_name": model_name, "reason": str(exc)}) |
| | continue |
| |
|
| | temp_model_path = Path(model_path).parent / f"{model_name}.pkl" |
| | save_model(model, temp_model_path) |
| |
|
| | run_id = log_run_to_mlflow( |
| | experiment_name=experiment_name, |
| | model_name=model_name, |
| | params={ |
| | "model_name": model_name, |
| | "test_size": test_size, |
| | "random_state": random_state, |
| | "imbalance_method": imbalance_method, |
| | }, |
| | metrics=metrics, |
| | preprocessor_path=preprocessor_path_obj, |
| | model_temp_path=temp_model_path, |
| | artifact_dir=artifact_dir, |
| | ) |
| |
|
| | results.append({"model_name": model_name, "model": model, "metrics": metrics, "run_id": run_id}) |
| |
|
| | if not results: |
| | raise RuntimeError("No models were successfully trained.") |
| |
|
| | ranked = rank_models(results) |
| | best = ranked[0] |
| | y_test_proba_best = best["model"].predict_proba(prep["X_test"])[:, 1] |
| | threshold_selection = select_best_threshold( |
| | prep["y_test"], |
| | y_test_proba_best, |
| | min_recall=min_recall_target, |
| | min_threshold=threshold_min, |
| | max_threshold=threshold_max, |
| | grid_size=threshold_grid_size, |
| | ) |
| |
|
| | model_report = { |
| | "timestamp_utc": datetime.now(timezone.utc).isoformat(), |
| | "best_model_name": best["model_name"], |
| | "default_threshold_metrics": best["metrics"], |
| | "threshold_selection": threshold_selection, |
| | "evaluation_summary": { |
| | "test_rows": int(len(prep["y_test"])), |
| | "min_recall_target": min_recall_target, |
| | "selection_reason": threshold_selection["selection_reason"], |
| | }, |
| | } |
| | model_report_path_obj = Path(model_report_path) |
| | model_report_path_obj.parent.mkdir(parents=True, exist_ok=True) |
| | model_report_path_obj.write_text(json.dumps(model_report, indent=2), encoding="utf-8") |
| |
|
| | final_model_path = save_model(best["model"], model_path) |
| |
|
| | report = { |
| | "timestamp_utc": datetime.now(timezone.utc).isoformat(), |
| | "experiment_name": experiment_name, |
| | "tracking_uri": tracking_uri, |
| | "data_path": str(data_path), |
| | "preprocessor_path": str(preprocessor_path), |
| | "model_path": str(final_model_path), |
| | "model_report_path": str(model_report_path_obj), |
| | "best_model": { |
| | "model_name": best["model_name"], |
| | "run_id": best["run_id"], |
| | "metrics": best["metrics"], |
| | "selected_threshold": threshold_selection["selected_threshold"], |
| | "threshold_metrics": threshold_selection["selected_metrics"], |
| | }, |
| | "all_results": [ |
| | {"model_name": entry["model_name"], "run_id": entry["run_id"], "metrics": entry["metrics"]} |
| | for entry in ranked |
| | ], |
| | "skipped_models": skipped_models, |
| | } |
| |
|
| | report_path_obj = Path(report_path) |
| | report_path_obj.parent.mkdir(parents=True, exist_ok=True) |
| | report_path_obj.write_text(json.dumps(report, indent=2), encoding="utf-8") |
| |
|
| | return report |
| |
|
| |
|
| | def _build_parser() -> argparse.ArgumentParser: |
| | parser = argparse.ArgumentParser(description="Train fraud model and log to MLflow.") |
| | parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Training config YAML path.") |
| | parser.add_argument("--data-path", default=str(DEFAULT_DATA_PATH), help="Dataset CSV path.") |
| | parser.add_argument("--model-path", default=str(DEFAULT_MODEL_PATH), help="Output model artifact path.") |
| | parser.add_argument( |
| | "--preprocessor-path", |
| | default=str(DEFAULT_PREPROCESSOR_PATH), |
| | help="Output preprocessor artifact path.", |
| | ) |
| | parser.add_argument("--report-path", default=str(DEFAULT_REPORT_PATH), help="Training report JSON path.") |
| | parser.add_argument( |
| | "--model-report-path", |
| | default=str(DEFAULT_MODEL_REPORT_PATH), |
| | help="Model evaluation report JSON path.", |
| | ) |
| | return parser |
| |
|
| |
|
| | def main() -> None: |
| | args = _build_parser().parse_args() |
| | report = run_training_pipeline( |
| | config_path=args.config, |
| | data_path=args.data_path, |
| | model_path=args.model_path, |
| | preprocessor_path=args.preprocessor_path, |
| | report_path=args.report_path, |
| | model_report_path=args.model_report_path, |
| | ) |
| |
|
| | best = report["best_model"] |
| | print("Training completed.") |
| | print(f"Best model: {best['model_name']}") |
| | print(f"Selected threshold: {best['selected_threshold']:.4f}") |
| | print(json.dumps(best["threshold_metrics"], indent=2)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|