import os import json import cv2 import numpy as np import pyphen import re import torch from PIL import Image, ImageDraw, ImageFont import transformers.modeling_utils import transformers.utils.import_utils from ultralytics import YOLO from manga_ocr import MangaOcr from transformers import AutoModelForCausalLM, AutoTokenizer from simple_lama_inpainting import SimpleLama import PIL.Image class MangaTranslator: def __init__(self, yolo_model_path='comic_yolov8m.pt', translation_model="LiquidAI/LFM2.5-1.2B-Instruct", font_path="font.ttf", custom_translations=None, keep_honorifics=True, debug=True): print("Loading YOLO model...") self.yolo_model = YOLO(yolo_model_path) self.font_path = font_path print("Loading LaMa Inpainting model...") self.lama = SimpleLama() print("Loading MangaOCR model...") self.mocr = MangaOcr() # --- LIQUID AI SETUP (Updated) --- print(f"Loading Translation Model ({translation_model})...") self.device = "cuda" if torch.cuda.is_available() else "cpu" # 1. Load Tokenizer self.tokenizer = AutoTokenizer.from_pretrained( translation_model, trust_remote_code=True # Required for Liquid architectures ) # 2. Load Model with Trust Remote Code self.trans_model = AutoModelForCausalLM.from_pretrained( translation_model, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, device_map=self.device, trust_remote_code=True # Required for Liquid architectures ) self.trans_model.eval() # ----------------------- self.dic = pyphen.Pyphen(lang='en') self.font_cache = {} self.custom_translations = custom_translations or {} self.keep_honorifics = keep_honorifics self.honorifics = ['san', 'chan', 'kun', 'sama', 'senpai', 'sensei', 'dono', 'tan'] # For romanization fallback try: import pykakasi self.kakasi = pykakasi.kakasi() except ImportError: print("Warning: pykakasi not installed. Install with 'pip install pykakasi' for romanization support.") self.kakasi = None def _get_font(self, size): """Cache fonts to avoid repeated loading""" if size not in self.font_cache: try: self.font_cache[size] = ImageFont.truetype(self.font_path, size) except IOError: self.font_cache[size] = ImageFont.load_default() return self.font_cache[size] def _sort_bubbles(self, bubbles, row_threshold=50): bubbles.sort(key=lambda b: b[1]) sorted_bubbles = [] if not bubbles: return sorted_bubbles current_row = [bubbles[0]] for i in range(1, len(bubbles)): if abs(bubbles[i][1] - current_row[-1][1]) < row_threshold: current_row.append(bubbles[i]) else: current_row.sort(key=lambda b: b[2], reverse=True) sorted_bubbles.extend(current_row) current_row = [bubbles[i]] current_row.sort(key=lambda b: b[2], reverse=True) sorted_bubbles.extend(current_row) return sorted_bubbles def _wrap_text_dynamic(self, text, font, max_width): words = text.split() lines = [] current_line = [] current_width = 0 space_width = font.getlength(" ") for word in words: word_width = font.getlength(word) potential_width = current_width + word_width + (space_width if current_line else 0) if potential_width <= max_width: current_line.append(word) current_width = potential_width else: splits = list(self.dic.iterate(word)) found_split = False for start, end in reversed(splits): chunk = start + "-" chunk_width = font.getlength(chunk) if current_width + chunk_width + (space_width if current_line else 0) <= max_width: current_line.append(chunk) lines.append(" ".join(current_line)) current_line = [end] current_width = font.getlength(end) found_split = True break if not found_split: if current_line: lines.append(" ".join(current_line)) current_line = [word] current_width = word_width if current_line: lines.append(" ".join(current_line)) return "\n".join(lines) def _smart_clean_bubble(self, img, bbox): """ Gaussian blur-based cleaning for transparent effect """ x1, y1, x2, y2 = bbox # Ensure coordinates are within image bounds h, w = img.shape[:2] x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(w, x2), min(h, y2) if x2 <= x1 or y2 <= y1: return img # Extract bubble region bubble_region = img[y1:y2, x1:x2].copy() if bubble_region.size == 0: return img # Apply Gaussian blur for softer look blurred = cv2.GaussianBlur(bubble_region, (21, 21), 0) # Brighten the blurred region slightly brightened = cv2.addWeighted(blurred, 0.7, np.ones_like(blurred) * 255, 0.3, 0) # Place back into image img[y1:y2, x1:x2] = brightened return img def _preserve_honorifics(self, original_text, translated_text): """ Detect and preserve Japanese honorifics in romaji form. Examples: さん→-san, ちゃん→-chan, 君→-kun, 様→-sama """ if not self.keep_honorifics or not self.kakasi: return translated_text # Common honorific patterns in Japanese honorific_map = { 'さん': '-san', 'ちゃん': '-chan', 'くん': '-kun', '君': '-kun', '様': '-sama', 'さま': '-sama', '先輩': '-senpai', 'せんぱい': '-senpai', '先生': '-sensei', 'せんせい': '-sensei', '殿': '-dono', 'どの': '-dono', 'たん': '-tan', } # Find honorifics in original text found_honorifics = [] for jp_hon, rom_hon in honorific_map.items(): if jp_hon in original_text: found_honorifics.append(rom_hon) # If we found honorifics, try to add them back to names in translation if found_honorifics: # Split into words and check last word for potential name words = translated_text.split() if len(words) >= 1: # Check if translation already has honorific last_word = words[-1].lower() has_honorific = any(hon.strip('-') in last_word for hon in self.honorifics) if not has_honorific and found_honorifics: # Add the first found honorific to what's likely a name # Look for capitalized words (likely names) for i in range(len(words) - 1, -1, -1): if words[i] and words[i][0].isupper(): # Add honorific to this name words[i] = words[i] + found_honorifics[0] translated_text = ' '.join(words) break return translated_text def _draw_text_with_outline(self, draw, position, text, font, text_color="black", outline_color="white", outline_width=2, **kwargs): """ Draw text with outline for better readability """ x, y = position # Draw outline for adj_x in range(-outline_width, outline_width + 1): for adj_y in range(-outline_width, outline_width + 1): if adj_x != 0 or adj_y != 0: draw.multiline_text((x + adj_x, y + adj_y), text, fill=outline_color, font=font, **kwargs) # Draw main text draw.multiline_text(position, text, fill=text_color, font=font, **kwargs) def _calculate_optimal_font_size(self, text, bbox, min_size=12, max_size=36): x1, y1, x2, y2 = bbox box_width = x2 - x1 box_height = y2 - y1 # --- NEW LOGIC: DETECT VERTICAL BUBBLES --- # If height is 1.5x bigger than width, it's a vertical speech bubble. is_vertical = box_height > (box_width * 1.5) # If vertical, force text to use only 60% of width (makes a column) # If horizontal, use 90% of width (standard) target_width_ratio = 0.6 if is_vertical else 0.9 # Start with max size and reduce until text fits for size in range(max_size, min_size - 1, -1): font = self._get_font(size) # Use the calculated target width max_line_width = int(box_width * target_width_ratio) wrapped = self._wrap_text_dynamic(text, font, max_line_width) # Measure resulting text block temp_draw = ImageDraw.Draw(Image.new('RGB', (1, 1))) left, top, right, bottom = temp_draw.multiline_textbbox( (0, 0), wrapped, font=font, align="center" ) text_width = right - left text_height = bottom - top # Check fit (Height is the main constraint) if text_height < (box_height - 10): # Secondary check: If vertical, ensure we didn't accidentally # make it too wide (overflowing the sides) if text_width < (box_width - 4): return size, wrapped # Fallback: Minimum size font = self._get_font(min_size) max_line_width = int(box_width * target_width_ratio) wrapped = self._wrap_text_dynamic(text, font, max_line_width) return min_size, wrapped def _has_japanese_characters(self, text): """Check if text contains Japanese characters""" japanese_ranges = [ (0x3040, 0x309F), # Hiragana (0x30A0, 0x30FF), # Katakana (0x4E00, 0x9FFF), # Kanji ] for char in text: code = ord(char) for start, end in japanese_ranges: if start <= code <= end: return True return False def _romanize_japanese(self, text): """Convert Japanese text to romaji""" if not self.kakasi: return text try: result = self.kakasi.convert(text) return ''.join([item['hepburn'] for item in result]) except Exception as e: print(f" Romanization error: {e}") return text def _apply_custom_translations(self, text): """Apply custom character name translations""" for jp_term, en_term in self.custom_translations.items(): text = text.replace(jp_term, en_term) return text def detect_and_process(self, image_path, output_dir="crops", page_id="", conf_threshold=0.15): image = cv2.imread(image_path) if image is None: raise ValueError(f"Not found: {image_path}") # 1. Run Prediction results = self.yolo_model.predict(source=image, conf=conf_threshold, save=False, verbose=False) # Get the class names dictionary (e.g., {0: 'text', 1: 'bubble'}) class_names = results[0].names # 2. Extract Boxes AND Classes detections = [] for box in results[0].boxes: xyxy = list(map(int, box.xyxy[0].tolist())) cls_id = int(box.cls[0]) label = class_names[cls_id] # e.g., "text" or "bubble" or "face" # Filter: We only care about text/bubbles, not faces/bodies if your model detects them if label in ['face', 'body']: continue detections.append({ "bbox": xyxy, "label": label }) # Sort (top to bottom, right to left for manga) # Note: We need a custom sort function since detections is now a dict, not just a list of boxes detections = sorted(detections, key=lambda x: (x['bbox'][1], -x['bbox'][0])) if not os.path.exists(output_dir): os.makedirs(output_dir) manga_data = [] for i, det in enumerate(detections): x_min, y_min, x_max, y_max = det['bbox'] # ... (Cropping logic stays the same) ... crop = image[y_min:y_max, x_min:x_max] # Save crop crop_filename = f"bubble_{page_id}_{i+1}.png" crop_path = os.path.join(output_dir, crop_filename) cv2.imwrite(crop_path, crop) manga_data.append({ "id": f"{page_id}_{i+1}", "page_id": page_id, "bbox": [x_min, y_min, x_max, y_max], "label": det['label'], "crop_path": crop_path, "original_text": "", "translated_text": "" }) return image, manga_data def run_ocr(self, manga_data): for entry in manga_data: crop_path = entry['crop_path'] japanese_text = self.mocr(crop_path) # Apply custom translations to original text japanese_text = self._apply_custom_translations(japanese_text) entry['original_text'] = japanese_text.replace('\n', '') return manga_data def _translate_single_bubble(self, text, series_info=None): """Translate a single bubble (fallback method)""" context_str = "" if series_info: context_str = f""" Context: {series_info.get('title', '')} - {series_info.get('tags', '')} """ prompt = f"""{context_str}Translate this Japanese manga text to natural English. Return ONLY the English translation, nothing else: {text}""" try: response = self.llm.invoke(prompt) translation = response.content.strip() # Remove common wrapper phrases translation = re.sub(r'^(Here\'s the translation:|Translation:|English:)\s*', '', translation, flags=re.IGNORECASE) translation = translation.strip('"\'') return translation except Exception as e: print(f" Translation error: {e}") return "[Translation Error]" def translate_batch(self, manga_data, series_info=None): """ Minimalist translation loop for LiquidAI LFM2-350M. REMOVED: Context injection (to prevent hallucinations). INCLUDED: Fix for Dictionary vs Tensor inputs. """ print(f"Translating {len(manga_data)} bubbles with LiquidAI...") # Strict System Prompt (Required by Model Card) system_prompt = "Translate to Thai." for entry in manga_data: text = entry.get('original_text', '').strip() if not text: continue # Skip punctuation-only bubbles if len(text) < 2 and text in "!?.…": entry['translated_text'] = text continue # --- NO CONTEXT, JUST TEXT --- messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text} # Raw text only ] # 1. Apply Template inputs = self.tokenizer.apply_chat_template( messages, add_generation_prompt=True, return_tensors="pt" ) # 2. Handle Dict vs Tensor (LiquidAI Quirks) if isinstance(inputs, dict) or hasattr(inputs, "keys"): inputs = inputs.to(self.device) generate_kwargs = inputs input_length = inputs["input_ids"].shape[1] else: inputs = inputs.to(self.device) generate_kwargs = {"input_ids": inputs} input_length = inputs.shape[1] # 3. Generate with torch.no_grad(): output_ids = self.trans_model.generate( **generate_kwargs, max_new_tokens=128, temperature=0.5, top_p=1.0, repetition_penalty=1.05, do_sample=True ) # 4. Decode translated_text = self.tokenizer.decode( output_ids[0][input_length:], skip_special_tokens=True ).strip() entry['translated_text'] = translated_text print(f" JP: {text[:15]}... -> EN: {translated_text}") return manga_data def clean_page(self, original_image, page_data, ellipse_padding=8, inpaint_radius=5): """ Strict Hybrid Cleaning: - text_bubble -> OpenCV Inpainting inside a shrunk Ellipse mask (Preserves tails) - text_free -> LaMa Inpainting on full Rectangle mask (Redraws background) """ final_image = original_image.copy() h, w = original_image.shape[:2] # Mask for LaMa (Accumulates all 'text_free' areas) lama_mask = np.zeros((h, w), dtype=np.uint8) has_lama_work = False for entry in page_data: # Skip if no translation (optional, but good for speed) if not entry.get('translated_text'): continue bbox = entry['bbox'] label = entry.get('label', 'text_free') x1, y1, x2, y2 = bbox # Clamp coordinates x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(w, x2), min(h, y2) # Extract crop for analysis crop = final_image[y1:y2, x1:x2] if crop.size == 0: continue gray_crop = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) # --- STRATEGY 1: SPEECH BUBBLES (OpenCV + Shrunk Ellipse) --- if label == 'text_bubble': ch, cw = crop.shape[:2] # A. Find the text pixels (dark ink) binary_text = cv2.adaptiveThreshold( gray_crop, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 21, 10 ) # B. Create SHRUNK Ellipse Mask ellipse_mask = np.zeros((ch, cw), dtype=np.uint8) center = (cw // 2, ch // 2) # Shrink axes by padding to avoid touching bubble borders axes = (max(1, cw // 2 - ellipse_padding), max(1, ch // 2 - ellipse_padding)) cv2.ellipse(ellipse_mask, center, axes, 0, 0, 360, 255, -1) # C. Combine: Mask ONLY text that is INSIDE the ellipse final_mask = cv2.bitwise_and(binary_text, ellipse_mask) # D. Dilate to catch anti-aliasing kernel = np.ones((5,5), np.uint8) final_mask = cv2.dilate(final_mask, kernel, iterations=1) # E. Run OpenCV Inpainting cleaned_crop = cv2.inpaint(crop, final_mask, inpaint_radius, cv2.INPAINT_TELEA) # Paste back final_image[y1:y2, x1:x2] = cleaned_crop # --- STRATEGY 2: FREE TEXT (LaMa + Rectangle) --- elif label == 'text_free': cv2.rectangle(lama_mask, (x1, y1), (x2, y2), 255, -1) has_lama_work = True # Run LaMa batch for all free text found if has_lama_work: # Dilate LaMa mask slightly lama_kernel = np.ones((5, 5), np.uint8) lama_mask = cv2.dilate(lama_mask, lama_kernel, iterations=1) img_pil = Image.fromarray(cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB)) mask_pil = Image.fromarray(lama_mask) try: # 1. Run Model cleaned_pil = self.lama(img_pil, mask_pil) cleaned_lama = cv2.cvtColor(np.array(cleaned_pil), cv2.COLOR_RGB2BGR) # 2. Resize fix (LaMa padding issue) if cleaned_lama.shape[:2] != (h, w): cleaned_lama = cv2.resize(cleaned_lama, (w, h)) # 3. Merge LaMa result final_image = np.where(lama_mask[:, :, None] == 255, cleaned_lama, final_image) except Exception as e: print(f" ⚠ LaMa failed: {e}") return final_image def typeset(self, original_image, manga_data, output_path): working_img = self.clean_page(original_image, manga_data) # 2. Text Drawing with adaptive sizing and outlines img_pil = Image.fromarray(cv2.cvtColor(working_img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) for entry in manga_data: x1, y1, x2, y2 = entry['bbox'] text = entry.get('translated_text', '') if not text: continue # Calculate optimal font size for this bubble font_size, wrapped_text = self._calculate_optimal_font_size( text, entry['bbox'] ) font = self._get_font(font_size) # Get text dimensions left, top, right, bottom = draw.multiline_textbbox( (0, 0), wrapped_text, font=font, align="center" ) text_w, text_h = right - left, bottom - top # Center text text_x = x1 + ((x2 - x1) - text_w) / 2 text_y = y1 + ((y2 - y1) - text_h) / 2 # Draw with outline for readability self._draw_text_with_outline( draw, (text_x, text_y), wrapped_text, font, text_color="black", outline_color="white", outline_width=2, align="center", spacing=2 ) final_img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) cv2.imwrite(output_path, final_img) print(f" Saved: {output_path}") def process_chapter(self, input_folder, output_folder, series_info=None, batch_size=4, selected_batches=None): """ Process manga chapter in batches for better context and efficiency """ if not os.path.exists(output_folder): os.makedirs(output_folder) valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp') files = [f for f in os.listdir(input_folder) if f.lower().endswith(valid_ext)] # Sort numerically (p1, p2, p10 instead of p1, p10, p2) files.sort(key=lambda x: int(re.search(r'\d+', x).group()) if re.search(r'\d+', x) else x) total_files = len(files) total_batches = (total_files + batch_size - 1) // batch_size # Master list to hold data for the entire chapter full_chapter_data = [] print(f"Found {total_files} images in {input_folder}") print(f"Total batches: {total_batches} (batch size: {batch_size})") if selected_batches: print(f"Processing selected batches: {selected_batches}") else: print(f"Processing all batches\n") # Process in batches for batch_start in range(0, total_files, batch_size): batch_num = batch_start // batch_size + 1 # Skip if not in selected batches if selected_batches and batch_num not in selected_batches: continue batch_files = files[batch_start:batch_start + batch_size] print(f"=== Batch {batch_num}/{total_batches} ({len(batch_files)} pages) ===") # Collect all data for this batch batch_data = [] batch_images = [] temp_crop_dir = os.path.join(output_folder, "temp_crops") for idx, filename in enumerate(batch_files): page_num = batch_start + idx + 1 print(f" [{page_num}/{total_files}] Detecting bubbles in {filename}...") input_path = os.path.join(input_folder, filename) page_id = f"p{page_num:03d}" try: img, data = self.detect_and_process(input_path, output_dir=temp_crop_dir, page_id=page_id) if data: print(f" Running OCR on {len(data)} bubbles...") data = self.run_ocr(data) batch_data.extend(data) else: print(f" No bubbles detected") batch_images.append((filename, img, page_id)) except Exception as e: print(f" Error processing {filename}: {e}") continue # Translate entire batch at once for context if batch_data: print(f" Translating {len(batch_data)} bubbles from batch...") batch_data = self.translate_batch(batch_data, series_info=series_info) # Add this batch's completed data to the master list full_chapter_data.extend(batch_data) # Typeset each page print(f" Typesetting pages...") for filename, img, page_id in batch_images: output_path = os.path.join(output_folder, filename) # Filter data for this specific page page_data = [d for d in batch_data if d.get('page_id') == page_id] try: self.typeset(img, page_data, output_path) except Exception as e: print(f" Error typesetting {filename}: {e}") print() # Empty line between batches # --- NEW LOGIC: Save JSON if debug is ON --- if self.debug and full_chapter_data: json_filename = f"chapter_data.json" json_path = os.path.join(output_folder, json_filename) try: with open(json_path, 'w', encoding='utf-8') as f: json.dump(full_chapter_data, f, ensure_ascii=False, indent=2) print(f" [DEBUG] Saved full chapter data to: {json_filename}") except Exception as e: print(f" [DEBUG] Failed to save JSON: {e}") print(f"\n✓ Chapter processing complete! Output saved to: {output_folder}") def process_single_image(self, image_path, output_path, series_info=None): """ Runs the full pipeline on a SINGLE image file. Perfect for demos or testing one page. """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image not found: {image_path}") print(f"=== Processing Single Page: {os.path.basename(image_path)} ===") # 1. Setup a temp folder for the bubble crops (required for OCR) # We use a fixed folder name for the demo to keep it clean temp_crop_dir = "temp_demo_crops" if not os.path.exists(temp_crop_dir): os.makedirs(temp_crop_dir) # 2. DETECT # We use a generic ID 'demo' since we don't have page numbers print("1. Detecting Bubbles...") original_img, data = self.detect_and_process( image_path, output_dir=temp_crop_dir, page_id="demo" ) if not data: print(" ⚠ No bubbles found! Saving original image...") cv2.imwrite(output_path, original_img) return # 3. OCR print(f"2. Running OCR on {len(data)} bubbles...") data = self.run_ocr(data) # 4. TRANSLATE print("3. Translating text...") # We reuse translate_batch because it handles the logic perfectly, # even if the "batch" is just bubbles from one page. data = self.translate_batch(data, series_info=series_info) # 5. TYPESET (Clean + Draw) print("4. Typesetting (Cleaning & Drawing)...") # Ensure output directory exists out_dir = os.path.dirname(output_path) if out_dir and not os.path.exists(out_dir): os.makedirs(out_dir) self.typeset(original_img, data, output_path) print(f"✅ Success! Saved to: {output_path}") # Optional: Return the data if you want to inspect JSON in the demo return data if __name__ == "__main__": # 1. Define Translation Dictionary (Optional but good for names) custom_translations = { "ルーグ": "Lugh", "トウアハーデ": "Tuatha Dé", "ディア": "Dia", "タルト": "Tarte", } # 2. Initialize the Class # Note: We removed 'ollama_model' and added 'translation_model' translator = MangaTranslator( yolo_model_path='comic-speech-bubble-detector.pt', translation_model="LiquidAI/LFM2-350M-ENJP-MT", font_path="font.ttf", custom_translations=custom_translations, debug=True # Keeps the JSON file for debugging ) # 3. Define Context (Important for tone, even with small models) # 4. Run the Single Page Demo # Ensure you have 'raw_images/001.jpg' inside your project folder input_file = "chapter_401/001.jpg" output_file = "output/001_translated.jpg" if os.path.exists(input_file): print(f"🚀 Starting Demo on {input_file}...") translator.process_single_image( image_path=input_file, output_path=output_file, series_info=None ) print(f"✨ Demo Complete! Check {output_file}") else: print(f"❌ Error: Could not find {input_file}. Please check your folder structure.")