Defect-Detection / core /defect_detection_image.py
Navy
last update
6d007bb
import os, cv2, base64, asyncio, httpx
import numpy as np
from ultralytics import YOLO
from PIL import Image
from io import BytesIO
from datetime import datetime
from dotenv import load_dotenv
from typing import Dict, List
from utils import *
load_dotenv()
MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0")
WEBHOOK_URL = os.getenv("WEBHOOK_URL")
WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0"))
# ============================================================
# DEFECT DETECTION FROM BASE64 IMAGE
# ============================================================
def detect_defect_from_base64(station_id: str, camera_id: str, image_base64: str, model_path=None):
"""
Detect defect from a single Base64 image.
Return:
- status: "OK" / "NG" / "error"
- annotated image (base64)
- list of detections
"""
try:
# OPTION 1
# img_data = base64.b64decode(image_base64)
# np_arr = np.frombuffer(img_data, np.uint8)
# frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# OPTION 2
img_data = base64.b64decode(image_base64)
image = Image.open(BytesIO(img_data)).convert("RGB")
frame = np.array(image)
if frame is None:
raise ValueError("Decoded image is None")
except Exception as e:
logger.error(f"[ERROR] Cannot decode base64 image for camera {camera_id}: {e}")
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "error",
"status_defect": "",
"image_base64": "",
"detections": [],
"message": "Invalid base64 image"
}
detections = []
try:
model = YOLO(f"./{model_path}")
logger.info(f"[MODEL] Success load model")
except Exception as e:
logger.error(f"[ERROR] Cannot load model: {e}")
if model:
results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False)
boxes = results[0].boxes
for box in boxes:
cls = int(box.cls[0])
conf = float(box.conf[0])
xyxy = [int(x) for x in box.xyxy[0].tolist()]
defect_name = model.names.get(cls, f"class_{cls}").lower()
x1, y1, x2, y2 = xyxy
color = color_defect(defect_name) if defect_name else color_defect('other')
# Draw bbox + label
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label = f"{defect_name.upper()} {conf:.2f}"
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1)
cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
detections.append({
"class": defect_name,
"confidence": conf,
"bbox": xyxy
})
# Convert annotated frame ke Base64
_, buffer = cv2.imencode(".jpg", frame)
frame_base64 = base64.b64encode(buffer).decode("utf-8")
# Save OK image (no bbox)
# output_dir = "outputs/images"
# os.makedirs(output_dir, exist_ok=True)
# filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
# filepath = os.path.join(output_dir, filename)
# cv2.imwrite(filepath, frame)
# logger.info(f"[SAVED] OK image saved to {filepath}")
if detections:
logger.info(f"[DETECTED] Camera {camera_id} β†’ {len(detections)} defect(s)")
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "success",
"status_defect": "NG",
"image_base64": frame_base64,
"detections": detections,
"message": "Detected as defect"
}
else:
logger.info(f"[OK] Camera {camera_id} β†’ No defect detected.")
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "success",
"status_defect": "OK",
"image_base64": frame_base64,
"detections": [],
"message": "Detected as normal (no defect)"
}
# ============================================================
# ASYNC WRAPPERS
# ============================================================
async def _detect_camera_image(station_id: str, camera: Dict, model_path=None):
"""Run detect_defect_from_base64 in thread for async parallel."""
return await asyncio.to_thread(
detect_defect_from_base64,
station_id,
camera["camera_id"],
camera["image_base64"],
model_path
)
# return await asyncio.to_thread(
# testing,
# station_id,
# camera["camera_id"],
# camera["image_base64"],
# model
# )
async def run_detection_group(
station_id: str,
cameras: List[Dict],
webhook_url: str,
model_path=None,
parts: Dict = None
):
parts = parts or {}
logger.info(f"[START] Station {station_id} β†’ {len(cameras)} camera(s)")
results = await asyncio.gather(
*[_detect_camera_image(station_id, cam, model_path) for cam in cameras],
return_exceptions=True
)
# Bersihkan hasil dengan aman
clean_results = []
for r in results:
if isinstance(r, Exception):
clean_results.append({
"status": "error",
"message": str(r)
})
else:
clean_results.append(r)
# Tentukan status keseluruhan
has_error = any(r.get("status") == "error" for r in clean_results)
all_error = all(r.get("status") == "error" for r in clean_results)
if all_error:
status = "error"
message = "All cameras failed during detection"
elif has_error:
status = "partial_error"
message = "Some cameras failed during detection"
else:
status = "success"
message = "Success detecting defects"
payload = {
"status": status,
"timestamp": datetime.now().isoformat(),
"model_version": MODEL_VERSION,
"message": message,
"parts": parts,
"data": make_serializable(clean_results),
}
# Kirim webhook
try:
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client:
response = await client.post(webhook_url, json=payload)
response.raise_for_status()
logger.info(f"[DONE] Station {station_id} β†’ webhook sent ({response.status_code})")
except Exception as e:
logger.exception(f"[ERROR] Webhook failed for Station {station_id}: {e}")
return payload
# ============================================================
# JSON SERIALIZABLE HELPER
# ============================================================
def make_serializable(obj):
"""Convert object to JSON-serializable format."""
if isinstance(obj, (int, float, str, bool)) or obj is None:
return obj
elif isinstance(obj, (list, tuple)):
return [make_serializable(i) for i in obj]
elif isinstance(obj, dict):
return {k: make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return str(obj)