""" Bubble-aware manga translation pipeline with polygon correction + debugging. """ import numpy as np from PIL import Image, ImageDraw from utils.image_utils import load_and_split_image, encode_image_to_html from utils.ocr_utils import extract_and_translate_chunk, extract_and_translate_with_masks from utils.polygon_utils import ( draw_translated_text_convex, shrink_or_expand_polygon, FONT_PATH, match_polygon_to_bubble_by_overlap, render_translated_chunk, ) from utils.bubble_detect import detect_speech_bubbles_robust from utils.u2net_detector import detect_bubbles_u2net from utils.bubble_detect_rtdetr import detect_and_refine_bubbles, polygon_to_mask def normalize_bubble_regions(bubble_boxes): """ Accepts: - list of rectangles - list of polygons - mixture of both Returns a list of valid polygons. """ def rect_to_poly(box): # Accept tuple OR list length = 4 if ( isinstance(box, (list, tuple)) and len(box) == 4 and all(isinstance(v, (int, float)) for v in box) ): x1, y1, x2, y2 = map(int, box) return [(x1, y1), (x2, y1), (x2, y2), (x1, y2)] return None # not a rectangle polygons = [] for region in bubble_boxes: # Case 1 — region is a rectangle poly = rect_to_poly(region) if poly is not None: polygons.append(poly) continue # Case 2 — polygon if ( isinstance(region, (list, tuple)) and len(region) >= 3 and all(len(pt) == 2 for pt in region) ): polygons.append([(int(x), int(y)) for x, y in region]) continue print(f"⚠️ Skipping invalid bubble box: {region}") return polygons def match_translations_to_bubbles(translations, bubble_polygons, min_overlap=0.10): """ Add matched_bubble_idx to each OCR translation. OCR polygon is NOT corrected. """ for t in translations: ocr_poly = t.get("polygon") if not ocr_poly: continue idx = match_polygon_to_bubble_by_overlap(ocr_poly, bubble_polygons, min_overlap) t["matched_bubble_idx"] = idx return translations # ======================== Debug Visualization ============================ def visualize_all_debug( img, translations, bubble_polygons, bubble_boxes=None, step_name="debug", prefix="debug" ): """ Robust debug visualization. Handles malformed polygons, empty lists, None values, and degenerate shapes. """ import numpy as np from PIL import ImageDraw debug = img.copy() draw = ImageDraw.Draw(debug, "RGBA") # ========================== # Helper: validate polygon # ========================== def valid_poly(poly): if not poly or len(poly) < 3: return False # Filter invalid coords cleaned = [(int(x), int(y)) for x, y in poly if isinstance(x, (int, float))] # Must have >= 3 *distinct* points return len(set(cleaned)) >= 3 # ========================== # Draw bounding boxes (ORANGE) # ========================== if bubble_boxes: for box in bubble_boxes: try: x1, y1, x2, y2 = map(int, box) draw.rectangle((x1, y1, x2, y2), outline=(255,165,0,180), width=3) except Exception: continue # ========================== # Draw bubble polygons (BLUE) # ========================== for bp in bubble_polygons: if not valid_poly(bp): continue try: draw.polygon(bp, outline=(30,144,255,200), width=4) xs = [p[0] for p in bp] ys = [p[1] for p in bp] cx, cy = int(np.mean(xs)), int(np.mean(ys)) draw.ellipse((cx-4, cy-4, cx+4, cy+4), fill=(0,255,255,220)) except Exception: continue # ========================== # Draw OCR polygons # ========================== for t in translations: orig = t.get("original_polygon") corr = t.get("polygon") # Draw original polygon (RED) if valid_poly(orig): try: draw.polygon(orig, outline=(255,50,50,180), width=3) except Exception: pass # Draw corrected polygon (GREEN) if valid_poly(corr): try: draw.polygon(corr, outline=(50,255,50,220), width=3) except Exception: pass # Center point (YELLOW) poly_for_center = None if valid_poly(corr): poly_for_center = corr elif valid_poly(orig): poly_for_center = orig if poly_for_center: try: xs = [p[0] for p in poly_for_center] ys = [p[1] for p in poly_for_center] cx, cy = int(np.mean(xs)), int(np.mean(ys)) draw.ellipse((cx-3, cy-3, cx+3, cy+3), fill=(255,255,0,220)) except Exception: pass # ========================== # Save output # ========================== out_path = f"{prefix}_{step_name}.png" debug.save(out_path) print(f"📌 Saved debug visualization → {out_path}") return out_path # ===================== Main Bubble Translation Pipeline (Chunk-Based) =================== # ===================== Main Bubble Translation Pipeline (Chunk-Based) =================== def bubble_pipeline_single(file_obj, num_chunks=1, polygon_strategy="hybrid", debug=True): """ Manga bubble-aware translation pipeline (CHUNK-BASED): - Split image into vertical chunks - For each chunk: * RT-DETR bubble detection * Bubble shape refinement (outer + inner polygons) * Mask-based OCR inside bubble interiors * Bubble-aware polygon matching * Render translated text inside bubble regions - Return concatenated HTML previews + table + debug files NOTE: * All polygons are kept in CHUNK-LOCAL coordinates. * Manual-edit pipeline isn't wired for Bubble mode, so this is fine. """ debug_files = [] # ------------------------------------------------------- # 1. Load & split image into chunks (vertical) # ------------------------------------------------------- filename, full_img, chunks = load_and_split_image(file_obj, num_chunks) print(f"📄 bubble_pipeline_single: {filename}, full size={full_img.size}, chunks={len(chunks)}") all_translations = [] # flattened list across chunks all_tables = [] # [["original", "translated"], ...] translated_chunks = [] # list of PIL images (per chunk) # ------------------------------------------------------- # 2. Process each chunk independently # ------------------------------------------------------- for ci, chunk in enumerate(chunks): print(f"\n================ CHUNK {ci} ================") cw, ch = chunk.size print(f" Chunk size: {cw}x{ch}") # ---- A) Detect & refine bubbles in THIS chunk (local coords) ---- bubble_polygons, interior_polygons, bubble_boxes = detect_and_refine_bubbles(chunk) print(f"🔍 Chunk {ci}: found {len(bubble_polygons)} bubble polygons") # Debug: bubble shapes & boxes on the chunk image if debug: dbg_path = visualize_all_debug( chunk, [], bubble_polygons, bubble_boxes=bubble_boxes, step_name=f"chunk{ci}_bubbles", prefix="bubble_dbg" ) debug_files.append(dbg_path) # ---- B) Mask-based OCR INSIDE bubble interiors (on the chunk) ---- print(f"📝 Chunk {ci}: masked OCR inside bubble interiors...") translations = extract_and_translate_with_masks(chunk, interior_polygons) # Fallback: if masked OCR failed, run full OCR on the chunk if not translations: print(f"⚠️ Chunk {ci}: masked OCR found no text → fallback to full OCR") translations = extract_and_translate_chunk(chunk) print(f"⬆️ Chunk {ci}: OCR detections = {len(translations)}") # Save original polygons for debug visualization for t in translations: t["original_polygon"] = t.get("polygon") # ---- C) Bubble-aware matching (still in chunk-local coords) ---- if bubble_polygons: print(f"🔄 Chunk {ci}: matching OCR polygons to bubbles...") translations = match_translations_to_bubbles(translations, bubble_polygons) else: print(f"⚠️ Chunk {ci}: no bubble polygons → skip bubble matching") # Debug: polygons after matching if debug: dbg_path2 = visualize_all_debug( chunk, translations, bubble_polygons, bubble_boxes=bubble_boxes, step_name=f"chunk{ci}_after_correction", prefix="bubble_dbg" ) debug_files.append(dbg_path2) # ---- D) Render translated text onto THIS chunk ---- translated_chunk_img = chunk.copy() for t in translations: translated_text = t.get("translated", "") if not translated_text: continue bidx = t.get("matched_bubble_idx") # Prefer the refined bubble polygon when we have a match if bidx is not None and 0 <= bidx < len(bubble_polygons): render_poly = bubble_polygons[bidx] else: # Fallback: use OCR polygon directly render_poly = t.get("polygon") if not render_poly: continue translated_chunk_img = draw_translated_text_convex( translated_chunk_img, polygon_coords=render_poly, text=translated_text, font_path=FONT_PATH, font_scale=1.0, original_polygon=t.get("original_polygon"), bubble_polygon=bubble_polygons[bidx] if (bidx is not None and 0 <= bidx < len(bubble_polygons)) else None, ) # ---- E) Collect outputs from this chunk ---- translated_chunks.append(translated_chunk_img) for t in translations: all_translations.append(t) all_tables.append([t.get("original", ""), t.get("translated", "")]) # ------------------------------------------------------- # 3. Assemble HTML output (original & translated) # ------------------------------------------------------- orig_html = "".join([encode_image_to_html(c) for c in chunks]) trans_html = "".join([encode_image_to_html(t) for t in translated_chunks]) # Keep API compatible: translations wrapped in a list return filename, orig_html, trans_html, all_tables, [all_translations], debug_files def split_image_into_chunks(img, num_chunks): """ Simple vertical splitting for the translated image. (UI-only; does not affect OCR logic, which is full-page.) """ if num_chunks <= 1: return [img] width, height = img.size chunk_height = height // num_chunks chunks = [] for i in range(num_chunks): top = i * chunk_height bottom = height if i == num_chunks - 1 else (i + 1) * chunk_height chunk = img.crop((0, top, width, bottom)) chunks.append(chunk) return chunks # =========================== Fallback Pipelines =========================== def fallback_ocr_pipeline(file_obj, num_chunks): """ Standard OCR-based translation pipeline (no bubble awareness). """ filename, image, chunks = load_and_split_image(file_obj, num_chunks) all_translations = [] all_tables = [] translated_images = [] for chunk in chunks: trans = extract_and_translate_chunk(chunk) tbl = [[t["original"], t["translated"]] for t in trans] all_translations.append(trans) all_tables.extend(tbl) img_t = render_translated_chunk(chunk, trans, font_path=FONT_PATH, font_scale=1.0) translated_images.append(img_t) orig = "".join([encode_image_to_html(c) for c in chunks]) trans = "".join([encode_image_to_html(t) for t in translated_images]) return filename, orig, trans, all_tables, all_translations def fallback_empty(file_obj, num_chunks, full_img): """ Fallback when no text is detected. Just shows the original image in both columns. """ filename, _, chunks = load_and_split_image(file_obj, num_chunks) orig = "".join([encode_image_to_html(c) for c in chunks]) trans = orig return filename, orig, trans, [], [[]]