|
|
import argparse |
|
|
import json |
|
|
import sys |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import onnxruntime as ort |
|
|
|
|
|
|
|
|
def load_class_names(base_dir: Path) -> dict[int, str]: |
|
|
labels_path = base_dir / "class_names.txt" |
|
|
if not labels_path.exists(): |
|
|
return {} |
|
|
names: dict[int, str] = {} |
|
|
for idx, raw in enumerate(labels_path.read_text().splitlines()): |
|
|
label = raw.strip() |
|
|
if label: |
|
|
names[idx] = label |
|
|
return names |
|
|
|
|
|
|
|
|
def load_image(frame: Any, base_dir: Path) -> Image.Image: |
|
|
if isinstance(frame, (bytes, bytearray, memoryview)): |
|
|
return Image.open(BytesIO(frame)).convert("RGB") |
|
|
|
|
|
path = Path(str(frame)) |
|
|
if not path.is_absolute(): |
|
|
path = (Path.cwd() / path).resolve() |
|
|
if not path.exists(): |
|
|
candidate = (base_dir / str(frame)).resolve() |
|
|
if candidate.exists(): |
|
|
path = candidate |
|
|
return Image.open(path).convert("RGB") |
|
|
|
|
|
|
|
|
def load_model(*_args: Any, **_kwargs: Any): |
|
|
base_dir = Path(__file__).resolve().parent |
|
|
model_path = base_dir / "yolov5s_weights.onnx" |
|
|
if not model_path.exists(): |
|
|
return None |
|
|
session = ort.InferenceSession(str(model_path), providers=["CPUExecutionProvider"]) |
|
|
return { |
|
|
"session": session, |
|
|
"input_name": session.get_inputs()[0].name, |
|
|
"names": load_class_names(base_dir), |
|
|
"size": 640, |
|
|
} |
|
|
|
|
|
|
|
|
def _nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]: |
|
|
if boxes.size == 0: |
|
|
return [] |
|
|
x1, y1, x2, y2 = boxes.T |
|
|
areas = (x2 - x1) * (y2 - y1) |
|
|
order = scores.argsort()[::-1] |
|
|
keep: List[int] = [] |
|
|
while order.size > 0: |
|
|
i = int(order[0]) |
|
|
keep.append(i) |
|
|
if order.size == 1: |
|
|
break |
|
|
xx1 = np.maximum(x1[i], x1[order[1:]]) |
|
|
yy1 = np.maximum(y1[i], y1[order[1:]]) |
|
|
xx2 = np.minimum(x2[i], x2[order[1:]]) |
|
|
yy2 = np.minimum(y2[i], y2[order[1:]]) |
|
|
w = np.clip(xx2 - xx1, 0, None) |
|
|
h = np.clip(yy2 - yy1, 0, None) |
|
|
inter = w * h |
|
|
iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6) |
|
|
inds = np.where(iou <= iou_thresh)[0] |
|
|
order = order[inds + 1] |
|
|
return keep |
|
|
|
|
|
|
|
|
def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]: |
|
|
if not isinstance(model, dict): |
|
|
return [] |
|
|
session: ort.InferenceSession = model["session"] |
|
|
input_name = model["input_name"] |
|
|
names: dict[int, str] = model["names"] |
|
|
size = int(model["size"]) |
|
|
|
|
|
image = Image.fromarray(frame).convert("RGB") |
|
|
orig_w, orig_h = image.size |
|
|
resized = image.resize((size, size)) |
|
|
inp = np.array(resized).astype("float32") / 255.0 |
|
|
inp = np.transpose(inp, (2, 0, 1))[None, ...] |
|
|
|
|
|
outputs = session.run(None, {input_name: inp}) |
|
|
preds = outputs[0][0] |
|
|
if preds.shape[1] < 6: |
|
|
return [] |
|
|
|
|
|
boxes = preds[:, :4] |
|
|
objectness = preds[:, 4] |
|
|
class_scores = preds[:, 5:] |
|
|
class_ids = np.argmax(class_scores, axis=1) |
|
|
class_conf = class_scores[np.arange(class_scores.shape[0]), class_ids] |
|
|
scores = objectness * class_conf |
|
|
|
|
|
conf_thresh = 0.25 |
|
|
keep = scores > conf_thresh |
|
|
boxes = boxes[keep] |
|
|
scores = scores[keep] |
|
|
class_ids = class_ids[keep] |
|
|
|
|
|
if boxes.size == 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
x, y, w, h = boxes.T |
|
|
x1 = x - w / 2 |
|
|
y1 = y - h / 2 |
|
|
x2 = x + w / 2 |
|
|
y2 = y + h / 2 |
|
|
boxes_xyxy = np.stack([x1, y1, x2, y2], axis=1) |
|
|
|
|
|
keep_idx = _nms(boxes_xyxy, scores, 0.45) |
|
|
detections: List[Dict[str, Any]] = [] |
|
|
for det_idx, i in enumerate(keep_idx): |
|
|
xyxy = boxes_xyxy[i] |
|
|
|
|
|
scale_x = orig_w / size |
|
|
scale_y = orig_h / size |
|
|
xyxy = np.array([xyxy[0] * scale_x, xyxy[1] * scale_y, xyxy[2] * scale_x, xyxy[3] * scale_y]) |
|
|
class_id = int(class_ids[i]) |
|
|
label = names.get(class_id, str(class_id)) |
|
|
detections.append( |
|
|
{ |
|
|
"frame_idx": 0, |
|
|
"class": label, |
|
|
"bbox": [float(v) for v in xyxy], |
|
|
"score": float(scores[i]), |
|
|
"track_id": f"f0-d{det_idx}", |
|
|
} |
|
|
) |
|
|
|
|
|
return detections |
|
|
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser: |
|
|
parser = argparse.ArgumentParser(description="Run vehicle detection (YOLOv5 ONNX).") |
|
|
parser.add_argument( |
|
|
"--stdin-raw", |
|
|
action="store_true", |
|
|
default=True, |
|
|
help="Read raw image bytes from stdin.", |
|
|
) |
|
|
return parser |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
build_parser().parse_args() |
|
|
|
|
|
base_dir = Path(__file__).resolve().parent |
|
|
model = load_model() |
|
|
if model is None: |
|
|
print("[]") |
|
|
sys.exit(0) |
|
|
|
|
|
try: |
|
|
image = load_image(sys.stdin.buffer.read(), base_dir) |
|
|
except Exception: |
|
|
print("[]") |
|
|
sys.exit(0) |
|
|
|
|
|
frame = np.array(image) |
|
|
output = run_model(model, frame) |
|
|
print(json.dumps(output)) |
|
|
|