Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import cv2 | |
| import numpy as np | |
| import pyphen | |
| import re | |
| import torch | |
| from PIL import Image, ImageDraw, ImageFont | |
| import transformers.modeling_utils | |
| import transformers.utils.import_utils | |
| from ultralytics import YOLO | |
| from manga_ocr import MangaOcr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from simple_lama_inpainting import SimpleLama | |
| import PIL.Image | |
| class MangaTranslator: | |
| def __init__(self, yolo_model_path='comic_yolov8m.pt', | |
| translation_model="LiquidAI/LFM2.5-1.2B-Instruct", | |
| font_path="font.ttf", custom_translations=None, keep_honorifics=True, debug=True): | |
| print("Loading YOLO model...") | |
| self.yolo_model = YOLO(yolo_model_path) | |
| self.font_path = font_path | |
| print("Loading LaMa Inpainting model...") | |
| self.lama = SimpleLama() | |
| print("Loading MangaOCR model...") | |
| self.mocr = MangaOcr() | |
| # --- LIQUID AI SETUP (Updated) --- | |
| print(f"Loading Translation Model ({translation_model})...") | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # 1. Load Tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| translation_model, | |
| trust_remote_code=True # Required for Liquid architectures | |
| ) | |
| # 2. Load Model with Trust Remote Code | |
| self.trans_model = AutoModelForCausalLM.from_pretrained( | |
| translation_model, | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, | |
| device_map=self.device, | |
| trust_remote_code=True # Required for Liquid architectures | |
| ) | |
| self.trans_model.eval() | |
| # ----------------------- | |
| self.dic = pyphen.Pyphen(lang='en') | |
| self.font_cache = {} | |
| self.custom_translations = custom_translations or {} | |
| self.keep_honorifics = keep_honorifics | |
| self.honorifics = ['san', 'chan', 'kun', 'sama', 'senpai', 'sensei', 'dono', 'tan'] | |
| # For romanization fallback | |
| try: | |
| import pykakasi | |
| self.kakasi = pykakasi.kakasi() | |
| except ImportError: | |
| print("Warning: pykakasi not installed. Install with 'pip install pykakasi' for romanization support.") | |
| self.kakasi = None | |
| def _get_font(self, size): | |
| """Cache fonts to avoid repeated loading""" | |
| if size not in self.font_cache: | |
| try: | |
| self.font_cache[size] = ImageFont.truetype(self.font_path, size) | |
| except IOError: | |
| self.font_cache[size] = ImageFont.load_default() | |
| return self.font_cache[size] | |
| def _sort_bubbles(self, bubbles, row_threshold=50): | |
| bubbles.sort(key=lambda b: b[1]) | |
| sorted_bubbles = [] | |
| if not bubbles: | |
| return sorted_bubbles | |
| current_row = [bubbles[0]] | |
| for i in range(1, len(bubbles)): | |
| if abs(bubbles[i][1] - current_row[-1][1]) < row_threshold: | |
| current_row.append(bubbles[i]) | |
| else: | |
| current_row.sort(key=lambda b: b[2], reverse=True) | |
| sorted_bubbles.extend(current_row) | |
| current_row = [bubbles[i]] | |
| current_row.sort(key=lambda b: b[2], reverse=True) | |
| sorted_bubbles.extend(current_row) | |
| return sorted_bubbles | |
| def _wrap_text_dynamic(self, text, font, max_width): | |
| words = text.split() | |
| lines = [] | |
| current_line = [] | |
| current_width = 0 | |
| space_width = font.getlength(" ") | |
| for word in words: | |
| word_width = font.getlength(word) | |
| potential_width = current_width + word_width + (space_width if current_line else 0) | |
| if potential_width <= max_width: | |
| current_line.append(word) | |
| current_width = potential_width | |
| else: | |
| splits = list(self.dic.iterate(word)) | |
| found_split = False | |
| for start, end in reversed(splits): | |
| chunk = start + "-" | |
| chunk_width = font.getlength(chunk) | |
| if current_width + chunk_width + (space_width if current_line else 0) <= max_width: | |
| current_line.append(chunk) | |
| lines.append(" ".join(current_line)) | |
| current_line = [end] | |
| current_width = font.getlength(end) | |
| found_split = True | |
| break | |
| if not found_split: | |
| if current_line: | |
| lines.append(" ".join(current_line)) | |
| current_line = [word] | |
| current_width = word_width | |
| if current_line: | |
| lines.append(" ".join(current_line)) | |
| return "\n".join(lines) | |
| def _smart_clean_bubble(self, img, bbox): | |
| """ | |
| Gaussian blur-based cleaning for transparent effect | |
| """ | |
| x1, y1, x2, y2 = bbox | |
| # Ensure coordinates are within image bounds | |
| h, w = img.shape[:2] | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(w, x2), min(h, y2) | |
| if x2 <= x1 or y2 <= y1: | |
| return img | |
| # Extract bubble region | |
| bubble_region = img[y1:y2, x1:x2].copy() | |
| if bubble_region.size == 0: | |
| return img | |
| # Apply Gaussian blur for softer look | |
| blurred = cv2.GaussianBlur(bubble_region, (21, 21), 0) | |
| # Brighten the blurred region slightly | |
| brightened = cv2.addWeighted(blurred, 0.7, | |
| np.ones_like(blurred) * 255, 0.3, 0) | |
| # Place back into image | |
| img[y1:y2, x1:x2] = brightened | |
| return img | |
| def _preserve_honorifics(self, original_text, translated_text): | |
| """ | |
| Detect and preserve Japanese honorifics in romaji form. | |
| Examples: さん→-san, ちゃん→-chan, 君→-kun, 様→-sama | |
| """ | |
| if not self.keep_honorifics or not self.kakasi: | |
| return translated_text | |
| # Common honorific patterns in Japanese | |
| honorific_map = { | |
| 'さん': '-san', | |
| 'ちゃん': '-chan', | |
| 'くん': '-kun', | |
| '君': '-kun', | |
| '様': '-sama', | |
| 'さま': '-sama', | |
| '先輩': '-senpai', | |
| 'せんぱい': '-senpai', | |
| '先生': '-sensei', | |
| 'せんせい': '-sensei', | |
| '殿': '-dono', | |
| 'どの': '-dono', | |
| 'たん': '-tan', | |
| } | |
| # Find honorifics in original text | |
| found_honorifics = [] | |
| for jp_hon, rom_hon in honorific_map.items(): | |
| if jp_hon in original_text: | |
| found_honorifics.append(rom_hon) | |
| # If we found honorifics, try to add them back to names in translation | |
| if found_honorifics: | |
| # Split into words and check last word for potential name | |
| words = translated_text.split() | |
| if len(words) >= 1: | |
| # Check if translation already has honorific | |
| last_word = words[-1].lower() | |
| has_honorific = any(hon.strip('-') in last_word for hon in self.honorifics) | |
| if not has_honorific and found_honorifics: | |
| # Add the first found honorific to what's likely a name | |
| # Look for capitalized words (likely names) | |
| for i in range(len(words) - 1, -1, -1): | |
| if words[i] and words[i][0].isupper(): | |
| # Add honorific to this name | |
| words[i] = words[i] + found_honorifics[0] | |
| translated_text = ' '.join(words) | |
| break | |
| return translated_text | |
| def _draw_text_with_outline(self, draw, position, text, font, | |
| text_color="black", outline_color="white", | |
| outline_width=2, **kwargs): | |
| """ | |
| Draw text with outline for better readability | |
| """ | |
| x, y = position | |
| # Draw outline | |
| for adj_x in range(-outline_width, outline_width + 1): | |
| for adj_y in range(-outline_width, outline_width + 1): | |
| if adj_x != 0 or adj_y != 0: | |
| draw.multiline_text((x + adj_x, y + adj_y), text, | |
| fill=outline_color, font=font, **kwargs) | |
| # Draw main text | |
| draw.multiline_text(position, text, fill=text_color, font=font, **kwargs) | |
| def _calculate_optimal_font_size(self, text, bbox, min_size=12, max_size=36): | |
| x1, y1, x2, y2 = bbox | |
| box_width = x2 - x1 | |
| box_height = y2 - y1 | |
| # --- NEW LOGIC: DETECT VERTICAL BUBBLES --- | |
| # If height is 1.5x bigger than width, it's a vertical speech bubble. | |
| is_vertical = box_height > (box_width * 1.5) | |
| # If vertical, force text to use only 60% of width (makes a column) | |
| # If horizontal, use 90% of width (standard) | |
| target_width_ratio = 0.6 if is_vertical else 0.9 | |
| # Start with max size and reduce until text fits | |
| for size in range(max_size, min_size - 1, -1): | |
| font = self._get_font(size) | |
| # Use the calculated target width | |
| max_line_width = int(box_width * target_width_ratio) | |
| wrapped = self._wrap_text_dynamic(text, font, max_line_width) | |
| # Measure resulting text block | |
| temp_draw = ImageDraw.Draw(Image.new('RGB', (1, 1))) | |
| left, top, right, bottom = temp_draw.multiline_textbbox( | |
| (0, 0), wrapped, font=font, align="center" | |
| ) | |
| text_width = right - left | |
| text_height = bottom - top | |
| # Check fit (Height is the main constraint) | |
| if text_height < (box_height - 10): | |
| # Secondary check: If vertical, ensure we didn't accidentally | |
| # make it too wide (overflowing the sides) | |
| if text_width < (box_width - 4): | |
| return size, wrapped | |
| # Fallback: Minimum size | |
| font = self._get_font(min_size) | |
| max_line_width = int(box_width * target_width_ratio) | |
| wrapped = self._wrap_text_dynamic(text, font, max_line_width) | |
| return min_size, wrapped | |
| def _has_japanese_characters(self, text): | |
| """Check if text contains Japanese characters""" | |
| japanese_ranges = [ | |
| (0x3040, 0x309F), # Hiragana | |
| (0x30A0, 0x30FF), # Katakana | |
| (0x4E00, 0x9FFF), # Kanji | |
| ] | |
| for char in text: | |
| code = ord(char) | |
| for start, end in japanese_ranges: | |
| if start <= code <= end: | |
| return True | |
| return False | |
| def _romanize_japanese(self, text): | |
| """Convert Japanese text to romaji""" | |
| if not self.kakasi: | |
| return text | |
| try: | |
| result = self.kakasi.convert(text) | |
| return ''.join([item['hepburn'] for item in result]) | |
| except Exception as e: | |
| print(f" Romanization error: {e}") | |
| return text | |
| def _apply_custom_translations(self, text): | |
| """Apply custom character name translations""" | |
| for jp_term, en_term in self.custom_translations.items(): | |
| text = text.replace(jp_term, en_term) | |
| return text | |
| def detect_and_process(self, image_path, output_dir="crops", page_id="", conf_threshold=0.15): | |
| image = cv2.imread(image_path) | |
| if image is None: raise ValueError(f"Not found: {image_path}") | |
| # 1. Run Prediction | |
| results = self.yolo_model.predict(source=image, conf=conf_threshold, save=False, verbose=False) | |
| # Get the class names dictionary (e.g., {0: 'text', 1: 'bubble'}) | |
| class_names = results[0].names | |
| # 2. Extract Boxes AND Classes | |
| detections = [] | |
| for box in results[0].boxes: | |
| xyxy = list(map(int, box.xyxy[0].tolist())) | |
| cls_id = int(box.cls[0]) | |
| label = class_names[cls_id] # e.g., "text" or "bubble" or "face" | |
| # Filter: We only care about text/bubbles, not faces/bodies if your model detects them | |
| if label in ['face', 'body']: continue | |
| detections.append({ | |
| "bbox": xyxy, | |
| "label": label | |
| }) | |
| # Sort (top to bottom, right to left for manga) | |
| # Note: We need a custom sort function since detections is now a dict, not just a list of boxes | |
| detections = sorted(detections, key=lambda x: (x['bbox'][1], -x['bbox'][0])) | |
| if not os.path.exists(output_dir): os.makedirs(output_dir) | |
| manga_data = [] | |
| for i, det in enumerate(detections): | |
| x_min, y_min, x_max, y_max = det['bbox'] | |
| # ... (Cropping logic stays the same) ... | |
| crop = image[y_min:y_max, x_min:x_max] | |
| # Save crop | |
| crop_filename = f"bubble_{page_id}_{i+1}.png" | |
| crop_path = os.path.join(output_dir, crop_filename) | |
| cv2.imwrite(crop_path, crop) | |
| manga_data.append({ | |
| "id": f"{page_id}_{i+1}", | |
| "page_id": page_id, | |
| "bbox": [x_min, y_min, x_max, y_max], | |
| "label": det['label'], | |
| "crop_path": crop_path, | |
| "original_text": "", | |
| "translated_text": "" | |
| }) | |
| return image, manga_data | |
| def run_ocr(self, manga_data): | |
| for entry in manga_data: | |
| crop_path = entry['crop_path'] | |
| japanese_text = self.mocr(crop_path) | |
| # Apply custom translations to original text | |
| japanese_text = self._apply_custom_translations(japanese_text) | |
| entry['original_text'] = japanese_text.replace('\n', '') | |
| return manga_data | |
| def _translate_single_bubble(self, text, series_info=None): | |
| """Translate a single bubble (fallback method)""" | |
| context_str = "" | |
| if series_info: | |
| context_str = f""" | |
| Context: {series_info.get('title', '')} - {series_info.get('tags', '')} | |
| """ | |
| prompt = f"""{context_str}Translate this Japanese manga text to natural English. Return ONLY the English translation, nothing else: | |
| {text}""" | |
| try: | |
| response = self.llm.invoke(prompt) | |
| translation = response.content.strip() | |
| # Remove common wrapper phrases | |
| translation = re.sub(r'^(Here\'s the translation:|Translation:|English:)\s*', '', translation, flags=re.IGNORECASE) | |
| translation = translation.strip('"\'') | |
| return translation | |
| except Exception as e: | |
| print(f" Translation error: {e}") | |
| return "[Translation Error]" | |
| def translate_batch(self, manga_data, series_info=None): | |
| """ | |
| Minimalist translation loop for LiquidAI LFM2-350M. | |
| REMOVED: Context injection (to prevent hallucinations). | |
| INCLUDED: Fix for Dictionary vs Tensor inputs. | |
| """ | |
| print(f"Translating {len(manga_data)} bubbles with LiquidAI...") | |
| # Strict System Prompt (Required by Model Card) | |
| system_prompt = "Translate to Thai." | |
| for entry in manga_data: | |
| text = entry.get('original_text', '').strip() | |
| if not text: continue | |
| # Skip punctuation-only bubbles | |
| if len(text) < 2 and text in "!?.…": | |
| entry['translated_text'] = text | |
| continue | |
| # --- NO CONTEXT, JUST TEXT --- | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": text} # Raw text only | |
| ] | |
| # 1. Apply Template | |
| inputs = self.tokenizer.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| return_tensors="pt" | |
| ) | |
| # 2. Handle Dict vs Tensor (LiquidAI Quirks) | |
| if isinstance(inputs, dict) or hasattr(inputs, "keys"): | |
| inputs = inputs.to(self.device) | |
| generate_kwargs = inputs | |
| input_length = inputs["input_ids"].shape[1] | |
| else: | |
| inputs = inputs.to(self.device) | |
| generate_kwargs = {"input_ids": inputs} | |
| input_length = inputs.shape[1] | |
| # 3. Generate | |
| with torch.no_grad(): | |
| output_ids = self.trans_model.generate( | |
| **generate_kwargs, | |
| max_new_tokens=128, | |
| temperature=0.5, | |
| top_p=1.0, | |
| repetition_penalty=1.05, | |
| do_sample=True | |
| ) | |
| # 4. Decode | |
| translated_text = self.tokenizer.decode( | |
| output_ids[0][input_length:], | |
| skip_special_tokens=True | |
| ).strip() | |
| entry['translated_text'] = translated_text | |
| print(f" JP: {text[:15]}... -> EN: {translated_text}") | |
| return manga_data | |
| def clean_page(self, original_image, page_data, ellipse_padding=8, inpaint_radius=5): | |
| """ | |
| Strict Hybrid Cleaning: | |
| - text_bubble -> OpenCV Inpainting inside a shrunk Ellipse mask (Preserves tails) | |
| - text_free -> LaMa Inpainting on full Rectangle mask (Redraws background) | |
| """ | |
| final_image = original_image.copy() | |
| h, w = original_image.shape[:2] | |
| # Mask for LaMa (Accumulates all 'text_free' areas) | |
| lama_mask = np.zeros((h, w), dtype=np.uint8) | |
| has_lama_work = False | |
| for entry in page_data: | |
| # Skip if no translation (optional, but good for speed) | |
| if not entry.get('translated_text'): continue | |
| bbox = entry['bbox'] | |
| label = entry.get('label', 'text_free') | |
| x1, y1, x2, y2 = bbox | |
| # Clamp coordinates | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(w, x2), min(h, y2) | |
| # Extract crop for analysis | |
| crop = final_image[y1:y2, x1:x2] | |
| if crop.size == 0: continue | |
| gray_crop = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) | |
| # --- STRATEGY 1: SPEECH BUBBLES (OpenCV + Shrunk Ellipse) --- | |
| if label == 'text_bubble': | |
| ch, cw = crop.shape[:2] | |
| # A. Find the text pixels (dark ink) | |
| binary_text = cv2.adaptiveThreshold( | |
| gray_crop, 255, cv2.ADAPTIVE_THRESH_MEAN_C, | |
| cv2.THRESH_BINARY_INV, 21, 10 | |
| ) | |
| # B. Create SHRUNK Ellipse Mask | |
| ellipse_mask = np.zeros((ch, cw), dtype=np.uint8) | |
| center = (cw // 2, ch // 2) | |
| # Shrink axes by padding to avoid touching bubble borders | |
| axes = (max(1, cw // 2 - ellipse_padding), max(1, ch // 2 - ellipse_padding)) | |
| cv2.ellipse(ellipse_mask, center, axes, 0, 0, 360, 255, -1) | |
| # C. Combine: Mask ONLY text that is INSIDE the ellipse | |
| final_mask = cv2.bitwise_and(binary_text, ellipse_mask) | |
| # D. Dilate to catch anti-aliasing | |
| kernel = np.ones((5,5), np.uint8) | |
| final_mask = cv2.dilate(final_mask, kernel, iterations=1) | |
| # E. Run OpenCV Inpainting | |
| cleaned_crop = cv2.inpaint(crop, final_mask, inpaint_radius, cv2.INPAINT_TELEA) | |
| # Paste back | |
| final_image[y1:y2, x1:x2] = cleaned_crop | |
| # --- STRATEGY 2: FREE TEXT (LaMa + Rectangle) --- | |
| elif label == 'text_free': | |
| cv2.rectangle(lama_mask, (x1, y1), (x2, y2), 255, -1) | |
| has_lama_work = True | |
| # Run LaMa batch for all free text found | |
| if has_lama_work: | |
| # Dilate LaMa mask slightly | |
| lama_kernel = np.ones((5, 5), np.uint8) | |
| lama_mask = cv2.dilate(lama_mask, lama_kernel, iterations=1) | |
| img_pil = Image.fromarray(cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB)) | |
| mask_pil = Image.fromarray(lama_mask) | |
| try: | |
| # 1. Run Model | |
| cleaned_pil = self.lama(img_pil, mask_pil) | |
| cleaned_lama = cv2.cvtColor(np.array(cleaned_pil), cv2.COLOR_RGB2BGR) | |
| # 2. Resize fix (LaMa padding issue) | |
| if cleaned_lama.shape[:2] != (h, w): | |
| cleaned_lama = cv2.resize(cleaned_lama, (w, h)) | |
| # 3. Merge LaMa result | |
| final_image = np.where(lama_mask[:, :, None] == 255, cleaned_lama, final_image) | |
| except Exception as e: | |
| print(f" ⚠ LaMa failed: {e}") | |
| return final_image | |
| def typeset(self, original_image, manga_data, output_path): | |
| working_img = self.clean_page(original_image, manga_data) | |
| # 2. Text Drawing with adaptive sizing and outlines | |
| img_pil = Image.fromarray(cv2.cvtColor(working_img, cv2.COLOR_BGR2RGB)) | |
| draw = ImageDraw.Draw(img_pil) | |
| for entry in manga_data: | |
| x1, y1, x2, y2 = entry['bbox'] | |
| text = entry.get('translated_text', '') | |
| if not text: continue | |
| # Calculate optimal font size for this bubble | |
| font_size, wrapped_text = self._calculate_optimal_font_size( | |
| text, entry['bbox'] | |
| ) | |
| font = self._get_font(font_size) | |
| # Get text dimensions | |
| left, top, right, bottom = draw.multiline_textbbox( | |
| (0, 0), wrapped_text, font=font, align="center" | |
| ) | |
| text_w, text_h = right - left, bottom - top | |
| # Center text | |
| text_x = x1 + ((x2 - x1) - text_w) / 2 | |
| text_y = y1 + ((y2 - y1) - text_h) / 2 | |
| # Draw with outline for readability | |
| self._draw_text_with_outline( | |
| draw, (text_x, text_y), wrapped_text, font, | |
| text_color="black", outline_color="white", | |
| outline_width=2, align="center", spacing=2 | |
| ) | |
| final_img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(output_path, final_img) | |
| print(f" Saved: {output_path}") | |
| def process_chapter(self, input_folder, output_folder, series_info=None, | |
| batch_size=4, selected_batches=None): | |
| """ | |
| Process manga chapter in batches for better context and efficiency | |
| """ | |
| if not os.path.exists(output_folder): | |
| os.makedirs(output_folder) | |
| valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp') | |
| files = [f for f in os.listdir(input_folder) if f.lower().endswith(valid_ext)] | |
| # Sort numerically (p1, p2, p10 instead of p1, p10, p2) | |
| files.sort(key=lambda x: int(re.search(r'\d+', x).group()) if re.search(r'\d+', x) else x) | |
| total_files = len(files) | |
| total_batches = (total_files + batch_size - 1) // batch_size | |
| # Master list to hold data for the entire chapter | |
| full_chapter_data = [] | |
| print(f"Found {total_files} images in {input_folder}") | |
| print(f"Total batches: {total_batches} (batch size: {batch_size})") | |
| if selected_batches: | |
| print(f"Processing selected batches: {selected_batches}") | |
| else: | |
| print(f"Processing all batches\n") | |
| # Process in batches | |
| for batch_start in range(0, total_files, batch_size): | |
| batch_num = batch_start // batch_size + 1 | |
| # Skip if not in selected batches | |
| if selected_batches and batch_num not in selected_batches: | |
| continue | |
| batch_files = files[batch_start:batch_start + batch_size] | |
| print(f"=== Batch {batch_num}/{total_batches} ({len(batch_files)} pages) ===") | |
| # Collect all data for this batch | |
| batch_data = [] | |
| batch_images = [] | |
| temp_crop_dir = os.path.join(output_folder, "temp_crops") | |
| for idx, filename in enumerate(batch_files): | |
| page_num = batch_start + idx + 1 | |
| print(f" [{page_num}/{total_files}] Detecting bubbles in {filename}...") | |
| input_path = os.path.join(input_folder, filename) | |
| page_id = f"p{page_num:03d}" | |
| try: | |
| img, data = self.detect_and_process(input_path, output_dir=temp_crop_dir, page_id=page_id) | |
| if data: | |
| print(f" Running OCR on {len(data)} bubbles...") | |
| data = self.run_ocr(data) | |
| batch_data.extend(data) | |
| else: | |
| print(f" No bubbles detected") | |
| batch_images.append((filename, img, page_id)) | |
| except Exception as e: | |
| print(f" Error processing {filename}: {e}") | |
| continue | |
| # Translate entire batch at once for context | |
| if batch_data: | |
| print(f" Translating {len(batch_data)} bubbles from batch...") | |
| batch_data = self.translate_batch(batch_data, series_info=series_info) | |
| # Add this batch's completed data to the master list | |
| full_chapter_data.extend(batch_data) | |
| # Typeset each page | |
| print(f" Typesetting pages...") | |
| for filename, img, page_id in batch_images: | |
| output_path = os.path.join(output_folder, filename) | |
| # Filter data for this specific page | |
| page_data = [d for d in batch_data if d.get('page_id') == page_id] | |
| try: | |
| self.typeset(img, page_data, output_path) | |
| except Exception as e: | |
| print(f" Error typesetting {filename}: {e}") | |
| print() # Empty line between batches | |
| # --- NEW LOGIC: Save JSON if debug is ON --- | |
| if self.debug and full_chapter_data: | |
| json_filename = f"chapter_data.json" | |
| json_path = os.path.join(output_folder, json_filename) | |
| try: | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(full_chapter_data, f, ensure_ascii=False, indent=2) | |
| print(f" [DEBUG] Saved full chapter data to: {json_filename}") | |
| except Exception as e: | |
| print(f" [DEBUG] Failed to save JSON: {e}") | |
| print(f"\n✓ Chapter processing complete! Output saved to: {output_folder}") | |
| def process_single_image(self, image_path, output_path, series_info=None): | |
| """ | |
| Runs the full pipeline on a SINGLE image file. | |
| Perfect for demos or testing one page. | |
| """ | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image not found: {image_path}") | |
| print(f"=== Processing Single Page: {os.path.basename(image_path)} ===") | |
| # 1. Setup a temp folder for the bubble crops (required for OCR) | |
| # We use a fixed folder name for the demo to keep it clean | |
| temp_crop_dir = "temp_demo_crops" | |
| if not os.path.exists(temp_crop_dir): | |
| os.makedirs(temp_crop_dir) | |
| # 2. DETECT | |
| # We use a generic ID 'demo' since we don't have page numbers | |
| print("1. Detecting Bubbles...") | |
| original_img, data = self.detect_and_process( | |
| image_path, | |
| output_dir=temp_crop_dir, | |
| page_id="demo" | |
| ) | |
| if not data: | |
| print(" ⚠ No bubbles found! Saving original image...") | |
| cv2.imwrite(output_path, original_img) | |
| return | |
| # 3. OCR | |
| print(f"2. Running OCR on {len(data)} bubbles...") | |
| data = self.run_ocr(data) | |
| # 4. TRANSLATE | |
| print("3. Translating text...") | |
| # We reuse translate_batch because it handles the logic perfectly, | |
| # even if the "batch" is just bubbles from one page. | |
| data = self.translate_batch(data, series_info=series_info) | |
| # 5. TYPESET (Clean + Draw) | |
| print("4. Typesetting (Cleaning & Drawing)...") | |
| # Ensure output directory exists | |
| out_dir = os.path.dirname(output_path) | |
| if out_dir and not os.path.exists(out_dir): | |
| os.makedirs(out_dir) | |
| self.typeset(original_img, data, output_path) | |
| print(f"✅ Success! Saved to: {output_path}") | |
| # Optional: Return the data if you want to inspect JSON in the demo | |
| return data | |
| if __name__ == "__main__": | |
| # 1. Define Translation Dictionary (Optional but good for names) | |
| custom_translations = { | |
| "ルーグ": "Lugh", | |
| "トウアハーデ": "Tuatha Dé", | |
| "ディア": "Dia", | |
| "タルト": "Tarte", | |
| } | |
| # 2. Initialize the Class | |
| # Note: We removed 'ollama_model' and added 'translation_model' | |
| translator = MangaTranslator( | |
| yolo_model_path='comic-speech-bubble-detector.pt', | |
| translation_model="LiquidAI/LFM2-350M-ENJP-MT", | |
| font_path="font.ttf", | |
| custom_translations=custom_translations, | |
| debug=True # Keeps the JSON file for debugging | |
| ) | |
| # 3. Define Context (Important for tone, even with small models) | |
| # 4. Run the Single Page Demo | |
| # Ensure you have 'raw_images/001.jpg' inside your project folder | |
| input_file = "chapter_401/001.jpg" | |
| output_file = "output/001_translated.jpg" | |
| if os.path.exists(input_file): | |
| print(f"🚀 Starting Demo on {input_file}...") | |
| translator.process_single_image( | |
| image_path=input_file, | |
| output_path=output_file, | |
| series_info=None | |
| ) | |
| print(f"✨ Demo Complete! Check {output_file}") | |
| else: | |
| print(f"❌ Error: Could not find {input_file}. Please check your folder structure.") |