""" API de détection d'espaces vides sur étagères. Endpoints : POST /detect — Envoie une image, reçoit les détections + score POST /detect/annotated — Envoie une image, reçoit l'image annotée GET /health — Vérifier que l'API est en ligne Déploiement gratuit : Hugging Face Spaces (Docker) """ import io import base64 import time from pathlib import Path import cv2 import numpy as np from fastapi import FastAPI, File, UploadFile, Query from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from ultralytics import YOLO # ────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────── MODEL_PATH = Path(__file__).parent / "model" / "best.pt" CONFIDENCE_DEFAULT = 0.25 # ────────────────────────────────────────────── # Initialisation FastAPI # ────────────────────────────────────────────── app = FastAPI( title="🔍 Shelf Empty Space Detector", description="API de détection d'espaces vides sur étagères de magasin", version="1.0.0", ) # CORS — autoriser les appels depuis n'importe quelle origine (mobile, web, etc.) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ────────────────────────────────────────────── # Chargement du modèle au démarrage # ────────────────────────────────────────────── print(f"📦 Chargement du modèle : {MODEL_PATH}") model = YOLO(str(MODEL_PATH)) print("✅ Modèle chargé avec succès") # ────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────── def read_image(file_bytes: bytes) -> np.ndarray: """Convertir les bytes d'un fichier uploadé en image OpenCV.""" nparr = np.frombuffer(file_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise ValueError("Impossible de décoder l'image") return img def analyze_detections(image: np.ndarray, results) -> dict: """Analyser les détections et calculer les statistiques.""" result = results[0] boxes = result.boxes img_h, img_w = image.shape[:2] img_area = img_h * img_w detections = [] empty_area_total = 0 for i, box in enumerate(boxes): x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() conf = float(box.conf[0]) w = x2 - x1 h = y2 - y1 area = float(w * h) empty_area_total += area zone_pct = (area / img_area) * 100 detections.append({ "id": i + 1, "bbox": { "x1": int(x1), "y1": int(y1), "x2": int(x2), "y2": int(y2), }, "width": int(w), "height": int(h), "area_px": int(area), "area_percentage": round(zone_pct, 2), "confidence": round(conf, 4), }) pct_empty = (empty_area_total / img_area) * 100 pct_merchandise = 100 - pct_empty # Déterminer le statut if pct_empty < 5: status = "excellent" status_label = "Étagère bien remplie" elif pct_empty < 15: status = "correct" status_label = "Quelques espaces à réapprovisionner" elif pct_empty < 30: status = "attention" status_label = "Réapprovisionnement nécessaire" else: status = "critique" status_label = "Étagère largement vide, action urgente" # Trier les détections par surface décroissante detections_sorted = sorted(detections, key=lambda d: d["area_px"], reverse=True) for idx, d in enumerate(detections_sorted): d["id"] = idx + 1 d["rank"] = idx + 1 # Zone la plus grande et la plus petite largest = detections_sorted[0] if detections_sorted else None smallest = detections_sorted[-1] if detections_sorted else None # Confiance moyenne avg_conf = ( round(sum(d["confidence"] for d in detections_sorted) / len(detections_sorted), 4) if detections_sorted else 0 ) return { "image_size": {"width": img_w, "height": img_h}, "total_area_px": img_area, "summary": { "empty_area_px": int(empty_area_total), "empty_percentage": round(pct_empty, 2), "merchandise_percentage": round(pct_merchandise, 2), "fill_rate": round(pct_merchandise, 2), "num_detections": len(detections_sorted), "average_confidence": avg_conf, "largest_empty_zone": { "id": largest["id"], "area_px": largest["area_px"], "area_percentage": largest["area_percentage"], "confidence": largest["confidence"], } if largest else None, "smallest_empty_zone": { "id": smallest["id"], "area_px": smallest["area_px"], "area_percentage": smallest["area_percentage"], "confidence": smallest["confidence"], } if smallest else None, }, "status": status, "status_label": status_label, "detections": detections_sorted, } def annotate_image(image: np.ndarray, analysis: dict) -> np.ndarray: """Générer une image annotée avec les détections.""" annotated = image.copy() img_h, img_w = image.shape[:2] for d in analysis["detections"]: bbox = d["bbox"] x1, y1, x2, y2 = bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"] conf = d["confidence"] # Rectangle semi-transparent rouge overlay = annotated.copy() cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 0, 255), -1) cv2.addWeighted(overlay, 0.3, annotated, 0.7, 0, annotated) # Contour cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 0, 255), 2) # Label zone_pct = d.get("area_percentage", 0) label = f"Vide {conf:.0%} ({zone_pct:.1f}%)" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(annotated, (x1, y1 - th - 10), (x1 + tw + 5, y1), (0, 0, 255), -1) cv2.putText(annotated, label, (x1 + 2, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Bannière en haut pct_m = analysis["summary"]["merchandise_percentage"] pct_e = analysis["summary"]["empty_percentage"] n = analysis["summary"]["num_detections"] banner_h = 80 cv2.rectangle(annotated, (0, 0), (img_w, banner_h), (40, 40, 40), -1) score_text = f"Remplissage: {pct_m:.1f}% | Vide: {pct_e:.1f}% | {n} zone(s)" cv2.putText(annotated, score_text, (15, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) # Barre de progression bar_x, bar_y, bar_w, bar_bh = 15, 50, img_w - 30, 20 cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_bh), (100, 100, 100), -1) fill_w = int(bar_w * pct_m / 100) cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + fill_w, bar_y + bar_bh), (0, 200, 0), -1) if fill_w < bar_w: cv2.rectangle(annotated, (bar_x + fill_w, bar_y), (bar_x + bar_w, bar_y + bar_bh), (0, 0, 200), -1) return annotated # ────────────────────────────────────────────── # Endpoints # ────────────────────────────────────────────── @app.get("/health") async def health(): """Vérifier que l'API est en ligne.""" return {"status": "ok", "model": str(MODEL_PATH.name)} @app.post("/detect") async def detect( file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, description="Seuil de confiance (0.01 - 1.0)"), ): """ Détecter les espaces vides dans une image d'étagère. Retourne un JSON avec : - Le pourcentage d'espace vide / marchandises - Le nombre et les coordonnées des zones vides - Le statut (excellent / correct / attention / critique) """ start = time.time() try: file_bytes = await file.read() image = read_image(file_bytes) except Exception as e: return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) # Inférence results = model.predict( source=image, conf=confidence, imgsz=640, device="cpu", verbose=False, ) analysis = analyze_detections(image, results) analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1) # Générer l'image annotée en base64 annotated = annotate_image(image, analysis) _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85]) analysis["annotated_image_base64"] = base64.b64encode(buffer.tobytes()).decode("utf-8") return analysis @app.post("/detect/annotated") async def detect_annotated( file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, description="Seuil de confiance (0.01 - 1.0)"), ): """ Détecter les espaces vides et retourner l'image annotée (JPEG). Utile pour afficher directement le résultat visuel dans l'app mobile. """ try: file_bytes = await file.read() image = read_image(file_bytes) except Exception as e: return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) # Inférence results = model.predict( source=image, conf=confidence, imgsz=640, device="cpu", verbose=False, ) analysis = analyze_detections(image, results) annotated = annotate_image(image, analysis) # Encoder en JPEG _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 90]) return StreamingResponse( io.BytesIO(buffer.tobytes()), media_type="image/jpeg", headers={"X-Empty-Percentage": str(analysis["summary"]["empty_percentage"]), "X-Status": analysis["status"]}, ) @app.post("/detect/full") async def detect_full( file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, description="Seuil de confiance (0.01 - 1.0)"), ): """ Retourne le JSON complet + l'image annotée encodée en base64. Idéal pour les apps mobiles qui veulent les données ET l'image en un seul appel. """ start = time.time() try: file_bytes = await file.read() image = read_image(file_bytes) except Exception as e: return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) # Inférence results = model.predict( source=image, conf=confidence, imgsz=640, device="cpu", verbose=False, ) analysis = analyze_detections(image, results) analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1) # Générer l'image annotée en base64 annotated = annotate_image(image, analysis) _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85]) img_base64 = base64.b64encode(buffer.tobytes()).decode("utf-8") analysis["annotated_image_base64"] = img_base64 return analysis # ────────────────────────────────────────────── # Lancement local # ────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)