iotaminer commited on
Commit
e7f0c1a
·
verified ·
1 Parent(s): 2cbead3

scorevision: push artifact

Browse files
Files changed (1) hide show
  1. miner.py +330 -367
miner.py CHANGED
@@ -1,24 +1,5 @@
1
- """
2
- Detect-vehicle miner for ScoreVision.
3
-
4
- Loaded by the TurboVision chute_template from the root of the HF repo.
5
- Thresholds (imgsz, conf, iou, max_det) are overridable via SN44_* env vars
6
- so operators can hot-patch without redeploying.
7
-
8
- Contract expected by the chute template:
9
- * class `Miner(path_hf_repo: Path)`
10
- * method `predict_batch(batch_images, offset, n_keypoints) -> list[TVFrameResult]`
11
-
12
- Vehicle classes filtered from COCO 80-class: car(2), motorcycle(3), bus(5), truck(7).
13
- cls_id is REMAPPED to 0 in the output because the ScoreVision validator
14
- treats class-0 as the single target class per element.
15
- """
16
-
17
- from __future__ import annotations
18
-
19
- import math
20
- import os
21
  from pathlib import Path
 
22
 
23
  import cv2
24
  import numpy as np
@@ -42,401 +23,384 @@ class TVFrameResult(BaseModel):
42
  keypoints: list[tuple[int, int]]
43
 
44
 
45
- # ---------------------------------------------------------------------------
46
- # Tuned hyperparameters (override via env for hot-patching without redeploy)
47
- # ---------------------------------------------------------------------------
48
- _DEFAULT_WEIGHTS = "weights.onnx"
49
- _DEFAULT_IMGSZ = 960
50
- _DEFAULT_CONF = 0.25
51
- _DEFAULT_IOU = 0.60
52
- _DEFAULT_MAX_DET = 300
53
-
54
-
55
- def _env_int(name: str, default: int) -> int:
56
- try:
57
- return int(os.environ.get(name, default))
58
- except (TypeError, ValueError):
59
- return default
60
-
61
-
62
- def _env_float(name: str, default: float) -> float:
63
- try:
64
- return float(os.environ.get(name, default))
65
- except (TypeError, ValueError):
66
- return default
67
-
68
-
69
- def _letterbox(
70
- image: ndarray,
71
- new_shape: tuple[int, int],
72
- color: tuple[int, int, int] = (114, 114, 114),
73
- ) -> tuple[ndarray, float, tuple[float, float]]:
74
- """YOLO-style letterbox preserving aspect ratio, returns (img, ratio, (dw, dh))."""
75
- h, w = image.shape[:2]
76
- new_w, new_h = new_shape
77
- ratio = min(new_w / w, new_h / h)
78
- resized_w = int(round(w * ratio))
79
- resized_h = int(round(h * ratio))
80
- if (resized_w, resized_h) != (w, h):
81
- interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
82
- image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
83
- dw = (new_w - resized_w) / 2.0
84
- dh = (new_h - resized_h) / 2.0
85
- left = int(round(dw - 0.1))
86
- right = int(round(dw + 0.1))
87
- top = int(round(dh - 0.1))
88
- bottom = int(round(dh + 0.1))
89
- padded = cv2.copyMakeBorder(
90
- image, top, bottom, left, right,
91
- borderType=cv2.BORDER_CONSTANT, value=color,
92
- )
93
- return padded, ratio, (dw, dh)
94
-
95
-
96
- def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray:
97
- out = np.empty_like(boxes)
98
- out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0
99
- out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0
100
- out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0
101
- out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0
102
- return out
103
-
104
-
105
- def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray:
106
- """Pure numpy hard NMS. Avoids torchvision to keep the chute slim."""
107
- if len(boxes) == 0:
108
- return np.array([], dtype=np.intp)
109
- boxes = np.asarray(boxes, dtype=np.float32)
110
- scores = np.asarray(scores, dtype=np.float32)
111
- order = np.argsort(scores)[::-1]
112
- keep: list[int] = []
113
- while len(order) > 0:
114
- i = int(order[0])
115
- keep.append(i)
116
- if len(order) == 1:
117
- break
118
- rest = order[1:]
119
- xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
120
- yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
121
- xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
122
- yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
123
- inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
124
- area_i = max(0.0, (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]))
125
- area_r = np.maximum(0.0, boxes[rest, 2] - boxes[rest, 0]) * np.maximum(
126
- 0.0, boxes[rest, 3] - boxes[rest, 1]
127
- )
128
- iou = inter / (area_i + area_r - inter + 1e-7)
129
- order = rest[iou <= iou_thresh]
130
- return np.array(keep, dtype=np.intp)
131
-
132
-
133
- def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
134
- w, h = image_size
135
- boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
136
- boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
137
- boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
138
- boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
139
- return boxes
140
 
