File size: 2,277 Bytes
1514d57
0bd4bcf
 
 
 
c2e9672
0bd4bcf
 
 
c2e9672
1514d57
0bd4bcf
1514d57
 
 
0bd4bcf
1514d57
0bd4bcf
 
 
1514d57
0bd4bcf
 
 
 
 
1514d57
 
 
 
 
 
 
 
 
 
0bd4bcf
 
 
 
 
 
 
 
 
 
1514d57
0bd4bcf
 
 
1514d57
0bd4bcf
 
 
 
 
 
1514d57
 
 
 
 
 
 
 
 
 
 
c2e9672
1514d57
0bd4bcf
 
1514d57
 
0bd4bcf
 
1514d57
 
 
 
0bd4bcf
1514d57
 
c2e9672
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import argparse
import json
import sys
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List

from PIL import Image
from ultralytics import YOLO
import numpy as np


def load_image(frame: Any, base_dir: Path) -> Image.Image:
    if isinstance(frame, (bytes, bytearray, memoryview)):
        return Image.open(BytesIO(frame)).convert("RGB")

    path = Path(str(frame))
    if not path.is_absolute():
        path = (Path.cwd() / path).resolve()
        if not path.exists():
            candidate = (base_dir / str(frame)).resolve()
            if candidate.exists():
                path = candidate
    return Image.open(path).convert("RGB")


def load_model(*_args: Any, **_kwargs: Any):
    base_dir = Path(__file__).resolve().parent
    model_path = base_dir / "best.pt"
    if not model_path.exists():
        return None
    return YOLO(str(model_path))


def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]:
    image = Image.fromarray(frame)
    results = model(image)
    detections: List[Dict[str, Any]] = []
    result = results[0]
    names = result.names or model.names

    for det_idx, box in enumerate(result.boxes):
        xyxy = box.xyxy[0].tolist()
        class_id = int(box.cls[0].item())
        detections.append(
            {
                "frame_idx": 0,
                "class": names.get(class_id, str(class_id)),
                "bbox": [float(x) for x in xyxy],
                "score": float(box.conf[0].item()),
                "track_id": f"f0-d{det_idx}",
            }
        )

    return detections


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Run vehicle detect element.")
    parser.add_argument(
        "--stdin-raw",
        action="store_true",
        default=True,
        help="Read raw image bytes from stdin.",
    )
    return parser


if __name__ == "__main__":
    args = build_parser().parse_args()

    base_dir = Path(__file__).resolve().parent
    model = load_model()
    if model is None:
        print("[]")

    try:
        image = load_image(sys.stdin.buffer.read(), base_dir)
    except Exception:
        print("[]")

    frame = np.array(image)
    output = run_model(model, frame)
    print(json.dumps(output))