test / app.py
mbouhaja's picture
Add annotated image base64 in /detect response
6fdd52c
"""
API de détection d'espaces vides sur étagères.
Endpoints :
POST /detect β€” Envoie une image, reΓ§oit les dΓ©tections + score
POST /detect/annotated β€” Envoie une image, reΓ§oit l'image annotΓ©e
GET /health β€” VΓ©rifier que l'API est en ligne
DΓ©ploiement gratuit : Hugging Face Spaces (Docker)
"""
import io
import base64
import time
from pathlib import Path
import cv2
import numpy as np
from fastapi import FastAPI, File, UploadFile, Query
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from ultralytics import YOLO
# ──────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────
MODEL_PATH = Path(__file__).parent / "model" / "best.pt"
CONFIDENCE_DEFAULT = 0.25
# ──────────────────────────────────────────────
# Initialisation FastAPI
# ──────────────────────────────────────────────
app = FastAPI(
title="πŸ” Shelf Empty Space Detector",
description="API de détection d'espaces vides sur étagères de magasin",
version="1.0.0",
)
# CORS β€” autoriser les appels depuis n'importe quelle origine (mobile, web, etc.)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ──────────────────────────────────────────────
# Chargement du modèle au démarrage
# ──────────────────────────────────────────────
print(f"πŸ“¦ Chargement du modΓ¨le : {MODEL_PATH}")
model = YOLO(str(MODEL_PATH))
print("βœ… ModΓ¨le chargΓ© avec succΓ¨s")
# ──────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────
def read_image(file_bytes: bytes) -> np.ndarray:
"""Convertir les bytes d'un fichier uploadΓ© en image OpenCV."""
nparr = np.frombuffer(file_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise ValueError("Impossible de dΓ©coder l'image")
return img
def analyze_detections(image: np.ndarray, results) -> dict:
"""Analyser les dΓ©tections et calculer les statistiques."""
result = results[0]
boxes = result.boxes
img_h, img_w = image.shape[:2]
img_area = img_h * img_w
detections = []
empty_area_total = 0
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0])
w = x2 - x1
h = y2 - y1
area = float(w * h)
empty_area_total += area
zone_pct = (area / img_area) * 100
detections.append({
"id": i + 1,
"bbox": {
"x1": int(x1), "y1": int(y1),
"x2": int(x2), "y2": int(y2),
},
"width": int(w),
"height": int(h),
"area_px": int(area),
"area_percentage": round(zone_pct, 2),
"confidence": round(conf, 4),
})
pct_empty = (empty_area_total / img_area) * 100
pct_merchandise = 100 - pct_empty
# DΓ©terminer le statut
if pct_empty < 5:
status = "excellent"
status_label = "Étagère bien remplie"
elif pct_empty < 15:
status = "correct"
status_label = "Quelques espaces Γ  rΓ©approvisionner"
elif pct_empty < 30:
status = "attention"
status_label = "RΓ©approvisionnement nΓ©cessaire"
else:
status = "critique"
status_label = "Étagère largement vide, action urgente"
# Trier les dΓ©tections par surface dΓ©croissante
detections_sorted = sorted(detections, key=lambda d: d["area_px"], reverse=True)
for idx, d in enumerate(detections_sorted):
d["id"] = idx + 1
d["rank"] = idx + 1
# Zone la plus grande et la plus petite
largest = detections_sorted[0] if detections_sorted else None
smallest = detections_sorted[-1] if detections_sorted else None
# Confiance moyenne
avg_conf = (
round(sum(d["confidence"] for d in detections_sorted) / len(detections_sorted), 4)
if detections_sorted else 0
)
return {
"image_size": {"width": img_w, "height": img_h},
"total_area_px": img_area,
"summary": {
"empty_area_px": int(empty_area_total),
"empty_percentage": round(pct_empty, 2),
"merchandise_percentage": round(pct_merchandise, 2),
"fill_rate": round(pct_merchandise, 2),
"num_detections": len(detections_sorted),
"average_confidence": avg_conf,
"largest_empty_zone": {
"id": largest["id"],
"area_px": largest["area_px"],
"area_percentage": largest["area_percentage"],
"confidence": largest["confidence"],
} if largest else None,
"smallest_empty_zone": {
"id": smallest["id"],
"area_px": smallest["area_px"],
"area_percentage": smallest["area_percentage"],
"confidence": smallest["confidence"],
} if smallest else None,
},
"status": status,
"status_label": status_label,
"detections": detections_sorted,
}
def annotate_image(image: np.ndarray, analysis: dict) -> np.ndarray:
"""GΓ©nΓ©rer une image annotΓ©e avec les dΓ©tections."""
annotated = image.copy()
img_h, img_w = image.shape[:2]
for d in analysis["detections"]:
bbox = d["bbox"]
x1, y1, x2, y2 = bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"]
conf = d["confidence"]
# Rectangle semi-transparent rouge
overlay = annotated.copy()
cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 0, 255), -1)
cv2.addWeighted(overlay, 0.3, annotated, 0.7, 0, annotated)
# Contour
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 0, 255), 2)
# Label
zone_pct = d.get("area_percentage", 0)
label = f"Vide {conf:.0%} ({zone_pct:.1f}%)"
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated, (x1, y1 - th - 10), (x1 + tw + 5, y1), (0, 0, 255), -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Bannière en haut
pct_m = analysis["summary"]["merchandise_percentage"]
pct_e = analysis["summary"]["empty_percentage"]
n = analysis["summary"]["num_detections"]
banner_h = 80
cv2.rectangle(annotated, (0, 0), (img_w, banner_h), (40, 40, 40), -1)
score_text = f"Remplissage: {pct_m:.1f}% | Vide: {pct_e:.1f}% | {n} zone(s)"
cv2.putText(annotated, score_text, (15, 35),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
# Barre de progression
bar_x, bar_y, bar_w, bar_bh = 15, 50, img_w - 30, 20
cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_bh), (100, 100, 100), -1)
fill_w = int(bar_w * pct_m / 100)
cv2.rectangle(annotated, (bar_x, bar_y), (bar_x + fill_w, bar_y + bar_bh), (0, 200, 0), -1)
if fill_w < bar_w:
cv2.rectangle(annotated, (bar_x + fill_w, bar_y),
(bar_x + bar_w, bar_y + bar_bh), (0, 0, 200), -1)
return annotated
# ──────────────────────────────────────────────
# Endpoints
# ──────────────────────────────────────────────
@app.get("/health")
async def health():
"""VΓ©rifier que l'API est en ligne."""
return {"status": "ok", "model": str(MODEL_PATH.name)}
@app.post("/detect")
async def detect(
file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"),
confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0,
description="Seuil de confiance (0.01 - 1.0)"),
):
"""
Détecter les espaces vides dans une image d'étagère.
Retourne un JSON avec :
- Le pourcentage d'espace vide / marchandises
- Le nombre et les coordonnΓ©es des zones vides
- Le statut (excellent / correct / attention / critique)
"""
start = time.time()
try:
file_bytes = await file.read()
image = read_image(file_bytes)
except Exception as e:
return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"})
# InfΓ©rence
results = model.predict(
source=image,
conf=confidence,
imgsz=640,
device="cpu",
verbose=False,
)
analysis = analyze_detections(image, results)
analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1)
# GΓ©nΓ©rer l'image annotΓ©e en base64
annotated = annotate_image(image, analysis)
_, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85])
analysis["annotated_image_base64"] = base64.b64encode(buffer.tobytes()).decode("utf-8")
return analysis
@app.post("/detect/annotated")
async def detect_annotated(
file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"),
confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0,
description="Seuil de confiance (0.01 - 1.0)"),
):
"""
DΓ©tecter les espaces vides et retourner l'image annotΓ©e (JPEG).
Utile pour afficher directement le rΓ©sultat visuel dans l'app mobile.
"""
try:
file_bytes = await file.read()
image = read_image(file_bytes)
except Exception as e:
return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"})
# InfΓ©rence
results = model.predict(
source=image,
conf=confidence,
imgsz=640,
device="cpu",
verbose=False,
)
analysis = analyze_detections(image, results)
annotated = annotate_image(image, analysis)
# Encoder en JPEG
_, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 90])
return StreamingResponse(
io.BytesIO(buffer.tobytes()),
media_type="image/jpeg",
headers={"X-Empty-Percentage": str(analysis["summary"]["empty_percentage"]),
"X-Status": analysis["status"]},
)
@app.post("/detect/full")
async def detect_full(
file: UploadFile = File(..., description="Image d'étagère (JPG, PNG)"),
confidence: float = Query(CONFIDENCE_DEFAULT, ge=0.01, le=1.0,
description="Seuil de confiance (0.01 - 1.0)"),
):
"""
Retourne le JSON complet + l'image annotΓ©e encodΓ©e en base64.
IdΓ©al pour les apps mobiles qui veulent les donnΓ©es ET l'image en un seul appel.
"""
start = time.time()
try:
file_bytes = await file.read()
image = read_image(file_bytes)
except Exception as e:
return JSONResponse(status_code=400, content={"error": f"Image invalide: {e}"})
# InfΓ©rence
results = model.predict(
source=image,
conf=confidence,
imgsz=640,
device="cpu",
verbose=False,
)
analysis = analyze_detections(image, results)
analysis["inference_time_ms"] = round((time.time() - start) * 1000, 1)
# GΓ©nΓ©rer l'image annotΓ©e en base64
annotated = annotate_image(image, analysis)
_, buffer = cv2.imencode(".jpg", annotated, [cv2.IMWRITE_JPEG_QUALITY, 85])
img_base64 = base64.b64encode(buffer.tobytes()).decode("utf-8")
analysis["annotated_image_base64"] = img_base64
return analysis
# ──────────────────────────────────────────────
# Lancement local
# ──────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)