141
 
142
  class Miner:
143
- """Detect-vehicle miner: ONNX Runtime + raw YOLO decode + numpy NMS.
144
-
145
- Same yolo11s_fp16 backbone as the Detect-Person miner; inference path is
146
- unchanged. The ONLY difference is class filtering: we keep COCO classes
147
- {car=2, motorcycle=3, bus=5, truck=7} and merge them into a single
148
- emitted class_id=0 per the validator's single-target convention.
149
- """
150
-
151
  def __init__(self, path_hf_repo: Path) -> None:
152
- # IMPORTANT: element `manak0/Detect-detect-vehicle` declares:
153
- # objects = ["bus", "car", "truck", "motorcycle"] (in this order)
154
- # So the validator maps cls_id 0->"bus", 1->"car", 2->"truck", 3->"motorcycle".
155
- # We must emit cls_id matching this order.
156
- self.class_names = ["bus", "car", "truck", "motorcycle"]
157
- # COCO -> element cls_id:
158
- # bus (5) -> 0
159
- # car (2) -> 1
160
- # truck (7) -> 2
161
- # motorcycle (3) -> 3
162
- self.coco_to_element = {5: 0, 2: 1, 7: 2, 3: 3}
163
- self.vehicle_coco_ids = tuple(self.coco_to_element.keys())
164
-
165
- weights_name = os.environ.get("SN44_ONNX_WEIGHTS", _DEFAULT_WEIGHTS)
166
- weights_path = path_hf_repo / weights_name
167
- if not weights_path.is_file():
168
- raise FileNotFoundError(
169
- f"ONNX weights '{weights_name}' not found in {path_hf_repo}"
170
- )
171
-
172
  print("ORT version:", ort.__version__)
 
173
  try:
174
  ort.preload_dlls()
175
- print("ORT preload_dlls ok")
 
 
 
 
 
 
 
 
 
 
 
 
176
  except Exception as e:
177
- print(f"ORT preload_dlls skipped: {e}")
178
- print("ORT available providers:", ort.get_available_providers())
179
 
180
  sess_options = ort.SessionOptions()
181
  sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
182
 
183
  try:
184
  self.session = ort.InferenceSession(
185
- str(weights_path),
186
  sess_options=sess_options,
187
  providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
188
  )
189
- print("ORT session created with CUDA preferred")
190
  except Exception as e:
191
- print(f"ORT CUDA provider failed, falling back to CPU: {e}")
192
  self.session = ort.InferenceSession(
193
- str(weights_path),
194
  sess_options=sess_options,
195
  providers=["CPUExecutionProvider"],
196
  )
 
197
  print("ORT session providers:", self.session.get_providers())
198
 
199
  for inp in self.session.get_inputs():
200
- print("ONNX INPUT:", inp.name, inp.shape, inp.type)
201
  for out in self.session.get_outputs():
202
- print("ONNX OUTPUT:", out.name, out.shape, out.type)
203
 
204
  self.input_name = self.session.get_inputs()[0].name
205
  self.output_names = [o.name for o in self.session.get_outputs()]
206
- input_shape = self.session.get_inputs()[0].shape
207
-
208
- h = input_shape[2] if isinstance(input_shape[2], int) and input_shape[2] > 0 else _DEFAULT_IMGSZ
209
- w = input_shape[3] if isinstance(input_shape[3], int) and input_shape[3] > 0 else _DEFAULT_IMGSZ
210
- self.input_height = _env_int("SN44_IMGSZ", h)
211
- self.input_width = _env_int("SN44_IMGSZ", w)
212
-
213
- self.conf_thres = _env_float("SN44_CONF", _DEFAULT_CONF)
214
- self.iou_thres = _env_float("SN44_IOU", _DEFAULT_IOU)
215
- self.max_det = _env_int("SN44_MAX_DET", _DEFAULT_MAX_DET)
216
-
217
- self.min_w = 4
218
- self.min_h = 4
219
- self.min_box_area = 16
220
- self.max_aspect_ratio = 8.0
221
- self.max_box_area_ratio = 0.9
222
-
223
- print(
224
- "Vehicle Miner ready: "
225
- f"imgsz={self.input_height}x{self.input_width}, "
226
- f"conf={self.conf_thres:.3f}, iou={self.iou_thres:.3f}, "
227
- f"max_det={self.max_det}, providers={self.session.get_providers()}, "
228
- f"coco_ids={self.vehicle_coco_ids}"
229
- )
230
 
