import base64 import io import numpy as np from PIL import Image from ultralytics import YOLO import torch import os import time class EndpointHandler: def __init__(self, path=""): model_path = "yolo11n-seg.pt" # Select device if torch.backends.mps.is_available(): self.device = "mps" elif torch.cuda.is_available(): self.device = "cuda" else: self.device = "cpu" # Load YOLO model self.model = YOLO(model_path) print(f"✅ Model loaded on {self.device}") def __call__(self, data): """ Expected input: { "inputs": } """ if "inputs" not in data: return {"error": "No input image provided"} # Decode image img_b64 = data["inputs"] img_bytes = base64.b64decode(img_b64) image = Image.open(io.BytesIO(img_bytes)).convert("RGB") image = np.array(image) t0 = time.perf_counter() # Run inference results = self.model(image, device=self.device, verbose=False)[0] elapsed = time.perf_counter() - t0 detections = [] if results.masks is not None: for box, mask, conf, cls, poly in zip( results.boxes.xyxy, results.masks.data, results.boxes.conf, results.boxes.cls, results.masks.xy ): x1, y1, x2, y2 = map(int, box.tolist()) poly_np = poly.tolist() # polygon points detections.append({ "box": [x1, y1, x2, y2], "confidence": float(conf), "class_id": int(cls), "polygon": poly_np # Optional: "mask": mask.cpu().numpy().astype(np.uint8).tolist() }) else: for box, conf, cls in zip(results.boxes.xyxy, results.boxes.conf, results.boxes.cls): x1, y1, x2, y2 = map(int, box.tolist()) detections.append({ "box": [x1, y1, x2, y2], "confidence": float(conf), "class_id": int(cls), "polygon": None }) return {"detections": detections,"model_latency":elapsed}