Spaces:
Build error
Build error
| # Copyright (C) 2022-2025, Pyronear. | |
| # This program is licensed under the Apache License 2.0. | |
| # See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details. | |
| import logging | |
| import os | |
| import platform | |
| import tarfile | |
| from typing import Sequence, Tuple | |
| from urllib.request import urlretrieve | |
| import numpy as np | |
| from PIL import Image | |
| try: | |
| import ncnn | |
| except ImportError: | |
| ncnn = None | |
| try: | |
| import onnxruntime | |
| except ImportError: | |
| onnxruntime = None | |
| try: | |
| from .utils import DownloadProgressBar, box_iou, letterbox, nms, xywh2xyxy | |
| except ImportError: | |
| from utils import DownloadProgressBar, box_iou, letterbox, nms, xywh2xyxy | |
| __all__ = ["Classifier"] | |
| MODEL_URL_FOLDER = "https://huggingface.co/pyronear/yolo11s_mighty-mongoose_v5.1.0/resolve/main/" | |
| MODEL_NAME = "ncnn_cpu_yolo11s_mighty-mongoose_v5.1.0.tar.gz" | |
| logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True) | |
| def _env_int(name: str, default: int) -> int: | |
| try: | |
| return int(os.getenv(name, str(default))) | |
| except Exception: | |
| return int(default) | |
| class Classifier: | |
| """Implements an image classification model using YOLO backend. | |
| Examples: | |
| >>> from pyroengine.vision import Classifier | |
| >>> model = Classifier() | |
| Args: | |
| model_path: model path | |
| """ | |
| def __init__( | |
| self, | |
| model_folder="data", | |
| imgsz=1024, | |
| conf=0.15, | |
| iou=0, | |
| format="ncnn", | |
| model_path=None, | |
| max_bbox_size=0.4, | |
| ) -> None: | |
| if model_path: | |
| if not os.path.isfile(model_path): | |
| raise ValueError(f"Model file not found: {model_path}") | |
| if os.path.splitext(model_path)[-1].lower() != ".onnx": | |
| raise ValueError(f"Input model_path should point to an ONNX export but currently is {model_path}") | |
| self.format = "onnx" | |
| else: | |
| if format == "ncnn": | |
| if ncnn is None: | |
| raise ImportError("ncnn is required for format='ncnn'. Install ncnn or use format='onnx'.") | |
| if not self.is_arm_architecture(): | |
| logging.info("NCNN format is optimized for arm architecture only, switching to onnx is recommended") | |
| model = MODEL_NAME | |
| self.format = "ncnn" | |
| elif format == "onnx": | |
| if onnxruntime is None: | |
| raise ImportError("onnxruntime is required for format='onnx'. Install onnxruntime.") | |
| model = MODEL_NAME.replace("ncnn", "onnx") | |
| self.format = "onnx" | |
| else: | |
| raise ValueError("Unsupported format: should be 'ncnn' or 'onnx'") | |
| model_path = os.path.join(model_folder, model) | |
| model_url = MODEL_URL_FOLDER + model | |
| if not os.path.isfile(model_path): | |
| logging.info(f"Downloading model from {model_url} ...") | |
| os.makedirs(model_folder, exist_ok=True) | |
| with DownloadProgressBar(unit="B", unit_scale=True, miniters=1, desc=model_path) as t: | |
| urlretrieve(model_url, model_path, reporthook=t.update_to) | |
| logging.info("Model downloaded!") | |
| # Extract .tar.gz archive | |
| if model_path.endswith(".tar.gz"): | |
| base_name = os.path.basename(model_path).replace(".tar.gz", "") | |
| extract_path = os.path.join(model_folder, base_name) | |
| if not os.path.isdir(extract_path): | |
| with tarfile.open(model_path, "r:gz") as tar: | |
| tar.extractall(model_folder) | |
| logging.info(f"Extracted model to: {extract_path}") | |
| model_path = extract_path | |
| if self.format == "ncnn": | |
| if ncnn is None: | |
| raise RuntimeError("ncnn is not available; cannot load NCNN model.") | |
| self.model = ncnn.Net() | |
| self.model.load_param(os.path.join(model_path, "best_ncnn_model", "model.ncnn.param")) | |
| self.model.load_model(os.path.join(model_path, "best_ncnn_model", "model.ncnn.bin")) | |
| else: | |
| if onnxruntime is None: | |
| raise RuntimeError("onnxruntime is not available; cannot load ONNX model.") | |
| try: | |
| onnx_file = model_path if model_path.endswith(".onnx") else os.path.join(model_path, "best.onnx") | |
| sess_options = onnxruntime.SessionOptions() | |
| sess_options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| sess_options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL | |
| default_intra_threads = max(1, int(os.cpu_count() or 1)) | |
| intra_threads = max(1, _env_int("ORT_INTRA_OP_NUM_THREADS", default_intra_threads)) | |
| inter_threads = max(1, _env_int("ORT_INTER_OP_NUM_THREADS", 1)) | |
| sess_options.intra_op_num_threads = intra_threads | |
| sess_options.inter_op_num_threads = inter_threads | |
| providers_env = os.getenv("ORT_PROVIDERS", "CPUExecutionProvider") | |
| requested_providers = [p.strip() for p in providers_env.split(",") if p.strip()] | |
| available_providers = set(onnxruntime.get_available_providers()) | |
| providers = [p for p in requested_providers if p in available_providers] | |
| if not providers: | |
| providers = ["CPUExecutionProvider"] | |
| self.ort_session = onnxruntime.InferenceSession( | |
| onnx_file, | |
| sess_options=sess_options, | |
| providers=providers, | |
| ) | |
| logging.info( | |
| "ONNX Runtime config | providers=%s intra_op_threads=%d inter_op_threads=%d", | |
| providers, | |
| intra_threads, | |
| inter_threads, | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load the ONNX model from {model_path}: {e!s}") from e | |
| logging.info(f"ONNX model loaded successfully from {model_path}") | |
| self.imgsz = imgsz | |
| self.conf = conf | |
| self.iou = iou | |
| self.max_bbox_size = max_bbox_size | |
| def is_arm_architecture(self): | |
| # Check for ARM architecture | |
| return platform.machine().startswith("arm") or platform.machine().startswith("aarch") | |
| def prep_process(self, pil_img: Image.Image) -> Tuple[np.ndarray, Tuple[int, int]]: | |
| """Preprocess an image for inference | |
| Args: | |
| pil_img: A valid PIL image. | |
| Returns: | |
| A tuple containing: | |
| - The resized and normalized image of shape (1, C, H, W). | |
| - Padding information as a tuple of integers (pad_height, pad_width). | |
| """ | |
| np_img, pad = letterbox(np.array(pil_img), self.imgsz) # Applies letterbox resize with padding | |
| if self.format == "ncnn": | |
| np_img = ncnn.Mat.from_pixels(np_img, ncnn.Mat.PixelType.PIXEL_BGR, np_img.shape[1], np_img.shape[0]) | |
| mean = [0, 0, 0] | |
| std = [1 / 255, 1 / 255, 1 / 255] | |
| np_img.substract_mean_normalize(mean=mean, norm=std) | |
| else: | |
| np_img = np.expand_dims(np_img.astype("float32"), axis=0) # Add batch dimension | |
| np_img = np.ascontiguousarray(np_img.transpose((0, 3, 1, 2))) # Convert from BHWC to BCHW format | |
| np_img /= 255.0 # Normalize to [0, 1] | |
| return np_img, pad | |
| def post_process(self, pred: np.ndarray, pad: Tuple[int, int]) -> np.ndarray: | |
| """Post-process model predictions. | |
| Args: | |
| pred: Raw predictions from the model. | |
| pad: Padding information as (left_pad, top_pad). | |
| Returns: | |
| Processed predictions as a numpy array. | |
| """ | |
| pred = pred[:, pred[-1, :] > self.conf] # Drop low-confidence predictions | |
| pred = np.transpose(pred) | |
| pred = xywh2xyxy(pred) | |
| pred = pred[pred[:, 4].argsort()] # Sort by confidence | |
| pred = nms(pred) | |
| pred = pred[::-1] # Reverse for highest confidence first | |
| if len(pred) > 0: | |
| left_pad, top_pad = pad # Unpack the tuple | |
| pred[:, :4:2] -= left_pad | |
| pred[:, 1:4:2] -= top_pad | |
| pred[:, :4:2] /= self.imgsz - 2 * left_pad | |
| pred[:, 1:4:2] /= self.imgsz - 2 * top_pad | |
| pred = np.clip(pred, 0, 1) | |
| else: | |
| pred = np.zeros((0, 5)) # Return empty prediction array | |
| return pred | |
| def _finalize_prediction(self, pred: np.ndarray, pad: Tuple[int, int], occlusion_bboxes: dict) -> np.ndarray: | |
| # Convert pad to a tuple if required | |
| if isinstance(pad, list): | |
| pad = tuple(pad) | |
| pred = self.post_process(pred, pad) # Ensure pad is passed as a tuple | |
| # drop big detections | |
| pred = np.clip(pred, 0, 1) | |
| pred = pred[(pred[:, 2] - pred[:, 0]) < self.max_bbox_size, :] | |
| pred = np.reshape(pred, (-1, 5)) | |
| logging.debug("Model original pred : %s", pred) | |
| # Remove prediction in bbox occlusion mask | |
| if len(occlusion_bboxes): | |
| all_boxes = np.array([b[:4] for b in occlusion_bboxes.values()], dtype=pred.dtype) | |
| pred_boxes = pred[:, :4].astype(pred.dtype) | |
| ious = box_iou(pred_boxes, all_boxes) | |
| max_ious = ious.max(axis=0) | |
| keep = max_ious <= 0.1 | |
| pred = pred[keep] | |
| return pred | |
| def infer_batch(self, pil_imgs: Sequence[Image.Image], occlusion_bboxes: dict = None, batch_size: int = 8): | |
| if not pil_imgs: | |
| return [] | |
| if occlusion_bboxes is None: | |
| occlusion_bboxes = {} | |
| # NCNN path stays single-image. | |
| if self.format != "onnx": | |
| return [self(pil_img, occlusion_bboxes=occlusion_bboxes) for pil_img in pil_imgs] | |
| batch_size = max(1, int(batch_size)) | |
| outputs = [] | |
| for start in range(0, len(pil_imgs), batch_size): | |
| chunk = pil_imgs[start : start + batch_size] | |
| batch_imgs = [] | |
| pads = [] | |
| for pil_img in chunk: | |
| np_img, pad = self.prep_process(pil_img) | |
| batch_imgs.append(np_img) | |
| pads.append(pad) | |
| np_batch = np.concatenate(batch_imgs, axis=0) | |
| raw = self.ort_session.run(["output0"], {"images": np_batch})[0] | |
| if raw.ndim >= 3 and raw.shape[0] == len(chunk): | |
| raw_preds = [raw[i] for i in range(len(chunk))] | |
| elif len(chunk) == 1 and raw.ndim >= 3: | |
| raw_preds = [raw[0]] | |
| elif len(chunk) == 1: | |
| raw_preds = [raw] | |
| else: | |
| # Fallback for unexpected output shapes. | |
| raw_preds = [self.ort_session.run(["output0"], {"images": arr})[0][0] for arr in batch_imgs] | |
| for raw_pred, pad in zip(raw_preds, pads): | |
| outputs.append(self._finalize_prediction(raw_pred, pad, occlusion_bboxes)) | |
| return outputs | |
| def __call__(self, pil_img: Image.Image, occlusion_bboxes: dict = {}) -> np.ndarray: | |
| """Run the classifier on an input image. | |
| Args: | |
| pil_img: The input PIL image. | |
| occlusion_mask: Optional occlusion mask to exclude certain areas. | |
| Returns: | |
| Processed predictions. | |
| """ | |
| np_img, pad = self.prep_process(pil_img) | |
| if self.format == "ncnn": | |
| extractor = self.model.create_extractor() | |
| extractor.set_light_mode(True) | |
| extractor.input("in0", np_img) | |
| pred = ncnn.Mat() | |
| extractor.extract("out0", pred) | |
| pred = np.asarray(pred) | |
| else: | |
| pred = self.ort_session.run(["output0"], {"images": np_img})[0][0] | |
| return self._finalize_prediction(pred, pad, occlusion_bboxes) | |