Spaces:
Running
Running
| from fastapi import APIRouter, HTTPException | |
| from app.schemas.request import PredictionRequest, BatchPredictionRequest | |
| from app.schemas.response import PredictionResponse, BatchPredictionResponse | |
| from app.utils.model_loader import model_loader | |
| from app.utils.metrics import prediction_counter, prediction_duration | |
| from monitoring.model_monitoring.prediction_logger import PredictionLogger | |
| from pathlib import Path | |
| import pandas as pd | |
| import time | |
| router = APIRouter(prefix="/predict", tags=["prediction"]) | |
| # Initialize prediction logger | |
| prediction_logger = PredictionLogger(Path("monitoring/predictions")) | |
| def convert_to_original_columns(data_dict): | |
| mapping = { | |
| "Chest_pain_type": "Chest pain type", | |
| "FBS_over_120": "FBS over 120", | |
| "EKG_results": "EKG results", | |
| "Max_HR": "Max HR", | |
| "Exercise_angina": "Exercise angina", | |
| "ST_depression": "ST depression", | |
| "Slope_of_ST": "Slope of ST", | |
| "Number_of_vessels_fluro": "Number of vessels fluro" | |
| } | |
| return {mapping.get(k, k): v for k, v in data_dict.items()} | |
| def add_interaction_features(df): | |
| df['id_x_Age'] = df['id'] * df['Age'] | |
| return df | |
| async def predict_single(request: PredictionRequest): | |
| start_time = time.time() | |
| try: | |
| pipeline = model_loader.get_pipeline() | |
| input_dict = convert_to_original_columns(request.model_dump()) | |
| df = pd.DataFrame([input_dict]) | |
| df = add_interaction_features(df) | |
| result = pipeline.predict(df) | |
| prediction = result["predictions"][0] | |
| proba_row = result.get("probabilities")[0] if result.get("probabilities") else None | |
| # proba_row is a list [p_class0, p_class1, ...]; take the positive-class prob for logging | |
| proba_scalar = float(proba_row[-1]) if proba_row is not None else None | |
| # Log prediction | |
| prediction_logger.log_prediction( | |
| input_data=input_dict, | |
| prediction=int(prediction), | |
| model_version="v1", | |
| metadata={"probability": proba_scalar} | |
| ) | |
| # Update metrics | |
| prediction_counter.labels(model_version="v1", status="success").inc() | |
| prediction_duration.observe(time.time() - start_time) | |
| return PredictionResponse( | |
| prediction=prediction, | |
| probability=proba_row | |
| ) | |
| except Exception as e: | |
| prediction_counter.labels(model_version="v1", status="error").inc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def predict_batch(request: BatchPredictionRequest): | |
| start_time = time.time() | |
| try: | |
| pipeline = model_loader.get_pipeline() | |
| data_list = [convert_to_original_columns(item.model_dump()) for item in request.data] | |
| df = pd.DataFrame(data_list) | |
| df = add_interaction_features(df) | |
| result = pipeline.predict(df) | |
| # Log batch predictions | |
| for input_data, prediction in zip(data_list, result["predictions"]): | |
| prediction_logger.log_prediction( | |
| input_data=input_data, | |
| prediction=int(prediction), | |
| model_version="v1" | |
| ) | |
| # Update metrics | |
| prediction_counter.labels(model_version="v1", status="success").inc(len(result["predictions"])) | |
| prediction_duration.observe(time.time() - start_time) | |
| return BatchPredictionResponse( | |
| predictions=result["predictions"], | |
| probabilities=result.get("probabilities"), | |
| num_samples=result["num_samples"] | |
| ) | |
| except Exception as e: | |
| prediction_counter.labels(model_version="v1", status="error").inc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |