MTerryJack's picture
Upload 9 files
d66fee6 verified
import argparse
import json
import sys
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List
import numpy as np
from PIL import Image
import onnxruntime as ort
def load_class_names(base_dir: Path) -> dict[int, str]:
labels_path = base_dir / "class_names.txt"
if not labels_path.exists():
return {}
names: dict[int, str] = {}
for idx, raw in enumerate(labels_path.read_text().splitlines()):
label = raw.strip()
if label:
names[idx] = label
return names
def load_image(frame: Any, base_dir: Path) -> Image.Image:
if isinstance(frame, (bytes, bytearray, memoryview)):
return Image.open(BytesIO(frame)).convert("RGB")
path = Path(str(frame))
if not path.is_absolute():
path = (Path.cwd() / path).resolve()
if not path.exists():
candidate = (base_dir / str(frame)).resolve()
if candidate.exists():
path = candidate
return Image.open(path).convert("RGB")
def _nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> List[int]:
if boxes.size == 0:
return []
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep: List[int] = []
while order.size > 0:
i = int(order[0])
keep.append(i)
if order.size == 1:
break
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.clip(xx2 - xx1, 0, None)
h = np.clip(yy2 - yy1, 0, None)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6)
inds = np.where(iou <= iou_thresh)[0]
order = order[inds + 1]
return keep
def load_model(*_args: Any, **_kwargs: Any):
base_dir = Path(__file__).resolve().parent
model_path = base_dir / "yolov5s_weights.onnx"
if not model_path.exists():
return None
session = ort.InferenceSession(str(model_path), providers=["CPUExecutionProvider"])
return {
"session": session,
"input_name": session.get_inputs()[0].name,
"names": load_class_names(base_dir),
"size": 640,
}
def run_model(model, frame: "np.ndarray") -> List[Dict[str, Any]]:
if not isinstance(model, dict):
return []
session: ort.InferenceSession = model["session"]
input_name = model["input_name"]
names: dict[int, str] = model["names"]
size = int(model["size"])
image = Image.fromarray(frame).convert("RGB")
orig_w, orig_h = image.size
resized = image.resize((size, size))
inp = np.array(resized).astype("float32") / 255.0
inp = np.transpose(inp, (2, 0, 1))[None, ...]
outputs = session.run(None, {input_name: inp})
preds = outputs[0][0] # (25200, 5+nc)
if preds.shape[1] < 6:
return []
boxes = preds[:, :4]
objectness = preds[:, 4]
class_scores = preds[:, 5:]
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(class_scores.shape[0]), class_ids]
scores = objectness * class_conf
conf_thresh = 0.25
keep = scores > conf_thresh
boxes = boxes[keep]
scores = scores[keep]
class_ids = class_ids[keep]
if boxes.size == 0:
return []
x, y, w, h = boxes.T
x1 = x - w / 2
y1 = y - h / 2
x2 = x + w / 2
y2 = y + h / 2
boxes_xyxy = np.stack([x1, y1, x2, y2], axis=1)
keep_idx = _nms(boxes_xyxy, scores, 0.45)
detections: List[Dict[str, Any]] = []
for det_idx, i in enumerate(keep_idx):
xyxy = boxes_xyxy[i]
scale_x = orig_w / size
scale_y = orig_h / size
xyxy = np.array([xyxy[0] * scale_x, xyxy[1] * scale_y, xyxy[2] * scale_x, xyxy[3] * scale_y])
class_id = int(class_ids[i])
label = names.get(class_id, str(class_id))
detections.append(
{
"frame_idx": 0,
"class": label,
"bbox": [float(v) for v in xyxy],
"score": float(scores[i]),
"track_id": f"f0-d{det_idx}",
}
)
return detections
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run rotten fruit detection (YOLOv5 ONNX).")
parser.add_argument(
"--stdin-raw",
action="store_true",
default=True,
help="Read raw image bytes from stdin.",
)
return parser
if __name__ == "__main__":
build_parser().parse_args()
base_dir = Path(__file__).resolve().parent
model = load_model()
if model is None:
print("[]")
sys.exit(0)
try:
image = load_image(sys.stdin.buffer.read(), base_dir)
except Exception:
print("[]")
sys.exit(0)
frame = np.array(image)
output = run_model(model, frame)
print(json.dumps(output))