Spaces:
Running
Running
| from services.bubble_detector_kiuyha_service import Bubble_Detector_Kiuyha_Service | |
| from services.translate_qwen_service import Translate_Qwen_Service | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| import re | |
| import torch | |
| from pathlib import Path | |
| from helpers import get_project_root, setup_fonts | |
| from manga_ocr import MangaOcr | |
| import httpx | |
| class ImageProcessor: | |
| def __init__(self, bubble_detector, ocr_model, translate_model): | |
| self.bubble_detector_model = bubble_detector | |
| self.ocr_model = ocr_model | |
| self.translate_model = translate_model | |
| async def download_and_process(self, image_url: str, language: str): | |
| # Create a temporary file that stays on disk until we close it | |
| # 'delete=False' is important because some ML models need the file to stay closed/flushed before they can read it. | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: | |
| temp_path = tmp.name | |
| # Download | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(image_url) | |
| response.raise_for_status() | |
| tmp.write(response.content) | |
| tmp.flush() | |
| try: | |
| results = self.process_image(temp_path, language) | |
| return results | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| print(f"Cleaned up temp file: {temp_path}") | |
| def process_image(self, image_path, language): | |
| bubble_results = self.bubble_detector_model.predict(image_path) | |
| print(f"bubble results: {bubble_results}") | |
| img = Image.open(image_path) | |
| width, height = img.size | |
| # draw = ImageDraw.Draw(img) | |
| texts = [] | |
| coordinates={} | |
| i=0 | |
| for box_data in bubble_results: | |
| coords = box_data['coords'] | |
| # draw.rectangle(coords, outline="red", width=1) | |
| box_cropped = img.crop(coords) | |
| # box_cropped = upscale_for_ocr(box_cropped, scale=3) | |
| # box_cropped.show() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f: | |
| box_cropped.save(f.name) | |
| temp_path = f.name | |
| text = "" | |
| try: | |
| text = self.ocr_model(box_cropped) | |
| except Exception as e: | |
| print(f"text OCR failed for {i}") | |
| text = re.sub(r'[\n\r\u2028\u2029]+', ' ', text) #remove new lines | |
| texts.append({"id": i, "text": text}) | |
| coordinates[i] = coords | |
| i+=1 | |
| print(f'OCR Complete, total {len(texts)} bubbles.') | |
| #add translated text to manga image | |
| try: | |
| print("Translating with cloud Qwen model...") | |
| translated = self.translate_model.translate_cloud(texts) | |
| except Exception as e: | |
| print("API translation failed with Qwen, falling back to local model...") | |
| translated = self.translate_model.translate(texts) | |
| print(translated) | |
| bubble_data = [] | |
| for i in range(len(texts)): | |
| coords = coordinates[i] | |
| x1, y1, x2, y2 = coords | |
| original_text = texts[i]["text"] | |
| translated_text = translated.get(str(i), translated.get(i, "")) | |
| if not isinstance(translated_text, str): | |
| translated_text = str(translated_text) | |
| print(f"{i}: {original_text}") | |
| print(translated_text) | |
| print("==================================") | |
| bubble_data.append({ | |
| "bubble_index": i, | |
| "width": width, | |
| "height": height, | |
| "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2), | |
| "original_text": original_text, | |
| "translated_text": translated_text, | |
| }) | |
| ######### Code for drawing translated text onto manga panel directly) ########### | |
| # #wipe the space | |
| # draw.rectangle(coords, fill="white", outline="white") | |
| # # 1. Calculate the best fit | |
| # lines, best_font, final_size, line_h = fit_text_to_box(draw, translated_text, coords, FONT_PATH) | |
| # # Calculate total height of the block | |
| # total_h = line_h * len(lines) | |
| # # Start_y adjusted for the block height relative to the box center | |
| # start_y = coords[1] + ((coords[3] - coords[1]) - total_h) / 2 | |
| # # 3. Draw each line centered horizontally | |
| # for line in lines: | |
| # line = line.strip() | |
| # if not line: continue | |
| # # Horizontal Centering | |
| # line_w = draw.textlength(line, font=best_font) | |
| # start_x = coords[0] + ((coords[2] - coords[0]) - line_w) / 2 | |
| # draw.text((start_x, start_y), line, font=best_font, fill="black") | |
| # start_y += line_h | |
| return bubble_data #img, bubble_data | |
| ########Test code, keeping it here as reference. Remove later################ | |
| # def show_boxes(image_path): | |
| # result = bubble_detector_model.predict(image_path) | |
| # img = Image.open(image_path).convert("RGB") | |
| # draw = ImageDraw.Draw(img) | |
| # for box in result.boxes: | |
| # # Get coordinates as a list of floats | |
| # coords = box.xyxy[0].tolist() # [x1, y1, x2, y2] | |
| # draw.rectangle(coords, outline="red", width=1) | |
| # # label | |
| # conf = box.conf[0].item() | |
| # box_cropped = img.crop(coords) | |
| # # box_cropped = upscale_for_ocr(box_cropped, scale=3) | |
| # with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f: | |
| # box_cropped.save(f.name) | |
| # temp_path = f.name | |
| # draw.text( | |
| # (coords[0], coords[1] - 10), | |
| # "b", | |
| # fill="red", | |
| # font=font | |
| # ) | |
| # img.show() | |
| # def get_wrapped_text(text, font, max_width): | |
| # lines = [] | |
| # words = text.split(' ') # Split by words for English | |
| # current_line = [] | |
| # for word in words: | |
| # # Check if adding the next word exceeds the width | |
| # test_line = ' '.join(current_line + [word]) | |
| # # getlength() is more accurate than getbbox for text width | |
| # if font.getlength(test_line) <= max_width: | |
| # current_line.append(word) | |
| # else: | |
| # lines.append(' '.join(current_line)) | |
| # current_line = [word] | |
| # lines.append(' '.join(current_line)) | |
| # return lines | |
| # def fit_text_to_box(draw, text, box_coords, font_path, padding=5, initial_size=40): | |
| # x1, y1, x2, y2 = box_coords | |
| # padding = padding | |
| # target_width = (x2 - x1) - (padding * 2) | |
| # target_height = (y2 - y1) - (padding * 2) | |
| # current_size = initial_size | |
| # lines = [] | |
| # while current_size > 8: | |
| # # index=0 for Japanese, 1 for Korean in NotoSansCJK | |
| # font = ImageFont.truetype(font_path, size=current_size) | |
| # lines = get_wrapped_text(text, font, target_width) | |
| # # Use a more reliable line height measurement | |
| # # getbbox can be inconsistent; use font.size * constant for better leading | |
| # line_height = int(current_size * 1.2) | |
| # total_height = line_height * len(lines) | |
| # if total_height <= target_height: | |
| # break | |
| # current_size -= 2 # Step down by 2 for speed | |
| # return lines, font, current_size, line_height | |
| # def upscale_for_ocr(img, scale=2): | |
| # w, h = img.size | |
| # return img.resize((w*scale, h*scale), Image.BICUBIC) | |
| # def process_image(image_path, language): | |
| # bubble_results = bubble_detector_model.predict(image_path) | |
| # print(f"bubble results: {bubble_results}") | |
| # img = Image.open(image_path) | |
| # draw = ImageDraw.Draw(img) | |
| # texts = [] | |
| # coordinates={} | |
| # i=0 | |
| # for box_data in bubble_results: | |
| # coords = box_data['coords'] | |
| # draw.rectangle(coords, outline="red", width=1) | |
| # box_cropped = img.crop(coords) | |
| # # box_cropped = upscale_for_ocr(box_cropped, scale=3) | |
| # # box_cropped.show() | |
| # with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as f: | |
| # box_cropped.save(f.name) | |
| # temp_path = f.name | |
| # text = "" | |
| # # if language == "japanese": | |
| # # # text = ocr_japanese_model.runOCR(temp_path) | |
| # # text = ocr_model(temp_path) | |
| # # else: | |
| # # text = ocr_model.runOCR(temp_path) | |
| # text = ocr_model(box_cropped) | |
| # text = re.sub(r'[\n\r\u2028\u2029]+', ' ', text) #remove new lines | |
| # texts.append({"id": i, "text": text}) | |
| # coordinates[i] = coords | |
| # i+=1 | |
| # print(f'OCR Complete, total {len(texts)} bubbles.') | |
| # #add translated text to manga image | |
| # try: | |
| # print("Translating with cloud Qwen model...") | |
| # translated = translate_model.translate_cloud(texts) | |
| # except Exception as e: | |
| # print("API translation failed with Qwen, falling back to local model...") | |
| # translated = translate_model.translate(texts) | |
| # print(translated) | |
| # bubble_data = [] | |
| # for i in range(len(texts)): | |
| # coords = coordinates[i] | |
| # x1, y1, x2, y2 = coords | |
| # original_text = texts[i]["text"] | |
| # translated_text = translated.get(str(i), translated.get(i, "")) | |
| # if not isinstance(translated_text, str): | |
| # translated_text = str(translated_text) | |
| # print(f"{i}: {original_text}") | |
| # print(translated_text) | |
| # print("==================================") | |
| # bubble_data.append({ | |
| # "bubble_index": i, | |
| # "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2), | |
| # "original_text": original_text, | |
| # "translated_text": translated_text, | |
| # }) | |
| # #wipe the space | |
| # draw.rectangle(coords, fill="white", outline="white") | |
| # # 1. Calculate the best fit | |
| # lines, best_font, final_size, line_h = fit_text_to_box(draw, translated_text, coords, FONT_PATH) | |
| # # Calculate total height of the block | |
| # total_h = line_h * len(lines) | |
| # # Start_y adjusted for the block height relative to the box center | |
| # start_y = coords[1] + ((coords[3] - coords[1]) - total_h) / 2 | |
| # # 3. Draw each line centered horizontally | |
| # for line in lines: | |
| # line = line.strip() | |
| # if not line: continue | |
| # # Horizontal Centering | |
| # line_w = draw.textlength(line, font=best_font) | |
| # start_x = coords[0] + ((coords[2] - coords[0]) - line_w) / 2 | |
| # draw.text((start_x, start_y), line, font=best_font, fill="black") | |
| # start_y += line_h | |
| # return img, bubble_data | |
| # def translate_text(text, language): | |
| # # translated_text = "" | |
| # # if language == "japanese": | |
| # # translated_text = | |
| # translated_text = translate_model.translate(text) | |
| # return translated_text | |
| # def _language_to_code(language: str) -> str: | |
| # """Map language name to ISO 639-1 style code for DB.""" | |
| # m = {"japanese": "ja", "english": "en", "korean": "ko", "chinese": "zh"} | |
| # return m.get(language.lower(), language[:2] if len(language) >= 2 else "ja") | |