Spaces:
Paused
Paused
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import List, Sequence | |
| import numpy as np | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from ultralytics import YOLO | |
| from models.detectors.base import DetectionResult, ObjectDetector | |
| from utils.tiling import get_slice_bboxes, slice_image, shift_bboxes, batched_nms | |
| # Download YOLO11m weights to a writable cache (container /app is read-only). | |
| _WEIGHTS_CACHE = Path(os.environ.get("YOLO_CACHE", "/tmp/yolo_weights")) | |
| _WEIGHTS_CACHE.mkdir(parents=True, exist_ok=True) | |
| _YOLO11M_PATH = _WEIGHTS_CACHE / "yolo11m.pt" | |
| class Yolo11Detector(ObjectDetector): | |
| """YOLO11m detector with COCO-pretrained weights from Ultralytics.""" | |
| supports_batch = True | |
| max_batch_size = 32 | |
| def __init__(self, score_threshold: float = 0.3, device: str = None) -> None: | |
| self.name = "yolo11" | |
| self.score_threshold = score_threshold | |
| # CRITICAL: Store device as torch.device, NOT a string. | |
| # Ultralytics' select_device() sets CUDA_VISIBLE_DEVICES when it | |
| # receives a string like "cuda:0", restricting the entire process to | |
| # one GPU. Passing a torch.device object causes select_device() to | |
| # return immediately without touching the environment. | |
| if device: | |
| self.device = torch.device(device) | |
| else: | |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| logging.info( | |
| "Loading YOLO11m COCO-pretrained weights onto %s", | |
| self.device, | |
| ) | |
| # Download weights via huggingface_hub to a writable path, then load. | |
| if not _YOLO11M_PATH.exists(): | |
| logging.info("Downloading yolo11m.pt to %s ...", _YOLO11M_PATH) | |
| hf_hub_download( | |
| repo_id="Ultralytics/YOLO11", | |
| filename="yolo11m.pt", | |
| local_dir=str(_WEIGHTS_CACHE), | |
| ) | |
| self.model = YOLO(str(_YOLO11M_PATH)) | |
| self.model.to(self.device) | |
| self.class_names = self.model.names | |
| def _filter_indices(self, label_names: Sequence[str], queries: Sequence[str]) -> List[int]: | |
| if not queries: | |
| return list(range(len(label_names))) | |
| allowed = {query.lower().strip() for query in queries if query} | |
| keep = [idx for idx, name in enumerate(label_names) if name.lower() in allowed] | |
| return keep or list(range(len(label_names))) | |
| def _parse_single_result(self, result, queries: Sequence[str]) -> DetectionResult: | |
| boxes = result.boxes | |
| if boxes is None or boxes.xyxy is None: | |
| empty = np.empty((0, 4), dtype=np.float32) | |
| return DetectionResult(empty, [], [], []) | |
| xyxy = boxes.xyxy.cpu().numpy() | |
| scores = boxes.conf.cpu().numpy().tolist() | |
| label_ids = boxes.cls.cpu().numpy().astype(int).tolist() | |
| label_names = [self.class_names.get(idx, f"class_{idx}") for idx in label_ids] | |
| keep_indices = self._filter_indices(label_names, queries) | |
| xyxy = xyxy[keep_indices] if len(xyxy) else xyxy | |
| scores = [scores[i] for i in keep_indices] | |
| label_ids = [label_ids[i] for i in keep_indices] | |
| label_names = [label_names[i] for i in keep_indices] | |
| return DetectionResult( | |
| boxes=xyxy, | |
| scores=scores, | |
| labels=label_ids, | |
| label_names=label_names, | |
| ) | |
| def _predict_tiled(self, frame: np.ndarray, queries: Sequence[str]) -> DetectionResult: | |
| """Run tiled inference for high-resolution frames.""" | |
| h, w = frame.shape[:2] | |
| # Heuristic: 1280x1280 tiles with 20% overlap | |
| slice_boxes = get_slice_bboxes(h, w, 1280, 1280, 0.2, 0.2) | |
| tiles = slice_image(frame, slice_boxes) | |
| all_boxes = [] | |
| all_scores = [] | |
| all_labels = [] | |
| batch_size = self.max_batch_size | |
| for i in range(0, len(tiles), batch_size): | |
| batch_tiles = tiles[i : i + batch_size] | |
| batch_slices = slice_boxes[i : i + batch_size] | |
| # Using 1280px tiles | |
| results = self.model.predict( | |
| source=batch_tiles, | |
| device=self.device, | |
| conf=self.score_threshold, | |
| imgsz=1280, | |
| verbose=False, | |
| ) | |
| for res, slice_coord in zip(results, batch_slices): | |
| if res.boxes is None: continue | |
| boxes = res.boxes.xyxy.cpu().numpy().tolist() | |
| scores = res.boxes.conf.cpu().numpy().tolist() | |
| clss = res.boxes.cls.cpu().numpy().tolist() | |
| shifted = shift_bboxes(boxes, slice_coord) | |
| all_boxes.extend(shifted) | |
| all_scores.extend(scores) | |
| all_labels.extend(clss) | |
| if not all_boxes: | |
| empty = np.empty((0, 4), dtype=np.float32) | |
| return DetectionResult(empty, [], [], []) | |
| boxes_t = torch.tensor(all_boxes, device=self.device) | |
| scores_t = torch.tensor(all_scores, device=self.device) | |
| labels_t = torch.tensor(all_labels, device=self.device) | |
| keep = batched_nms(boxes_t, scores_t, labels_t, iou_threshold=0.4) | |
| final_boxes = boxes_t[keep].cpu().numpy() | |
| final_scores = scores_t[keep].cpu().tolist() | |
| final_labels = labels_t[keep].cpu().int().tolist() | |
| label_names = [self.class_names.get(idx, f"class_{idx}") for idx in final_labels] | |
| keep_indices = self._filter_indices(label_names, queries) | |
| if not keep_indices: | |
| empty = np.empty((0, 4), dtype=np.float32) | |
| return DetectionResult(empty, [], [], []) | |
| final_boxes = final_boxes[keep_indices] | |
| final_scores = [final_scores[i] for i in keep_indices] | |
| final_labels = [final_labels[i] for i in keep_indices] | |
| final_names = [label_names[i] for i in keep_indices] | |
| return DetectionResult( | |
| boxes=final_boxes, | |
| scores=final_scores, | |
| labels=final_labels, | |
| label_names=final_names | |
| ) | |
| def predict(self, frame: np.ndarray, queries: Sequence[str]) -> DetectionResult: | |
| h, w = frame.shape[:2] | |
| if w > 3000: | |
| return self._predict_tiled(frame, queries) | |
| results = self.model.predict( | |
| source=frame, | |
| device=self.device, | |
| conf=self.score_threshold, | |
| imgsz=1280, | |
| verbose=False, | |
| ) | |
| return self._parse_single_result(results[0], queries) | |
| def predict_batch(self, frames: Sequence[np.ndarray], queries: Sequence[str]) -> Sequence[DetectionResult]: | |
| if not frames: return [] | |
| h, w = frames[0].shape[:2] | |
| if w > 3000: | |
| return [self._predict_tiled(f, queries) for f in frames] | |
| results = self.model.predict( | |
| source=frames, | |
| device=self.device, | |
| conf=self.score_threshold, | |
| imgsz=1280, | |
| verbose=False, | |
| ) | |
| return [self._parse_single_result(r, queries) for r in results] | |