231
  def __repr__(self) -> str:
232
  return (
233
- "DetectVehicleMiner("
234
- f"providers={self.session.get_providers()}, "
235
- f"imgsz={self.input_height}x{self.input_width}, "
236
- f"conf={self.conf_thres}, iou={self.iou_thres}, "
237
- f"coco_ids={self.vehicle_coco_ids})"
238
  )
239
 
240
- def _preprocess(
241
- self, image: ndarray
242
- ) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]:
243
- if image.dtype != np.uint8:
244
- image = image.astype(np.uint8)
245
- orig_h, orig_w = image.shape[:2]
246
- img, ratio, pad = _letterbox(image, (self.input_width, self.input_height))
247
- img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
248
- img = img.astype(np.float32) / 255.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
249
  img = np.transpose(img, (2, 0, 1))[None, ...]
250
- img = np.ascontiguousarray(img, dtype=np.float32)
251
- return img, ratio, pad, (orig_w, orig_h)
252
 
253
- def _filter_sane(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
  self,
255
  boxes: np.ndarray,
256
  scores: np.ndarray,
257
- orig_size: tuple[int, int],
 
258
  ) -> tuple[np.ndarray, np.ndarray]:
259
- if len(boxes) == 0:
260
- return boxes, scores
261
- orig_w, orig_h = orig_size
262
- image_area = float(orig_w * orig_h)
263
- keep: list[int] = []
264
- for i, box in enumerate(boxes):
265
- x1, y1, x2, y2 = box.tolist()
266
- bw = x2 - x1
267
- bh = y2 - y1
268
- if bw <= 0 or bh <= 0:
269
- continue
270
- if bw < self.min_w or bh < self.min_h:
271
- continue
272
- area = bw * bh
273
- if area < self.min_box_area:
274
- continue
275
- if area > self.max_box_area_ratio * image_area:
276
- continue
277
- ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6))
278
- if ar > self.max_aspect_ratio:
279
- continue
280
- keep.append(i)
281
- if not keep:
282
- return (
283
- np.empty((0, 4), dtype=np.float32),
284
- np.empty((0,), dtype=np.float32),
285
  )
286
- keep_idx = np.array(keep, dtype=np.intp)
287
- return boxes[keep_idx], scores[keep_idx]
288
-
289
- def _decode_yolov11(
290
- self,
291
- preds: np.ndarray,
292
- ratio: float,
293
- pad: tuple[float, float],
294
- orig_size: tuple[int, int],
295
- ) -> list[BoundingBox]:
296
- """
297
- Ultralytics YOLOv8/11 ONNX output is [1, 4+nc, N].
298
- For COCO nc=80 → shape [1, 84, N]. No objectness term;
299
- class score IS the detection score.
300
- """
301
- if preds.ndim != 3:
302
- return []
303
- preds = preds[0]
304
- if preds.shape[0] == 4 + len(self._coco_classes()):
305
- preds = preds.T
306
- elif preds.shape[1] == 4 + len(self._coco_classes()):
307
- pass
308
- else:
309
- if preds.shape[0] < preds.shape[1]:
310
- preds = preds.T
311
-
312
- if preds.shape[1] < 5:
313
- return []
314
-
315
- boxes_xywh = preds[:, :4].astype(np.float32)
316
- class_scores = preds[:, 4:].astype(np.float32)
317
-
318
- # For each detection, determine which COCO vehicle class has the MAX score
319
- # and retain that as the class id + score.
320
- vehicle_coco_idx = np.array(self.vehicle_coco_ids, dtype=np.intp)
321
- vehicle_class_scores = class_scores[:, vehicle_coco_idx] # (N, 4)
322
- best_in_vehicle = vehicle_class_scores.argmax(axis=1) # (N,) index into vehicle_coco_idx
323
- vehicle_scores = vehicle_class_scores.max(axis=1) # (N,)
324
- best_coco_ids = vehicle_coco_idx[best_in_vehicle] # COCO ids per row
325
-
326
- mask = vehicle_scores >= self.conf_thres
327
- if not np.any(mask):
328
- return []
329
-
330
- boxes_xywh = boxes_xywh[mask]
331
- scores = vehicle_scores[mask]
332
- # Map COCO id -> element cls_id (for emission)
333
- element_cls_ids = np.array(
334
- [self.coco_to_element[int(c)] for c in best_coco_ids[mask]],
335
- dtype=np.int32,
336
- )
337
-
338
- boxes = _xywh_to_xyxy(boxes_xywh)
339
-
340
- pad_w, pad_h = pad
341
- boxes[:, [0, 2]] -= pad_w
342
- boxes[:, [1, 3]] -= pad_h
343
- boxes /= ratio
344
- boxes = _clip_boxes(boxes, orig_size)
345
 
