PneumoniaAPI / api /main.py
GitHub Actions
Auto-deploy from GitHub: 495db78a06be79166200269bb14d9e9b1e8906d6
af59988
"""
FastAPI application for Pneumonia Detection API.
Run with: uvicorn api.main:app --reload
"""
import io
import time
import base64
from pathlib import Path
import torch
from PIL import Image
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from .schemas import (
HealthResponse,
PredictionResponse,
GradCAMResponse,
ErrorResponse
)
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import CHECKPOINT_PATH, CLASS_NAMES, CONFIDENCE_THRESHOLD
from src.model import create_model, get_device
from src.predict import load_model, predict_image
from src.gradcam import generate_gradcam
# =============================================================================
# App Configuration
# =============================================================================
app = FastAPI(
title="Pneumonia Detection API",
description="Deep learning API for detecting pneumonia from chest X-ray images using EfficientNet-B0",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS middleware for frontend access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =============================================================================
# Model Loading (on startup)
# =============================================================================
model = None
device = None
@app.on_event("startup")
async def load_model_on_startup():
"""Load model when the API starts."""
global model, device
device = get_device()
print(f"Using device: {device}")
if not CHECKPOINT_PATH.exists():
print(f"Warning: Model checkpoint not found at {CHECKPOINT_PATH}")
return
model = create_model(pretrained=False, freeze_backbone=False, device=device)
model = load_model(model, CHECKPOINT_PATH, device)
print(f"Model loaded from {CHECKPOINT_PATH}")
# =============================================================================
# Helper Functions
# =============================================================================
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png"}
def validate_image(file: UploadFile) -> None:
"""Validate uploaded image file."""
if not file.content_type.startswith("image/"):
raise HTTPException(
status_code=400,
detail=f"Invalid content type: {file.content_type}. Expected image/*"
)
ext = Path(file.filename).suffix.lower() if file.filename else ""
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Invalid file extension: {ext}. Allowed: {ALLOWED_EXTENSIONS}"
)
async def read_image(file: UploadFile) -> Image.Image:
"""Read uploaded file as PIL Image."""
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
return image
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to read image: {str(e)}"
)
# =============================================================================
# API Endpoints
# =============================================================================
@app.get("/", include_in_schema=False)
async def root():
"""Redirect to docs."""
return {"message": "Pneumonia Detection API", "docs": "/docs"}
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""
Health check endpoint.
Returns the API status and model loading state.
"""
return HealthResponse(
status="healthy" if model is not None else "model_not_loaded",
model_loaded=model is not None,
model_path=str(CHECKPOINT_PATH)
)
@app.post(
"/predict",
response_model=PredictionResponse,
responses={400: {"model": ErrorResponse}, 503: {"model": ErrorResponse}},
tags=["Prediction"]
)
async def predict(file: UploadFile = File(..., description="Chest X-ray image (JPEG/PNG)")):
"""
Predict pneumonia from chest X-ray image.
Upload a chest X-ray image and get the prediction (NORMAL or PNEUMONIA)
with confidence score.
"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
validate_image(file)
image = await read_image(file)
# Run inference
start_time = time.time()
pred_class, confidence = predict_image(model, image, device)
processing_time = (time.time() - start_time) * 1000 # Convert to ms
# Calculate raw probability
probability = confidence if pred_class == "PNEUMONIA" else 1 - confidence
return PredictionResponse(
prediction=pred_class,
confidence=confidence,
probability=probability,
processing_time_ms=round(processing_time, 2)
)
@app.post(
"/predict/gradcam",
response_model=GradCAMResponse,
responses={400: {"model": ErrorResponse}, 503: {"model": ErrorResponse}},
tags=["Prediction"]
)
async def predict_with_gradcam(file: UploadFile = File(..., description="Chest X-ray image (JPEG/PNG)")):
"""
Predict with Grad-CAM visualization.
Returns prediction along with a Grad-CAM heatmap overlay showing
which regions of the image influenced the prediction.
"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
validate_image(file)
image = await read_image(file)
# Run inference with Grad-CAM
start_time = time.time()
cam_image, pred_class, confidence, _ = generate_gradcam(model, image, device)
processing_time = (time.time() - start_time) * 1000
# Convert Grad-CAM image to base64
cam_pil = Image.fromarray(cam_image)
buffer = io.BytesIO()
cam_pil.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Calculate raw probability
probability = confidence if pred_class == "PNEUMONIA" else 1 - confidence
return GradCAMResponse(
prediction=pred_class,
confidence=confidence,
probability=probability,
processing_time_ms=round(processing_time, 2),
gradcam_image=f"data:image/png;base64,{img_base64}"
)
# =============================================================================
# Error Handlers
# =============================================================================
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Handle HTTP exceptions."""
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail, "detail": None}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Handle unexpected exceptions."""
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "detail": str(exc)}
)
# =============================================================================
# Run with: uvicorn api.main:app --reload --host 0.0.0.0 --port 8000
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)