import os, cv2, base64, asyncio, httpx import numpy as np from ultralytics import YOLO from PIL import Image from io import BytesIO from datetime import datetime from dotenv import load_dotenv from typing import Dict, List from utils import * load_dotenv() MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0") WEBHOOK_URL = os.getenv("WEBHOOK_URL") WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0")) # ============================================================ # DEFECT DETECTION FROM BASE64 IMAGE # ============================================================ def detect_defect_from_base64(station_id: str, camera_id: str, image_base64: str, model_path=None): """ Detect defect from a single Base64 image. Return: - status: "OK" / "NG" / "error" - annotated image (base64) - list of detections """ try: # OPTION 1 # img_data = base64.b64decode(image_base64) # np_arr = np.frombuffer(img_data, np.uint8) # frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # OPTION 2 img_data = base64.b64decode(image_base64) image = Image.open(BytesIO(img_data)).convert("RGB") frame = np.array(image) if frame is None: raise ValueError("Decoded image is None") except Exception as e: logger.error(f"[ERROR] Cannot decode base64 image for camera {camera_id}: {e}") return { "station_id": station_id, "camera_id": camera_id, "status": "error", "status_defect": "", "image_base64": "", "detections": [], "message": "Invalid base64 image" } detections = [] try: model = YOLO(f"./{model_path}") logger.info(f"[MODEL] Success load model") except Exception as e: logger.error(f"[ERROR] Cannot load model: {e}") if model: results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False) boxes = results[0].boxes for box in boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) xyxy = [int(x) for x in box.xyxy[0].tolist()] defect_name = model.names.get(cls, f"class_{cls}").lower() x1, y1, x2, y2 = xyxy color = color_defect(defect_name) if defect_name else color_defect('other') # Draw bbox + label cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) label = f"{defect_name.upper()} {conf:.2f}" (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2) detections.append({ "class": defect_name, "confidence": conf, "bbox": xyxy }) # Convert annotated frame ke Base64 _, buffer = cv2.imencode(".jpg", frame) frame_base64 = base64.b64encode(buffer).decode("utf-8") # Save OK image (no bbox) # output_dir = "outputs/images" # os.makedirs(output_dir, exist_ok=True) # filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" # filepath = os.path.join(output_dir, filename) # cv2.imwrite(filepath, frame) # logger.info(f"[SAVED] OK image saved to {filepath}") if detections: logger.info(f"[DETECTED] Camera {camera_id} → {len(detections)} defect(s)") return { "station_id": station_id, "camera_id": camera_id, "status": "success", "status_defect": "NG", "image_base64": frame_base64, "detections": detections, "message": "Detected as defect" } else: logger.info(f"[OK] Camera {camera_id} → No defect detected.") return { "station_id": station_id, "camera_id": camera_id, "status": "success", "status_defect": "OK", "image_base64": frame_base64, "detections": [], "message": "Detected as normal (no defect)" } # ============================================================ # ASYNC WRAPPERS # ============================================================ async def _detect_camera_image(station_id: str, camera: Dict, model_path=None): """Run detect_defect_from_base64 in thread for async parallel.""" return await asyncio.to_thread( detect_defect_from_base64, station_id, camera["camera_id"], camera["image_base64"], model_path ) # return await asyncio.to_thread( # testing, # station_id, # camera["camera_id"], # camera["image_base64"], # model # ) async def run_detection_group( station_id: str, cameras: List[Dict], webhook_url: str, model_path=None, parts: Dict = None ): parts = parts or {} logger.info(f"[START] Station {station_id} → {len(cameras)} camera(s)") results = await asyncio.gather( *[_detect_camera_image(station_id, cam, model_path) for cam in cameras], return_exceptions=True ) # Bersihkan hasil dengan aman clean_results = [] for r in results: if isinstance(r, Exception): clean_results.append({ "status": "error", "message": str(r) }) else: clean_results.append(r) # Tentukan status keseluruhan has_error = any(r.get("status") == "error" for r in clean_results) all_error = all(r.get("status") == "error" for r in clean_results) if all_error: status = "error" message = "All cameras failed during detection" elif has_error: status = "partial_error" message = "Some cameras failed during detection" else: status = "success" message = "Success detecting defects" payload = { "status": status, "timestamp": datetime.now().isoformat(), "model_version": MODEL_VERSION, "message": message, "parts": parts, "data": make_serializable(clean_results), } # Kirim webhook try: async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client: response = await client.post(webhook_url, json=payload) response.raise_for_status() logger.info(f"[DONE] Station {station_id} → webhook sent ({response.status_code})") except Exception as e: logger.exception(f"[ERROR] Webhook failed for Station {station_id}: {e}") return payload # ============================================================ # JSON SERIALIZABLE HELPER # ============================================================ def make_serializable(obj): """Convert object to JSON-serializable format.""" if isinstance(obj, (int, float, str, bool)) or obj is None: return obj elif isinstance(obj, (list, tuple)): return [make_serializable(i) for i in obj] elif isinstance(obj, dict): return {k: make_serializable(v) for k, v in obj.items()} elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return str(obj)