346
- boxes, scores, element_cls_ids = self._filter_sane_with_cls(
347
- boxes, scores, element_cls_ids, orig_size,
348
- )
349
  if len(boxes) == 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
350
  return []
 
 
351
 
352
- keep = _hard_nms(boxes, scores, self.iou_thres)
353
- keep = keep[: self.max_det]
354
- boxes = boxes[keep]
355
- scores = scores[keep]
356
- element_cls_ids = element_cls_ids[keep]
357
 
358
- out: list[BoundingBox] = []
359
- for box, conf, cls_id in zip(boxes, scores, element_cls_ids):
360
- if box[2] <= box[0] or box[3] <= box[1]:
 
361
  continue
362
- out.append(
363
  BoundingBox(
364
- x1=int(math.floor(box[0])),
365
- y1=int(math.floor(box[1])),
366
- x2=int(math.ceil(box[2])),
367
- y2=int(math.ceil(box[3])),
368
- cls_id=int(cls_id),
369
  conf=float(conf),
370
  )
371
  )
372
- return out
373
-
374
- def _filter_sane_with_cls(
375
- self,
376
- boxes: np.ndarray,
377
- scores: np.ndarray,
378
- cls_ids: np.ndarray,
379
- orig_size: tuple[int, int],
380
- ) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
381
- if len(boxes) == 0:
382
- return boxes, scores, cls_ids
383
- orig_w, orig_h = orig_size
384
- image_area = float(orig_w * orig_h)
385
- keep: list[int] = []
386
- for i, box in enumerate(boxes):
387
- x1, y1, x2, y2 = box.tolist()
388
- bw = x2 - x1
389
- bh = y2 - y1
390
- if bw <= 0 or bh <= 0:
391
- continue
392
- if bw < self.min_w or bh < self.min_h:
393
- continue
394
- area = bw * bh
395
- if area < self.min_box_area:
396
- continue
397
- if area > self.max_box_area_ratio * image_area:
398
- continue
399
- ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6))
400
- if ar > self.max_aspect_ratio:
401
- continue
402
- keep.append(i)
403
- if not keep:
404
- return (
405
- np.empty((0, 4), dtype=np.float32),
406
- np.empty((0,), dtype=np.float32),
407
- np.empty((0,), dtype=np.int32),
408
- )
409
- idx = np.array(keep, dtype=np.intp)
410
- return boxes[idx], scores[idx], cls_ids[idx]
411
-
412
- @staticmethod
413
- def _coco_classes() -> list[str]:
414
- return [
415
- "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train",
416
- "truck", "boat", "traffic light", "fire hydrant", "stop sign",
417
- "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep",
418
- "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella",
419
- "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard",
420
- "sports ball", "kite", "baseball bat", "baseball glove", "skateboard",
421
- "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork",
422
- "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange",
423
- "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair",
424
- "couch", "potted plant", "bed", "dining table", "toilet", "tv",
425
- "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave",
426
- "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase",
427
- "scissors", "teddy bear", "hair drier", "toothbrush",
428
- ]
429
-
430
- def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
431
- if image is None:
432
- raise ValueError("Input image is None")
433
- if not isinstance(image, np.ndarray) or image.ndim != 3 or image.shape[2] != 3:
434
- raise ValueError(f"Expected HWC RGB/BGR image, got shape={getattr(image, 'shape', None)}")
435
-
436
- input_tensor, ratio, pad, orig_size = self._preprocess(image)
437
- outputs = self.session.run(self.output_names, {self.input_name: input_tensor})
438
- return self._decode_yolov11(outputs[0], ratio, pad, orig_size)
439
 
 
440
  def predict_batch(
441
  self,
442
  batch_images: list[ndarray],
@@ -444,16 +408,15 @@ class Miner:
444
  n_keypoints: int,
445
  ) -> list[TVFrameResult]:
446
  results: list[TVFrameResult] = []
