|
|
from fastapi import FastAPI, UploadFile, File, HTTPException, Security, Depends |
|
|
from fastapi.security.api_key import APIKeyHeader |
|
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
|
import uvicorn |
|
|
import logging |
|
|
import io |
|
|
import os |
|
|
from typing import Tuple, Optional |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import cv2 |
|
|
|
|
|
|
|
|
from ultralytics import YOLO |
|
|
import mediapipe as mp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
API_KEY = "1234" |
|
|
api_key_header = APIKeyHeader(name="X-API-Key") |
|
|
|
|
|
def verify_api_key(api_key: str = Security(api_key_header)): |
|
|
if api_key != API_KEY: |
|
|
raise HTTPException(status_code=403, detail="Forbidden") |
|
|
return api_key |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s - %(levelname)s - %(message)s" |
|
|
) |
|
|
logger = logging.getLogger("stroke-api") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Stroke Detection API", |
|
|
version="1.2.0", |
|
|
description=""" |
|
|
🚑 Stroke Detection API using YOLOv8 + Face Detection (MediaPipe) |
|
|
|
|
|
⚠️ **Disclaimer**: Research/demo only — not a medical device. |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
model = YOLO("best.pt") |
|
|
logger.info("✅ YOLO model loaded.") |
|
|
except Exception as e: |
|
|
logger.exception("❌ Failed to load YOLO model") |
|
|
raise RuntimeError(f"Model loading failed: {e}") |
|
|
|
|
|
mp_face_detection = mp.solutions.face_detection |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ALLOWED_EXT = (".png", ".jpg", ".jpeg") |
|
|
ALLOWED_MIME = {"image/png", "image/jpeg"} |
|
|
MAX_BYTES = 8 * 1024 * 1024 |
|
|
CROP_ON_FACE = True |
|
|
|
|
|
def _validate_file(file: UploadFile, raw: bytes): |
|
|
|
|
|
if not file.filename.lower().endswith(ALLOWED_EXT): |
|
|
raise HTTPException(status_code=400, detail="Invalid file extension. Use .png/.jpg/.jpeg") |
|
|
|
|
|
if (file.content_type or "").lower() not in ALLOWED_MIME: |
|
|
|
|
|
if file.content_type: |
|
|
raise HTTPException(status_code=400, detail="Invalid content-type. Use image/png or image/jpeg") |
|
|
|
|
|
if len(raw) > MAX_BYTES: |
|
|
raise HTTPException(status_code=413, detail=f"Image too large. Max {MAX_BYTES//(1024*1024)} MB") |
|
|
|
|
|
def _read_image_to_numpy(raw: bytes) -> np.ndarray: |
|
|
try: |
|
|
img = Image.open(io.BytesIO(raw)).convert("RGB") |
|
|
return np.array(img) |
|
|
except Exception: |
|
|
raise HTTPException(status_code=400, detail="Unreadable image file") |
|
|
|
|
|
def _largest_face_bbox(np_img: np.ndarray, min_conf: float = 0.6) -> Optional[Tuple[int,int,int,int]]: |
|
|
""" |
|
|
Retourne (x1,y1,x2,y2) du plus grand visage détecté, ou None. |
|
|
""" |
|
|
h, w = np_img.shape[:2] |
|
|
with mp_face_detection.FaceDetection(min_detection_confidence=min_conf) as fd: |
|
|
results = fd.process(cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)) |
|
|
if not results.detections: |
|
|
return None |
|
|
boxes = [] |
|
|
for det in results.detections: |
|
|
rel = det.location_data.relative_bounding_box |
|
|
x1 = int(max(0, rel.xmin) * w) |
|
|
y1 = int(max(0, rel.ymin) * h) |
|
|
x2 = int(min(1.0, rel.xmin + rel.width) * w) |
|
|
y2 = int(min(1.0, rel.ymin + rel.height) * h) |
|
|
boxes.append((x1, y1, x2, y2)) |
|
|
|
|
|
boxes.sort(key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True) |
|
|
return boxes[0] if boxes else None |
|
|
|
|
|
def _crop_to_bbox(np_img: np.ndarray, bbox: Tuple[int,int,int,int], margin: float = 0.15) -> np.ndarray: |
|
|
h, w = np_img.shape[:2] |
|
|
x1, y1, x2, y2 = bbox |
|
|
bw, bh = x2 - x1, y2 - y1 |
|
|
|
|
|
dx, dy = int(bw * margin), int(bh * margin) |
|
|
X1 = max(0, x1 - dx) |
|
|
Y1 = max(0, y1 - dy) |
|
|
X2 = min(w, x2 + dx) |
|
|
Y2 = min(h, y2 + dy) |
|
|
return np_img[Y1:Y2, X1:X2].copy() |
|
|
|
|
|
def _annotate_face_box(np_img: np.ndarray, bbox: Tuple[int,int,int,int]) -> np.ndarray: |
|
|
annotated = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR).copy() |
|
|
x1, y1, x2, y2 = bbox |
|
|
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
return annotated |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return {"status": "ok", "model_loaded": True} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/predict/") |
|
|
async def predict( |
|
|
file: UploadFile = File(...), |
|
|
api_key: str = Depends(verify_api_key) |
|
|
): |
|
|
raw = await file.read() |
|
|
_validate_file(file, raw) |
|
|
|
|
|
try: |
|
|
np_img = _read_image_to_numpy(raw) |
|
|
|
|
|
|
|
|
face_bbox = _largest_face_bbox(np_img) |
|
|
if face_bbox is None: |
|
|
return JSONResponse( |
|
|
status_code=422, |
|
|
content={"status": "error", "message": "Aucun visage humain détecté. Veuillez centrer le visage."} |
|
|
) |
|
|
|
|
|
|
|
|
input_img = _crop_to_bbox(np_img, face_bbox) if CROP_ON_FACE else np_img |
|
|
|
|
|
|
|
|
results = model.predict(source=input_img, verbose=False) |
|
|
|
|
|
output = [] |
|
|
for r in results: |
|
|
for box in r.boxes: |
|
|
output.append({ |
|
|
"class": r.names[int(box.cls[0].item())], |
|
|
"confidence": float(box.conf[0].item()), |
|
|
"bbox": box.xyxy[0].tolist() |
|
|
}) |
|
|
|
|
|
logger.info(f"/predict {file.filename} -> {len(output)} detections (face ok)") |
|
|
return JSONResponse(content={ |
|
|
"status": "ok", |
|
|
"face_detected": True, |
|
|
"face_bbox": list(map(int, face_bbox)), |
|
|
"predictions": output |
|
|
}) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception("Error in /v1/predict") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/predict_image/") |
|
|
async def predict_image( |
|
|
file: UploadFile = File(...), |
|
|
api_key: str = Depends(verify_api_key) |
|
|
): |
|
|
raw = await file.read() |
|
|
_validate_file(file, raw) |
|
|
|
|
|
try: |
|
|
np_img = _read_image_to_numpy(raw) |
|
|
|
|
|
|
|
|
face_bbox = _largest_face_bbox(np_img) |
|
|
if face_bbox is None: |
|
|
return JSONResponse( |
|
|
status_code=422, |
|
|
content={"status": "error", "message": "Aucun visage humain détecté. Veuillez centrer le visage."} |
|
|
) |
|
|
|
|
|
|
|
|
input_img = _crop_to_bbox(np_img, face_bbox) if CROP_ON_FACE else np_img |
|
|
|
|
|
|
|
|
results = model.predict(source=input_img, verbose=False) |
|
|
|
|
|
|
|
|
yolo_annot = results[0].plot() |
|
|
yolo_annot = cv2.cvtColor(yolo_annot, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
if not CROP_ON_FACE: |
|
|
annotated = _annotate_face_box(np_img, face_bbox) |
|
|
|
|
|
out_rgb = annotated |
|
|
else: |
|
|
|
|
|
out_rgb = yolo_annot |
|
|
|
|
|
|
|
|
pil_img = Image.fromarray(out_rgb) |
|
|
buf = io.BytesIO() |
|
|
pil_img.save(buf, format="PNG") |
|
|
buf.seek(0) |
|
|
|
|
|
logger.info(f"/predict_image {file.filename} -> face ok + image annotée") |
|
|
return StreamingResponse(buf, media_type="image/png") |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception("Error in /v1/predict_image") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|