from fastapi import APIRouter, HTTPException from app.schemas.request import PredictionRequest, BatchPredictionRequest from app.schemas.response import PredictionResponse, BatchPredictionResponse from app.utils.model_loader import model_loader from app.utils.metrics import prediction_counter, prediction_duration from monitoring.model_monitoring.prediction_logger import PredictionLogger from pathlib import Path import pandas as pd import time router = APIRouter(prefix="/predict", tags=["prediction"]) # Initialize prediction logger prediction_logger = PredictionLogger(Path("monitoring/predictions")) def convert_to_original_columns(data_dict): mapping = { "Chest_pain_type": "Chest pain type", "FBS_over_120": "FBS over 120", "EKG_results": "EKG results", "Max_HR": "Max HR", "Exercise_angina": "Exercise angina", "ST_depression": "ST depression", "Slope_of_ST": "Slope of ST", "Number_of_vessels_fluro": "Number of vessels fluro" } return {mapping.get(k, k): v for k, v in data_dict.items()} def add_interaction_features(df): df['id_x_Age'] = df['id'] * df['Age'] return df @router.post("/", response_model=PredictionResponse) async def predict_single(request: PredictionRequest): start_time = time.time() try: pipeline = model_loader.get_pipeline() input_dict = convert_to_original_columns(request.model_dump()) df = pd.DataFrame([input_dict]) df = add_interaction_features(df) result = pipeline.predict(df) prediction = result["predictions"][0] proba_row = result.get("probabilities")[0] if result.get("probabilities") else None # proba_row is a list [p_class0, p_class1, ...]; take the positive-class prob for logging proba_scalar = float(proba_row[-1]) if proba_row is not None else None # Log prediction prediction_logger.log_prediction( input_data=input_dict, prediction=int(prediction), model_version="v1", metadata={"probability": proba_scalar} ) # Update metrics prediction_counter.labels(model_version="v1", status="success").inc() prediction_duration.observe(time.time() - start_time) return PredictionResponse( prediction=prediction, probability=proba_row ) except Exception as e: prediction_counter.labels(model_version="v1", status="error").inc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/batch", response_model=BatchPredictionResponse) async def predict_batch(request: BatchPredictionRequest): start_time = time.time() try: pipeline = model_loader.get_pipeline() data_list = [convert_to_original_columns(item.model_dump()) for item in request.data] df = pd.DataFrame(data_list) df = add_interaction_features(df) result = pipeline.predict(df) # Log batch predictions for input_data, prediction in zip(data_list, result["predictions"]): prediction_logger.log_prediction( input_data=input_data, prediction=int(prediction), model_version="v1" ) # Update metrics prediction_counter.labels(model_version="v1", status="success").inc(len(result["predictions"])) prediction_duration.observe(time.time() - start_time) return BatchPredictionResponse( predictions=result["predictions"], probabilities=result.get("probabilities"), num_samples=result["num_samples"] ) except Exception as e: prediction_counter.labels(model_version="v1", status="error").inc() raise HTTPException(status_code=500, detail=str(e))