Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import cv2 | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from ultralytics import YOLO | |
| import insightface | |
| from PIL import Image, ImageOps | |
| import pillow_heif | |
| import io | |
| # Enable HEIC/HEIF support | |
| pillow_heif.register_heif_opener() | |
| app = FastAPI() | |
| # ---------------------------- | |
| # Load Models (CPU mode) | |
| # ---------------------------- | |
| yolo = YOLO("yolov8n.pt") | |
| face_model = insightface.app.FaceAnalysis(name="buffalo_l") | |
| face_model.prepare(ctx_id=-1) | |
| # ---------------------------- | |
| # Utility: Normalize embedding | |
| # ---------------------------- | |
| def normalize(vec): | |
| vec = np.array(vec, dtype=np.float32) | |
| norm = np.linalg.norm(vec) | |
| if norm == 0: | |
| return vec.tolist() | |
| return (vec / norm).tolist() | |
| # ---------------------------- | |
| # Decode Image (JPEG/PNG/WEBP/HEIC/HEIF) | |
| # ---------------------------- | |
| def decode_image(body: bytes): | |
| # 🔥 Fast path: OpenCV (JPEG/PNG/WebP) | |
| np_arr = np.frombuffer(body, np.uint8) | |
| image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
| if image is not None: | |
| return image | |
| # 🔥 Fallback: Pillow (HEIC/HEIF support) | |
| try: | |
| image = Image.open(io.BytesIO(body)) | |
| # Auto-rotate based on EXIF | |
| image = ImageOps.exif_transpose(image) | |
| image = image.convert("RGB") | |
| image_np = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| return image_np | |
| except Exception: | |
| return None | |
| # ---------------------------- | |
| # Face Processing | |
| # ---------------------------- | |
| def process_image_np(image_np): | |
| results = yolo(image_np) | |
| faces_output = [] | |
| for r in results: | |
| boxes = r.boxes | |
| for box, cls, conf in zip(boxes.xyxy, boxes.cls, boxes.conf): | |
| if int(cls) != 0: # YOLO class 0 = person | |
| continue | |
| if float(conf) < 0.4: | |
| continue | |
| xmin, ymin, xmax, ymax = map(int, box.cpu().numpy()) | |
| h, w, _ = image_np.shape | |
| xmin = max(0, xmin) | |
| ymin = max(0, ymin) | |
| xmax = min(w, xmax) | |
| ymax = min(h, ymax) | |
| person_crop = image_np[ymin:ymax, xmin:xmax] | |
| if person_crop.size == 0: | |
| continue | |
| detected_faces = face_model.get(person_crop) | |
| for face in detected_faces: | |
| embedding = normalize(face.embedding) | |
| fxmin, fymin, fxmax, fymax = face.bbox.astype(int) | |
| faces_output.append({ | |
| "cx": float((fxmin + fxmax) / 2 + xmin), | |
| "cy": float((fymin + fymax) / 2 + ymin), | |
| "confidence": float(conf), | |
| "box": { | |
| "xmin": int(fxmin + xmin), | |
| "ymin": int(fymin + ymin), | |
| "xmax": int(fxmax + xmin), | |
| "ymax": int(fymax + ymin) | |
| }, | |
| "embedding": embedding | |
| }) | |
| return faces_output | |
| # ---------------------------- | |
| # API Endpoint | |
| # ---------------------------- | |
| async def detect(request: Request): | |
| body = await request.body() | |
| if not body: | |
| return JSONResponse( | |
| {"error": "Empty request body"}, | |
| status_code=400 | |
| ) | |
| image_np = decode_image(body) | |
| if image_np is None: | |
| return JSONResponse( | |
| {"error": "Unsupported or invalid image format"}, | |
| status_code=400 | |
| ) | |
| result = process_image_np(image_np) | |
| return result | |
| from fastapi import Response | |
| # ---------------------------- | |
| # Convert Image To JPEG (For Browser Display) | |
| # ---------------------------- | |
| async def convert(request: Request): | |
| body = await request.body() | |
| if not body: | |
| return JSONResponse( | |
| {"error": "Empty request body"}, | |
| status_code=400 | |
| ) | |
| image_np = decode_image(body) | |
| if image_np is None: | |
| return JSONResponse( | |
| {"error": "Unsupported or invalid image format"}, | |
| status_code=400 | |
| ) | |
| # Convert BGR -> RGB | |
| image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(image_rgb) | |
| buffer = io.BytesIO() | |
| pil_image.save(buffer, format="JPEG", quality=90) | |
| return Response( | |
| content=buffer.getvalue(), | |
| media_type="image/jpeg" | |
| ) |