447
- for i, image in enumerate(batch_images):
448
- frame_id = offset + i
449
  try:
450
  boxes = self._predict_single(image)
451
  except Exception as e:
452
- print(f"Inference failed for frame {frame_id}: {e}")
453
  boxes = []
454
  results.append(
455
  TVFrameResult(
456
- frame_id=frame_id,
457
  boxes=boxes,
458
  keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
459
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from pathlib import Path
2
+ import math
3
 
4
  import cv2
5
  import numpy as np
 
23
  keypoints: list[tuple[int, int]]
24
 
25
 
26
+ SIZE = 1280
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
 
29
  class Miner:
 
 
 
 
 
 
 
 
30
  def __init__(self, path_hf_repo: Path) -> None:
31
+ model_path = path_hf_repo / "weights.onnx"
32
+ cn_path = model_path.with_name("class_names.txt")
33
+ if cn_path.is_file():
34
+ lines = cn_path.read_text(encoding="utf-8").splitlines()
35
+ self.class_names = [
36
+ ln.strip()
37
+ for ln in lines
38
+ if ln.strip() and not ln.strip().startswith("#")
39
+ ]
40
+ else:
41
+ self.class_names = ["numberplate"]
 
 
 
 
 
 
 
 
 
42
  print("ORT version:", ort.__version__)
43
+
44
  try:
45
  ort.preload_dlls()
46
+ print("onnxruntime.preload_dlls() success")
47
+ except Exception as e:
48
+ print(f"preload_dlls failed: {e}")
49
+
50
+ print("ORT available providers BEFORE session:", ort.get_available_providers())
51
+
52
+ try:
53
+ import torch
54
+ if torch.cuda.is_available():
55
+ print(f"GPU: {torch.cuda.get_device_name(0)}")
56
+ print(f"GPU memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB")
57
+ else:
58
+ print("GPU: CUDA not available via torch")
59
  except Exception as e:
60
+ print(f"GPU detection failed: {e}")
 
61
 
62
  sess_options = ort.SessionOptions()
63
  sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
64
 
65
  try:
66
  self.session = ort.InferenceSession(
67
+ str(model_path),
68
  sess_options=sess_options,
69
  providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
70
  )
71
+ print("Created ORT session with preferred CUDA provider list")
72
  except Exception as e:
73
+ print(f"CUDA session creation failed, falling back to CPU: {e}")
74
  self.session = ort.InferenceSession(
75
+ str(model_path),
76
  sess_options=sess_options,
77
  providers=["CPUExecutionProvider"],
78
  )
79
+
80
  print("ORT session providers:", self.session.get_providers())
81
 
82
  for inp in self.session.get_inputs():
83
+ print("INPUT:", inp.name, inp.shape, inp.type)
84
  for out in self.session.get_outputs():
85
+ print("OUTPUT:", out.name, out.shape, out.type)
86
 
87
  self.input_name = self.session.get_inputs()[0].name
88
  self.output_names = [o.name for o in self.session.get_outputs()]
89
+ self.input_shape = self.session.get_inputs()[0].shape
90
+
91
+ self.input_height = self._safe_dim(self.input_shape[2], default=SIZE)
92
+ self.input_width = self._safe_dim(self.input_shape[3], default=SIZE)
93
+
94
+ # Primary pass: alfred001 tuning (optimized for hermestech weights)
95
+ self.conf_thres = 0.23
96
+ self.iou_thres = 0.66
97
+ self.sigma = 0.465
98
+ self.max_det = 300
99
+
100
+ # Conditional tile-pass (trimmed for latency: no hflip, tighter sparse)
101
+ self.sparse_threshold = 3 # fire tiles only if primary returns < this
102
+ self.tile_conf = 0.57
103
+ self.tile_overlap = 0.20
104
+ self.novelty_iou = 0.10
105
+ self.final_max_det = 17
106
+ self.tile_use_hflip = False # skip hflip tile pass to save ~4 forwards
107
+
108
+ self.use_tta = True
109
+
110
+ print(f"ONNX model loaded from: {model_path}")
111
+ print(f"ONNX providers: {self.session.get_providers()}")
112
+ print(f"ONNX input: name={self.input_name}, shape={self.input_shape}")
113
 
114
  def __repr__(self) -> str:
115
  return (
116
+ f"ONNXRuntime(session={type(self.session).__name__}, "
117
+ f"providers={self.session.get_providers()})"
 
 
 
118
  )
119
 
120
+ @staticmethod
121
+ def _safe_dim(value, default: int) -> int:
122
+ return value if isinstance(value, int) and value > 0 else default
123
+
124
+ # ---------- image preprocessing ----------
125
+ def _letterbox(
126
+ self,
127
+ image: ndarray,
128
+ new_shape: tuple[int, int],
129
+ color=(114, 114, 114),
130
+ ) -> tuple[ndarray, float, tuple[float, float]]:
131
+ h, w = image.shape[:2]
132
+ new_w, new_h = new_shape
133
+ ratio = min(new_w / w, new_h / h)
134
+ resized_w = int(round(w * ratio))
135
+ resized_h = int(round(h * ratio))
136
+ if (resized_w, resized_h) != (w, h):
137
+ interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
138
+ image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
139
+ dw = (new_w - resized_w) / 2.0
140
+ dh = (new_h - resized_h) / 2.0
141
+ left = int(round(dw - 0.1))
142
+ right = int(round(dw + 0.1))
143
+ top = int(round(dh - 0.1))
144
+ bottom = int(round(dh + 0.1))
145
+ padded = cv2.copyMakeBorder(
146
+ image, top, bottom, left, right,
147
+ borderType=cv2.BORDER_CONSTANT, value=color,
148
+ )
149
+ return padded, ratio, (dw, dh)
150
+
151
+ def _preprocess(self, image: ndarray):
152
+ img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
153
+ img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
154
  img = np.transpose(img, (2, 0, 1))[None, ...]
155
+ return np.ascontiguousarray(img, dtype=np.float32), ratio, pad
 
156
 
157
+ @staticmethod
158
+ def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
159
+ w, h = image_size
160
+ boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
161
+ boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
162
+ boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
163
+ boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
164
+ return boxes
165
+
166
+ # ---------- NMS primitives ----------
167
+ @staticmethod
168
+ def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray:
169
+ N = len(boxes)
170
+ if N == 0:
171
+ return np.array([], dtype=np.intp)
172
+ boxes = np.asarray(boxes, dtype=np.float32)
173
+ scores = np.asarray(scores, dtype=np.float32)
174
+ order = np.argsort(-scores)
175
+ keep: list[int] = []
176
+ while len(order):
177
+ i = int(order[0])
178
+ keep.append(i)
179
+ if len(order) == 1:
180
+ break
181
+ rest = order[1:]
182
+ xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
183
+ yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
184
+ xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
185
+ yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
186
+ inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
187
+ area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
188
+ area_r = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1])
189
+ iou = inter / (area_i + area_r - inter + 1e-7)
190
+ order = rest[iou <= iou_thresh]
191
+ return np.array(keep, dtype=np.intp)
192
+
193
+ def _soft_nms(
194
  self,
195
  boxes: np.ndarray,
196
  scores: np.ndarray,
197
+ sigma: float,
198
+ score_thresh: float = 0.01,
199
  ) -> tuple[np.ndarray, np.ndarray]:
