File size: 3,047 Bytes
ec19c1a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import argparse
import json
import sys
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List

import numpy as np
from PIL import Image
from ultralytics import YOLO


def load_class_names(base_dir: Path) -> dict[int, str]:
    labels_path = base_dir / "class_names.txt"
    if not labels_path.exists():
        return {}
    names: dict[int, str] = {}
    for idx, raw in enumerate(labels_path.read_text().splitlines()):
        label = raw.strip()
        if label:
            names[idx] = label
    return names


def load_image(frame: Any, base_dir: Path) -> Image.Image:
    if isinstance(frame, (bytes, bytearray, memoryview)):
        return Image.open(BytesIO(frame)).convert("RGB")

    path = Path(str(frame))
    if not path.is_absolute():
        path = (Path.cwd() / path).resolve()
        if not path.exists():
            candidate = (base_dir / str(frame)).resolve()
            if candidate.exists():
                path = candidate
    return Image.open(path).convert("RGB")


def load_model(*_args: Any, **_kwargs: Any):
    base_dir = Path(__file__).resolve().parent
    model_path = base_dir / "yolov5s_weights.onnx"
    if not model_path.exists():
        return None
    model = YOLO(str(model_path), task="detect")
    names = load_class_names(base_dir)
    if names:
        if hasattr(model, "model") and hasattr(model.model, "names"):
            model.model.names = names
    return model


def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]:
    base_dir = Path(__file__).resolve().parent
    fallback_names = load_class_names(base_dir)
    image = Image.fromarray(frame)
    results = model(image)
    detections: List[Dict[str, Any]] = []
    result = results[0]
    names = result.names or model.names or fallback_names or {}

    for det_idx, box in enumerate(result.boxes):
        xyxy = box.xyxy[0].tolist()
        class_id = int(box.cls[0].item())
        label = fallback_names.get(class_id, names.get(class_id, str(class_id)))
        detections.append(
            {
                "frame_idx": 0,
                "class": label,
                "bbox": [float(x) for x in xyxy],
                "score": float(box.conf[0].item()),
                "track_id": f"f0-d{det_idx}",
            }
        )

    return detections


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Run person detection (YOLOv5 ONNX).")
    parser.add_argument(
        "--stdin-raw",
        action="store_true",
        default=True,
        help="Read raw image bytes from stdin.",
    )
    return parser


if __name__ == "__main__":
    build_parser().parse_args()

    base_dir = Path(__file__).resolve().parent
    model = load_model()
    if model is None:
        print("[]")
        sys.exit(0)

    try:
        image = load_image(sys.stdin.buffer.read(), base_dir)
    except Exception:
        print("[]")
        sys.exit(0)

    frame = np.array(image)
    output = run_model(model, frame)
    print(json.dumps(output))