Spaces:
Sleeping
Sleeping
| import os, cv2, time, base64, asyncio, httpx | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import Dict, List | |
| from utils import * | |
| load_dotenv() | |
| MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0") | |
| WEBHOOK_URL = os.getenv("WEBHOOK_URL") | |
| MAX_RUNTIME_SEC = float(os.getenv("MAX_RUNTIME_SEC", "20")) | |
| FRAME_FAIL_SLEEP = float(os.getenv("FRAME_FAIL_SLEEP", "0.05")) | |
| DEFAULT_FPS = float(os.getenv("DEFAULT_FPS", "25")) | |
| WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0")) | |
| # ============================================================ | |
| # DEFECT DETECTION FROM VIDEO URL | |
| # ============================================================ | |
| def detect_defect_from_video_url(station_id, camera_id: str, video_url: str, model=None): | |
| """ | |
| Detect defects sequentially from a video URL. | |
| - Reads frames in order. | |
| - Returns immediately when a defect is found. | |
| - Returns OK if timeout or no detection. | |
| - Always saves last processed image (OK or NG) to outputs/images/ | |
| """ | |
| cap = cv2.VideoCapture(video_url) | |
| if not cap.isOpened(): | |
| logger.error(f"[ERROR] Cannot open video URL: {video_url}") | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "error", | |
| "status_defect": "", | |
| "image_base64": "", | |
| "image_path": "", | |
| "detections": [], | |
| "message": f"Cannot open video URL: {video_url}" | |
| } | |
| fps = DEFAULT_FPS | |
| if fps == 0 or fps != fps: # handle NaN | |
| fps = DEFAULT_FPS | |
| start_time = time.time() | |
| frame_index = 0 | |
| last_frame = None | |
| while True: | |
| elapsed = time.time() - start_time | |
| if elapsed > MAX_RUNTIME_SEC: | |
| logger.info(f"[OK] {camera_id} → Timeout reached ({MAX_RUNTIME_SEC}s), no defect detected.") | |
| break | |
| ret, frame = cap.read() | |
| if not ret: | |
| time.sleep(FRAME_FAIL_SLEEP) | |
| continue | |
| frame_index += 1 | |
| time.sleep(1 / fps) | |
| last_frame = frame.copy() | |
| # YOLO DETECTION | |
| if model: | |
| results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False) | |
| boxes = results[0].boxes | |
| if len(boxes) > 0: | |
| for box in boxes: | |
| cls = int(box.cls[0]) | |
| conf = float(box.conf[0]) | |
| xyxy = [int(x) for x in box.xyxy[0].tolist()] | |
| defect_name = model.names.get(cls, f"class_{cls}").lower() | |
| x1, y1, x2, y2 = xyxy | |
| # Ambil warna berdasarkan defect | |
| try : | |
| color = color_defect(defect_name) | |
| except Exception as e: | |
| color = color_defect('other') | |
| # Draw bounding box | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| # Label | |
| label = f"{defect_name.upper()} {conf:.2f}" | |
| (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1) | |
| cv2.putText(frame, label, (x1, y1 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Convert frame to Base64 | |
| _, buffer = cv2.imencode(".jpg", frame) | |
| frame_base64 = base64.b64encode(buffer).decode("utf-8") | |
| # Save annotated image | |
| # output_dir = "outputs/images" | |
| # os.makedirs(output_dir, exist_ok=True) | |
| # filename = f"{station_id}_{camera_id}_NG_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
| # filepath = os.path.join(output_dir, filename) | |
| # cv2.imwrite(filepath, frame) | |
| # logger.info(f"[SAVED] NG image saved to {filepath}") | |
| cap.release() | |
| logger.info(f"[DETECTED] Camera {camera_id} → {defect_name} ({conf:.2f})") | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "success", | |
| "status_defect": "NG", | |
| "image_base64": frame_base64, | |
| # "image_path": filepath, | |
| "detections": [{ | |
| "class": defect_name, | |
| "confidence": conf, | |
| "bbox": xyxy | |
| }], | |
| "message": f"Detected as defect" | |
| } | |
| # --- no defect detected --- | |
| cap.release() | |
| if last_frame is not None: | |
| _, buffer = cv2.imencode(".jpg", last_frame) | |
| frame_base64 = base64.b64encode(buffer).decode("utf-8") | |
| # Save OK image (no bbox) | |
| # output_dir = "outputs/images" | |
| # os.makedirs(output_dir, exist_ok=True) | |
| # filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" | |
| # filepath = os.path.join(output_dir, filename) | |
| # cv2.imwrite(filepath, last_frame) | |
| # logger.info(f"[SAVED] OK image saved to {filepath}") | |
| else: | |
| frame_base64 = "" | |
| filepath = None | |
| return { | |
| "station_id": station_id, | |
| "camera_id": camera_id, | |
| "status": "success", | |
| "status_defect": "OK", | |
| "image_base64": frame_base64, | |
| # "image_path": filepath, | |
| "detections": [], | |
| "message": f"Detected as normal (no defect)" | |
| } | |
| # ============================================================ | |
| # ASYNC WRAPPERS | |
| # ============================================================ | |
| async def _detect_camera_video(station_id: str, camera: Dict, stop_flag: Dict, model=None): | |
| """Run detection in thread (for async parallel).""" | |
| return await asyncio.to_thread(detect_defect_from_video_url, station_id, camera["camera_id"], camera["rtsp_url"], model) | |
| async def run_detection_group(station_id: str, cameras: List[Dict], webhook_url: str, model=None, parts=str): | |
| """ | |
| Run detection for all cameras in parallel. | |
| Validate input before detection. | |
| Send webhook with NG/OK status. | |
| """ | |
| stop_flag = {"stop": False} | |
| logger.info(f"[START] Station {station_id} → {len(cameras)} camera(s)") | |
| results = await asyncio.gather( | |
| *[_detect_camera_video(station_id, cam, stop_flag, model) for cam in cameras], | |
| return_exceptions=True | |
| ) | |
| # misalnya results sudah berisi hasil dari tiap kamera | |
| has_error = any( | |
| isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error") | |
| for r in results | |
| ) | |
| all_error = all( | |
| isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error") | |
| for r in results | |
| ) | |
| if all_error: | |
| status = "error" | |
| message = "All cameras failed during detection" | |
| elif has_error: | |
| status = "partial_error" | |
| message = "Some cameras failed during detection" | |
| else: | |
| status = "success" | |
| message = "Success detecting defects" | |
| payload = { | |
| "status": status, | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()), | |
| "model_version": MODEL_VERSION, | |
| "message": message, | |
| "parts": parts, | |
| "data": results, | |
| } | |
| try: | |
| async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client: | |
| await client.post(webhook_url, json=payload) | |
| logger.info(f"[DONE] Station {station_id}") | |
| except Exception as e: | |
| logger.error(f"[ERROR] Webhook failed: {e}") | |
| return "DONE" | |
| # return payload |