200
+ N = len(boxes)
201
+ if N == 0:
202
+ return np.array([], dtype=np.intp), np.array([], dtype=np.float32)
203
+ boxes = boxes.astype(np.float32, copy=True)
204
+ scores = scores.astype(np.float32, copy=True)
205
+ order = np.arange(N)
206
+ for i in range(N):
207
+ max_pos = i + int(np.argmax(scores[i:]))
208
+ boxes[[i, max_pos]] = boxes[[max_pos, i]]
209
+ scores[[i, max_pos]] = scores[[max_pos, i]]
210
+ order[[i, max_pos]] = order[[max_pos, i]]
211
+ if i + 1 >= N:
212
+ break
213
+ xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0])
214
+ yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1])
215
+ xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2])
216
+ yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3])
217
+ inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
218
+ area_i = float(
219
+ (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
 
 
 
 
 
 
220
  )
221
+ areas_j = (
222
+ np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0])
223
+ * np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1])
224
+ )
225
+ iou = inter / (area_i + areas_j - inter + 1e-7)
226
+ scores[i + 1:] *= np.exp(-(iou ** 2) / sigma)
227
+ mask = scores > score_thresh
228
+ return order[mask], scores[mask]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
 
230
+ @staticmethod
231
+ def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
 
232
  if len(boxes) == 0:
