loan / app.py
Sunny2902's picture
Upload 17 files
4af1c21 verified
# --- Imports ---
# Standard library imports
from pathlib import Path
from typing import Any
import logging
import logging.config
import json
import uuid
import time
from datetime import datetime, timezone
# Third-party library imports
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import RedirectResponse
from sklearn.pipeline import Pipeline
import gradio as gr
import pandas as pd
import numpy as np
import joblib
from huggingface_hub import hf_hub_download
import geoip2.database
from geoip2.errors import AddressNotFoundError
# Local imports
from backend.schemas import (
PipelineInput,
PredictionEnum,
PredictedProbabilities,
PredictionResult,
PredictionResponse
)
from frontend.app import gradio_app
from src.custom_transformers import (
MissingValueChecker,
MissingValueStandardizer,
RobustSimpleImputer,
SnakeCaseFormatter,
BooleanColumnTransformer,
JobStabilityTransformer,
CityTierTransformer,
StateDefaultRateTargetEncoder,
RobustStandardScaler,
RobustOneHotEncoder,
RobustOrdinalEncoder,
FeatureSelector
)
from src.utils import get_root_directory
# --- Logging ---
# Create logs directory if it doesn't exist
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# Define logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
"monitoring": {
"format": "%(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout", # write to Python standard output stream, which goes to Docker container's standard output, which goes to Hugging Face host server, which goes to Hugging Face Space Logs tab
},
"monitoring_file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "monitoring",
"filename": str(log_dir / "prediction_logs.jsonl"), # JSON Lines format: one JSON object per line
"maxBytes": 10485760, # 10 MB
"backupCount": 3, # will create prediction_logs.jsonl.1, .2, and .3 for max 4 log files (40 MB), then overwrite
},
},
"loggers": {
"": { # root logger for general logs
"handlers": ["console"],
"level": "INFO",
},
"monitoring": { # prediction records logger for model monitoring
"handlers": ["monitoring_file"],
"level": "INFO",
"propagate": False,
},
},
}
# Apply logging configuration
logging.config.dictConfig(LOGGING_CONFIG)
# Get loggers
logger = logging.getLogger(__name__)
monitoring_logger = logging.getLogger("monitoring")
# --- Helper Functions ---
# Function to get batch-level metadata for logging
def get_batch_metadata(
pipeline_input_dict_ls: list[dict[str, Any]],
request: Request,
geoip_reader: geoip2.database.Reader | None
) -> dict[str, Any]:
# Get user agent and IP from frontend
user_agent = pipeline_input_dict_ls[0].get("user_agent", None) # use first input in list of inputs
if user_agent is None: # fall back for direct API request to backend
user_agent = request.headers.get("user-agent", "unknown") # get from request headers of FastAPI backend
client_ip = pipeline_input_dict_ls[0].get("client_ip", None)
if client_ip is None:
x_forwarded_for = request.headers.get("x-forwarded-for") # single str with one or more comma-separated IP addresses
client_ip = x_forwarded_for.split(",")[0].strip() if x_forwarded_for else request.client.host # first IP is client IP address
# Get client country from IP address
client_country = "unknown"
if geoip_reader and client_ip:
try:
response = geoip_reader.country(client_ip)
client_country = response.country.name
except AddressNotFoundError: # this occurs for unknown or private or reserved IPs (e.g., 127.0.0.1)
logger.debug("IP address not found in GeoLite2 country database. Likely a private or local address.")
# Create dictionary with batch-level metadata
metadata = {
"batch_id": str(uuid.uuid4()),
"batch_size": len(pipeline_input_dict_ls),
"batch_timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline_version": PIPELINE_VERSION_TAG,
"client_country": client_country,
"user_agent": user_agent,
}
return metadata
# Function to load a scikit-learn pipeline from the local machine
def load_pipeline_from_local(path: str | Path) -> Pipeline:
# Input type validation
if not isinstance(path, (str, Path)):
raise TypeError(f"Error when loading pipeline: 'path' must be a string or Path object, got {type(path).__name__}")
# Get path as both string and Path object
if isinstance(path, Path):
path_str = str(path)
else: # isinstance(path, str)
path_str = path
path = Path(path)
# Ensure file exists
if not path.exists():
raise FileNotFoundError(f"Error when loading pipeline: File not found at '{path_str}'")
# Load pipeline
try:
logger.info(f"Loading pipeline from '{path_str}'...")
pipeline = joblib.load(path_str)
logger.info("Successfully loaded pipeline.")
except Exception as e:
raise RuntimeError(f"Error when loading pipeline from '{path_str}'") from e
# Ensure loaded object is a scikit-learn Pipeline
if not isinstance(pipeline, Pipeline):
raise TypeError("Error when loading pipeline: Loaded object is not a scikit-learn Pipeline")
# Ensure pipeline has .predict_proba() method
if not hasattr(pipeline, "predict_proba"):
raise TypeError("Error when loading pipeline: Loaded pipeline does not have a .predict_proba() method")
return pipeline
# Function to download and load a scikit-learn pipeline from a Hugging Face Hub repository
def load_pipeline_from_huggingface(repo_id: str, filename: str, revision: str) -> Pipeline:
try:
# .hf_hub_download() downloads the pipeline file and returns its local file path (inside the Docker container)
# if the pipeline file was already downloaded, it will use the cached pipeline that is already stored inside the Docker container
logger.info(
f"Downloading pipeline '{filename}' with tag '{revision}' from Hugging Face Hub repo '{repo_id}'. "
"If already cached, will use local copy."
)
pipeline_path = hf_hub_download(repo_id=repo_id, filename=filename, revision=revision)
# Load pipeline from file inside the Docker container
pipeline = load_pipeline_from_local(pipeline_path)
return pipeline
except Exception as e:
raise RuntimeError(f"Error loading pipeline '{filename}' from Hugging Face Hub repository '{repo_id}': {e}") from e
# --- Geolocation Database ---
# Load the GeoLite2 country database to log client country for model monitoring (download database from https://www.maxmind.com to the "geoip_db/" directory)
GEO_DB_PATH = Path("geoip_db/GeoLite2-Country.mmdb")
try:
geoip_reader = geoip2.database.Reader(GEO_DB_PATH)
logger.info(f"Successfully loaded GeoLite2 country database from '{GEO_DB_PATH}'")
except FileNotFoundError:
logger.error(f"GeoLite2 country database not found at '{GEO_DB_PATH}'. Client country will not be logged. Download the database from https://www.maxmind.com.")
geoip_reader = None
# --- ML Pipeline ---
# Load loan default prediction pipeline (including data preprocessing and Random Forest Classifier model) from Hugging Face Hub
PIPELINE_VERSION_TAG = "v1.0"
pipeline = load_pipeline_from_huggingface(
repo_id="JensBender/loan-default-prediction-pipeline",
filename="loan_default_rf_pipeline.joblib",
revision=PIPELINE_VERSION_TAG
)
# Load pipeline from local machine (use for local setup without Hugging Face Hub)
# root_dir = get_root_directory() # get path to root directory
# pipeline_path = root_dir / "models" / "loan_default_rf_pipeline.joblib" # get path to pipeline file
# pipeline = load_pipeline_from_local(pipeline_path)
# --- API ---
# Create FastAPI app
fastapi_app = FastAPI(title="Loan Default Prediction API")
# Prediction endpoint
@fastapi_app.post("/api/predict", response_model=PredictionResponse)
def predict(pipeline_input: PipelineInput | list[PipelineInput], request: Request) -> PredictionResponse: # JSON object -> PipelineInput | JSON array -> list[PipelineInput]
batch_metadata = None
pipeline_input_dict_ls = None
try:
# Standardize input
if isinstance(pipeline_input, list):
if pipeline_input == []: # handle empty batch input
return PredictionResponse(results=[])
pipeline_input_dict_ls = [input.model_dump() for input in pipeline_input]
else: # isinstance(pipeline_input, PipelineInput)
pipeline_input_dict_ls = [pipeline_input.model_dump()]
# Get batch metadata for logging
batch_metadata = get_batch_metadata(pipeline_input_dict_ls, request, geoip_reader)
# Remove "client_ip" and "user_agent" from pipeline input
pipeline_input_cleaned = [
{k: v for k, v in d.items() if k not in {"client_ip", "user_agent"}}
for d in pipeline_input_dict_ls
]
# Create DataFrame
pipeline_input_df: pd.DataFrame = pd.DataFrame(pipeline_input_cleaned)
# Use pipeline to batch predict probabilities (and measure latency)
start_time = time.perf_counter() # use .perf_counter() for latency measurement and .time() for timestamps
predicted_probabilities: np.ndarray = pipeline.predict_proba(pipeline_input_df)
pipeline_prediction_latency_ms = round((time.perf_counter() - start_time) * 1000) # rounded to milliseconds
# Apply optimized threshold to convert probabilities to binary predictions
optimized_threshold: float = 0.29 # see threshold optimization in training script "loan_default_prediction.ipynb"
predictions: np.ndarray = (predicted_probabilities[:, 1] >= optimized_threshold) # bool 1d-array based on class 1 "Default"
# Add latency to batch metadata for logging
batch_metadata.update({
"batch_latency_ms": pipeline_prediction_latency_ms,
"avg_prediction_latency_ms": round(pipeline_prediction_latency_ms / len(pipeline_input_dict_ls)),
})
# --- Create prediction response ---
results: list[PredictionResult] = []
# Iterate over each prediction
for i, (pred, pred_proba) in enumerate(zip(predictions, predicted_probabilities)):
# Create prediction result
prediction_enum = PredictionEnum.DEFAULT if pred else PredictionEnum.NO_DEFAULT
prediction_result = PredictionResult(
prediction=prediction_enum,
probabilities=PredictedProbabilities(
default=float(pred_proba[1]),
no_default=float(pred_proba[0])
)
)
results.append(prediction_result)
# Log single prediction record for model monitoring (including batch metadata)
prediction_monitoring_record = {
**batch_metadata,
"prediction_id": str(uuid.uuid4()),
"inputs": pipeline_input_cleaned[i],
"prediction": prediction_enum.value,
"probabilities": {
"default": float(pred_proba[1]),
"no_default": float(pred_proba[0])
},
}
monitoring_logger.info(json.dumps(prediction_monitoring_record)) # converts record to JSON string for log
return PredictionResponse(results=results)
except Exception as e:
# Log error to console
logger.error("Error during predict: %s", e, exc_info=True)
# Log prediction error record to file for model monitoring (including batch metadata)
if pipeline_input_dict_ls:
if batch_metadata is None: # error before .get_batch_metadata()
batch_metadata = {
"batch_id": str(uuid.uuid4()),
"batch_size": len(pipeline_input_dict_ls),
"batch_timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline_version": PIPELINE_VERSION_TAG,
"client_country": None,
"user_agent": None
}
# Iterate over each input in batch
for input in pipeline_input_dict_ls:
prediction_monitoring_record = {
**batch_metadata,
"prediction_id": str(uuid.uuid4()),
"inputs": input,
"prediction": None,
"probabilities": None,
"error_message": str(e)
}
monitoring_logger.error(json.dumps(prediction_monitoring_record))
else:
prediction_monitoring_record = {
"batch_id": str(uuid.uuid4()),
"batch_size": None,
"batch_timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline_version": PIPELINE_VERSION_TAG,
"client_country": None,
"user_agent": None,
"prediction_id": str(uuid.uuid4()),
"inputs": None,
"prediction": None,
"probabilities": None,
"error_message": str(e)
}
monitoring_logger.error(json.dumps(prediction_monitoring_record))
raise HTTPException(status_code=500, detail="Internal server error during loan default prediction")
# Mount Gradio frontend onto FastAPI backend
app = gr.mount_gradio_app(
fastapi_app,
gradio_app,
path="/gradio", # at "/gradio" not "/" due to known Gradio bug (redirect loop)
show_api=False # disable Gradio's auto-generated API
)
# Home route redirects to Gradio UI
@app.get("/")
def root():
return RedirectResponse(url="/gradio/")