Zhen Ye
fix: download yolo11m.pt via huggingface_hub to writable /tmp path
a4e3c2b
import logging
import os
from pathlib import Path
from typing import List, Sequence
import numpy as np
import torch
from huggingface_hub import hf_hub_download
from ultralytics import YOLO
from models.detectors.base import DetectionResult, ObjectDetector
from utils.tiling import get_slice_bboxes, slice_image, shift_bboxes, batched_nms
# Download YOLO11m weights to a writable cache (container /app is read-only).
_WEIGHTS_CACHE = Path(os.environ.get("YOLO_CACHE", "/tmp/yolo_weights"))
_WEIGHTS_CACHE.mkdir(parents=True, exist_ok=True)
_YOLO11M_PATH = _WEIGHTS_CACHE / "yolo11m.pt"
class Yolo11Detector(ObjectDetector):
"""YOLO11m detector with COCO-pretrained weights from Ultralytics."""
supports_batch = True
max_batch_size = 32
def __init__(self, score_threshold: float = 0.3, device: str = None) -> None:
self.name = "yolo11"
self.score_threshold = score_threshold
# CRITICAL: Store device as torch.device, NOT a string.
# Ultralytics' select_device() sets CUDA_VISIBLE_DEVICES when it
# receives a string like "cuda:0", restricting the entire process to
# one GPU. Passing a torch.device object causes select_device() to
# return immediately without touching the environment.
if device:
self.device = torch.device(device)
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logging.info(
"Loading YOLO11m COCO-pretrained weights onto %s",
self.device,
)
# Download weights via huggingface_hub to a writable path, then load.
if not _YOLO11M_PATH.exists():
logging.info("Downloading yolo11m.pt to %s ...", _YOLO11M_PATH)
hf_hub_download(
repo_id="Ultralytics/YOLO11",
filename="yolo11m.pt",
local_dir=str(_WEIGHTS_CACHE),
)
self.model = YOLO(str(_YOLO11M_PATH))
self.model.to(self.device)
self.class_names = self.model.names
def _filter_indices(self, label_names: Sequence[str], queries: Sequence[str]) -> List[int]:
if not queries:
return list(range(len(label_names)))
allowed = {query.lower().strip() for query in queries if query}
keep = [idx for idx, name in enumerate(label_names) if name.lower() in allowed]
return keep or list(range(len(label_names)))
def _parse_single_result(self, result, queries: Sequence[str]) -> DetectionResult:
boxes = result.boxes
if boxes is None or boxes.xyxy is None:
empty = np.empty((0, 4), dtype=np.float32)
return DetectionResult(empty, [], [], [])
xyxy = boxes.xyxy.cpu().numpy()
scores = boxes.conf.cpu().numpy().tolist()
label_ids = boxes.cls.cpu().numpy().astype(int).tolist()
label_names = [self.class_names.get(idx, f"class_{idx}") for idx in label_ids]
keep_indices = self._filter_indices(label_names, queries)
xyxy = xyxy[keep_indices] if len(xyxy) else xyxy
scores = [scores[i] for i in keep_indices]
label_ids = [label_ids[i] for i in keep_indices]
label_names = [label_names[i] for i in keep_indices]
return DetectionResult(
boxes=xyxy,
scores=scores,
labels=label_ids,
label_names=label_names,
)
def _predict_tiled(self, frame: np.ndarray, queries: Sequence[str]) -> DetectionResult:
"""Run tiled inference for high-resolution frames."""
h, w = frame.shape[:2]
# Heuristic: 1280x1280 tiles with 20% overlap
slice_boxes = get_slice_bboxes(h, w, 1280, 1280, 0.2, 0.2)
tiles = slice_image(frame, slice_boxes)
all_boxes = []
all_scores = []
all_labels = []
batch_size = self.max_batch_size
for i in range(0, len(tiles), batch_size):
batch_tiles = tiles[i : i + batch_size]
batch_slices = slice_boxes[i : i + batch_size]
# Using 1280px tiles
results = self.model.predict(
source=batch_tiles,
device=self.device,
conf=self.score_threshold,
imgsz=1280,
verbose=False,
)
for res, slice_coord in zip(results, batch_slices):
if res.boxes is None: continue
boxes = res.boxes.xyxy.cpu().numpy().tolist()
scores = res.boxes.conf.cpu().numpy().tolist()
clss = res.boxes.cls.cpu().numpy().tolist()
shifted = shift_bboxes(boxes, slice_coord)
all_boxes.extend(shifted)
all_scores.extend(scores)
all_labels.extend(clss)
if not all_boxes:
empty = np.empty((0, 4), dtype=np.float32)
return DetectionResult(empty, [], [], [])
boxes_t = torch.tensor(all_boxes, device=self.device)
scores_t = torch.tensor(all_scores, device=self.device)
labels_t = torch.tensor(all_labels, device=self.device)
keep = batched_nms(boxes_t, scores_t, labels_t, iou_threshold=0.4)
final_boxes = boxes_t[keep].cpu().numpy()
final_scores = scores_t[keep].cpu().tolist()
final_labels = labels_t[keep].cpu().int().tolist()
label_names = [self.class_names.get(idx, f"class_{idx}") for idx in final_labels]
keep_indices = self._filter_indices(label_names, queries)
if not keep_indices:
empty = np.empty((0, 4), dtype=np.float32)
return DetectionResult(empty, [], [], [])
final_boxes = final_boxes[keep_indices]
final_scores = [final_scores[i] for i in keep_indices]
final_labels = [final_labels[i] for i in keep_indices]
final_names = [label_names[i] for i in keep_indices]
return DetectionResult(
boxes=final_boxes,
scores=final_scores,
labels=final_labels,
label_names=final_names
)
def predict(self, frame: np.ndarray, queries: Sequence[str]) -> DetectionResult:
h, w = frame.shape[:2]
if w > 3000:
return self._predict_tiled(frame, queries)
results = self.model.predict(
source=frame,
device=self.device,
conf=self.score_threshold,
imgsz=1280,
verbose=False,
)
return self._parse_single_result(results[0], queries)
def predict_batch(self, frames: Sequence[np.ndarray], queries: Sequence[str]) -> Sequence[DetectionResult]:
if not frames: return []
h, w = frames[0].shape[:2]
if w > 3000:
return [self._predict_tiled(f, queries) for f in frames]
results = self.model.predict(
source=frames,
device=self.device,
conf=self.score_threshold,
imgsz=1280,
verbose=False,
)
return [self._parse_single_result(r, queries) for r in results]