Upload handler.py with huggingface_hub
Browse files- handler.py +50 -0
handler.py
ADDED
|
@@ -0,0 +1,50 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Custom inference handler for RF-DETR Threat Detection on HuggingFace Inference Endpoints."""
|
| 2 |
+
|
| 3 |
+
from typing import Any, Dict, List
|
| 4 |
+
import io
|
| 5 |
+
import numpy as np
|
| 6 |
+
from PIL import Image
|
| 7 |
+
|
| 8 |
+
|
| 9 |
+
class EndpointHandler:
|
| 10 |
+
def __init__(self, path: str = ""):
|
| 11 |
+
from rfdetr import RFDETRNano
|
| 12 |
+
import os
|
| 13 |
+
|
| 14 |
+
weights = os.path.join(path, "checkpoint_best_total.pth")
|
| 15 |
+
self.model = RFDETRNano(resolution=960, pretrain_weights=weights)
|
| 16 |
+
self.model.optimize_for_inference()
|
| 17 |
+
|
| 18 |
+
self.class_map = {1: "Gun", 2: "Explosive", 3: "Grenade", 4: "Knife"}
|
| 19 |
+
|
| 20 |
+
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
|
| 21 |
+
inputs = data.get("inputs")
|
| 22 |
+
if isinstance(inputs, bytes):
|
| 23 |
+
image = Image.open(io.BytesIO(inputs)).convert("RGB")
|
| 24 |
+
elif isinstance(inputs, str):
|
| 25 |
+
import base64
|
| 26 |
+
image = Image.open(io.BytesIO(base64.b64decode(inputs))).convert("RGB")
|
| 27 |
+
else:
|
| 28 |
+
image = inputs
|
| 29 |
+
|
| 30 |
+
threshold = data.get("parameters", {}).get("threshold", 0.25)
|
| 31 |
+
detections = self.model.predict(image, threshold=threshold)
|
| 32 |
+
|
| 33 |
+
results = []
|
| 34 |
+
if hasattr(detections, "class_id") and len(detections.class_id) > 0:
|
| 35 |
+
for idx in range(len(detections.class_id)):
|
| 36 |
+
cid = int(detections.class_id[idx])
|
| 37 |
+
conf = float(detections.confidence[idx])
|
| 38 |
+
bbox = detections.xyxy[idx].tolist()
|
| 39 |
+
results.append({
|
| 40 |
+
"label": self.class_map.get(cid, f"threat_{cid}"),
|
| 41 |
+
"score": round(conf, 4),
|
| 42 |
+
"box": {
|
| 43 |
+
"xmin": int(bbox[0]),
|
| 44 |
+
"ymin": int(bbox[1]),
|
| 45 |
+
"xmax": int(bbox[2]),
|
| 46 |
+
"ymax": int(bbox[3]),
|
| 47 |
+
},
|
| 48 |
+
})
|
| 49 |
+
|
| 50 |
+
return results
|