| """ |
| API de détection d'espaces vides sur étagères. |
| |
| Endpoints : |
| POST /detect β Envoie une image, reΓ§oit les dΓ©tections + score |
| POST /detect/annotated β Envoie une image, reΓ§oit l'image annotΓ©e |
| GET /health β VΓ©rifier que l'API est en ligne |
| |
| DΓ©ploiement gratuit : Hugging Face Spaces (Docker) |
| """ |
|
|
| import io |
| import base64 |
| import time |
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| from fastapi import FastAPI, File, UploadFile, Query |
| from fastapi.responses import JSONResponse, StreamingResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from ultralytics import YOLO |
|
|
| |
| |
| |
| MODEL_PATH = Path(__file__).parent / "model" / "best.pt" |
| CONFIDENCE_DEFAULT = 0.25 |
|
|
| |
| |
| |
| app = FastAPI( |
| title="π Shelf Empty Space Detector", |
| description="API de détection d'espaces vides sur étagères de magasin", |
| version="1.0.0", |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
| print(f"π¦ Chargement du modΓ¨le : {MODEL_PATH}") |
| model = YOLO(str(MODEL_PATH)) |
| print("β
Modèle chargé avec succès") |
|
|
|
|
| |
| |
| |
| def read_image(file_bytes: bytes) -> np.ndarray: |
| """Convertir les bytes d'un fichier uploadΓ© en image OpenCV.""" |
| nparr = np.frombuffer(file_bytes, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if img is None: |
| raise ValueError("Impossible de dΓ©coder l'image") |
| return img |
|
|
|
|
| def analyze_detections(image: np.ndarray, results) -> dict: |
| """Analyser les dΓ©tections et calculer les statistiques.""" |
| result = results[0] |
| boxes = result.boxes |
| img_h, img_w = image.shape[:2] |
| img_area = img_h * img_w |
|
|
| detections = [] |
| empty_area_total = 0 |
|
|
| for i, box in enumerate(boxes): |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() |
| conf = float(box.conf[0]) |
| w = x2 - x1 |
| h = y2 - y1 |
| area = float(w * h) |
| empty_area_total += area |
| zone_pct = (area / img_area) * 100 |
| detections.append({ |
| "id": i + 1, |
| "bbox": { |
| "x1": int(x1), "y1": int(y1), |
| "x2": int(x2), "y2": int(y2), |
| }, |
| "width": int(w), |
| "height": int(h), |
| "area_px": int(area), |
| "area_percentage": round(zone_pct, 2), |
| "confidence": round(conf, 4), |
| }) |
|
|
| pct_empty = (empty_area_total / img_area) * 100 |
| pct_merchandise = 100 - pct_empty |
|
|
| |
| if pct_empty < 5: |
| status = "excellent" |
| status_label = "ΓtagΓ¨re bien remplie" |
| elif pct_empty < 15: |
| status = "correct" |
| status_label = "Quelques espaces Γ rΓ©approvisionner" |
| elif pct_empty < 30: |
| status = "attention" |
| status_label = "RΓ©approvisionnement nΓ©cessaire" |
| else: |
| status = "critique" |
| status_label = "ΓtagΓ¨re largement vide, action urgente" |
|
|
| |
| detections_sorted = sorted(detections, key=lambda d: d["area_px"], reverse=True) |
| for idx, d in enumerate(detections_sorted): |
| d["id"] = idx + 1 |
| d["rank"] = idx + 1 |
|
|
| |
| largest = detections_sorted[0] if detections_sorted else None |
| smallest = detections_sorted[-1] if detections_sorted else None |
|
|
| |
| avg_conf = ( |
| round(sum(d["confidence"] for d in detections_sorted) / len(detections_sorted), 4) |
| if detections_sorted else 0 |
| ) |
|
|
| return { |
| "image_size": {"width": img_w, "height": img_h}, |
| "total_area_px": img_area, |
| "summary": { |
| "empty_area_px": int(empty_area_total), |
| "empty_percentage": round(pct_empty, 2), |
| "merchandise_percentage": round(pct_merchandise, 2), |
| "fill_rate": round(pct_merchandise, 2), |
| "num_detections": len(detections_sorted), |
| "average_confidence": avg_conf, |
| "largest_empty_zone": { |
| "id": largest["id"], |
| "area_px": largest["area_px"], |
| "area_percentage": largest["area_percentage"], |
| "confidence": largest["confidence"], |
| } if largest else None, |
| "smallest_empty_zone": { |
| "id": smallest["id"], |
| "area_px": smallest["area_px"], |
| "area_percentage": smallest["area_percentage"], |
| "confidence": smallest["confidence"], |
| } if smallest else None, |
| }, |
| "status": status, |
| "status_label": status_label, |
| "detections": detections_sorted, |
| } |
|
|
|
|
| def annotate_image(image: np.ndarray, analysis: dict) -> np.ndarray: |
| """GΓ©nΓ©rer une image annotΓ©e avec les dΓ©tections.""" |
| annotated = image.copy() |
| img_h, img_w = image.shape[:2] |
|
|
| for d in analysis["detections"]: |
| bbox = d["bbox"] |
| x1, y1, x2, y2 = bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"] |
| conf = d["confidence"] |
|
|
| |
| overlay = annotated.copy() |
| cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 0, 255), -1) |
| cv2.addWeighted(overlay, 0.3, annotated, 0.7, 0, annotated) |
|
|
| |
| cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 0, 255), 2) |
|
|
| |
| zone_pct = d.get("area_percentage", 0) |
| label = f"Vide {conf:.0%} ({zone_pct:.1f}%)" |
| (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) |
| cv2.rectangle(annotated, (x1, y1 - th - 10), (x1 + tw + 5, y1), (0, 0, 255), -1) |
| cv2.putText(annotated, label, (x1 + 2, y1 - 5), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
|
| |
| pct_m = analysis["summary"]["merchandise_percentage"] |
| pct_e = analysis["summary"]["empty_percentage"] |
| n = analysis["summary"]["num_detections"] |
|
|
| banner_h = 80 |
| cv2.rectangle(annotated, (0, 0), (img_w, banner_h), (40, 40, 40), -1) |
| score_text = f"Remplissage: {pct_m:.1f}% | Vide: {pct_e:.1f}% | {n} zone(s)" |
| cv2.putText(annotated, score_text, (15, 35), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) |
|
|
| |
| bar_x, bar_y, bar_w, bar_bh = 15, 50, img_w - 30, 20 |
| cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_bh), (100, 100, 100), -1) |
| fill_w = int(bar_w * pct_m / 100) |
| cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + fill_w, bar_y + bar_bh), (0, 200, 0), -1) |
| if fill_w < bar_w: |
| cv2.rectangle(annotated, (bar_x + fill_w, bar_y), |
| (bar_x + bar_w, bar_y + bar_bh), (0, 0, 200), -1) |
|
|
| return annotated |
|
|
|
|
| |
| |
| |
| @app.get("/health") |
| async def health(): |
| """VΓ©rifier que l'API est en ligne.""" |
| return {"status": "ok", "model": str(MODEL_PATH.name)} |
|
|
|
|
| @app.post("/detect") |
| async def detect( |
| file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), |
| confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, |
| description="Seuil de confiance (0.01 - 1.0)"), |
| ): |
| """ |
| Détecter les espaces vides dans une image d'étagère. |
| |
| Retourne un JSON avec : |
| - Le pourcentage d'espace vide / marchandises |
| - Le nombre et les coordonnΓ©es des zones vides |
| - Le statut (excellent / correct / attention / critique) |
| """ |
| start = time.time() |
| try: |
| file_bytes = await file.read() |
| image = read_image(file_bytes) |
| except Exception as e: |
| return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) |
|
|
| |
| results = model.predict( |
| source=image, |
| conf=confidence, |
| imgsz=640, |
| device="cpu", |
| verbose=False, |
| ) |
|
|
| analysis = analyze_detections(image, results) |
| analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1) |
|
|
| |
| annotated = annotate_image(image, analysis) |
| _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85]) |
| analysis["annotated_image_base64"] = base64.b64encode(buffer.tobytes()).decode("utf-8") |
|
|
| return analysis |
|
|
|
|
| @app.post("/detect/annotated") |
| async def detect_annotated( |
| file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), |
| confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, |
| description="Seuil de confiance (0.01 - 1.0)"), |
| ): |
| """ |
| DΓ©tecter les espaces vides et retourner l'image annotΓ©e (JPEG). |
| |
| Utile pour afficher directement le rΓ©sultat visuel dans l'app mobile. |
| """ |
| try: |
| file_bytes = await file.read() |
| image = read_image(file_bytes) |
| except Exception as e: |
| return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) |
|
|
| |
| results = model.predict( |
| source=image, |
| conf=confidence, |
| imgsz=640, |
| device="cpu", |
| verbose=False, |
| ) |
|
|
| analysis = analyze_detections(image, results) |
| annotated = annotate_image(image, analysis) |
|
|
| |
| _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 90]) |
| return StreamingResponse( |
| io.BytesIO(buffer.tobytes()), |
| media_type="image/jpeg", |
| headers={"X-Empty-Percentage": str(analysis["summary"]["empty_percentage"]), |
| "X-Status": analysis["status"]}, |
| ) |
|
|
|
|
| @app.post("/detect/full") |
| async def detect_full( |
| file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"), |
| confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0, |
| description="Seuil de confiance (0.01 - 1.0)"), |
| ): |
| """ |
| Retourne le JSON complet + l'image annotΓ©e encodΓ©e en base64. |
| |
| IdΓ©al pour les apps mobiles qui veulent les donnΓ©es ET l'image en un seul appel. |
| """ |
| start = time.time() |
| try: |
| file_bytes = await file.read() |
| image = read_image(file_bytes) |
| except Exception as e: |
| return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"}) |
|
|
| |
| results = model.predict( |
| source=image, |
| conf=confidence, |
| imgsz=640, |
| device="cpu", |
| verbose=False, |
| ) |
|
|
| analysis = analyze_detections(image, results) |
| analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1) |
|
|
| |
| annotated = annotate_image(image, analysis) |
| _, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85]) |
| img_base64 = base64.b64encode(buffer.tobytes()).decode("utf-8") |
| analysis["annotated_image_base64"] = img_base64 |
|
|
| return analysis |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|