Spaces:
Running
Running
| """Speech bubble detection — find bubble boundaries for accurate text placement. | |
| Uses flood fill on the inpainted image (text already erased) to find | |
| the enclosing speech bubble for each text region. The bubble interior | |
| rectangle is used as the rendering target instead of the tight textline | |
| bounding box, giving translated text much more room to breathe. | |
| """ | |
| import cv2 | |
| import numpy as np | |
| from typing import List, Optional | |
| from ..utils import TextBlock, get_logger | |
| logger = get_logger("bubble") | |
| def detect_bubbles( | |
| inpainted_img: np.ndarray, | |
| text_regions: List[TextBlock], | |
| min_bubble_area: int = 800, | |
| max_bubble_ratio: float = 0.3, | |
| padding: int = 12, | |
| ) -> List[Optional[np.ndarray]]: | |
| """ | |
| For each text region, detect the enclosing speech bubble. | |
| Returns a list (one per region) of dst_points ``(1, 4, 2)`` int64 arrays | |
| representing the bubble interior rectangle, or ``None`` when no clear | |
| bubble is found (falls back to textline bounding box in the caller). | |
| """ | |
| if inpainted_img.ndim == 3: | |
| gray = cv2.cvtColor(inpainted_img, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = inpainted_img.copy() | |
| # Slight blur reduces pixel-level noise that can stop flood fill prematurely | |
| gray = cv2.GaussianBlur(gray, (3, 3), 0) | |
| h, w = gray.shape | |
| img_area = h * w | |
| results: List[Optional[np.ndarray]] = [] | |
| for idx, region in enumerate(text_regions): | |
| cx = max(0, min(int(region.center[0]), w - 1)) | |
| cy = max(0, min(int(region.center[1]), h - 1)) | |
| detected = _detect_single_bubble( | |
| gray, cx, cy, h, w, img_area, region, | |
| min_bubble_area, max_bubble_ratio, padding, | |
| ) | |
| rect = None | |
| conf = 0.0 | |
| if detected is not None: | |
| rect, conf = detected | |
| region._bubble_confidence = conf | |
| if rect is not None: | |
| bw = int(rect[0, 1, 0] - rect[0, 0, 0]) | |
| bh = int(rect[0, 2, 1] - rect[0, 0, 1]) | |
| logger.debug( | |
| "Region %d: BUBBLE %dx%d at (%d,%d), conf=%.2f", | |
| idx, | |
| bw, | |
| bh, | |
| int(rect[0, 0, 0]), | |
| int(rect[0, 0, 1]), | |
| conf, | |
| ) | |
| else: | |
| logger.debug("Region %d: no bubble detected", idx) | |
| results.append(rect) | |
| # If multiple regions share the same bubble, split the space. | |
| _resolve_overlaps(text_regions, results) | |
| return results | |
| # ── internals ──────────────────────────────────────────────────────── | |
| def _detect_single_bubble( | |
| gray, cx, cy, h, w, img_area, region, | |
| min_area, max_ratio, padding, | |
| ) -> Optional[tuple[np.ndarray, float]]: | |
| """Flood-fill from the text center to find the enclosing bubble.""" | |
| tx, ty, tw, th = cv2.boundingRect(region.min_rect[0].astype(np.int32)) | |
| text_area = max(1, tw * th) | |
| text_len = len(getattr(region, "translation", "") or region.text or "") | |
| min_area_dyn = max(min_area, int(text_area * 0.9), 300) | |
| max_area_dyn = int(img_area * max_ratio) | |
| best_rect: Optional[np.ndarray] = None | |
| best_score = -1.0 | |
| for sx, sy in _candidate_seed_points(cx, cy, tw, th, w, h): | |
| # ── 1. Flood fill from candidate seed ─────────────────────── | |
| bubble_mask, flood_area = _flood_fill(gray, sx, sy) | |
| if flood_area < min_area_dyn: | |
| continue | |
| if flood_area > max_area_dyn: | |
| continue | |
| if flood_area < text_area * 1.05: | |
| continue | |
| contours, _ = cv2.findContours(bubble_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| compactness = 0.0 | |
| if contours: | |
| cnt = max(contours, key=cv2.contourArea) | |
| hull_area = cv2.contourArea(cv2.convexHull(cnt)) | |
| if hull_area > 0: | |
| compactness = cv2.contourArea(cnt) / hull_area | |
| # Relax compactness threshold for dense text to avoid over-rejection. | |
| density = text_len / max(np.sqrt(max(flood_area, 1)), 1.0) | |
| compact_thresh = max(0.28, min(0.42, 0.40 - 0.10 * density)) | |
| if compactness > 0 and compactness < compact_thresh: | |
| continue | |
| # ── 2. Erode to safe interior while keeping selected seed inside ──── | |
| adaptive_padding = _adaptive_padding(flood_area, region, tw, th, padding) | |
| eroded = _erode_mask(bubble_mask, sx, sy, adaptive_padding) | |
| if eroded is None: | |
| continue | |
| num_labels, labels = cv2.connectedComponents(eroded) | |
| label_at_seed = labels[sy, sx] | |
| if label_at_seed == 0: | |
| continue | |
| component = (labels == label_at_seed).astype(np.uint8) * 255 | |
| bx, by, bw, bh = cv2.boundingRect(component) | |
| min_dim = max(18, int(min(tw, th) * 0.55)) | |
| if bw < min_dim or bh < min_dim: | |
| continue | |
| if region.horizontal and bw < tw * 0.7: | |
| continue | |
| score = _bubble_confidence_score(flood_area, text_area, compactness, bw, bh, tw, th) | |
| if score > best_score: | |
| best_score = score | |
| best_rect = _rect_to_dst(bx, by, bw, bh) | |
| if best_rect is None: | |
| return None | |
| return best_rect, float(max(0.0, min(1.0, best_score))) | |
| def _candidate_seed_points(cx: int, cy: int, tw: int, th: int, w: int, h: int) -> list[tuple[int, int]]: | |
| delta = max(2, int(round(min(tw, th) * 0.10))) | |
| pts = [ | |
| (cx, cy), | |
| (cx + delta, cy), | |
| (cx - delta, cy), | |
| (cx, cy + delta), | |
| (cx, cy - delta), | |
| ] | |
| uniq = set() | |
| out = [] | |
| for x, y in pts: | |
| sx = max(0, min(int(x), w - 1)) | |
| sy = max(0, min(int(y), h - 1)) | |
| key = (sx, sy) | |
| if key not in uniq: | |
| uniq.add(key) | |
| out.append(key) | |
| return out | |
| def _flood_fill(gray: np.ndarray, sx: int, sy: int) -> tuple[np.ndarray, int]: | |
| flood_mask = np.zeros((gray.shape[0] + 2, gray.shape[1] + 2), np.uint8) | |
| gray_copy = gray.copy() | |
| cv2.floodFill( | |
| gray_copy, | |
| flood_mask, | |
| (int(sx), int(sy)), | |
| newVal=0, | |
| loDiff=(35,), | |
| upDiff=(35,), | |
| flags=cv2.FLOODFILL_MASK_ONLY | (255 << 8), | |
| ) | |
| bubble_mask = flood_mask[1:-1, 1:-1] | |
| flood_area = int(np.sum(bubble_mask > 0)) | |
| return bubble_mask, flood_area | |
| def _bubble_confidence_score( | |
| flood_area: int, | |
| text_area: int, | |
| compactness: float, | |
| bw: int, | |
| bh: int, | |
| tw: int, | |
| th: int, | |
| ) -> float: | |
| area_ratio = flood_area / max(text_area, 1) | |
| area_term = max(0.0, min(1.0, (area_ratio - 1.0) / 4.0)) | |
| compact_term = max(0.0, min(1.0, compactness)) | |
| width_fit = max(0.0, min(1.0, bw / max(tw, 1))) | |
| height_fit = max(0.0, min(1.0, bh / max(th, 1))) | |
| fit_term = 0.5 * width_fit + 0.5 * height_fit | |
| return 0.45 * area_term + 0.30 * compact_term + 0.25 * fit_term | |
| def _erode_mask(mask, cx, cy, padding): | |
| """Erode *mask*, reducing padding until *center* is still inside.""" | |
| h, w = mask.shape | |
| if cy < 0 or cy >= h or cx < 0 or cx >= w: | |
| return None | |
| for p in range(padding, 1, -2): | |
| kernel = np.ones((p * 2 + 1, p * 2 + 1), np.uint8) | |
| eroded = cv2.erode(mask, kernel) | |
| if eroded[cy, cx] > 0: | |
| return eroded | |
| # Minimal / no erosion | |
| if mask[cy, cx] > 0: | |
| return mask | |
| return None | |
| def _adaptive_padding( | |
| flood_area: int, | |
| region: TextBlock, | |
| text_w: int, | |
| text_h: int, | |
| base_padding: int, | |
| ) -> int: | |
| """Compute erosion padding from bubble size and text density. | |
| Uses a smooth formula rather than fixed area classes: | |
| - Larger bubbles get more padding. | |
| - Longer/denser text gets less padding to avoid cramped rendering. | |
| - Preserves caller-provided ``base_padding`` as a soft prior. | |
| """ | |
| bubble_dim = max(1.0, float(np.sqrt(max(flood_area, 1)))) | |
| text_len = len(getattr(region, "translation", "") or region.text or "") | |
| # Size-driven padding component (smooth growth with bubble dimension) | |
| size_padding = 0.06 * bubble_dim | |
| # Text density proxy: higher density => less interior erosion | |
| text_density = text_len / max(bubble_dim, 1.0) | |
| density_factor = max(0.72, min(1.12, 1.06 - 0.16 * text_density)) | |
| # Blend caller default with adaptive value for backward compatibility | |
| blended = (0.45 * float(base_padding) + 0.55 * size_padding) * density_factor | |
| # Prevent erosion from consuming tiny bubbles | |
| upper_bound = max(3, int(min(text_w, text_h) * 0.22)) | |
| return int(max(3, min(24, min(upper_bound, round(blended))))) | |
| def _resolve_overlaps(text_regions, bubble_rects): | |
| """Split shared bubbles among multiple text regions.""" | |
| n = len(bubble_rects) | |
| for i in range(n): | |
| if bubble_rects[i] is None: | |
| continue | |
| for j in range(i + 1, n): | |
| if bubble_rects[j] is None: | |
| continue | |
| r1 = cv2.boundingRect(bubble_rects[i][0].astype(np.int32)) | |
| r2 = cv2.boundingRect(bubble_rects[j][0].astype(np.int32)) | |
| if _rect_iou(r1, r2) < 0.5: | |
| continue | |
| _split_shared_bubble(text_regions, bubble_rects, i, j) | |
| def _rect_iou(r1, r2): | |
| x1 = max(r1[0], r2[0]) | |
| y1 = max(r1[1], r2[1]) | |
| x2 = min(r1[0] + r1[2], r2[0] + r2[2]) | |
| y2 = min(r1[1] + r1[3], r2[1] + r2[3]) | |
| inter = max(0, x2 - x1) * max(0, y2 - y1) | |
| union = r1[2] * r1[3] + r2[2] * r2[3] - inter | |
| return inter / union if union > 0 else 0.0 | |
| def _split_shared_bubble(text_regions, bubble_rects, i, j): | |
| """Split shared bubble along dominant region-center axis.""" | |
| ri = bubble_rects[i][0] | |
| bx = int(ri[0, 0]) | |
| by = int(ri[0, 1]) | |
| bw = int(ri[1, 0] - bx) | |
| bh = int(ri[2, 1] - by) | |
| cy_i = float(text_regions[i].center[1]) | |
| cy_j = float(text_regions[j].center[1]) | |
| cx_i = float(text_regions[i].center[0]) | |
| cx_j = float(text_regions[j].center[0]) | |
| if abs(cx_i - cx_j) > abs(cy_i - cy_j): | |
| _split_horizontally(text_regions, bubble_rects, i, j, bx, by, bw, bh, cx_i, cx_j) | |
| else: | |
| _split_vertically(text_regions, bubble_rects, i, j, bx, by, bw, bh, cy_i, cy_j) | |
| def _split_vertically(text_regions, bubble_rects, i, j, bx, by, bw, bh, cy_i, cy_j): | |
| """Split a shared bubble between two regions by Y axis.""" | |
| split_y = int((cy_i + cy_j) / 2) | |
| split_y = max(by + 10, min(split_y, by + bh - 10)) | |
| if cy_i <= cy_j: | |
| h_top = split_y - by | |
| h_bot = by + bh - split_y | |
| bubble_rects[i] = _rect_to_dst(bx, by, bw, h_top) | |
| bubble_rects[j] = _rect_to_dst(bx, split_y, bw, h_bot) | |
| else: | |
| h_top = split_y - by | |
| h_bot = by + bh - split_y | |
| bubble_rects[j] = _rect_to_dst(bx, by, bw, h_top) | |
| bubble_rects[i] = _rect_to_dst(bx, split_y, bw, h_bot) | |
| def _split_horizontally(text_regions, bubble_rects, i, j, bx, by, bw, bh, cx_i, cx_j): | |
| """Split a shared bubble between two regions by X axis.""" | |
| split_x = int((cx_i + cx_j) / 2) | |
| split_x = max(bx + 10, min(split_x, bx + bw - 10)) | |
| if cx_i <= cx_j: | |
| w_left = split_x - bx | |
| w_right = bx + bw - split_x | |
| bubble_rects[i] = _rect_to_dst(bx, by, w_left, bh) | |
| bubble_rects[j] = _rect_to_dst(split_x, by, w_right, bh) | |
| else: | |
| w_left = split_x - bx | |
| w_right = bx + bw - split_x | |
| bubble_rects[j] = _rect_to_dst(bx, by, w_left, bh) | |
| bubble_rects[i] = _rect_to_dst(split_x, by, w_right, bh) | |
| def _rect_to_dst(x, y, w, h): | |
| """Pack (x, y, w, h) into a ``(1, 4, 2)`` int64 dst_points array.""" | |
| return np.array( | |
| [[[x, y], [x + w, y], [x + w, y + h], [x, y + h]]], | |
| dtype=np.int64, | |
| ) | |