Spaces:
Sleeping
Sleeping
| import os, cv2, base64, asyncio, httpx | |
| import numpy as np | |
| from ultralytics import YOLO | |
| from PIL import Image | |
| from io import BytesIO | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import Dict, List | |
| from utils import * | |
| load_dotenv() | |
| MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0") | |
| WEBHOOK_URL = os.getenv("WEBHOOK_URL") | |
| WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0")) | |
| # ============================================================ | |
| # DEFECT DETECTION FROM BASE64 IMAGE | |
| # ============================================================ | |
| def detect_defect_from_base64(station_id: str, camera_id: str, image_base64: str, model_path=None): | |
| """ | |
| Detect defect from a single Base64 image. | |
| Return: | |
| - status: "OK" / "NG" / "error" | |
| - annotated image (base64) | |
| - list of detections | |
| """ | |
| try: | |
| # OPTION 1 | |
| # img_data = base64.b64decode(image_base64) | |
| # np_arr = np.frombuffer(img_data, np.uint8) | |
| # frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
| # OPTION 2 | |
| img_data = base64.b64decode(image_base64) | |
| image = Image.open(BytesIO(img_data)).convert("RGB") | |
| frame = np.array(image) | |
| if frame is None: | |
| raise ValueError("Decoded image is None") | |
| except Exception as e: | |
| logger.error(f"[ERROR] Cannot decode base64 image for camera {camera_id}: {e}") | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "error", | |
| "status_defect": "", | |
| "image_base64": "", | |
| "detections": [], | |
| "message": "Invalid base64 image" | |
| } | |
| detections = [] | |
| try: | |
| model = YOLO(f"./{model_path}") | |
| logger.info(f"[MODEL] Success load model") | |
| except Exception as e: | |
| logger.error(f"[ERROR] Cannot load model: {e}") | |
| if model: | |
| results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False) | |
| boxes = results[0].boxes | |
| for box in boxes: | |
| cls = int(box.cls[0]) | |
| conf = float(box.conf[0]) | |
| xyxy = [int(x) for x in box.xyxy[0].tolist()] | |
| defect_name = model.names.get(cls, f"class_{cls}").lower() | |
| x1, y1, x2, y2 = xyxy | |
| color = color_defect(defect_name) if defect_name else color_defect('other') | |
| # Draw bbox + label | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| label = f"{defect_name.upper()} {conf:.2f}" | |
| (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1) | |
| cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2) | |
| detections.append({ | |
| "class": defect_name, | |
| "confidence": conf, | |
| "bbox": xyxy | |
| }) | |
| # Convert annotated frame ke Base64 | |
| _, buffer = cv2.imencode(".jpg", frame) | |
| frame_base64 = base64.b64encode(buffer).decode("utf-8") | |
| # Save OK image (no bbox) | |
| # output_dir = "outputs/images" | |
| # os.makedirs(output_dir, exist_ok=True) | |
| # filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
| # filepath = os.path.join(output_dir, filename) | |
| # cv2.imwrite(filepath, frame) | |
| # logger.info(f"[SAVED] OK image saved to {filepath}") | |
| if detections: | |
| logger.info(f"[DETECTED] Camera {camera_id} β {len(detections)} defect(s)") | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "success", | |
| "status_defect": "NG", | |
| "image_base64": frame_base64, | |
| "detections": detections, | |
| "message": "Detected as defect" | |
| } | |
| else: | |
| logger.info(f"[OK] Camera {camera_id} β No defect detected.") | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "success", | |
| "status_defect": "OK", | |
| "image_base64": frame_base64, | |
| "detections": [], | |
| "message": "Detected as normal (no defect)" | |
| } | |
| # ============================================================ | |
| # ASYNC WRAPPERS | |
| # ============================================================ | |
| async def _detect_camera_image(station_id: str, camera: Dict, model_path=None): | |
| """Run detect_defect_from_base64 in thread for async parallel.""" | |
| return await asyncio.to_thread( | |
| detect_defect_from_base64, | |
| station_id, | |
| camera["camera_id"], | |
| camera["image_base64"], | |
| model_path | |
| ) | |
| # return await asyncio.to_thread( | |
| # testing, | |
| # station_id, | |
| # camera["camera_id"], | |
| # camera["image_base64"], | |
| # model | |
| # ) | |
| async def run_detection_group( | |
| station_id: str, | |
| cameras: List[Dict], | |
| webhook_url: str, | |
| model_path=None, | |
| parts: Dict = None | |
| ): | |
| parts = parts or {} | |
| logger.info(f"[START] Station {station_id} β {len(cameras)} camera(s)") | |
| results = await asyncio.gather( | |
| *[_detect_camera_image(station_id, cam, model_path) for cam in cameras], | |
| return_exceptions=True | |
| ) | |
| # Bersihkan hasil dengan aman | |
| clean_results = [] | |
| for r in results: | |
| if isinstance(r, Exception): | |
| clean_results.append({ | |
| "status": "error", | |
| "message": str(r) | |
| }) | |
| else: | |
| clean_results.append(r) | |
| # Tentukan status keseluruhan | |
| has_error = any(r.get("status") == "error" for r in clean_results) | |
| all_error = all(r.get("status") == "error" for r in clean_results) | |
| if all_error: | |
| status = "error" | |
| message = "All cameras failed during detection" | |
| elif has_error: | |
| status = "partial_error" | |
| message = "Some cameras failed during detection" | |
| else: | |
| status = "success" | |
| message = "Success detecting defects" | |
| payload = { | |
| "status": status, | |
| "timestamp": datetime.now().isoformat(), | |
| "model_version": MODEL_VERSION, | |
| "message": message, | |
| "parts": parts, | |
| "data": make_serializable(clean_results), | |
| } | |
| # Kirim webhook | |
| try: | |
| async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client: | |
| response = await client.post(webhook_url, json=payload) | |
| response.raise_for_status() | |
| logger.info(f"[DONE] Station {station_id} β webhook sent ({response.status_code})") | |
| except Exception as e: | |
| logger.exception(f"[ERROR] Webhook failed for Station {station_id}: {e}") | |
| return payload | |
| # ============================================================ | |
| # JSON SERIALIZABLE HELPER | |
| # ============================================================ | |
| def make_serializable(obj): | |
| """Convert object to JSON-serializable format.""" | |
| if isinstance(obj, (int, float, str, bool)) or obj is None: | |
| return obj | |
| elif isinstance(obj, (list, tuple)): | |
| return [make_serializable(i) for i in obj] | |
| elif isinstance(obj, dict): | |
| return {k: make_serializable(v) for k, v in obj.items()} | |
| elif isinstance(obj, datetime): | |
| return obj.isoformat() | |
| elif isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| else: | |
| return str(obj) |