| |
| |
| from pathlib import Path |
| from typing import Any |
| import logging |
| import logging.config |
| import json |
| import uuid |
| import time |
| from datetime import datetime, timezone |
|
|
| |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import RedirectResponse |
| from sklearn.pipeline import Pipeline |
| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import joblib |
| from huggingface_hub import hf_hub_download |
| import geoip2.database |
| from geoip2.errors import AddressNotFoundError |
|
|
| |
| from backend.schemas import ( |
| PipelineInput, |
| PredictionEnum, |
| PredictedProbabilities, |
| PredictionResult, |
| PredictionResponse |
| ) |
| from frontend.app import gradio_app |
| from src.custom_transformers import ( |
| MissingValueChecker, |
| MissingValueStandardizer, |
| RobustSimpleImputer, |
| SnakeCaseFormatter, |
| BooleanColumnTransformer, |
| JobStabilityTransformer, |
| CityTierTransformer, |
| StateDefaultRateTargetEncoder, |
| RobustStandardScaler, |
| RobustOneHotEncoder, |
| RobustOrdinalEncoder, |
| FeatureSelector |
| ) |
| from src.utils import get_root_directory |
|
|
| |
| |
| log_dir = Path("logs") |
| log_dir.mkdir(exist_ok=True) |
|
|
| |
| LOGGING_CONFIG = { |
| "version": 1, |
| "disable_existing_loggers": False, |
| "formatters": { |
| "default": { |
| "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| }, |
| "monitoring": { |
| "format": "%(message)s", |
| }, |
| }, |
| "handlers": { |
| "console": { |
| "class": "logging.StreamHandler", |
| "formatter": "default", |
| "stream": "ext://sys.stdout", |
| }, |
| "monitoring_file": { |
| "class": "logging.handlers.RotatingFileHandler", |
| "formatter": "monitoring", |
| "filename": str(log_dir / "prediction_logs.jsonl"), |
| "maxBytes": 10485760, |
| "backupCount": 3, |
| }, |
| }, |
| "loggers": { |
| "": { |
| "handlers": ["console"], |
| "level": "INFO", |
| }, |
| "monitoring": { |
| "handlers": ["monitoring_file"], |
| "level": "INFO", |
| "propagate": False, |
| }, |
| }, |
| } |
|
|
| |
| logging.config.dictConfig(LOGGING_CONFIG) |
|
|
| |
| logger = logging.getLogger(__name__) |
| monitoring_logger = logging.getLogger("monitoring") |
|
|
|
|
| |
| |
| def get_batch_metadata( |
| pipeline_input_dict_ls: list[dict[str, Any]], |
| request: Request, |
| geoip_reader: geoip2.database.Reader | None |
| ) -> dict[str, Any]: |
| |
| user_agent = pipeline_input_dict_ls[0].get("user_agent", None) |
| if user_agent is None: |
| user_agent = request.headers.get("user-agent", "unknown") |
| client_ip = pipeline_input_dict_ls[0].get("client_ip", None) |
| if client_ip is None: |
| x_forwarded_for = request.headers.get("x-forwarded-for") |
| client_ip = x_forwarded_for.split(",")[0].strip() if x_forwarded_for else request.client.host |
| |
| |
| client_country = "unknown" |
| if geoip_reader and client_ip: |
| try: |
| response = geoip_reader.country(client_ip) |
| client_country = response.country.name |
| except AddressNotFoundError: |
| logger.debug("IP address not found in GeoLite2 country database. Likely a private or local address.") |
|
|
| |
| metadata = { |
| "batch_id": str(uuid.uuid4()), |
| "batch_size": len(pipeline_input_dict_ls), |
| "batch_timestamp": datetime.now(timezone.utc).isoformat(), |
| "pipeline_version": PIPELINE_VERSION_TAG, |
| "client_country": client_country, |
| "user_agent": user_agent, |
| } |
|
|
| return metadata |
|
|
|
|
| |
| def load_pipeline_from_local(path: str | Path) -> Pipeline: |
| |
| if not isinstance(path, (str, Path)): |
| raise TypeError(f"Error when loading pipeline: 'path' must be a string or Path object, got {type(path).__name__}") |
|
|
| |
| if isinstance(path, Path): |
| path_str = str(path) |
| else: |
| path_str = path |
| path = Path(path) |
|
|
| |
| if not path.exists(): |
| raise FileNotFoundError(f"Error when loading pipeline: File not found at '{path_str}'") |
| |
| |
| try: |
| logger.info(f"Loading pipeline from '{path_str}'...") |
| pipeline = joblib.load(path_str) |
| logger.info("Successfully loaded pipeline.") |
| except Exception as e: |
| raise RuntimeError(f"Error when loading pipeline from '{path_str}'") from e |
| |
| |
| if not isinstance(pipeline, Pipeline): |
| raise TypeError("Error when loading pipeline: Loaded object is not a scikit-learn Pipeline") |
|
|
| |
| if not hasattr(pipeline, "predict_proba"): |
| raise TypeError("Error when loading pipeline: Loaded pipeline does not have a .predict_proba() method") |
|
|
| return pipeline |
|
|
|
|
| |
| def load_pipeline_from_huggingface(repo_id: str, filename: str, revision: str) -> Pipeline: |
| try: |
| |
| |
| logger.info( |
| f"Downloading pipeline '{filename}' with tag '{revision}' from Hugging Face Hub repo '{repo_id}'. " |
| "If already cached, will use local copy." |
| ) |
| pipeline_path = hf_hub_download(repo_id=repo_id, filename=filename, revision=revision) |
|
|
| |
| pipeline = load_pipeline_from_local(pipeline_path) |
|
|
| return pipeline |
|
|
| except Exception as e: |
| raise RuntimeError(f"Error loading pipeline '{filename}' from Hugging Face Hub repository '{repo_id}': {e}") from e |
|
|
|
|
| |
| |
| GEO_DB_PATH = Path("geoip_db/GeoLite2-Country.mmdb") |
| try: |
| geoip_reader = geoip2.database.Reader(GEO_DB_PATH) |
| logger.info(f"Successfully loaded GeoLite2 country database from '{GEO_DB_PATH}'") |
| except FileNotFoundError: |
| logger.error(f"GeoLite2 country database not found at '{GEO_DB_PATH}'. Client country will not be logged. Download the database from https://www.maxmind.com.") |
| geoip_reader = None |
|
|
| |
| |
| PIPELINE_VERSION_TAG = "v1.0" |
| pipeline = load_pipeline_from_huggingface( |
| repo_id="JensBender/loan-default-prediction-pipeline", |
| filename="loan_default_rf_pipeline.joblib", |
| revision=PIPELINE_VERSION_TAG |
| ) |
|
|
| |
| |
| |
| |
|
|
| |
| |
| fastapi_app = FastAPI(title="Loan Default Prediction API") |
|
|
| |
| @fastapi_app.post("/api/predict", response_model=PredictionResponse) |
| def predict(pipeline_input: PipelineInput | list[PipelineInput], request: Request) -> PredictionResponse: |
| batch_metadata = None |
| pipeline_input_dict_ls = None |
| try: |
| |
| if isinstance(pipeline_input, list): |
| if pipeline_input == []: |
| return PredictionResponse(results=[]) |
| pipeline_input_dict_ls = [input.model_dump() for input in pipeline_input] |
| else: |
| pipeline_input_dict_ls = [pipeline_input.model_dump()] |
| |
| |
| batch_metadata = get_batch_metadata(pipeline_input_dict_ls, request, geoip_reader) |
|
|
| |
| pipeline_input_cleaned = [ |
| {k: v for k, v in d.items() if k not in {"client_ip", "user_agent"}} |
| for d in pipeline_input_dict_ls |
| ] |
|
|
| |
| pipeline_input_df: pd.DataFrame = pd.DataFrame(pipeline_input_cleaned) |
|
|
| |
| start_time = time.perf_counter() |
| predicted_probabilities: np.ndarray = pipeline.predict_proba(pipeline_input_df) |
| pipeline_prediction_latency_ms = round((time.perf_counter() - start_time) * 1000) |
|
|
| |
| optimized_threshold: float = 0.29 |
| predictions: np.ndarray = (predicted_probabilities[:, 1] >= optimized_threshold) |
|
|
| |
| batch_metadata.update({ |
| "batch_latency_ms": pipeline_prediction_latency_ms, |
| "avg_prediction_latency_ms": round(pipeline_prediction_latency_ms / len(pipeline_input_dict_ls)), |
| }) |
|
|
| |
| results: list[PredictionResult] = [] |
| |
| for i, (pred, pred_proba) in enumerate(zip(predictions, predicted_probabilities)): |
| |
| prediction_enum = PredictionEnum.DEFAULT if pred else PredictionEnum.NO_DEFAULT |
| prediction_result = PredictionResult( |
| prediction=prediction_enum, |
| probabilities=PredictedProbabilities( |
| default=float(pred_proba[1]), |
| no_default=float(pred_proba[0]) |
| ) |
| ) |
| results.append(prediction_result) |
|
|
| |
| prediction_monitoring_record = { |
| **batch_metadata, |
| "prediction_id": str(uuid.uuid4()), |
| "inputs": pipeline_input_cleaned[i], |
| "prediction": prediction_enum.value, |
| "probabilities": { |
| "default": float(pred_proba[1]), |
| "no_default": float(pred_proba[0]) |
| }, |
| } |
| monitoring_logger.info(json.dumps(prediction_monitoring_record)) |
|
|
| return PredictionResponse(results=results) |
|
|
| except Exception as e: |
| |
| logger.error("Error during predict: %s", e, exc_info=True) |
|
|
| |
| if pipeline_input_dict_ls: |
| if batch_metadata is None: |
| batch_metadata = { |
| "batch_id": str(uuid.uuid4()), |
| "batch_size": len(pipeline_input_dict_ls), |
| "batch_timestamp": datetime.now(timezone.utc).isoformat(), |
| "pipeline_version": PIPELINE_VERSION_TAG, |
| "client_country": None, |
| "user_agent": None |
| } |
| |
| for input in pipeline_input_dict_ls: |
| prediction_monitoring_record = { |
| **batch_metadata, |
| "prediction_id": str(uuid.uuid4()), |
| "inputs": input, |
| "prediction": None, |
| "probabilities": None, |
| "error_message": str(e) |
| } |
| monitoring_logger.error(json.dumps(prediction_monitoring_record)) |
| else: |
| prediction_monitoring_record = { |
| "batch_id": str(uuid.uuid4()), |
| "batch_size": None, |
| "batch_timestamp": datetime.now(timezone.utc).isoformat(), |
| "pipeline_version": PIPELINE_VERSION_TAG, |
| "client_country": None, |
| "user_agent": None, |
| "prediction_id": str(uuid.uuid4()), |
| "inputs": None, |
| "prediction": None, |
| "probabilities": None, |
| "error_message": str(e) |
| } |
| monitoring_logger.error(json.dumps(prediction_monitoring_record)) |
|
|
| raise HTTPException(status_code=500, detail="Internal server error during loan default prediction") |
|
|
| |
| app = gr.mount_gradio_app( |
| fastapi_app, |
| gradio_app, |
| path="/gradio", |
| show_api=False |
| ) |
|
|
| |
| @app.get("/") |
| def root(): |
| return RedirectResponse(url="/gradio/") |
|
|