import argparse import json import sys from io import BytesIO from pathlib import Path from typing import Any, Dict, List import numpy as np from PIL import Image import onnxruntime as ort def load_class_names(base_dir: Path) -> dict[int, str]: labels_path = base_dir / "class_names.txt" if not labels_path.exists(): return {} names: dict[int, str] = {} for idx, raw in enumerate(labels_path.read_text().splitlines()): label = raw.strip() if label: names[idx] = label return names def load_image(frame: Any, base_dir: Path) -> Image.Image: if isinstance(frame, (bytes, bytearray, memoryview)): return Image.open(BytesIO(frame)).convert("RGB") path = Path(str(frame)) if not path.is_absolute(): path = (Path.cwd() / path).resolve() if not path.exists(): candidate = (base_dir / str(frame)).resolve() if candidate.exists(): path = candidate return Image.open(path).convert("RGB") def load_model(*_args: Any, **_kwargs: Any): base_dir = Path(__file__).resolve().parent model_path = base_dir / "yolov5s_weights.onnx" if not model_path.exists(): return None session = ort.InferenceSession(str(model_path), providers=["CPUExecutionProvider"]) return { "session": session, "input_name": session.get_inputs()[0].name, "names": load_class_names(base_dir), "size": 640, } def _nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]: if boxes.size == 0: return [] x1, y1, x2, y2 = boxes.T areas = (x2 - x1) * (y2 - y1) order = scores.argsort()[::-1] keep: List[int] = [] while order.size > 0: i = int(order[0]) keep.append(i) if order.size == 1: break xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.clip(xx2 - xx1, 0, None) h = np.clip(yy2 - yy1, 0, None) inter = w * h iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6) inds = np.where(iou <= iou_thresh)[0] order = order[inds + 1] return keep def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]: if not isinstance(model, dict): return [] session: ort.InferenceSession = model["session"] input_name = model["input_name"] names: dict[int, str] = model["names"] size = int(model["size"]) image = Image.fromarray(frame).convert("RGB") orig_w, orig_h = image.size resized = image.resize((size, size)) inp = np.array(resized).astype("float32") / 255.0 inp = np.transpose(inp, (2, 0, 1))[None, ...] outputs = session.run(None, {input_name: inp}) preds = outputs[0][0] # (25200, 17) if preds.shape[1] < 6: return [] boxes = preds[:, :4] objectness = preds[:, 4] class_scores = preds[:, 5:] class_ids = np.argmax(class_scores, axis=1) class_conf = class_scores[np.arange(class_scores.shape[0]), class_ids] scores = objectness * class_conf conf_thresh = 0.25 keep = scores > conf_thresh boxes = boxes[keep] scores = scores[keep] class_ids = class_ids[keep] if boxes.size == 0: return [] # xywh -> xyxy x, y, w, h = boxes.T x1 = x - w / 2 y1 = y - h / 2 x2 = x + w / 2 y2 = y + h / 2 boxes_xyxy = np.stack([x1, y1, x2, y2], axis=1) keep_idx = _nms(boxes_xyxy, scores, 0.45) detections: List[Dict[str, Any]] = [] for det_idx, i in enumerate(keep_idx): xyxy = boxes_xyxy[i] # map back to original size scale_x = orig_w / size scale_y = orig_h / size xyxy = np.array([xyxy[0] * scale_x, xyxy[1] * scale_y, xyxy[2] * scale_x, xyxy[3] * scale_y]) class_id = int(class_ids[i]) label = names.get(class_id, str(class_id)) detections.append( { "frame_idx": 0, "class": label, "bbox": [float(v) for v in xyxy], "score": float(scores[i]), "track_id": f"f0-d{det_idx}", } ) return detections def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run vehicle detection (YOLOv5 ONNX).") parser.add_argument( "--stdin-raw", action="store_true", default=True, help="Read raw image bytes from stdin.", ) return parser if __name__ == "__main__": build_parser().parse_args() base_dir = Path(__file__).resolve().parent model = load_model() if model is None: print("[]") sys.exit(0) try: image = load_image(sys.stdin.buffer.read(), base_dir) except Exception: print("[]") sys.exit(0) frame = np.array(image) output = run_model(model, frame) print(json.dumps(output))