# build-marker: v5-alfred-only-no-tta-rollback """SN44 crime detection miner — ALFRED ONLY, no TTA, no RF-DETR. v5 (2026-05-04): drops the RF-DETR branch entirely. Component benchmarks showed RF-DETR was ~10× slower than alfred (8.2s vs 0.8s on CPU) and contributed zero observed scoring credit on cid 61709 (alfred alone returned the same 3 correct boxes that the alfred-competitor used to earn 0.8). Goal: get under the 5s validator gate with comfortable margin (target p95 < 2000ms e2e). Single ONNX file expected in path_hf_repo: weights.onnx — alfred yolo26n e2e [1,300,6] in input-pixel coords (1280) Conf threshold 0.52, NMS IoU 0.4, min_box_area 196 — unchanged from v3/v4. All 6 classes routed through alfred (identity remap). """ import math from pathlib import Path import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """Public miner — chute calls predict_batch(...). v5 is alfred-only, single forward pass, no TTA, no RF-DETR.""" def __init__(self, path_hf_repo) -> None: self.path_hf_repo = Path(path_hf_repo) self.class_names = ["balaclava", "hoodie", "glove", "bat", "spray paint", "graffiti"] self.cls_remap = np.arange(6, dtype=np.int32) # identity remap, all classes try: ort.preload_dlls() except Exception: pass sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(self.path_hf_repo / "weights.onnx"), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) except Exception: self.session = ort.InferenceSession( str(self.path_hf_repo / "weights.onnx"), sess_options=sess_options, providers=["CPUExecutionProvider"], ) self.input_name = self.session.get_inputs()[0].name self.output_names = [o.name for o in self.session.get_outputs()] self.input_h = 1280 self.input_w = 1280 self.conf_threshold = 0.52 self.iou_thresh = 0.4 self.cross_iou_thresh = 0.7 self.max_det = 150 self.min_box_area = 196 self.min_side = 8 self.max_aspect_ratio = 8.0 # Warmup warm = np.zeros((1280, 1280, 3), dtype=np.uint8) for _ in range(2): try: self._infer_single(warm) except Exception: break def __repr__(self): return (f"CrimeMiner v5 alfred-only(yolo26n@1280, NO TTA) " f"conf>=0.52 iou={self.iou_thresh} min_area={self.min_box_area}") def _letterbox(self, image): h, w = image.shape[:2] ratio = min(self.input_w / w, self.input_h / h) nw, nh = int(round(w * ratio)), int(round(h * ratio)) if (nw, nh) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR resized = cv2.resize(image, (nw, nh), interpolation=interp) else: resized = image canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) dy = (self.input_h - nh) // 2 dx = (self.input_w - nw) // 2 canvas[dy:dy+nh, dx:dx+nw] = resized return canvas, ratio, (float(dx), float(dy)) def _preprocess(self, image_bgr): canvas, ratio, pad = self._letterbox(image_bgr) rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) x = (rgb.astype(np.float32) / 255.0).transpose(2, 0, 1)[None, ...] # NOTE: v5 alfred ONNX is FP32. The 0.5 dtype check in deploy_agent.sh # enforces this match. Keep in sync with weights.onnx export format. return np.ascontiguousarray(x, dtype=np.float32), ratio, pad @staticmethod def _hard_nms(boxes, scores, iou_thresh): n = len(boxes) if n == 0: return np.array([], dtype=np.intp) order = np.argsort(scores)[::-1] keep, suppressed = [], np.zeros(n, dtype=bool) for i in range(n): idx = order[i] if suppressed[idx]: continue keep.append(int(idx)) bi = boxes[idx] for k in range(i + 1, n): jdx = order[k] if suppressed[jdx]: continue bj = boxes[jdx] xx1, yy1 = max(bi[0], bj[0]), max(bi[1], bj[1]) xx2, yy2 = min(bi[2], bj[2]), min(bi[3], bj[3]) inter = max(0.0, xx2-xx1) * max(0.0, yy2-yy1) ai = (bi[2]-bi[0])*(bi[3]-bi[1]); aj = (bj[2]-bj[0])*(bj[3]-bj[1]) iou = inter / (ai + aj - inter + 1e-7) if iou > iou_thresh: suppressed[jdx] = True return np.array(keep, dtype=np.intp) def _per_class_hard_nms(self, boxes, scores, cls_ids, iou_thresh): if len(boxes) == 0: return np.array([], dtype=np.intp) all_keep = [] for c in np.unique(cls_ids): mask = cls_ids == c indices = np.where(mask)[0] keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) all_keep.extend(indices[keep].tolist()) all_keep.sort() return np.array(all_keep, dtype=np.intp) @staticmethod def _cross_class_dedup(boxes, scores, cls_ids, iou_thresh): n = len(boxes) if n <= 1: return boxes, scores, cls_ids areas = np.maximum(0.0, boxes[:, 2]-boxes[:, 0]) * np.maximum(0.0, boxes[:, 3]-boxes[:, 1]) order = np.lexsort((-scores, -areas)) suppressed = np.zeros(n, dtype=bool); keep = [] for i in order: if suppressed[i]: continue keep.append(int(i)) bi = boxes[i] xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1]) xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3]) inter = np.maximum(0.0, xx2-xx1) * np.maximum(0.0, yy2-yy1) ai = max(1e-7, float((bi[2]-bi[0])*(bi[3]-bi[1]))) iou = inter / (ai + areas - inter + 1e-7) dup = iou > iou_thresh; dup[i] = False suppressed |= dup kept = np.array(keep, dtype=np.intp) return boxes[kept], scores[kept], cls_ids[kept] def _infer_single(self, image_bgr): inp, ratio, (dx, dy) = self._preprocess(image_bgr) out = self.session.run(self.output_names, {self.input_name: inp})[0] if out.ndim == 3: out = out[0] confs = out[:, 4].astype(np.float32) keep = confs >= self.conf_threshold if not keep.any(): return [] out = out[keep] boxes = out[:, :4].astype(np.float32).copy() confs = out[:, 4].astype(np.float32) cls_ids = self.cls_remap[out[:, 5].astype(np.int32)] boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio oh, ow = image_bgr.shape[:2] boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, ow - 1) boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, oh - 1) if len(boxes) > 1: keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh) keep_idx = keep_idx[: self.max_det] boxes, confs, cls_ids = boxes[keep_idx], confs[keep_idx], cls_ids[keep_idx] boxes, confs, cls_ids = self._cross_class_dedup(boxes, confs, cls_ids, self.cross_iou_thresh) return self._to_boundingboxes(boxes, confs, cls_ids, ow, oh) def _to_boundingboxes(self, boxes, confs, cls_ids, orig_w, orig_h): out = [] for i in range(len(boxes)): x1, y1, x2, y2 = boxes[i] ix1 = max(0, min(orig_w, math.floor(x1))) iy1 = max(0, min(orig_h, math.floor(y1))) ix2 = max(0, min(orig_w, math.ceil(x2))) iy2 = max(0, min(orig_h, math.ceil(y2))) if ix2 <= ix1 or iy2 <= iy1: continue bw, bh = ix2 - ix1, iy2 - iy1 if bw * bh < self.min_box_area: continue if min(bw, bh) < self.min_side: continue ar = max(bw / max(bh, 1), bh / max(bw, 1)) if ar > self.max_aspect_ratio: continue out.append(BoundingBox(x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=int(cls_ids[i]), conf=max(0.0, min(1.0, float(confs[i]))))) return out def predict_batch(self, batch_images, offset, n_keypoints): results = [] for idx, image in enumerate(batch_images): boxes = self._infer_single(image) results.append(TVFrameResult( frame_id=offset + idx, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], )) return results