233
+ return np.zeros(0, dtype=np.float32)
234
+ xx1 = np.maximum(box[0], boxes[:, 0])
235
+ yy1 = np.maximum(box[1], boxes[:, 1])
236
+ xx2 = np.minimum(box[2], boxes[:, 2])
237
+ yy2 = np.minimum(box[3], boxes[:, 3])
238
+ inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
239
+ area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1]))
240
+ area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
241
+ return inter / (area_a + area_b - inter + 1e-7)
242
+
243
+ # ---------- raw-dets helper ----------
244
+ def _raw_dets(self, image: ndarray, conf: float) -> np.ndarray:
245
+ """Run a single forward pass and return [N, 5] dets in ORIGINAL image coords."""
246
+ x, ratio, (dw, dh) = self._preprocess(image)
247
+ out = self.session.run(self.output_names, {self.input_name: x})[0]
248
+ if out.ndim == 3:
249
+ out = out[0]
250
+ if out.shape[1] < 5:
251
+ return np.zeros((0, 5), dtype=np.float32)
252
+ boxes = out[:, :4].astype(np.float32)
253
+ scores = out[:, 4].astype(np.float32)
254
+ keep = scores >= conf
255
+ boxes, scores = boxes[keep], scores[keep]
256
+ if len(boxes) == 0:
257
+ return np.zeros((0, 5), dtype=np.float32)
258
+ boxes[:, [0, 2]] -= dw
259
+ boxes[:, [1, 3]] -= dh
260
+ boxes /= ratio
261
+ oh, ow = image.shape[:2]
262
+ boxes = self._clip_boxes(boxes, (ow, oh))
263
+ return np.concatenate([boxes, scores[:, None]], axis=1)
264
+
265
+ # ---------- primary pass: soft-NMS + hflip TTA ----------
266
+ def _primary(self, image: ndarray) -> np.ndarray:
267
+ d1 = self._raw_dets(image, self.conf_thres)
268
+ flipped = cv2.flip(image, 1)
269
+ d2 = self._raw_dets(flipped, self.conf_thres)
270
+ if len(d2):
271
+ w = image.shape[1]
272
+ x1 = w - d2[:, 2]
273
+ x2 = w - d2[:, 0]
274
+ d2 = np.stack([x1, d2[:, 1], x2, d2[:, 3], d2[:, 4]], axis=1)
275
+ all_d = np.concatenate([d1, d2], axis=0) if len(d2) else d1
276
+ if len(all_d) == 0:
277
+ return np.zeros((0, 5), dtype=np.float32)
278
+ # soft-NMS, then hard-NMS
279
+ keep_idx, scores = self._soft_nms(all_d[:, :4].copy(), all_d[:, 4].copy(), sigma=self.sigma)
280
+ if len(keep_idx) == 0:
281
+ return np.zeros((0, 5), dtype=np.float32)
282
+ merged = np.concatenate([all_d[keep_idx, :4], scores[:, None]], axis=1)
283
+ keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
284
+ merged = merged[keep]
285
+ if len(merged) > self.max_det:
286
+ merged = merged[np.argsort(-merged[:, 4])[: self.max_det]]
287
+ return merged
288
+
289
+ # ---------- conditional tile pass ----------
290
+ def _tile_augment(self, image: ndarray, primary: np.ndarray) -> np.ndarray:
291
+ """Run 2x2 overlapping tiles + hflip, novelty-merge into primary."""
292
+ oh, ow = image.shape[:2]
293
+ tw, th = ow // 2, oh // 2
294
+ ox, oy = int(tw * self.tile_overlap), int(th * self.tile_overlap)
295
+ tiles = [
296
+ (0, 0, min(ow, tw + ox), min(oh, th + oy)),
297
+ (max(0, tw - ox), 0, ow, min(oh, th + oy)),
298
+ (0, max(0, th - oy), min(ow, tw + ox), oh),
299
+ (max(0, tw - ox), max(0, th - oy), ow, oh),
300
+ ]
301
+ collected: list[np.ndarray] = []
302
+ for x1, y1, x2, y2 in tiles:
303
+ crop = image[y1:y2, x1:x2]
304
+ if crop.size == 0:
305
+ continue
306
+ d = self._raw_dets(crop, self.tile_conf)
307
+ if len(d):
308
+ d[:, 0] += x1
309
+ d[:, 1] += y1
310
+ d[:, 2] += x1
311
+ d[:, 3] += y1
312
+ collected.append(d)
313
+
314
+ # hflip tile pass (skipped when tile_use_hflip=False — saves 4 ONNX forwards)
315
+ if self.tile_use_hflip:
316
+ flipped = cv2.flip(image, 1)
317
+ for x1, y1, x2, y2 in tiles:
318
+ fx1 = ow - x2
319
+ fx2 = ow - x1
320
+ if fx2 <= fx1:
321
+ continue
322
+ crop = flipped[y1:y2, fx1:fx2]
323
+ if crop.size == 0:
324
+ continue
325
+ d = self._raw_dets(crop, self.tile_conf)
326
+ if len(d):
327
+ d_un = d.copy()
328
+ d_un[:, 0] = (ow - (d[:, 2] + fx1))
329
+ d_un[:, 2] = (ow - (d[:, 0] + fx1))
330
+ d_un[:, 1] = d[:, 1] + y1
331
+ d_un[:, 3] = d[:, 3] + y1
332
+ collected.append(d_un)
333
+
334
+ if not collected:
335
+ return primary
336
+
337
+ tile_dets = np.concatenate(collected, axis=0)
338
+ keep = self._hard_nms(tile_dets[:, :4], tile_dets[:, 4], 0.5)
339
+ tile_dets = tile_dets[keep]
340
+
341
+ # Novelty: drop tile boxes that overlap any primary box at IoU >= novelty_iou
342
+ if len(primary) > 0 and len(tile_dets) > 0:
343
+ mask = np.ones(len(tile_dets), dtype=bool)
344
+ for i in range(len(tile_dets)):
345
+ ious = self._box_iou_one_to_many(tile_dets[i, :4], primary[:, :4])
346
+ if len(ious) and np.max(ious) >= self.novelty_iou:
347
+ mask[i] = False
348
+ tile_dets = tile_dets[mask]
349
+
350
+ if len(tile_dets) == 0:
351
+ return primary
352
+
353
+ # Sanity filter: min/max size, aspect ratio
354
+ w = tile_dets[:, 2] - tile_dets[:, 0]
355
+ h = tile_dets[:, 3] - tile_dets[:, 1]
356
+ area = w * h
357
+ ar = np.maximum(w / np.maximum(h, 1e-6), h / np.maximum(w, 1e-6))
358
+ img_area = float(ow * oh)
359
+ ok = (w >= 7) & (h >= 7) & (area >= 85) & (area <= 0.5 * img_area) & (ar <= 10.0)
360
+ tile_dets = tile_dets[ok]
361
+ if len(tile_dets) == 0:
362
+ return primary
363
+
364
+ merged = np.concatenate([primary, tile_dets], axis=0)
365
+ keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
366
+ merged = merged[keep]
367
+ if len(merged) > self.final_max_det:
368
+ merged = merged[np.argsort(-merged[:, 4])[: self.final_max_det]]
369
+ return merged
370
+
371
+ # ---------- single-image predict ----------
372
+ def _predict_single(self, image: ndarray) -> list[BoundingBox]:
373
+ if image is None or not isinstance(image, np.ndarray) or image.ndim != 3:
374
+ return []
375
+ if image.shape[0] <= 0 or image.shape[1] <= 0 or image.shape[2] != 3:
376
  return []
