import os, cv2, time, base64, asyncio, httpx from datetime import datetime from dotenv import load_dotenv from typing import Dict, List from utils import * load_dotenv() MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0") WEBHOOK_URL = os.getenv("WEBHOOK_URL") MAX_RUNTIME_SEC = float(os.getenv("MAX_RUNTIME_SEC", "20")) FRAME_FAIL_SLEEP = float(os.getenv("FRAME_FAIL_SLEEP", "0.05")) DEFAULT_FPS = float(os.getenv("DEFAULT_FPS", "25")) WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0")) # ============================================================ # DEFECT DETECTION FROM VIDEO URL # ============================================================ def detect_defect_from_video_url(station_id, camera_id: str, video_url: str, model=None): """ Detect defects sequentially from a video URL. - Reads frames in order. - Returns immediately when a defect is found. - Returns OK if timeout or no detection. - Always saves last processed image (OK or NG) to outputs/images/ """ cap = cv2.VideoCapture(video_url) if not cap.isOpened(): logger.error(f"[ERROR] Cannot open video URL: {video_url}") return { "station_id": station_id, "camera_id": camera_id, "status": "error", "status_defect": "", "image_base64": "", "image_path": "", "detections": [], "message": f"Cannot open video URL: {video_url}" } fps = DEFAULT_FPS if fps == 0 or fps != fps: # handle NaN fps = DEFAULT_FPS start_time = time.time() frame_index = 0 last_frame = None while True: elapsed = time.time() - start_time if elapsed > MAX_RUNTIME_SEC: logger.info(f"[OK] {camera_id} → Timeout reached ({MAX_RUNTIME_SEC}s), no defect detected.") break ret, frame = cap.read() if not ret: time.sleep(FRAME_FAIL_SLEEP) continue frame_index += 1 time.sleep(1 / fps) last_frame = frame.copy() # YOLO DETECTION if model: results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False) boxes = results[0].boxes if len(boxes) > 0: for box in boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) xyxy = [int(x) for x in box.xyxy[0].tolist()] defect_name = model.names.get(cls, f"class_{cls}").lower() x1, y1, x2, y2 = xyxy # Ambil warna berdasarkan defect try : color = color_defect(defect_name) except Exception as e: color = color_defect('other') # Draw bounding box cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # Label label = f"{defect_name.upper()} {conf:.2f}" (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Convert frame to Base64 _, buffer = cv2.imencode(".jpg", frame) frame_base64 = base64.b64encode(buffer).decode("utf-8") # Save annotated image # output_dir = "outputs/images" # os.makedirs(output_dir, exist_ok=True) # filename = f"{station_id}_{camera_id}_NG_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" # filepath = os.path.join(output_dir, filename) # cv2.imwrite(filepath, frame) # logger.info(f"[SAVED] NG image saved to {filepath}") cap.release() logger.info(f"[DETECTED] Camera {camera_id} → {defect_name} ({conf:.2f})") return { "station_id": station_id, "camera_id": camera_id, "status": "success", "status_defect": "NG", "image_base64": frame_base64, # "image_path": filepath, "detections": [{ "class": defect_name, "confidence": conf, "bbox": xyxy }], "message": f"Detected as defect" } # --- no defect detected --- cap.release() if last_frame is not None: _, buffer = cv2.imencode(".jpg", last_frame) frame_base64 = base64.b64encode(buffer).decode("utf-8") # Save OK image (no bbox) # output_dir = "outputs/images" # os.makedirs(output_dir, exist_ok=True) # filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" # filepath = os.path.join(output_dir, filename) # cv2.imwrite(filepath, last_frame) # logger.info(f"[SAVED] OK image saved to {filepath}") else: frame_base64 = "" filepath = None return { "station_id": station_id, "camera_id": camera_id, "status": "success", "status_defect": "OK", "image_base64": frame_base64, # "image_path": filepath, "detections": [], "message": f"Detected as normal (no defect)" } # ============================================================ # ASYNC WRAPPERS # ============================================================ async def _detect_camera_video(station_id: str, camera: Dict, stop_flag: Dict, model=None): """Run detection in thread (for async parallel).""" return await asyncio.to_thread(detect_defect_from_video_url, station_id, camera["camera_id"], camera["rtsp_url"], model) async def run_detection_group(station_id: str, cameras: List[Dict], webhook_url: str, model=None, parts=str): """ Run detection for all cameras in parallel. Validate input before detection. Send webhook with NG/OK status. """ stop_flag = {"stop": False} logger.info(f"[START] Station {station_id} → {len(cameras)} camera(s)") results = await asyncio.gather( *[_detect_camera_video(station_id, cam, stop_flag, model) for cam in cameras], return_exceptions=True ) # misalnya results sudah berisi hasil dari tiap kamera has_error = any( isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error") for r in results ) all_error = all( isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error") for r in results ) if all_error: status = "error" message = "All cameras failed during detection" elif has_error: status = "partial_error" message = "Some cameras failed during detection" else: status = "success" message = "Success detecting defects" payload = { "status": status, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), "model_version": MODEL_VERSION, "message": message, "parts": parts, "data": results, } try: async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client: await client.post(webhook_url, json=payload) logger.info(f"[DONE] Station {station_id}") except Exception as e: logger.error(f"[ERROR] Webhook failed: {e}") return "DONE" # return payload