Spaces:
Sleeping
Sleeping
| from paddleocr import PaddleOCR | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| from scipy.spatial import ConvexHull | |
| from utils.azure_translate import translate_text_azure | |
| from math import dist | |
| import numpy as np | |
| from shapely.geometry import box as shapely_box | |
| from shapely.geometry import Polygon | |
| from shapely.ops import unary_union | |
| import networkx as nx | |
| from shapely.ops import unary_union | |
| from utils.bubble_detect_rtdetr import polygon_to_mask | |
| ocr_model = PaddleOCR(use_textline_orientation=True, lang='ch') | |
| def inflate_polygon(polygon_points, percent=0.05): | |
| """ | |
| Inflate a polygon by a given percentage of its diagonal. | |
| Args: | |
| polygon_points: List of (x, y) coordinates | |
| percent: Inflation percentage (0.05 = 5%) | |
| Returns: | |
| Shapely Polygon inflated by the specified amount | |
| """ | |
| poly = Polygon(polygon_points) | |
| if not poly.is_valid: | |
| poly = poly.convex_hull | |
| minx, miny, maxx, maxy = poly.bounds | |
| diagonal = ((maxx - minx)**2 + (maxy - miny)**2)**0.5 | |
| inflate_dist = diagonal * percent | |
| return poly.buffer(inflate_dist) | |
| def group_nearby_boxes(lines, inflation_percent=0.05): | |
| """ | |
| Group nearby text boxes by checking if their inflated polygons intersect. | |
| Args: | |
| lines: List of (polygon_points, text) tuples | |
| inflation_percent: How much to inflate polygons for grouping detection | |
| Returns: | |
| List of groups, each containing {"polygons": [...], "texts": [...]} | |
| """ | |
| from collections import defaultdict | |
| n = len(lines) | |
| inflated_polys = [] | |
| original_polys = [] | |
| texts = [] | |
| for poly_pts, text in lines: | |
| inflated = inflate_polygon(poly_pts, percent=inflation_percent) | |
| original = Polygon(poly_pts) | |
| inflated_polys.append(inflated) | |
| original_polys.append(original) | |
| texts.append(text) | |
| # Build connectivity graph | |
| adjacency = defaultdict(set) | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| if inflated_polys[i].intersects(inflated_polys[j]): | |
| adjacency[i].add(j) | |
| adjacency[j].add(i) | |
| # DFS to find connected components | |
| visited = [False] * n | |
| groups = [] | |
| def dfs(i, group): | |
| visited[i] = True | |
| group.append(i) | |
| for neighbor in adjacency[i]: | |
| if not visited[neighbor]: | |
| dfs(neighbor, group) | |
| for i in range(n): | |
| if not visited[i]: | |
| group = [] | |
| dfs(i, group) | |
| groups.append(group) | |
| # Construct output groups | |
| grouped = [] | |
| for group in groups: | |
| group_polys = [list(original_polys[i].exterior.coords) for i in group] | |
| group_texts = [texts[i] for i in group] | |
| grouped.append({ | |
| "polygons": group_polys, | |
| "texts": group_texts | |
| }) | |
| return grouped | |
| def extract_and_translate_chunk(image: Image.Image): | |
| """ | |
| Extract text from entire image and translate. | |
| Groups nearby text boxes before translation. | |
| """ | |
| np_img = np.array(image) | |
| results = ocr_model.ocr(np_img) | |
| if not results or not isinstance(results[0], dict): | |
| return [] | |
| result_dict = results[0] | |
| polygons = result_dict.get("rec_polys", []) | |
| texts = result_dict.get("rec_texts", []) | |
| if not polygons or not texts or len(polygons) != len(texts): | |
| return [] | |
| lines = list(zip([[(int(x), int(y)) for x, y in poly] for poly in polygons], texts)) | |
| print("π OCR Raw Output:", lines) | |
| grouped = group_nearby_boxes(lines) | |
| translations = [] | |
| for group in grouped: | |
| polygons = group["polygons"] | |
| texts = group["texts"] | |
| merged_text = "".join(texts).strip() | |
| if not merged_text: | |
| continue | |
| try: | |
| translated = translate_text_azure(merged_text) | |
| except Exception as e: | |
| print("β οΈ Translation failed:", e) | |
| translated = "" | |
| all_points = np.array([pt for polygon in polygons for pt in polygon]) | |
| if len(all_points) < 3: | |
| continue | |
| hull_indices = ConvexHull(all_points).vertices | |
| hull = [tuple(map(int, all_points[i])) for i in hull_indices] | |
| translations.append({ | |
| "original": merged_text, | |
| "translated": translated, | |
| "polygon": hull | |
| }) | |
| return translations | |
| def extract_and_translate_with_masks( | |
| full_img, | |
| interior_polygons, | |
| grouping_inflation=0.05, | |
| final_inflation=0 | |
| ): | |
| """ | |
| OCR ONLY inside bubble interior polygons, with grouping and inflation. | |
| Args: | |
| full_img: PIL Image | |
| interior_polygons: List of bubble interior polygons [(x,y)] | |
| grouping_inflation: % used for grouping OCR boxes | |
| final_inflation: % used to enlarge final rendering polygon | |
| Returns: | |
| List of dict: | |
| - original | |
| - translated | |
| - polygon (inflated hull) | |
| - matched_bubble_idx | |
| """ | |
| np_img = np.array(full_img) | |
| H, W = np_img.shape[:2] | |
| translations = [] | |
| for idx, poly in enumerate(interior_polygons): | |
| if not poly: | |
| continue | |
| # ---------------------------------------------------- | |
| # 1) Mask the bubble region (white outside bubble) | |
| # ---------------------------------------------------- | |
| mask = polygon_to_mask((W, H), poly) | |
| bubble_img = np.where(mask[..., None] == 255, np_img, 255).astype(np.uint8) | |
| # ---------------------------------------------------- | |
| # 2) OCR inside bubble | |
| # ---------------------------------------------------- | |
| results = ocr_model.ocr(bubble_img) | |
| if not results or not isinstance(results[0], dict): | |
| continue | |
| res = results[0] | |
| polys = res.get("rec_polys", []) | |
| texts = res.get("rec_texts", []) | |
| if not polys or not texts: | |
| continue | |
| # Convert polys to global coordinates | |
| lines = [] | |
| for poly_coords, text in zip(polys, texts): | |
| text_stripped = text.strip() | |
| if not text_stripped: | |
| continue | |
| poly_global = [(int(x), int(y)) for x, y in poly_coords] | |
| lines.append((poly_global, text_stripped)) | |
| if not lines: | |
| continue | |
| print(f"π Bubble {idx}: Found {len(lines)} text boxes") | |
| # ---------------------------------------------------- | |
| # 3) Group nearby OCR text boxes | |
| # ---------------------------------------------------- | |
| grouped = group_nearby_boxes(lines, inflation_percent=grouping_inflation) | |
| print(f" β Grouped into {len(grouped)} groups") | |
| # ---------------------------------------------------- | |
| # 4) Process each group β merge text + hull + inflation | |
| # ---------------------------------------------------- | |
| for group in grouped: | |
| group_polys = group["polygons"] | |
| group_texts = group["texts"] | |
| merged_text = "".join(group_texts).strip() | |
| if not merged_text: | |
| continue | |
| try: | |
| translated = translate_text_azure(merged_text) | |
| except Exception as e: | |
| print(f"β οΈ Translation failed: {e}") | |
| translated = merged_text | |
| # Get all points in the group boxes | |
| all_points = np.array([pt for polygon in group_polys for pt in polygon]) | |
| if len(all_points) < 3: | |
| continue | |
| hull_idx = ConvexHull(all_points).vertices | |
| hull_coords = [tuple(map(int, all_points[i])) for i in hull_idx] | |
| # ------------------------------------------------ | |
| # 5) Inflate using EXISTING inflate_polygon() | |
| # ------------------------------------------------ | |
| inflated_poly = inflate_polygon(hull_coords, percent=final_inflation) | |
| # Convert back to a list of coords | |
| if inflated_poly.geom_type == "Polygon": | |
| final_coords = [(int(x), int(y)) for x, y in inflated_poly.exterior.coords[:-1]] | |
| else: | |
| # Fallback: use convex hull of multipolygon union | |
| final_coords = [ | |
| (int(x), int(y)) for x, y in inflated_poly.convex_hull.exterior.coords[:-1] | |
| ] | |
| translations.append({ | |
| "original": merged_text, | |
| "translated": translated, | |
| "polygon": final_coords, | |
| "matched_bubble_idx": idx, | |
| "num_text_boxes": len(group_texts), | |
| }) | |
| print(f"β Total translations extracted: {len(translations)}") | |
| return translations | |