MTerryJack's picture
Upload 10 files
5929bb5 verified
import argparse
import json
import sys
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List
import numpy as np
from PIL import Image
import onnxruntime as ort
def load_class_names(base_dir: Path) -> dict[int, str]:
labels_path = base_dir / "class_names.txt"
if not labels_path.exists():
return {}
names: dict[int, str] = {}
for idx, raw in enumerate(labels_path.read_text().splitlines()):
label = raw.strip()
if label:
names[idx] = label
return names
def load_image(frame: Any, base_dir: Path) -> Image.Image:
if isinstance(frame, (bytes, bytearray, memoryview)):
return Image.open(BytesIO(frame)).convert("RGB")
path = Path(str(frame))
if not path.is_absolute():
path = (Path.cwd() / path).resolve()
if not path.exists():
candidate = (base_dir / str(frame)).resolve()
if candidate.exists():
path = candidate
return Image.open(path).convert("RGB")
def load_model(*_args: Any, **_kwargs: Any):
base_dir = Path(__file__).resolve().parent
model_path = base_dir / "yolov5s_weights.onnx"
if not model_path.exists():
return None
session = ort.InferenceSession(str(model_path), providers=["CPUExecutionProvider"])
return {
"session": session,
"input_name": session.get_inputs()[0].name,
"names": load_class_names(base_dir),
"size": 640,
}
def _nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]:
if boxes.size == 0:
return []
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep: List[int] = []
while order.size > 0:
i = int(order[0])
keep.append(i)
if order.size == 1:
break
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.clip(xx2 - xx1, 0, None)
h = np.clip(yy2 - yy1, 0, None)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6)
inds = np.where(iou <= iou_thresh)[0]
order = order[inds + 1]
return keep
def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]:
if not isinstance(model, dict):
return []
session: ort.InferenceSession = model["session"]
input_name = model["input_name"]
names: dict[int, str] = model["names"]
size = int(model["size"])
image = Image.fromarray(frame).convert("RGB")
orig_w, orig_h = image.size
resized = image.resize((size, size))
inp = np.array(resized).astype("float32") / 255.0
inp = np.transpose(inp, (2, 0, 1))[None, ...]
outputs = session.run(None, {input_name: inp})
preds = outputs[0][0] # (25200, 17)
if preds.shape[1] < 6:
return []
boxes = preds[:, :4]
objectness = preds[:, 4]
class_scores = preds[:, 5:]
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(class_scores.shape[0]), class_ids]
scores = objectness * class_conf
conf_thresh = 0.25
keep = scores > conf_thresh
boxes = boxes[keep]
scores = scores[keep]
class_ids = class_ids[keep]
if boxes.size == 0:
return []
# xywh -> xyxy
x, y, w, h = boxes.T
x1 = x - w / 2
y1 = y - h / 2
x2 = x + w / 2
y2 = y + h / 2
boxes_xyxy = np.stack([x1, y1, x2, y2], axis=1)
keep_idx = _nms(boxes_xyxy, scores, 0.45)
detections: List[Dict[str, Any]] = []
for det_idx, i in enumerate(keep_idx):
xyxy = boxes_xyxy[i]
# map back to original size
scale_x = orig_w / size
scale_y = orig_h / size
xyxy = np.array([xyxy[0] * scale_x, xyxy[1] * scale_y, xyxy[2] * scale_x, xyxy[3] * scale_y])
class_id = int(class_ids[i])
label = names.get(class_id, str(class_id))
detections.append(
{
"frame_idx": 0,
"class": label,
"bbox": [float(v) for v in xyxy],
"score": float(scores[i]),
"track_id": f"f0-d{det_idx}",
}
)
return detections
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run vehicle detection (YOLOv5 ONNX).")
parser.add_argument(
"--stdin-raw",
action="store_true",
default=True,
help="Read raw image bytes from stdin.",
)
return parser
if __name__ == "__main__":
build_parser().parse_args()
base_dir = Path(__file__).resolve().parent
model = load_model()
if model is None:
print("[]")
sys.exit(0)
try:
image = load_image(sys.stdin.buffer.read(), base_dir)
except Exception:
print("[]")
sys.exit(0)
frame = np.array(image)
output = run_model(model, frame)
print(json.dumps(output))