Spaces:
Sleeping
Sleeping
| """Inference utilities for the exported MLflow champion model.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| import mlflow.pyfunc | |
| import mlflow.sklearn | |
| import pandas as pd | |
| from sklearn.metrics import f1_score, roc_auc_score | |
| from src import config | |
| from src.feature_engineering import apply_feature_engineering | |
| from src.preprocess import FEATURE_COLUMNS | |
| RAW_INPUT_COLUMNS = [c for c in config.RAW_COLUMNS if c != config.TARGET_COL] | |
| ORDER_COL = "__input_order" | |
| class InferenceMetrics: | |
| rows: int | |
| f1_macro: float | |
| roc_auc: float | |
| def load_pyfunc_model(model_uri: str | Path = config.CHAMPION_EXPORT_DIR) -> Any: | |
| return mlflow.pyfunc.load_model(str(model_uri)) | |
| def load_sklearn_model(model_uri: str | Path = config.CHAMPION_EXPORT_DIR) -> Any: | |
| return mlflow.sklearn.load_model(str(model_uri)) | |
| def prepare_features(raw_df: pd.DataFrame) -> pd.DataFrame: | |
| missing = [c for c in RAW_INPUT_COLUMNS if c not in raw_df.columns] | |
| if missing: | |
| raise ValueError(f"Raw input missing expected columns: {missing}") | |
| work = raw_df.copy() | |
| work[ORDER_COL] = range(len(work)) | |
| fe = apply_feature_engineering(work) | |
| fe = fe.sort_values(ORDER_COL).reset_index(drop=True) | |
| return fe[FEATURE_COLUMNS] | |
| def predict_dataframe(raw_df: pd.DataFrame, model: Any | None = None) -> list[int]: | |
| fitted = model if model is not None else load_pyfunc_model() | |
| features = prepare_features(raw_df) | |
| return [int(x) for x in fitted.predict(features)] | |
| def predict_records(records: list[dict[str, Any]], model: Any | None = None) -> list[int]: | |
| if not records: | |
| raise ValueError("records must contain at least one row") | |
| return predict_dataframe(pd.DataFrame.from_records(records), model=model) | |
| def score_raw_dataframe( | |
| raw_df: pd.DataFrame, | |
| model: Any | None = None, | |
| year: int | None = None, | |
| ) -> InferenceMetrics: | |
| if config.TARGET_COL not in raw_df.columns: | |
| raise ValueError(f"Raw input must include {config.TARGET_COL} to calculate F1/AUC") | |
| scored = raw_df.copy() | |
| if year is not None: | |
| scored = scored[scored["Year"] == year].reset_index(drop=True) | |
| if scored.empty: | |
| raise ValueError("No rows available for scoring after filtering") | |
| fitted = model if model is not None else load_sklearn_model() | |
| features = prepare_features(scored) | |
| y_true = scored[config.TARGET_COL].astype(int) | |
| y_pred = fitted.predict(features) | |
| if not hasattr(fitted, "predict_proba"): | |
| raise ValueError("Model must expose predict_proba to calculate ROC-AUC") | |
| y_score = fitted.predict_proba(features)[:, 1] | |
| return InferenceMetrics( | |
| rows=int(len(scored)), | |
| f1_macro=float(f1_score(y_true, y_pred, average="macro")), | |
| roc_auc=float(roc_auc_score(y_true, y_score)), | |
| ) | |
| def _run_cli() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--input", default=str(config.DATA_DIR / "test.csv")) | |
| parser.add_argument("--output", default=str(config.DATA_DIR / "test_predictions.csv")) | |
| parser.add_argument("--model-uri", default=str(config.CHAMPION_EXPORT_DIR)) | |
| parser.add_argument("--year", type=int, default=None) | |
| args = parser.parse_args() | |
| raw = pd.read_csv(args.input) | |
| if args.year is not None and "Year" in raw.columns: | |
| raw = raw[raw["Year"] == args.year].reset_index(drop=True) | |
| pyfunc_model = load_pyfunc_model(args.model_uri) | |
| predictions = predict_dataframe(raw, model=pyfunc_model) | |
| output = raw.copy() | |
| output["prediction"] = predictions | |
| output.to_csv(args.output, index=False) | |
| print(f"Wrote {len(output):,} predictions -> {args.output}") | |
| if config.TARGET_COL in raw.columns: | |
| sklearn_model = load_sklearn_model(args.model_uri) | |
| metrics = score_raw_dataframe(raw, model=sklearn_model) | |
| print(json.dumps(asdict(metrics), indent=2)) | |
| else: | |
| print(f"Metrics skipped: {args.input} has no {config.TARGET_COL} column") | |
| if __name__ == "__main__": | |
| _run_cli() | |