Abeshith's picture
Upload folder using huggingface_hub
8e5f262 verified
from fastapi import APIRouter, HTTPException
from app.schemas.request import PredictionRequest, BatchPredictionRequest
from app.schemas.response import PredictionResponse, BatchPredictionResponse
from app.utils.model_loader import model_loader
from app.utils.metrics import prediction_counter, prediction_duration
from monitoring.model_monitoring.prediction_logger import PredictionLogger
from pathlib import Path
import pandas as pd
import time
router = APIRouter(prefix="/predict", tags=["prediction"])
# Initialize prediction logger
prediction_logger = PredictionLogger(Path("monitoring/predictions"))
def convert_to_original_columns(data_dict):
mapping = {
"Chest_pain_type": "Chest pain type",
"FBS_over_120": "FBS over 120",
"EKG_results": "EKG results",
"Max_HR": "Max HR",
"Exercise_angina": "Exercise angina",
"ST_depression": "ST depression",
"Slope_of_ST": "Slope of ST",
"Number_of_vessels_fluro": "Number of vessels fluro"
}
return {mapping.get(k, k): v for k, v in data_dict.items()}
def add_interaction_features(df):
df['id_x_Age'] = df['id'] * df['Age']
return df
@router.post("/", response_model=PredictionResponse)
async def predict_single(request: PredictionRequest):
start_time = time.time()
try:
pipeline = model_loader.get_pipeline()
input_dict = convert_to_original_columns(request.model_dump())
df = pd.DataFrame([input_dict])
df = add_interaction_features(df)
result = pipeline.predict(df)
prediction = result["predictions"][0]
proba_row = result.get("probabilities")[0] if result.get("probabilities") else None
# proba_row is a list [p_class0, p_class1, ...]; take the positive-class prob for logging
proba_scalar = float(proba_row[-1]) if proba_row is not None else None
# Log prediction
prediction_logger.log_prediction(
input_data=input_dict,
prediction=int(prediction),
model_version="v1",
metadata={"probability": proba_scalar}
)
# Update metrics
prediction_counter.labels(model_version="v1", status="success").inc()
prediction_duration.observe(time.time() - start_time)
return PredictionResponse(
prediction=prediction,
probability=proba_row
)
except Exception as e:
prediction_counter.labels(model_version="v1", status="error").inc()
raise HTTPException(status_code=500, detail=str(e))
@router.post("/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
start_time = time.time()
try:
pipeline = model_loader.get_pipeline()
data_list = [convert_to_original_columns(item.model_dump()) for item in request.data]
df = pd.DataFrame(data_list)
df = add_interaction_features(df)
result = pipeline.predict(df)
# Log batch predictions
for input_data, prediction in zip(data_list, result["predictions"]):
prediction_logger.log_prediction(
input_data=input_data,
prediction=int(prediction),
model_version="v1"
)
# Update metrics
prediction_counter.labels(model_version="v1", status="success").inc(len(result["predictions"]))
prediction_duration.observe(time.time() - start_time)
return BatchPredictionResponse(
predictions=result["predictions"],
probabilities=result.get("probabilities"),
num_samples=result["num_samples"]
)
except Exception as e:
prediction_counter.labels(model_version="v1", status="error").inc()
raise HTTPException(status_code=500, detail=str(e))