377
+ if image.dtype != np.uint8:
378
+ image = image.astype(np.uint8)
379
 
380
+ primary = self._primary(image)
381
+ if len(primary) < self.sparse_threshold:
382
+ dets = self._tile_augment(image, primary)
383
+ else:
384
+ dets = primary
385
 
386
+ results: list[BoundingBox] = []
387
+ for row in dets:
388
+ x1, y1, x2, y2, conf = row.tolist()
389
+ if x2 <= x1 or y2 <= y1:
390
  continue
391
+ results.append(
392
  BoundingBox(
393
+ x1=int(math.floor(x1)),
394
+ y1=int(math.floor(y1)),
395
+ x2=int(math.ceil(x2)),
396
+ y2=int(math.ceil(y2)),
397
+ cls_id=0,
398
  conf=float(conf),
399
  )
400
  )
401
+ return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
 
403
+ # ---------- chute entrypoint ----------
404
  def predict_batch(
405
  self,
406
  batch_images: list[ndarray],
 
408
  n_keypoints: int,
409
  ) -> list[TVFrameResult]:
410
  results: list[TVFrameResult] = []
411
+ for frame_number_in_batch, image in enumerate(batch_images):
 
412
  try:
413
  boxes = self._predict_single(image)
414
  except Exception as e:
415
+ print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}")
416
  boxes = []
417
  results.append(
418
  TVFrameResult(
419
+ frame_id=offset + frame_number_in_batch,
420
  boxes=boxes,
421
  keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
422
  )