from pathlib import Path import math from typing import Any import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel # Profile 002-001: same implementation as miner_script/002/miner.py (crime # classes, defaults, TTA merge, and ONNX sweep-cache helpers). Fork here only # when you need a frozen hyperparameter variant. class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" # Canonical class indices (validator / PGT / manifest); match dataset.yaml names order. # Crime element (evaluations_7/dataset.yaml): # balaclava=0, hoodie=1, glove=2, bat=3, spray paint=4, graffiti=5 self.class_names = ['balaclava', 'hoodie', 'glove', 'bat', 'spray paint', 'graffiti'] # ONNX class index order from training export (Ultralytics names 0..5 in dataset.yaml). model_class_order = ['balaclava', 'bat', 'glove', 'graffiti', 'hoodie', 'spray paint'] self._train_cls_to_canonical = np.array( [self.class_names.index(n) for n in model_class_order], dtype=np.int32 ) print("ORT version:", ort.__version__) try: ort.preload_dlls() print("✅ onnxruntime.preload_dlls() success") except Exception as e: print(f"⚠️ preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("✅ Created ORT session with preferred CUDA provider list") except Exception as e: print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) for inp in self.session.get_inputs(): print("INPUT:", inp.name, inp.shape, inp.type) for out in self.session.get_outputs(): print("OUTPUT:", out.name, out.shape, out.type) self.input_name = self.session.get_inputs()[0].name self.output_names = [output.name for output in self.session.get_outputs()] self.input_shape = self.session.get_inputs()[0].shape self.input_height = self._safe_dim(self.input_shape[2], default=1280) self.input_width = self._safe_dim(self.input_shape[3], default=1280) # ---------- Scoring-oriented thresholds (crime: balaclava / hoodie / glove / # bat / spray paint / graffiti) ---------- # Crime classes mix wearables (balaclava, hoodie), small handhelds # (glove, spray paint), long handhelds (bat) and large markings # (graffiti). Confidence peaks vary widely so we keep a moderate # floor and lean on TTA consensus for the soft tail. self.conf_thres = 0.25 # Above this on orig view, accept directly. Below it, require TTA agreement. self.conf_high = 0.45 # Standard NMS IoU; balaclavas / hoodies on the same person can # overlap heavily but they're different classes so the per-class # NMS handles that — 0.50 stays a safe default. self.iou_thres = 0.5 # Balaclavas / hoodies barely shift between orig and h-flipped views, # but graffiti and bats can have asymmetric extents so a moderate # IoU gate keeps the consensus rule from getting too strict. self.tta_match_iou = 0.25 self.max_det = 50 self.use_tta = True # Box sanity filters tuned for crime: # - smallest classes (glove, spray paint) can shrink to ~10 px on # the short side in wide shots — keep min thresholds modest # - aspect ratio: bats are long/thin (h/w ratio up to ~6) and # graffiti can be very wide; allow up to 8.0 # - a single graffiti tag can fill most of a wall, so allow up to # ~95% image-area for one box self.min_box_area = 16 * 16 self.min_w = 12 self.min_h = 12 self.max_aspect_ratio = 8.0 self.max_box_area_ratio = 0.9 print(f"✅ ONNX model loaded from: {model_path}") print(f"✅ ONNX providers: {self.session.get_providers()}") print(f"✅ ONNX input: name={self.input_name}, shape={self.input_shape}") def __repr__(self) -> str: return ( f"ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default def _remap_train_cls_ids(self, cls_ids: np.ndarray) -> np.ndarray: idx = np.clip(cls_ids.astype(np.int64, copy=False), 0, len(self._train_cls_to_canonical) - 1) return self._train_cls_to_canonical[idx] def _letterbox( self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114), ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = new_w - resized_w dh = new_h - resized_h dw /= 2.0 dh /= 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder( image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color, ) return padded, ratio, (dw, dh) def _preprocess( self, image: ndarray ) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]: orig_h, orig_w = image.shape[:2] img, ratio, pad = self._letterbox( image, (self.input_width, self.input_height) ) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] img = np.ascontiguousarray(img, dtype=np.float32) return img, ratio, pad, (orig_w, orig_h) @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes @staticmethod def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray: out = np.empty_like(boxes) out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0 out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0 out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0 out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0 return out @staticmethod def _hard_nms( boxes: np.ndarray, scores: np.ndarray, iou_thresh: float, ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) order = np.argsort(scores)[::-1] keep = [] while len(order) > 0: i = order[0] keep.append(i) if len(order) == 1: break rest = order[1:] xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = np.maximum(0.0, (boxes[i, 2] - boxes[i, 0])) * np.maximum(0.0, (boxes[i, 3] - boxes[i, 1])) area_r = np.maximum(0.0, (boxes[rest, 2] - boxes[rest, 0])) * np.maximum(0.0, (boxes[rest, 3] - boxes[rest, 1])) iou = inter / (area_i + area_r - inter + 1e-7) order = rest[iou <= iou_thresh] return np.array(keep, dtype=np.intp) @classmethod def _nms_per_class( cls, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, max_det: int, ) -> np.ndarray: """NMS within each class so overlapping predictions of different classes are not merged away.""" if len(boxes) == 0: return np.array([], dtype=np.intp) keep_all: list[int] = [] for c in np.unique(cls_ids): idxs = np.nonzero(cls_ids == c)[0] if len(idxs) == 0: continue local_keep = cls._hard_nms(boxes[idxs], scores[idxs], iou_thresh) keep_all.extend(idxs[local_keep].tolist()) keep_all = np.array(keep_all, dtype=np.intp) order = np.argsort(scores[keep_all])[::-1] return keep_all[order[:max_det]] @staticmethod def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray: xx1 = np.maximum(box[0], boxes[:, 0]) yy1 = np.maximum(box[1], boxes[:, 1]) xx2 = np.minimum(box[2], boxes[:, 2]) yy2 = np.minimum(box[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1])) area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) return inter / (area_a + area_b - inter + 1e-7) def _filter_sane_boxes( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, orig_size: tuple[int, int], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if len(boxes) == 0: return boxes, scores, cls_ids orig_w, orig_h = orig_size image_area = float(orig_w * orig_h) keep = [] for i, box in enumerate(boxes): x1, y1, x2, y2 = box.tolist() bw = x2 - x1 bh = y2 - y1 if bw <= 0 or bh <= 0: continue if bw < self.min_w or bh < self.min_h: continue area = bw * bh if area < self.min_box_area: continue if area > self.max_box_area_ratio * image_area: continue ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6)) if ar > self.max_aspect_ratio: continue keep.append(i) if not keep: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) keep = np.array(keep, dtype=np.intp) return boxes[keep], scores[keep], cls_ids[keep] def _decode_final_dets( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> list[BoundingBox]: if preds.ndim == 3 and preds.shape[0] == 1: preds = preds[0] if preds.ndim != 2 or preds.shape[1] < 6: raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}") boxes = preds[:, :4].astype(np.float32) scores = preds[:, 4].astype(np.float32) cls_ids = self._remap_train_cls_ids(preds[:, 5].astype(np.int32)) # Multi-class crime: balaclava / hoodie / glove / bat / spray paint / graffiti # (see self.class_names). # candidate threshold keep = scores >= self.conf_thres boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes) == 0: return [] pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) if len(boxes) == 0: return [] keep_idx = self._nms_per_class( boxes, scores, cls_ids, self.iou_thres, self.max_det ) boxes = boxes[keep_idx] scores = scores[keep_idx] cls_ids = cls_ids[keep_idx] return [ BoundingBox( x1=int(math.floor(box[0])), y1=int(math.floor(box[1])), x2=int(math.ceil(box[2])), y2=int(math.ceil(box[3])), cls_id=int(cls_id), conf=float(conf), ) for box, conf, cls_id in zip(boxes, scores, cls_ids) if box[2] > box[0] and box[3] > box[1] ] def _decode_raw_yolo( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> list[BoundingBox]: if preds.ndim != 3: raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}") if preds.shape[0] != 1: raise ValueError(f"Unexpected batch dimension in raw output: {preds.shape}") preds = preds[0] # Normalize to [N, C] if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]: preds = preds.T if preds.ndim != 2 or preds.shape[1] < 5: raise ValueError(f"Unexpected normalized raw output shape: {preds.shape}") boxes_xywh = preds[:, :4].astype(np.float32) tail = preds[:, 4:].astype(np.float32) # Supports: # [x,y,w,h,score] single-class # [x,y,w,h,obj,cls] YOLO standard single-class # [x,y,w,h,obj,cls1,cls2,...] multi-class if tail.shape[1] == 1: scores = tail[:, 0] cls_ids = np.zeros(len(scores), dtype=np.int32) elif tail.shape[1] == 2: obj = tail[:, 0] cls_prob = tail[:, 1] scores = obj * cls_prob cls_ids = np.zeros(len(scores), dtype=np.int32) else: obj = tail[:, 0] class_probs = tail[:, 1:] cls_ids = np.argmax(class_probs, axis=1).astype(np.int32) cls_scores = class_probs[np.arange(len(class_probs)), cls_ids] scores = obj * cls_scores cls_ids = self._remap_train_cls_ids(cls_ids) keep = scores >= self.conf_thres boxes_xywh = boxes_xywh[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes_xywh) == 0: return [] boxes = self._xywh_to_xyxy(boxes_xywh) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) if len(boxes) == 0: return [] keep_idx = self._nms_per_class( boxes, scores, cls_ids, self.iou_thres, self.max_det ) boxes = boxes[keep_idx] scores = scores[keep_idx] cls_ids = cls_ids[keep_idx] return [ BoundingBox( x1=int(math.floor(box[0])), y1=int(math.floor(box[1])), x2=int(math.ceil(box[2])), y2=int(math.ceil(box[3])), cls_id=int(cls_id), conf=float(conf), ) for box, conf, cls_id in zip(boxes, scores, cls_ids) if box[2] > box[0] and box[3] > box[1] ] def _postprocess( self, output: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> list[BoundingBox]: if output.ndim == 2 and output.shape[1] >= 6: return self._decode_final_dets(output, ratio, pad, orig_size) if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] >= 6: return self._decode_final_dets(output, ratio, pad, orig_size) return self._decode_raw_yolo(output, ratio, pad, orig_size) def _predict_single(self, image: np.ndarray) -> list[BoundingBox]: if image is None: raise ValueError("Input image is None") if not isinstance(image, np.ndarray): raise TypeError(f"Input is not numpy array: {type(image)}") if image.ndim != 3: raise ValueError(f"Expected HWC image, got shape={image.shape}") if image.shape[0] <= 0 or image.shape[1] <= 0: raise ValueError(f"Invalid image shape={image.shape}") if image.shape[2] != 3: raise ValueError(f"Expected 3 channels, got shape={image.shape}") if image.dtype != np.uint8: image = image.astype(np.uint8) input_tensor, ratio, pad, orig_size = self._preprocess(image) expected_shape = (1, 3, self.input_height, self.input_width) if input_tensor.shape != expected_shape: raise ValueError( f"Bad input tensor shape={input_tensor.shape}, expected={expected_shape}" ) outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) det_output = outputs[0] return self._postprocess(det_output, ratio, pad, orig_size) def _merge_tta_consensus( self, boxes_orig: list[BoundingBox], boxes_flip: list[BoundingBox], *, conf_high: float | None = None, tta_match_iou: float | None = None, iou_thres: float | None = None, ) -> list[BoundingBox]: """ Keep: - any box with conf >= conf_high - low/medium-conf boxes only if confirmed across TTA views Then run final hard NMS. All thresholds default to the instance attributes when not supplied, so the non-sweep path can call this without args. The sweep path passes explicit values to avoid mutating shared state across parameter combinations (and to be safe under any future concurrency). """ ch = float(conf_high) if conf_high is not None else float(self.conf_high) tm = float(tta_match_iou) if tta_match_iou is not None else float(self.tta_match_iou) ih = float(iou_thres) if iou_thres is not None else float(self.iou_thres) if not boxes_orig and not boxes_flip: return [] coords_o = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0, 4), dtype=np.float32) scores_o = np.array([b.conf for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0,), dtype=np.float32) cls_o = np.array([b.cls_id for b in boxes_orig], dtype=np.int32) if boxes_orig else np.empty((0,), dtype=np.int32) coords_f = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0, 4), dtype=np.float32) scores_f = np.array([b.conf for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0,), dtype=np.float32) cls_f = np.array([b.cls_id for b in boxes_flip], dtype=np.int32) if boxes_flip else np.empty((0,), dtype=np.int32) accepted_boxes = [] accepted_scores = [] accepted_cls = [] # Original view candidates for i in range(len(coords_o)): score = scores_o[i] if score >= ch: accepted_boxes.append(coords_o[i]) accepted_scores.append(score) accepted_cls.append(int(cls_o[i])) elif len(coords_f) > 0: ious = self._box_iou_one_to_many(coords_o[i], coords_f) j = int(np.argmax(ious)) if ious[j] >= tm: fused_score = max(score, scores_f[j]) accepted_boxes.append(coords_o[i]) accepted_scores.append(fused_score) accepted_cls.append(int(cls_o[i])) # Flipped-view high-confidence boxes that original missed for i in range(len(coords_f)): score = scores_f[i] if score < ch: continue if len(coords_o) == 0: accepted_boxes.append(coords_f[i]) accepted_scores.append(score) accepted_cls.append(int(cls_f[i])) continue ious = self._box_iou_one_to_many(coords_f[i], coords_o) if np.max(ious) < tm: accepted_boxes.append(coords_f[i]) accepted_scores.append(score) accepted_cls.append(int(cls_f[i])) if not accepted_boxes: return [] boxes = np.array(accepted_boxes, dtype=np.float32) scores = np.array(accepted_scores, dtype=np.float32) cls_ids = np.array(accepted_cls, dtype=np.int32) keep = self._nms_per_class(boxes, scores, cls_ids, ih, self.max_det) out = [] for idx in keep: x1, y1, x2, y2 = boxes[idx].tolist() out.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=int(cls_ids[idx]), conf=float(scores[idx]), ) ) return out def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]: boxes_orig = self._predict_single(image) flipped = cv2.flip(image, 1) boxes_flip_raw = self._predict_single(flipped) w = image.shape[1] boxes_flip = [ BoundingBox( x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2, cls_id=b.cls_id, conf=b.conf, ) for b in boxes_flip_raw ] return self._merge_tta_consensus(boxes_orig, boxes_flip) # --- Fast sweep: two ONNX runs per image, then CPU-only threshold / NMS / TTA merge --- # Must be <= the smallest conf_thres any sweep will try, otherwise the sweep silently # caps the effective threshold and reported "best" params won't reproduce in non-sweep. SWEEP_CONF_FLOOR = 0.0 def _final_dets_to_arrays_no_nms( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], score_floor: float, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if preds.ndim == 3 and preds.shape[0] == 1: preds = preds[0] if preds.ndim != 2 or preds.shape[1] < 6: raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}") boxes = preds[:, :4].astype(np.float32) scores = preds[:, 4].astype(np.float32) cls_ids = self._remap_train_cls_ids(preds[:, 5].astype(np.int32)) keep = scores >= float(score_floor) boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes) == 0: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) return self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) def _raw_yolo_to_arrays_no_nms( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], score_floor: float, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if preds.ndim != 3: raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}") if preds.shape[0] != 1: raise ValueError(f"Unexpected batch dimension in raw output: {preds.shape}") preds = preds[0] if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]: preds = preds.T if preds.ndim != 2 or preds.shape[1] < 5: raise ValueError(f"Unexpected normalized raw output shape: {preds.shape}") boxes_xywh = preds[:, :4].astype(np.float32) tail = preds[:, 4:].astype(np.float32) if tail.shape[1] == 1: scores = tail[:, 0] cls_ids = np.zeros(len(scores), dtype=np.int32) elif tail.shape[1] == 2: obj = tail[:, 0] cls_prob = tail[:, 1] scores = obj * cls_prob cls_ids = np.zeros(len(scores), dtype=np.int32) else: obj = tail[:, 0] class_probs = tail[:, 1:] cls_ids = np.argmax(class_probs, axis=1).astype(np.int32) cls_scores = class_probs[np.arange(len(class_probs)), cls_ids] scores = obj * cls_scores cls_ids = self._remap_train_cls_ids(cls_ids) keep = scores >= float(score_floor) boxes_xywh = boxes_xywh[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes_xywh) == 0: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) boxes = self._xywh_to_xyxy(boxes_xywh) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) return self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) def _postprocess_to_arrays_no_nms( self, output: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], score_floor: float, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if output.ndim == 2 and output.shape[1] >= 6: return self._final_dets_to_arrays_no_nms(output, ratio, pad, orig_size, score_floor) if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] >= 6: return self._final_dets_to_arrays_no_nms(output, ratio, pad, orig_size, score_floor) return self._raw_yolo_to_arrays_no_nms(output, ratio, pad, orig_size, score_floor) @staticmethod def _horizontal_flip_boxes_xyxy(boxes: np.ndarray, w: int) -> np.ndarray: if len(boxes) == 0: return boxes out = boxes.astype(np.float32, copy=True) x1 = out[:, 0].copy() x2 = out[:, 2].copy() out[:, 0] = w - x2 out[:, 2] = w - x1 return out def build_vehicle_sweep_cache(self, image_bgr: np.ndarray) -> dict[str, Any]: """Two ONNX forwards (TTA views); candidates kept at score >= SWEEP_CONF_FLOOR.""" if image_bgr.dtype != np.uint8: image_bgr = image_bgr.astype(np.uint8) h, w = int(image_bgr.shape[0]), int(image_bgr.shape[1]) input_tensor, ratio, pad, orig_size = self._preprocess(image_bgr) outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) bo, so, co = self._postprocess_to_arrays_no_nms( outputs[0], ratio, pad, orig_size, self.SWEEP_CONF_FLOOR ) flipped = cv2.flip(image_bgr, 1) input_tensor_f, ratio_f, pad_f, orig_size_f = self._preprocess(flipped) outputs_f = self.session.run(self.output_names, {self.input_name: input_tensor_f}) bf, sf, cf = self._postprocess_to_arrays_no_nms( outputs_f[0], ratio_f, pad_f, orig_size_f, self.SWEEP_CONF_FLOOR ) bf = self._horizontal_flip_boxes_xyxy(bf, w) return {"orig": (bo, so, co), "flip": (bf, sf, cf), "image_shape": (h, w)} def _arrays_to_boxes_after_conf_nms( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, conf_thres: float, iou_thres: float, ) -> list[BoundingBox]: if len(boxes) == 0: return [] m = scores >= float(conf_thres) if not np.any(m): return [] boxes = boxes[m] scores = scores[m] cls_ids = cls_ids[m] keep_idx = self._nms_per_class(boxes, scores, cls_ids, float(iou_thres), self.max_det) out: list[BoundingBox] = [] for i in keep_idx: box = boxes[i] x1, y1, x2, y2 = float(box[0]), float(box[1]), float(box[2]), float(box[3]) if x2 <= x1 or y2 <= y1: continue out.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=int(cls_ids[i]), conf=float(scores[i]), ) ) return out def predict_vehicle_from_sweep_cache( self, cache: dict[str, Any], *, conf_thres: float, iou_thres: float, conf_high: float | None = None, tta_match_iou: float | None = None, ) -> list[BoundingBox]: bo, so, co = cache["orig"] bf, sf, cf = cache["flip"] boxes_orig = self._arrays_to_boxes_after_conf_nms(bo, so, co, conf_thres, iou_thres) # Match predict_batch: single-view path when TTA is off (sweep cache must not force merge). if not getattr(self, "use_tta", True): return boxes_orig boxes_flip = self._arrays_to_boxes_after_conf_nms(bf, sf, cf, conf_thres, iou_thres) # Pass swept thresholds explicitly to avoid mutating self.* (race-free # under any future concurrent sweep harness; also robust to early returns). return self._merge_tta_consensus( boxes_orig, boxes_flip, conf_high=conf_high, tta_match_iou=tta_match_iou, iou_thres=iou_thres, ) def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for frame_number_in_batch, image in enumerate(batch_images): try: if self.use_tta: boxes = self._predict_tta(image) else: boxes = self._predict_single(image) except Exception as e: print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}") boxes = [] results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) return results