| from ultralytics import YOLO
|
| import numpy as np
|
| import cv2
|
|
|
|
|
| _yolo_model_cache = {}
|
|
|
|
|
| MAX_ASPECT_RATIO = 3.0
|
| MIN_CHUNK_HEIGHT = 800
|
| MAX_CHUNK_HEIGHT = 1500
|
| GUTTER_MIN_HEIGHT = 10
|
| OVERLAP_SIZE = 200
|
| WHITE_THRESHOLD = 245
|
| BLACK_THRESHOLD = 15
|
| IOU_THRESHOLD = 0.5
|
|
|
|
|
| BLACK_BUBBLE_THRESHOLD = 50
|
| BLACK_BUBBLE_MIN_AREA = 1000
|
| BLACK_BUBBLE_MAX_AREA_RATIO = 0.4
|
| BLACK_BUBBLE_MIN_ASPECT = 0.2
|
| BLACK_BUBBLE_MAX_ASPECT = 5.0
|
|
|
|
|
| def detect_black_bubbles(image, min_area=None, max_area_ratio=None):
|
| """
|
| Detect black speech bubbles using OpenCV contour detection.
|
| Used as fallback when YOLO doesn't detect dark bubbles.
|
|
|
| Args:
|
| image: Input image (numpy array, BGR)
|
| min_area: Minimum bubble area in pixels (default: BLACK_BUBBLE_MIN_AREA)
|
| max_area_ratio: Maximum bubble area as ratio of image (default: BLACK_BUBBLE_MAX_AREA_RATIO)
|
|
|
| Returns:
|
| list: Detections in format [x1, y1, x2, y2, confidence, class_id]
|
| """
|
| if min_area is None:
|
| min_area = BLACK_BUBBLE_MIN_AREA
|
| if max_area_ratio is None:
|
| max_area_ratio = BLACK_BUBBLE_MAX_AREA_RATIO
|
|
|
| height, width = image.shape[:2]
|
| max_area = int(width * height * max_area_ratio)
|
|
|
|
|
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
| _, thresh = cv2.threshold(gray, BLACK_BUBBLE_THRESHOLD, 255, cv2.THRESH_BINARY_INV)
|
|
|
|
|
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
|
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
|
|
|
|
|
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
| detections = []
|
|
|
| for contour in contours:
|
| area = cv2.contourArea(contour)
|
|
|
|
|
| if area < min_area or area > max_area:
|
| continue
|
|
|
|
|
| x, y, w, h = cv2.boundingRect(contour)
|
|
|
|
|
| aspect_ratio = w / h if h > 0 else 0
|
| if aspect_ratio < BLACK_BUBBLE_MIN_ASPECT or aspect_ratio > BLACK_BUBBLE_MAX_ASPECT:
|
| continue
|
|
|
|
|
| rect_area = w * h
|
| fill_ratio = area / rect_area if rect_area > 0 else 0
|
| if fill_ratio < 0.3:
|
| continue
|
|
|
|
|
| roi = gray[y:y+h, x:x+w]
|
| mean_intensity = np.mean(roi)
|
| if mean_intensity > BLACK_BUBBLE_THRESHOLD + 30:
|
| continue
|
|
|
|
|
| confidence = min(0.8, fill_ratio * (1 - mean_intensity / 255))
|
|
|
| x1, y1, x2, y2 = x, y, x + w, y + h
|
| detections.append([x1, y1, x2, y2, confidence, 0])
|
|
|
| return detections
|
|
|
|
|
| def find_safe_cut_points(image, target_height=MAX_CHUNK_HEIGHT):
|
| """
|
| Find safe places to cut the image (white/black gutters between panels).
|
|
|
| Args:
|
| image: Input image (numpy array, BGR)
|
| target_height: Approximate target height for each chunk
|
|
|
| Returns:
|
| list: List of y-coordinates where it's safe to cut
|
| """
|
| height, width = image.shape[:2]
|
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
| row_means = np.mean(gray, axis=1)
|
|
|
|
|
| is_gutter = (row_means > WHITE_THRESHOLD) | (row_means < BLACK_THRESHOLD)
|
|
|
|
|
| gutter_regions = []
|
| start = None
|
|
|
| for i, is_gut in enumerate(is_gutter):
|
| if is_gut and start is None:
|
| start = i
|
| elif not is_gut and start is not None:
|
| if i - start >= GUTTER_MIN_HEIGHT:
|
| gutter_regions.append((start, i, (start + i) // 2))
|
| start = None
|
|
|
|
|
| if start is not None and height - start >= GUTTER_MIN_HEIGHT:
|
| gutter_regions.append((start, height, (start + height) // 2))
|
|
|
| if not gutter_regions:
|
| return []
|
|
|
|
|
| cut_points = []
|
| last_cut = 0
|
|
|
| for start, end, center in gutter_regions:
|
|
|
| if center - last_cut >= MIN_CHUNK_HEIGHT:
|
|
|
| if center - last_cut >= target_height * 0.7:
|
| cut_points.append(center)
|
| last_cut = center
|
|
|
| return cut_points
|
|
|
|
|
| def calculate_iou(box1, box2):
|
| """Calculate Intersection over Union of two boxes."""
|
| x1_1, y1_1, x2_1, y2_1 = box1[:4]
|
| x1_2, y1_2, x2_2, y2_2 = box2[:4]
|
|
|
|
|
| x1_i = max(x1_1, x1_2)
|
| y1_i = max(y1_1, y1_2)
|
| x2_i = min(x2_1, x2_2)
|
| y2_i = min(y2_1, y2_2)
|
|
|
| if x2_i <= x1_i or y2_i <= y1_i:
|
| return 0.0
|
|
|
| intersection = (x2_i - x1_i) * (y2_i - y1_i)
|
|
|
|
|
| area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
|
| area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
|
| union = area1 + area2 - intersection
|
|
|
| return intersection / union if union > 0 else 0.0
|
|
|
|
|
| def remove_duplicate_detections(detections, iou_threshold=IOU_THRESHOLD):
|
| """Remove duplicate detections based on IoU, keeping higher confidence ones."""
|
| if len(detections) <= 1:
|
| return detections
|
|
|
|
|
| sorted_dets = sorted(detections, key=lambda x: x[4], reverse=True)
|
|
|
| keep = []
|
| while sorted_dets:
|
| best = sorted_dets.pop(0)
|
| keep.append(best)
|
|
|
|
|
| sorted_dets = [
|
| det for det in sorted_dets
|
| if calculate_iou(best, det) < iou_threshold
|
| ]
|
|
|
| return keep
|
|
|
|
|
| def detect_bubbles_on_chunks(model, image, cut_points):
|
| """
|
| Detect bubbles on image chunks and merge results.
|
|
|
| Args:
|
| model: Loaded YOLO model
|
| image: Full image (numpy array)
|
| cut_points: List of y-coordinates to cut at
|
|
|
| Returns:
|
| list: Merged bubble detections with adjusted coordinates
|
| """
|
| height = image.shape[0]
|
| all_detections = []
|
|
|
|
|
| boundaries = [0] + cut_points + [height]
|
|
|
| print(f"Processing image in {len(boundaries) - 1} chunks...")
|
|
|
| for i in range(len(boundaries) - 1):
|
| y_start = boundaries[i]
|
| y_end = boundaries[i + 1]
|
|
|
| chunk = image[y_start:y_end]
|
|
|
|
|
| if chunk.shape[0] < 50:
|
| continue
|
|
|
|
|
| results = model(chunk, verbose=False)[0]
|
| chunk_detections = results.boxes.data.tolist()
|
|
|
|
|
| for det in chunk_detections:
|
| det[1] += y_start
|
| det[3] += y_start
|
| all_detections.append(det)
|
|
|
| print(f" Chunk {i+1}: y={y_start}-{y_end}, found {len(chunk_detections)} bubbles")
|
|
|
|
|
| merged = remove_duplicate_detections(all_detections)
|
| print(f"Total: {len(all_detections)} detections → {len(merged)} after merge")
|
|
|
| return merged
|
|
|
|
|
| def detect_bubbles_with_fallback(model, image):
|
| """
|
| Detect bubbles using overlap-based slicing when no gutters found.
|
|
|
| Args:
|
| model: Loaded YOLO model
|
| image: Full image (numpy array)
|
|
|
| Returns:
|
| list: Merged bubble detections
|
| """
|
| height = image.shape[0]
|
| all_detections = []
|
|
|
|
|
| chunk_height = MAX_CHUNK_HEIGHT
|
| overlap = OVERLAP_SIZE
|
|
|
| y = 0
|
| chunk_num = 0
|
|
|
| print(f"No gutters found. Using overlap-based slicing...")
|
|
|
| while y < height:
|
| y_end = min(y + chunk_height, height)
|
| chunk = image[y:y_end]
|
|
|
| if chunk.shape[0] < 50:
|
| break
|
|
|
|
|
| results = model(chunk, verbose=False)[0]
|
| chunk_detections = results.boxes.data.tolist()
|
|
|
|
|
| for det in chunk_detections:
|
| det[1] += y
|
| det[3] += y
|
| all_detections.append(det)
|
|
|
| chunk_num += 1
|
| print(f" Chunk {chunk_num}: y={y}-{y_end}, found {len(chunk_detections)} bubbles")
|
|
|
|
|
| y = y_end - overlap
|
| if y_end >= height:
|
| break
|
|
|
|
|
| merged = remove_duplicate_detections(all_detections)
|
| print(f"Total: {len(all_detections)} detections → {len(merged)} after merge")
|
|
|
| return merged
|
|
|
|
|
| def detect_bubbles(model_path, image_input, enable_black_bubble=True):
|
| """
|
| Detects bubbles in an image using a YOLOv8 model.
|
| Also detects black speech bubbles using OpenCV fallback (optional).
|
| Automatically handles long vertical images (webtoons) by slicing.
|
|
|
| Args:
|
| model_path (str): The file path to the YOLO model.
|
| image_input: File path to image OR numpy array (BGR).
|
| enable_black_bubble (bool): Whether to detect black bubbles using OpenCV.
|
|
|
| Returns:
|
| list: A list containing the coordinates, score and class_id of
|
| the detected bubbles. Each detection also includes is_dark_bubble flag.
|
| """
|
| global _yolo_model_cache
|
|
|
|
|
| if model_path not in _yolo_model_cache:
|
| print(f"Loading YOLO model from {model_path}...")
|
| _yolo_model_cache[model_path] = YOLO(model_path)
|
| print("YOLO model loaded and cached!")
|
|
|
| model = _yolo_model_cache[model_path]
|
|
|
|
|
| if isinstance(image_input, str):
|
| image = cv2.imread(image_input)
|
| else:
|
| image = image_input
|
|
|
| if image is None:
|
| return []
|
|
|
| height, width = image.shape[:2]
|
| aspect_ratio = height / width
|
|
|
|
|
| if aspect_ratio > MAX_ASPECT_RATIO:
|
| print(f"Long image detected: {width}x{height} (ratio: {aspect_ratio:.1f})")
|
|
|
|
|
| cut_points = find_safe_cut_points(image)
|
|
|
| if cut_points:
|
| print(f"Found {len(cut_points)} safe cut points (gutters)")
|
| yolo_detections = detect_bubbles_on_chunks(model, image, cut_points)
|
| else:
|
|
|
| yolo_detections = detect_bubbles_with_fallback(model, image)
|
| else:
|
|
|
| bubbles = model(image, verbose=False)[0]
|
| yolo_detections = bubbles.boxes.data.tolist()
|
|
|
|
|
| if enable_black_bubble:
|
| black_bubble_detections = detect_black_bubbles(image)
|
| else:
|
| black_bubble_detections = []
|
|
|
| if black_bubble_detections:
|
| print(f"OpenCV found {len(black_bubble_detections)} potential black bubbles")
|
|
|
|
|
| for det in black_bubble_detections:
|
| det.append(1)
|
|
|
|
|
| for det in yolo_detections:
|
| if len(det) == 6:
|
| det.append(0)
|
|
|
|
|
| all_detections = yolo_detections + black_bubble_detections
|
| merged = remove_duplicate_detections(all_detections)
|
|
|
| print(f"Total: {len(yolo_detections)} YOLO + {len(black_bubble_detections)} black = {len(merged)} after merge")
|
| return merged
|
| else:
|
|
|
| for det in yolo_detections:
|
| if len(det) == 6:
|
| det.append(0)
|
| return yolo_detections
|
|
|
|
|
| def clear_model_cache():
|
| """Clear the YOLO model cache to free memory."""
|
| global _yolo_model_cache
|
| _yolo_model_cache.clear()
|
|
|
|
|