Spaces:
Running
Running
| """ | |
| Bubble-aware manga translation pipeline with polygon correction + debugging. | |
| """ | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| from utils.image_utils import load_and_split_image, encode_image_to_html | |
| from utils.ocr_utils import extract_and_translate_chunk, extract_and_translate_with_masks | |
| from utils.polygon_utils import ( | |
| draw_translated_text_convex, | |
| shrink_or_expand_polygon, | |
| FONT_PATH, | |
| match_polygon_to_bubble_by_overlap, | |
| render_translated_chunk, | |
| ) | |
| from utils.bubble_detect import detect_speech_bubbles_robust | |
| from utils.u2net_detector import detect_bubbles_u2net | |
| from utils.bubble_detect_rtdetr import detect_and_refine_bubbles, polygon_to_mask | |
| def normalize_bubble_regions(bubble_boxes): | |
| """ | |
| Accepts: | |
| - list of rectangles | |
| - list of polygons | |
| - mixture of both | |
| Returns a list of valid polygons. | |
| """ | |
| def rect_to_poly(box): | |
| # Accept tuple OR list length = 4 | |
| if ( | |
| isinstance(box, (list, tuple)) and | |
| len(box) == 4 and | |
| all(isinstance(v, (int, float)) for v in box) | |
| ): | |
| x1, y1, x2, y2 = map(int, box) | |
| return [(x1, y1), (x2, y1), (x2, y2), (x1, y2)] | |
| return None # not a rectangle | |
| polygons = [] | |
| for region in bubble_boxes: | |
| # Case 1 β region is a rectangle | |
| poly = rect_to_poly(region) | |
| if poly is not None: | |
| polygons.append(poly) | |
| continue | |
| # Case 2 β polygon | |
| if ( | |
| isinstance(region, (list, tuple)) and | |
| len(region) >= 3 and | |
| all(len(pt) == 2 for pt in region) | |
| ): | |
| polygons.append([(int(x), int(y)) for x, y in region]) | |
| continue | |
| print(f"β οΈ Skipping invalid bubble box: {region}") | |
| return polygons | |
| def match_translations_to_bubbles(translations, bubble_polygons, min_overlap=0.10): | |
| """ | |
| Add matched_bubble_idx to each OCR translation. | |
| OCR polygon is NOT corrected. | |
| """ | |
| for t in translations: | |
| ocr_poly = t.get("polygon") | |
| if not ocr_poly: | |
| continue | |
| idx = match_polygon_to_bubble_by_overlap(ocr_poly, bubble_polygons, min_overlap) | |
| t["matched_bubble_idx"] = idx | |
| return translations | |
| # ======================== Debug Visualization ============================ | |
| def visualize_all_debug( | |
| img, translations, bubble_polygons, bubble_boxes=None, | |
| step_name="debug", prefix="debug" | |
| ): | |
| """ | |
| Robust debug visualization. | |
| Handles malformed polygons, empty lists, None values, and degenerate shapes. | |
| """ | |
| import numpy as np | |
| from PIL import ImageDraw | |
| debug = img.copy() | |
| draw = ImageDraw.Draw(debug, "RGBA") | |
| # ========================== | |
| # Helper: validate polygon | |
| # ========================== | |
| def valid_poly(poly): | |
| if not poly or len(poly) < 3: | |
| return False | |
| # Filter invalid coords | |
| cleaned = [(int(x), int(y)) for x, y in poly if isinstance(x, (int, float))] | |
| # Must have >= 3 *distinct* points | |
| return len(set(cleaned)) >= 3 | |
| # ========================== | |
| # Draw bounding boxes (ORANGE) | |
| # ========================== | |
| if bubble_boxes: | |
| for box in bubble_boxes: | |
| try: | |
| x1, y1, x2, y2 = map(int, box) | |
| draw.rectangle((x1, y1, x2, y2), outline=(255,165,0,180), width=3) | |
| except Exception: | |
| continue | |
| # ========================== | |
| # Draw bubble polygons (BLUE) | |
| # ========================== | |
| for bp in bubble_polygons: | |
| if not valid_poly(bp): | |
| continue | |
| try: | |
| draw.polygon(bp, outline=(30,144,255,200), width=4) | |
| xs = [p[0] for p in bp] | |
| ys = [p[1] for p in bp] | |
| cx, cy = int(np.mean(xs)), int(np.mean(ys)) | |
| draw.ellipse((cx-4, cy-4, cx+4, cy+4), fill=(0,255,255,220)) | |
| except Exception: | |
| continue | |
| # ========================== | |
| # Draw OCR polygons | |
| # ========================== | |
| for t in translations: | |
| orig = t.get("original_polygon") | |
| corr = t.get("polygon") | |
| # Draw original polygon (RED) | |
| if valid_poly(orig): | |
| try: | |
| draw.polygon(orig, outline=(255,50,50,180), width=3) | |
| except Exception: | |
| pass | |
| # Draw corrected polygon (GREEN) | |
| if valid_poly(corr): | |
| try: | |
| draw.polygon(corr, outline=(50,255,50,220), width=3) | |
| except Exception: | |
| pass | |
| # Center point (YELLOW) | |
| poly_for_center = None | |
| if valid_poly(corr): | |
| poly_for_center = corr | |
| elif valid_poly(orig): | |
| poly_for_center = orig | |
| if poly_for_center: | |
| try: | |
| xs = [p[0] for p in poly_for_center] | |
| ys = [p[1] for p in poly_for_center] | |
| cx, cy = int(np.mean(xs)), int(np.mean(ys)) | |
| draw.ellipse((cx-3, cy-3, cx+3, cy+3), fill=(255,255,0,220)) | |
| except Exception: | |
| pass | |
| # ========================== | |
| # Save output | |
| # ========================== | |
| out_path = f"{prefix}_{step_name}.png" | |
| debug.save(out_path) | |
| print(f"π Saved debug visualization β {out_path}") | |
| return out_path | |
| # ===================== Main Bubble Translation Pipeline (Chunk-Based) =================== | |
| # ===================== Main Bubble Translation Pipeline (Chunk-Based) =================== | |
| def bubble_pipeline_single(file_obj, num_chunks=1, polygon_strategy="hybrid", debug=True): | |
| """ | |
| Manga bubble-aware translation pipeline (CHUNK-BASED): | |
| - Split image into vertical chunks | |
| - For each chunk: | |
| * RT-DETR bubble detection | |
| * Bubble shape refinement (outer + inner polygons) | |
| * Mask-based OCR inside bubble interiors | |
| * Bubble-aware polygon matching | |
| * Render translated text inside bubble regions | |
| - Return concatenated HTML previews + table + debug files | |
| NOTE: | |
| * All polygons are kept in CHUNK-LOCAL coordinates. | |
| * Manual-edit pipeline isn't wired for Bubble mode, so this is fine. | |
| """ | |
| debug_files = [] | |
| # ------------------------------------------------------- | |
| # 1. Load & split image into chunks (vertical) | |
| # ------------------------------------------------------- | |
| filename, full_img, chunks = load_and_split_image(file_obj, num_chunks) | |
| print(f"π bubble_pipeline_single: {filename}, full size={full_img.size}, chunks={len(chunks)}") | |
| all_translations = [] # flattened list across chunks | |
| all_tables = [] # [["original", "translated"], ...] | |
| translated_chunks = [] # list of PIL images (per chunk) | |
| # ------------------------------------------------------- | |
| # 2. Process each chunk independently | |
| # ------------------------------------------------------- | |
| for ci, chunk in enumerate(chunks): | |
| print(f"\n================ CHUNK {ci} ================") | |
| cw, ch = chunk.size | |
| print(f" Chunk size: {cw}x{ch}") | |
| # ---- A) Detect & refine bubbles in THIS chunk (local coords) ---- | |
| bubble_polygons, interior_polygons, bubble_boxes = detect_and_refine_bubbles(chunk) | |
| print(f"π Chunk {ci}: found {len(bubble_polygons)} bubble polygons") | |
| # Debug: bubble shapes & boxes on the chunk image | |
| if debug: | |
| dbg_path = visualize_all_debug( | |
| chunk, [], bubble_polygons, bubble_boxes=bubble_boxes, | |
| step_name=f"chunk{ci}_bubbles", prefix="bubble_dbg" | |
| ) | |
| debug_files.append(dbg_path) | |
| # ---- B) Mask-based OCR INSIDE bubble interiors (on the chunk) ---- | |
| print(f"π Chunk {ci}: masked OCR inside bubble interiors...") | |
| translations = extract_and_translate_with_masks(chunk, interior_polygons) | |
| # Fallback: if masked OCR failed, run full OCR on the chunk | |
| if not translations: | |
| print(f"β οΈ Chunk {ci}: masked OCR found no text β fallback to full OCR") | |
| translations = extract_and_translate_chunk(chunk) | |
| print(f"β¬οΈ Chunk {ci}: OCR detections = {len(translations)}") | |
| # Save original polygons for debug visualization | |
| for t in translations: | |
| t["original_polygon"] = t.get("polygon") | |
| # ---- C) Bubble-aware matching (still in chunk-local coords) ---- | |
| if bubble_polygons: | |
| print(f"π Chunk {ci}: matching OCR polygons to bubbles...") | |
| translations = match_translations_to_bubbles(translations, bubble_polygons) | |
| else: | |
| print(f"β οΈ Chunk {ci}: no bubble polygons β skip bubble matching") | |
| # Debug: polygons after matching | |
| if debug: | |
| dbg_path2 = visualize_all_debug( | |
| chunk, translations, bubble_polygons, bubble_boxes=bubble_boxes, | |
| step_name=f"chunk{ci}_after_correction", prefix="bubble_dbg" | |
| ) | |
| debug_files.append(dbg_path2) | |
| # ---- D) Render translated text onto THIS chunk ---- | |
| translated_chunk_img = chunk.copy() | |
| for t in translations: | |
| translated_text = t.get("translated", "") | |
| if not translated_text: | |
| continue | |
| bidx = t.get("matched_bubble_idx") | |
| # Prefer the refined bubble polygon when we have a match | |
| if bidx is not None and 0 <= bidx < len(bubble_polygons): | |
| render_poly = bubble_polygons[bidx] | |
| else: | |
| # Fallback: use OCR polygon directly | |
| render_poly = t.get("polygon") | |
| if not render_poly: | |
| continue | |
| translated_chunk_img = draw_translated_text_convex( | |
| translated_chunk_img, | |
| polygon_coords=render_poly, | |
| text=translated_text, | |
| font_path=FONT_PATH, | |
| font_scale=1.0, | |
| original_polygon=t.get("original_polygon"), | |
| bubble_polygon=bubble_polygons[bidx] if (bidx is not None and 0 <= bidx < len(bubble_polygons)) else None, | |
| ) | |
| # ---- E) Collect outputs from this chunk ---- | |
| translated_chunks.append(translated_chunk_img) | |
| for t in translations: | |
| all_translations.append(t) | |
| all_tables.append([t.get("original", ""), t.get("translated", "")]) | |
| # ------------------------------------------------------- | |
| # 3. Assemble HTML output (original & translated) | |
| # ------------------------------------------------------- | |
| orig_html = "".join([encode_image_to_html(c) for c in chunks]) | |
| trans_html = "".join([encode_image_to_html(t) for t in translated_chunks]) | |
| # Keep API compatible: translations wrapped in a list | |
| return filename, orig_html, trans_html, all_tables, [all_translations], debug_files | |
| def split_image_into_chunks(img, num_chunks): | |
| """ | |
| Simple vertical splitting for the translated image. | |
| (UI-only; does not affect OCR logic, which is full-page.) | |
| """ | |
| if num_chunks <= 1: | |
| return [img] | |
| width, height = img.size | |
| chunk_height = height // num_chunks | |
| chunks = [] | |
| for i in range(num_chunks): | |
| top = i * chunk_height | |
| bottom = height if i == num_chunks - 1 else (i + 1) * chunk_height | |
| chunk = img.crop((0, top, width, bottom)) | |
| chunks.append(chunk) | |
| return chunks | |
| # =========================== Fallback Pipelines =========================== | |
| def fallback_ocr_pipeline(file_obj, num_chunks): | |
| """ | |
| Standard OCR-based translation pipeline (no bubble awareness). | |
| """ | |
| filename, image, chunks = load_and_split_image(file_obj, num_chunks) | |
| all_translations = [] | |
| all_tables = [] | |
| translated_images = [] | |
| for chunk in chunks: | |
| trans = extract_and_translate_chunk(chunk) | |
| tbl = [[t["original"], t["translated"]] for t in trans] | |
| all_translations.append(trans) | |
| all_tables.extend(tbl) | |
| img_t = render_translated_chunk(chunk, trans, font_path=FONT_PATH, font_scale=1.0) | |
| translated_images.append(img_t) | |
| orig = "".join([encode_image_to_html(c) for c in chunks]) | |
| trans = "".join([encode_image_to_html(t) for t in translated_images]) | |
| return filename, orig, trans, all_tables, all_translations | |
| def fallback_empty(file_obj, num_chunks, full_img): | |
| """ | |
| Fallback when no text is detected. | |
| Just shows the original image in both columns. | |
| """ | |
| filename, _, chunks = load_and_split_image(file_obj, num_chunks) | |
| orig = "".join([encode_image_to_html(c) for c in chunks]) | |
| trans = orig | |
| return filename, orig, trans, [], [[]] | |