github-actions[bot]
deploy: sync snapshot from github
4937cba
"""Training entrypoint for fraud detection models with MLflow tracking."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import joblib
import mlflow
import pandas as pd
import yaml
from sklearn.linear_model import LogisticRegression
from src.data_ingestion import load_data, run_data_validation
from src.evaluate import calculate_metrics, rank_models, select_best_threshold
from src.preprocessing import preprocess_for_training
try:
from xgboost import XGBClassifier
except Exception: # pragma: no cover - handled at runtime
XGBClassifier = None
DEFAULT_CONFIG_PATH = Path("configs/train.yaml")
DEFAULT_DATA_PATH = Path("data/raw/creditcard.csv")
DEFAULT_MODEL_PATH = Path("models/model.pkl")
DEFAULT_PREPROCESSOR_PATH = Path("models/preprocessor.pkl")
DEFAULT_REPORT_PATH = Path("artifacts/model_training_report.json")
DEFAULT_MODEL_REPORT_PATH = Path("artifacts/model_report.json")
DEFAULT_VALIDATION_REPORT_PATH = Path("artifacts/data_validation.json")
def load_training_config(config_path: str | Path = DEFAULT_CONFIG_PATH) -> dict[str, Any]:
"""Load YAML training configuration."""
config = yaml.safe_load(Path(config_path).read_text(encoding="utf-8")) or {}
config.setdefault("experiment", {})
config.setdefault("training", {})
config.setdefault("mlflow", {})
return config
def create_model(model_name: str, random_state: int) -> Any:
"""Create model instance from configured model name."""
if model_name == "logistic_regression":
return LogisticRegression(
max_iter=500,
solver="lbfgs",
class_weight="balanced",
random_state=random_state,
)
if model_name == "xgboost":
if XGBClassifier is None:
raise RuntimeError("xgboost is not available in the environment")
return XGBClassifier(
n_estimators=300,
max_depth=5,
learning_rate=0.05,
subsample=0.9,
colsample_bytree=0.9,
eval_metric="logloss",
random_state=random_state,
n_jobs=2,
)
raise ValueError(f"Unsupported model: {model_name}")
def train_single_model(
model_name: str,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
*,
random_state: int,
) -> tuple[Any, dict[str, Any]]:
"""Train one model and return model + metrics."""
model = create_model(model_name, random_state=random_state)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
metrics = calculate_metrics(y_test, y_pred, y_pred_proba)
return model, metrics
def log_run_to_mlflow(
*,
experiment_name: str,
model_name: str,
params: dict[str, Any],
metrics: dict[str, Any],
preprocessor_path: Path,
model_temp_path: Path,
artifact_dir: Path,
) -> str:
"""Log one training run to MLflow and return run id."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=model_name) as run:
mlflow.log_params(params)
metric_values = {k: v for k, v in metrics.items() if isinstance(v, float)}
mlflow.log_metrics(metric_values)
# Structured artifacts for debugging and reproducibility.
metrics_path = artifact_dir / f"metrics_{model_name}.json"
metrics_path.parent.mkdir(parents=True, exist_ok=True)
metrics_path.write_text(json.dumps(metrics, indent=2), encoding="utf-8")
mlflow.log_artifact(str(preprocessor_path), artifact_path="preprocessor")
mlflow.log_artifact(str(model_temp_path), artifact_path="model")
mlflow.log_artifact(str(metrics_path), artifact_path="metrics")
return run.info.run_id
def save_model(model: Any, output_path: str | Path = DEFAULT_MODEL_PATH) -> Path:
"""Save model artifact to disk."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(model, path)
return path
def run_training_pipeline(
*,
config_path: str | Path = DEFAULT_CONFIG_PATH,
data_path: str | Path = DEFAULT_DATA_PATH,
model_path: str | Path = DEFAULT_MODEL_PATH,
preprocessor_path: str | Path = DEFAULT_PREPROCESSOR_PATH,
report_path: str | Path = DEFAULT_REPORT_PATH,
model_report_path: str | Path = DEFAULT_MODEL_REPORT_PATH,
validation_report_path: str | Path = DEFAULT_VALIDATION_REPORT_PATH,
) -> dict[str, Any]:
"""Execute end-to-end training and experiment tracking pipeline."""
config = load_training_config(config_path)
experiment_name = config["experiment"].get("name", "fraud-detection-baseline")
tracking_uri = config["mlflow"].get("tracking_uri", "file:./mlruns")
mlflow.set_tracking_uri(tracking_uri)
training_cfg = config["training"]
random_state = int(training_cfg.get("random_state", 42))
test_size = float(training_cfg.get("test_size", 0.2))
imbalance_method = str(training_cfg.get("imbalance_method", "class_weight"))
models = training_cfg.get("models") or [training_cfg.get("model", "logistic_regression")]
threshold_cfg = config.get("threshold", {})
min_recall_target = float(threshold_cfg.get("min_recall_target", 0.90))
threshold_grid_size = int(threshold_cfg.get("grid_size", 99))
threshold_min = float(threshold_cfg.get("min_threshold", 0.01))
threshold_max = float(threshold_cfg.get("max_threshold", 0.99))
run_data_validation(file_path=data_path, report_path=validation_report_path)
raw_df = load_data(data_path)
prep = preprocess_for_training(
raw_df,
test_size=test_size,
random_state=random_state,
imbalance_method=imbalance_method,
preprocessor_path=preprocessor_path,
)
results: list[dict[str, Any]] = []
skipped_models: list[dict[str, str]] = []
artifact_dir = Path(report_path).parent
artifact_dir.mkdir(parents=True, exist_ok=True)
preprocessor_path_obj = Path(preprocessor_path)
for model_name in models:
try:
model, metrics = train_single_model(
model_name=model_name,
X_train=prep["X_train"],
y_train=prep["y_train"],
X_test=prep["X_test"],
y_test=prep["y_test"],
random_state=random_state,
)
except RuntimeError as exc:
skipped_models.append({"model_name": model_name, "reason": str(exc)})
continue
temp_model_path = Path(model_path).parent / f"{model_name}.pkl"
save_model(model, temp_model_path)
run_id = log_run_to_mlflow(
experiment_name=experiment_name,
model_name=model_name,
params={
"model_name": model_name,
"test_size": test_size,
"random_state": random_state,
"imbalance_method": imbalance_method,
},
metrics=metrics,
preprocessor_path=preprocessor_path_obj,
model_temp_path=temp_model_path,
artifact_dir=artifact_dir,
)
results.append({"model_name": model_name, "model": model, "metrics": metrics, "run_id": run_id})
if not results:
raise RuntimeError("No models were successfully trained.")
ranked = rank_models(results)
best = ranked[0]
y_test_proba_best = best["model"].predict_proba(prep["X_test"])[:, 1]
threshold_selection = select_best_threshold(
prep["y_test"],
y_test_proba_best,
min_recall=min_recall_target,
min_threshold=threshold_min,
max_threshold=threshold_max,
grid_size=threshold_grid_size,
)
model_report = {
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"best_model_name": best["model_name"],
"default_threshold_metrics": best["metrics"],
"threshold_selection": threshold_selection,
"evaluation_summary": {
"test_rows": int(len(prep["y_test"])),
"min_recall_target": min_recall_target,
"selection_reason": threshold_selection["selection_reason"],
},
}
model_report_path_obj = Path(model_report_path)
model_report_path_obj.parent.mkdir(parents=True, exist_ok=True)
model_report_path_obj.write_text(json.dumps(model_report, indent=2), encoding="utf-8")
final_model_path = save_model(best["model"], model_path)
report = {
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"experiment_name": experiment_name,
"tracking_uri": tracking_uri,
"data_path": str(data_path),
"preprocessor_path": str(preprocessor_path),
"model_path": str(final_model_path),
"model_report_path": str(model_report_path_obj),
"best_model": {
"model_name": best["model_name"],
"run_id": best["run_id"],
"metrics": best["metrics"],
"selected_threshold": threshold_selection["selected_threshold"],
"threshold_metrics": threshold_selection["selected_metrics"],
},
"all_results": [
{"model_name": entry["model_name"], "run_id": entry["run_id"], "metrics": entry["metrics"]}
for entry in ranked
],
"skipped_models": skipped_models,
}
report_path_obj = Path(report_path)
report_path_obj.parent.mkdir(parents=True, exist_ok=True)
report_path_obj.write_text(json.dumps(report, indent=2), encoding="utf-8")
return report
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Train fraud model and log to MLflow.")
parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH), help="Training config YAML path.")
parser.add_argument("--data-path", default=str(DEFAULT_DATA_PATH), help="Dataset CSV path.")
parser.add_argument("--model-path", default=str(DEFAULT_MODEL_PATH), help="Output model artifact path.")
parser.add_argument(
"--preprocessor-path",
default=str(DEFAULT_PREPROCESSOR_PATH),
help="Output preprocessor artifact path.",
)
parser.add_argument("--report-path", default=str(DEFAULT_REPORT_PATH), help="Training report JSON path.")
parser.add_argument(
"--model-report-path",
default=str(DEFAULT_MODEL_REPORT_PATH),
help="Model evaluation report JSON path.",
)
return parser
def main() -> None:
args = _build_parser().parse_args()
report = run_training_pipeline(
config_path=args.config,
data_path=args.data_path,
model_path=args.model_path,
preprocessor_path=args.preprocessor_path,
report_path=args.report_path,
model_report_path=args.model_report_path,
)
best = report["best_model"]
print("Training completed.")
print(f"Best model: {best['model_name']}")
print(f"Selected threshold: {best['selected_threshold']:.4f}")
print(json.dumps(best["threshold_metrics"], indent=2))
if __name__ == "__main__":
main()