Defect-Detection / core /defect_detection.py
Navy
feat: adding code response
d5327ae
import os, cv2, time, base64, asyncio, httpx
from datetime import datetime
from dotenv import load_dotenv
from typing import Dict, List
from utils import *
load_dotenv()
MODEL_VERSION = os.getenv("MODEL_VERSION","v1.0.0")
WEBHOOK_URL = os.getenv("WEBHOOK_URL")
MAX_RUNTIME_SEC = float(os.getenv("MAX_RUNTIME_SEC", "20"))
FRAME_FAIL_SLEEP = float(os.getenv("FRAME_FAIL_SLEEP", "0.05"))
DEFAULT_FPS = float(os.getenv("DEFAULT_FPS", "25"))
WEBHOOK_TIMEOUT = float(os.getenv("WEBHOOK_TIMEOUT", "10.0"))
# ============================================================
# DEFECT DETECTION FROM VIDEO URL
# ============================================================
def detect_defect_from_video_url(station_id, camera_id: str, video_url: str, model=None):
"""
Detect defects sequentially from a video URL.
- Reads frames in order.
- Returns immediately when a defect is found.
- Returns OK if timeout or no detection.
- Always saves last processed image (OK or NG) to outputs/images/
"""
cap = cv2.VideoCapture(video_url)
if not cap.isOpened():
logger.error(f"[ERROR] Cannot open video URL: {video_url}")
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "error",
"status_defect": "",
"image_base64": "",
"image_path": "",
"detections": [],
"message": f"Cannot open video URL: {video_url}"
}
fps = DEFAULT_FPS
if fps == 0 or fps != fps: # handle NaN
fps = DEFAULT_FPS
start_time = time.time()
frame_index = 0
last_frame = None
while True:
elapsed = time.time() - start_time
if elapsed > MAX_RUNTIME_SEC:
logger.info(f"[OK] {camera_id} → Timeout reached ({MAX_RUNTIME_SEC}s), no defect detected.")
break
ret, frame = cap.read()
if not ret:
time.sleep(FRAME_FAIL_SLEEP)
continue
frame_index += 1
time.sleep(1 / fps)
last_frame = frame.copy()
# YOLO DETECTION
if model:
results = model.predict(source=frame, conf=0.4, imgsz=640, verbose=False)
boxes = results[0].boxes
if len(boxes) > 0:
for box in boxes:
cls = int(box.cls[0])
conf = float(box.conf[0])
xyxy = [int(x) for x in box.xyxy[0].tolist()]
defect_name = model.names.get(cls, f"class_{cls}").lower()
x1, y1, x2, y2 = xyxy
# Ambil warna berdasarkan defect
try :
color = color_defect(defect_name)
except Exception as e:
color = color_defect('other')
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Label
label = f"{defect_name.upper()} {conf:.2f}"
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(frame, (x1, y1 - 20), (x1 + w, y1), color, -1)
cv2.putText(frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Convert frame to Base64
_, buffer = cv2.imencode(".jpg", frame)
frame_base64 = base64.b64encode(buffer).decode("utf-8")
# Save annotated image
# output_dir = "outputs/images"
# os.makedirs(output_dir, exist_ok=True)
# filename = f"{station_id}_{camera_id}_NG_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
# filepath = os.path.join(output_dir, filename)
# cv2.imwrite(filepath, frame)
# logger.info(f"[SAVED] NG image saved to {filepath}")
cap.release()
logger.info(f"[DETECTED] Camera {camera_id}{defect_name} ({conf:.2f})")
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "success",
"status_defect": "NG",
"image_base64": frame_base64,
# "image_path": filepath,
"detections": [{
"class": defect_name,
"confidence": conf,
"bbox": xyxy
}],
"message": f"Detected as defect"
}
# --- no defect detected ---
cap.release()
if last_frame is not None:
_, buffer = cv2.imencode(".jpg", last_frame)
frame_base64 = base64.b64encode(buffer).decode("utf-8")
# Save OK image (no bbox)
# output_dir = "outputs/images"
# os.makedirs(output_dir, exist_ok=True)
# filename = f"{station_id}_{camera_id}_OK_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
# filepath = os.path.join(output_dir, filename)
# cv2.imwrite(filepath, last_frame)
# logger.info(f"[SAVED] OK image saved to {filepath}")
else:
frame_base64 = ""
filepath = None
return {
"station_id": station_id,
"camera_id": camera_id,
"status": "success",
"status_defect": "OK",
"image_base64": frame_base64,
# "image_path": filepath,
"detections": [],
"message": f"Detected as normal (no defect)"
}
# ============================================================
# ASYNC WRAPPERS
# ============================================================
async def _detect_camera_video(station_id: str, camera: Dict, stop_flag: Dict, model=None):
"""Run detection in thread (for async parallel)."""
return await asyncio.to_thread(detect_defect_from_video_url, station_id, camera["camera_id"], camera["rtsp_url"], model)
async def run_detection_group(station_id: str, cameras: List[Dict], webhook_url: str, model=None, parts=str):
"""
Run detection for all cameras in parallel.
Validate input before detection.
Send webhook with NG/OK status.
"""
stop_flag = {"stop": False}
logger.info(f"[START] Station {station_id}{len(cameras)} camera(s)")
results = await asyncio.gather(
*[_detect_camera_video(station_id, cam, stop_flag, model) for cam in cameras],
return_exceptions=True
)
# misalnya results sudah berisi hasil dari tiap kamera
has_error = any(
isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error")
for r in results
)
all_error = all(
isinstance(r, Exception) or (isinstance(r, dict) and r.get("status") == "error")
for r in results
)
if all_error:
status = "error"
message = "All cameras failed during detection"
elif has_error:
status = "partial_error"
message = "Some cameras failed during detection"
else:
status = "success"
message = "Success detecting defects"
payload = {
"status": status,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime()),
"model_version": MODEL_VERSION,
"message": message,
"parts": parts,
"data": results,
}
try:
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client:
await client.post(webhook_url, json=payload)
logger.info(f"[DONE] Station {station_id}")
except Exception as e:
logger.error(f"[ERROR] Webhook failed: {e}")
return "DONE"
# return payload