BookTester / MangaTranslator.py
eoeooe's picture
Update MangaTranslator.py
cdfe812 verified
import os
import json
import cv2
import numpy as np
import pyphen
import re
import torch
from PIL import Image, ImageDraw, ImageFont
import transformers.modeling_utils
import transformers.utils.import_utils
from ultralytics import YOLO
from manga_ocr import MangaOcr
from transformers import AutoModelForCausalLM, AutoTokenizer
from simple_lama_inpainting import SimpleLama
import PIL.Image
class MangaTranslator:
def __init__(self, yolo_model_path='comic_yolov8m.pt',
translation_model="LiquidAI/LFM2.5-1.2B-Instruct",
font_path="font.ttf", custom_translations=None, keep_honorifics=True, debug=True):
print("Loading YOLO model...")
self.yolo_model = YOLO(yolo_model_path)
self.font_path = font_path
print("Loading LaMa Inpainting model...")
self.lama = SimpleLama()
print("Loading MangaOCR model...")
self.mocr = MangaOcr()
# --- LIQUID AI SETUP (Updated) ---
print(f"Loading Translation Model ({translation_model})...")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# 1. Load Tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
translation_model,
trust_remote_code=True # Required for Liquid architectures
)
# 2. Load Model with Trust Remote Code
self.trans_model = AutoModelForCausalLM.from_pretrained(
translation_model,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map=self.device,
trust_remote_code=True # Required for Liquid architectures
)
self.trans_model.eval()
# -----------------------
self.dic = pyphen.Pyphen(lang='en')
self.font_cache = {}
self.custom_translations = custom_translations or {}
self.keep_honorifics = keep_honorifics
self.honorifics = ['san', 'chan', 'kun', 'sama', 'senpai', 'sensei', 'dono', 'tan']
# For romanization fallback
try:
import pykakasi
self.kakasi = pykakasi.kakasi()
except ImportError:
print("Warning: pykakasi not installed. Install with 'pip install pykakasi' for romanization support.")
self.kakasi = None
def _get_font(self, size):
"""Cache fonts to avoid repeated loading"""
if size not in self.font_cache:
try:
self.font_cache[size] = ImageFont.truetype(self.font_path, size)
except IOError:
self.font_cache[size] = ImageFont.load_default()
return self.font_cache[size]
def _sort_bubbles(self, bubbles, row_threshold=50):
bubbles.sort(key=lambda b: b[1])
sorted_bubbles = []
if not bubbles:
return sorted_bubbles
current_row = [bubbles[0]]
for i in range(1, len(bubbles)):
if abs(bubbles[i][1] - current_row[-1][1]) < row_threshold:
current_row.append(bubbles[i])
else:
current_row.sort(key=lambda b: b[2], reverse=True)
sorted_bubbles.extend(current_row)
current_row = [bubbles[i]]
current_row.sort(key=lambda b: b[2], reverse=True)
sorted_bubbles.extend(current_row)
return sorted_bubbles
def _wrap_text_dynamic(self, text, font, max_width):
words = text.split()
lines = []
current_line = []
current_width = 0
space_width = font.getlength(" ")
for word in words:
word_width = font.getlength(word)
potential_width = current_width + word_width + (space_width if current_line else 0)
if potential_width <= max_width:
current_line.append(word)
current_width = potential_width
else:
splits = list(self.dic.iterate(word))
found_split = False
for start, end in reversed(splits):
chunk = start + "-"
chunk_width = font.getlength(chunk)
if current_width + chunk_width + (space_width if current_line else 0) <= max_width:
current_line.append(chunk)
lines.append(" ".join(current_line))
current_line = [end]
current_width = font.getlength(end)
found_split = True
break
if not found_split:
if current_line:
lines.append(" ".join(current_line))
current_line = [word]
current_width = word_width
if current_line:
lines.append(" ".join(current_line))
return "\n".join(lines)
def _smart_clean_bubble(self, img, bbox):
"""
Gaussian blur-based cleaning for transparent effect
"""
x1, y1, x2, y2 = bbox
# Ensure coordinates are within image bounds
h, w = img.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 <= x1 or y2 <= y1:
return img
# Extract bubble region
bubble_region = img[y1:y2, x1:x2].copy()
if bubble_region.size == 0:
return img
# Apply Gaussian blur for softer look
blurred = cv2.GaussianBlur(bubble_region, (21, 21), 0)
# Brighten the blurred region slightly
brightened = cv2.addWeighted(blurred, 0.7,
np.ones_like(blurred) * 255, 0.3, 0)
# Place back into image
img[y1:y2, x1:x2] = brightened
return img
def _preserve_honorifics(self, original_text, translated_text):
"""
Detect and preserve Japanese honorifics in romaji form.
Examples: さん→-san, ちゃん→-chan, 君→-kun, 様→-sama
"""
if not self.keep_honorifics or not self.kakasi:
return translated_text
# Common honorific patterns in Japanese
honorific_map = {
'さん': '-san',
'ちゃん': '-chan',
'くん': '-kun',
'君': '-kun',
'様': '-sama',
'さま': '-sama',
'先輩': '-senpai',
'せんぱい': '-senpai',
'先生': '-sensei',
'せんせい': '-sensei',
'殿': '-dono',
'どの': '-dono',
'たん': '-tan',
}
# Find honorifics in original text
found_honorifics = []
for jp_hon, rom_hon in honorific_map.items():
if jp_hon in original_text:
found_honorifics.append(rom_hon)
# If we found honorifics, try to add them back to names in translation
if found_honorifics:
# Split into words and check last word for potential name
words = translated_text.split()
if len(words) >= 1:
# Check if translation already has honorific
last_word = words[-1].lower()
has_honorific = any(hon.strip('-') in last_word for hon in self.honorifics)
if not has_honorific and found_honorifics:
# Add the first found honorific to what's likely a name
# Look for capitalized words (likely names)
for i in range(len(words) - 1, -1, -1):
if words[i] and words[i][0].isupper():
# Add honorific to this name
words[i] = words[i] + found_honorifics[0]
translated_text = ' '.join(words)
break
return translated_text
def _draw_text_with_outline(self, draw, position, text, font,
text_color="black", outline_color="white",
outline_width=2, **kwargs):
"""
Draw text with outline for better readability
"""
x, y = position
# Draw outline
for adj_x in range(-outline_width, outline_width + 1):
for adj_y in range(-outline_width, outline_width + 1):
if adj_x != 0 or adj_y != 0:
draw.multiline_text((x + adj_x, y + adj_y), text,
fill=outline_color, font=font, **kwargs)
# Draw main text
draw.multiline_text(position, text, fill=text_color, font=font, **kwargs)
def _calculate_optimal_font_size(self, text, bbox, min_size=12, max_size=36):
x1, y1, x2, y2 = bbox
box_width = x2 - x1
box_height = y2 - y1
# --- NEW LOGIC: DETECT VERTICAL BUBBLES ---
# If height is 1.5x bigger than width, it's a vertical speech bubble.
is_vertical = box_height > (box_width * 1.5)
# If vertical, force text to use only 60% of width (makes a column)
# If horizontal, use 90% of width (standard)
target_width_ratio = 0.6 if is_vertical else 0.9
# Start with max size and reduce until text fits
for size in range(max_size, min_size - 1, -1):
font = self._get_font(size)
# Use the calculated target width
max_line_width = int(box_width * target_width_ratio)
wrapped = self._wrap_text_dynamic(text, font, max_line_width)
# Measure resulting text block
temp_draw = ImageDraw.Draw(Image.new('RGB', (1, 1)))
left, top, right, bottom = temp_draw.multiline_textbbox(
(0, 0), wrapped, font=font, align="center"
)
text_width = right - left
text_height = bottom - top
# Check fit (Height is the main constraint)
if text_height < (box_height - 10):
# Secondary check: If vertical, ensure we didn't accidentally
# make it too wide (overflowing the sides)
if text_width < (box_width - 4):
return size, wrapped
# Fallback: Minimum size
font = self._get_font(min_size)
max_line_width = int(box_width * target_width_ratio)
wrapped = self._wrap_text_dynamic(text, font, max_line_width)
return min_size, wrapped
def _has_japanese_characters(self, text):
"""Check if text contains Japanese characters"""
japanese_ranges = [
(0x3040, 0x309F), # Hiragana
(0x30A0, 0x30FF), # Katakana
(0x4E00, 0x9FFF), # Kanji
]
for char in text:
code = ord(char)
for start, end in japanese_ranges:
if start <= code <= end:
return True
return False
def _romanize_japanese(self, text):
"""Convert Japanese text to romaji"""
if not self.kakasi:
return text
try:
result = self.kakasi.convert(text)
return ''.join([item['hepburn'] for item in result])
except Exception as e:
print(f" Romanization error: {e}")
return text
def _apply_custom_translations(self, text):
"""Apply custom character name translations"""
for jp_term, en_term in self.custom_translations.items():
text = text.replace(jp_term, en_term)
return text
def detect_and_process(self, image_path, output_dir="crops", page_id="", conf_threshold=0.15):
image = cv2.imread(image_path)
if image is None: raise ValueError(f"Not found: {image_path}")
# 1. Run Prediction
results = self.yolo_model.predict(source=image, conf=conf_threshold, save=False, verbose=False)
# Get the class names dictionary (e.g., {0: 'text', 1: 'bubble'})
class_names = results[0].names
# 2. Extract Boxes AND Classes
detections = []
for box in results[0].boxes:
xyxy = list(map(int, box.xyxy[0].tolist()))
cls_id = int(box.cls[0])
label = class_names[cls_id] # e.g., "text" or "bubble" or "face"
# Filter: We only care about text/bubbles, not faces/bodies if your model detects them
if label in ['face', 'body']: continue
detections.append({
"bbox": xyxy,
"label": label
})
# Sort (top to bottom, right to left for manga)
# Note: We need a custom sort function since detections is now a dict, not just a list of boxes
detections = sorted(detections, key=lambda x: (x['bbox'][1], -x['bbox'][0]))
if not os.path.exists(output_dir): os.makedirs(output_dir)
manga_data = []
for i, det in enumerate(detections):
x_min, y_min, x_max, y_max = det['bbox']
# ... (Cropping logic stays the same) ...
crop = image[y_min:y_max, x_min:x_max]
# Save crop
crop_filename = f"bubble_{page_id}_{i+1}.png"
crop_path = os.path.join(output_dir, crop_filename)
cv2.imwrite(crop_path, crop)
manga_data.append({
"id": f"{page_id}_{i+1}",
"page_id": page_id,
"bbox": [x_min, y_min, x_max, y_max],
"label": det['label'],
"crop_path": crop_path,
"original_text": "",
"translated_text": ""
})
return image, manga_data
def run_ocr(self, manga_data):
for entry in manga_data:
crop_path = entry['crop_path']
japanese_text = self.mocr(crop_path)
# Apply custom translations to original text
japanese_text = self._apply_custom_translations(japanese_text)
entry['original_text'] = japanese_text.replace('\n', '')
return manga_data
def _translate_single_bubble(self, text, series_info=None):
"""Translate a single bubble (fallback method)"""
context_str = ""
if series_info:
context_str = f"""
Context: {series_info.get('title', '')} - {series_info.get('tags', '')}
"""
prompt = f"""{context_str}Translate this Japanese manga text to natural English. Return ONLY the English translation, nothing else:
{text}"""
try:
response = self.llm.invoke(prompt)
translation = response.content.strip()
# Remove common wrapper phrases
translation = re.sub(r'^(Here\'s the translation:|Translation:|English:)\s*', '', translation, flags=re.IGNORECASE)
translation = translation.strip('"\'')
return translation
except Exception as e:
print(f" Translation error: {e}")
return "[Translation Error]"
def translate_batch(self, manga_data, series_info=None):
"""
Minimalist translation loop for LiquidAI LFM2-350M.
REMOVED: Context injection (to prevent hallucinations).
INCLUDED: Fix for Dictionary vs Tensor inputs.
"""
print(f"Translating {len(manga_data)} bubbles with LiquidAI...")
# Strict System Prompt (Required by Model Card)
system_prompt = "Translate to Thai."
for entry in manga_data:
text = entry.get('original_text', '').strip()
if not text: continue
# Skip punctuation-only bubbles
if len(text) < 2 and text in "!?.…":
entry['translated_text'] = text
continue
# --- NO CONTEXT, JUST TEXT ---
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": text} # Raw text only
]
# 1. Apply Template
inputs = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt"
)
# 2. Handle Dict vs Tensor (LiquidAI Quirks)
if isinstance(inputs, dict) or hasattr(inputs, "keys"):
inputs = inputs.to(self.device)
generate_kwargs = inputs
input_length = inputs["input_ids"].shape[1]
else:
inputs = inputs.to(self.device)
generate_kwargs = {"input_ids": inputs}
input_length = inputs.shape[1]
# 3. Generate
with torch.no_grad():
output_ids = self.trans_model.generate(
**generate_kwargs,
max_new_tokens=128,
temperature=0.5,
top_p=1.0,
repetition_penalty=1.05,
do_sample=True
)
# 4. Decode
translated_text = self.tokenizer.decode(
output_ids[0][input_length:],
skip_special_tokens=True
).strip()
entry['translated_text'] = translated_text
print(f" JP: {text[:15]}... -> EN: {translated_text}")
return manga_data
def clean_page(self, original_image, page_data, ellipse_padding=8, inpaint_radius=5):
"""
Strict Hybrid Cleaning:
- text_bubble -> OpenCV Inpainting inside a shrunk Ellipse mask (Preserves tails)
- text_free -> LaMa Inpainting on full Rectangle mask (Redraws background)
"""
final_image = original_image.copy()
h, w = original_image.shape[:2]
# Mask for LaMa (Accumulates all 'text_free' areas)
lama_mask = np.zeros((h, w), dtype=np.uint8)
has_lama_work = False
for entry in page_data:
# Skip if no translation (optional, but good for speed)
if not entry.get('translated_text'): continue
bbox = entry['bbox']
label = entry.get('label', 'text_free')
x1, y1, x2, y2 = bbox
# Clamp coordinates
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
# Extract crop for analysis
crop = final_image[y1:y2, x1:x2]
if crop.size == 0: continue
gray_crop = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
# --- STRATEGY 1: SPEECH BUBBLES (OpenCV + Shrunk Ellipse) ---
if label == 'text_bubble':
ch, cw = crop.shape[:2]
# A. Find the text pixels (dark ink)
binary_text = cv2.adaptiveThreshold(
gray_crop, 255, cv2.ADAPTIVE_THRESH_MEAN_C,
cv2.THRESH_BINARY_INV, 21, 10
)
# B. Create SHRUNK Ellipse Mask
ellipse_mask = np.zeros((ch, cw), dtype=np.uint8)
center = (cw // 2, ch // 2)
# Shrink axes by padding to avoid touching bubble borders
axes = (max(1, cw // 2 - ellipse_padding), max(1, ch // 2 - ellipse_padding))
cv2.ellipse(ellipse_mask, center, axes, 0, 0, 360, 255, -1)
# C. Combine: Mask ONLY text that is INSIDE the ellipse
final_mask = cv2.bitwise_and(binary_text, ellipse_mask)
# D. Dilate to catch anti-aliasing
kernel = np.ones((5,5), np.uint8)
final_mask = cv2.dilate(final_mask, kernel, iterations=1)
# E. Run OpenCV Inpainting
cleaned_crop = cv2.inpaint(crop, final_mask, inpaint_radius, cv2.INPAINT_TELEA)
# Paste back
final_image[y1:y2, x1:x2] = cleaned_crop
# --- STRATEGY 2: FREE TEXT (LaMa + Rectangle) ---
elif label == 'text_free':
cv2.rectangle(lama_mask, (x1, y1), (x2, y2), 255, -1)
has_lama_work = True
# Run LaMa batch for all free text found
if has_lama_work:
# Dilate LaMa mask slightly
lama_kernel = np.ones((5, 5), np.uint8)
lama_mask = cv2.dilate(lama_mask, lama_kernel, iterations=1)
img_pil = Image.fromarray(cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB))
mask_pil = Image.fromarray(lama_mask)
try:
# 1. Run Model
cleaned_pil = self.lama(img_pil, mask_pil)
cleaned_lama = cv2.cvtColor(np.array(cleaned_pil), cv2.COLOR_RGB2BGR)
# 2. Resize fix (LaMa padding issue)
if cleaned_lama.shape[:2] != (h, w):
cleaned_lama = cv2.resize(cleaned_lama, (w, h))
# 3. Merge LaMa result
final_image = np.where(lama_mask[:, :, None] == 255, cleaned_lama, final_image)
except Exception as e:
print(f" ⚠ LaMa failed: {e}")
return final_image
def typeset(self, original_image, manga_data, output_path):
working_img = self.clean_page(original_image, manga_data)
# 2. Text Drawing with adaptive sizing and outlines
img_pil = Image.fromarray(cv2.cvtColor(working_img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
for entry in manga_data:
x1, y1, x2, y2 = entry['bbox']
text = entry.get('translated_text', '')
if not text: continue
# Calculate optimal font size for this bubble
font_size, wrapped_text = self._calculate_optimal_font_size(
text, entry['bbox']
)
font = self._get_font(font_size)
# Get text dimensions
left, top, right, bottom = draw.multiline_textbbox(
(0, 0), wrapped_text, font=font, align="center"
)
text_w, text_h = right - left, bottom - top
# Center text
text_x = x1 + ((x2 - x1) - text_w) / 2
text_y = y1 + ((y2 - y1) - text_h) / 2
# Draw with outline for readability
self._draw_text_with_outline(
draw, (text_x, text_y), wrapped_text, font,
text_color="black", outline_color="white",
outline_width=2, align="center", spacing=2
)
final_img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
cv2.imwrite(output_path, final_img)
print(f" Saved: {output_path}")
def process_chapter(self, input_folder, output_folder, series_info=None,
batch_size=4, selected_batches=None):
"""
Process manga chapter in batches for better context and efficiency
"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
valid_ext = ('.png', '.jpg', '.jpeg', '.webp', '.bmp')
files = [f for f in os.listdir(input_folder) if f.lower().endswith(valid_ext)]
# Sort numerically (p1, p2, p10 instead of p1, p10, p2)
files.sort(key=lambda x: int(re.search(r'\d+', x).group()) if re.search(r'\d+', x) else x)
total_files = len(files)
total_batches = (total_files + batch_size - 1) // batch_size
# Master list to hold data for the entire chapter
full_chapter_data = []
print(f"Found {total_files} images in {input_folder}")
print(f"Total batches: {total_batches} (batch size: {batch_size})")
if selected_batches:
print(f"Processing selected batches: {selected_batches}")
else:
print(f"Processing all batches\n")
# Process in batches
for batch_start in range(0, total_files, batch_size):
batch_num = batch_start // batch_size + 1
# Skip if not in selected batches
if selected_batches and batch_num not in selected_batches:
continue
batch_files = files[batch_start:batch_start + batch_size]
print(f"=== Batch {batch_num}/{total_batches} ({len(batch_files)} pages) ===")
# Collect all data for this batch
batch_data = []
batch_images = []
temp_crop_dir = os.path.join(output_folder, "temp_crops")
for idx, filename in enumerate(batch_files):
page_num = batch_start + idx + 1
print(f" [{page_num}/{total_files}] Detecting bubbles in {filename}...")
input_path = os.path.join(input_folder, filename)
page_id = f"p{page_num:03d}"
try:
img, data = self.detect_and_process(input_path, output_dir=temp_crop_dir, page_id=page_id)
if data:
print(f" Running OCR on {len(data)} bubbles...")
data = self.run_ocr(data)
batch_data.extend(data)
else:
print(f" No bubbles detected")
batch_images.append((filename, img, page_id))
except Exception as e:
print(f" Error processing {filename}: {e}")
continue
# Translate entire batch at once for context
if batch_data:
print(f" Translating {len(batch_data)} bubbles from batch...")
batch_data = self.translate_batch(batch_data, series_info=series_info)
# Add this batch's completed data to the master list
full_chapter_data.extend(batch_data)
# Typeset each page
print(f" Typesetting pages...")
for filename, img, page_id in batch_images:
output_path = os.path.join(output_folder, filename)
# Filter data for this specific page
page_data = [d for d in batch_data if d.get('page_id') == page_id]
try:
self.typeset(img, page_data, output_path)
except Exception as e:
print(f" Error typesetting {filename}: {e}")
print() # Empty line between batches
# --- NEW LOGIC: Save JSON if debug is ON ---
if self.debug and full_chapter_data:
json_filename = f"chapter_data.json"
json_path = os.path.join(output_folder, json_filename)
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(full_chapter_data, f, ensure_ascii=False, indent=2)
print(f" [DEBUG] Saved full chapter data to: {json_filename}")
except Exception as e:
print(f" [DEBUG] Failed to save JSON: {e}")
print(f"\n✓ Chapter processing complete! Output saved to: {output_folder}")
def process_single_image(self, image_path, output_path, series_info=None):
"""
Runs the full pipeline on a SINGLE image file.
Perfect for demos or testing one page.
"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image not found: {image_path}")
print(f"=== Processing Single Page: {os.path.basename(image_path)} ===")
# 1. Setup a temp folder for the bubble crops (required for OCR)
# We use a fixed folder name for the demo to keep it clean
temp_crop_dir = "temp_demo_crops"
if not os.path.exists(temp_crop_dir):
os.makedirs(temp_crop_dir)
# 2. DETECT
# We use a generic ID 'demo' since we don't have page numbers
print("1. Detecting Bubbles...")
original_img, data = self.detect_and_process(
image_path,
output_dir=temp_crop_dir,
page_id="demo"
)
if not data:
print(" ⚠ No bubbles found! Saving original image...")
cv2.imwrite(output_path, original_img)
return
# 3. OCR
print(f"2. Running OCR on {len(data)} bubbles...")
data = self.run_ocr(data)
# 4. TRANSLATE
print("3. Translating text...")
# We reuse translate_batch because it handles the logic perfectly,
# even if the "batch" is just bubbles from one page.
data = self.translate_batch(data, series_info=series_info)
# 5. TYPESET (Clean + Draw)
print("4. Typesetting (Cleaning & Drawing)...")
# Ensure output directory exists
out_dir = os.path.dirname(output_path)
if out_dir and not os.path.exists(out_dir):
os.makedirs(out_dir)
self.typeset(original_img, data, output_path)
print(f"✅ Success! Saved to: {output_path}")
# Optional: Return the data if you want to inspect JSON in the demo
return data
if __name__ == "__main__":
# 1. Define Translation Dictionary (Optional but good for names)
custom_translations = {
"ルーグ": "Lugh",
"トウアハーデ": "Tuatha Dé",
"ディア": "Dia",
"タルト": "Tarte",
}
# 2. Initialize the Class
# Note: We removed 'ollama_model' and added 'translation_model'
translator = MangaTranslator(
yolo_model_path='comic-speech-bubble-detector.pt',
translation_model="LiquidAI/LFM2-350M-ENJP-MT",
font_path="font.ttf",
custom_translations=custom_translations,
debug=True # Keeps the JSON file for debugging
)
# 3. Define Context (Important for tone, even with small models)
# 4. Run the Single Page Demo
# Ensure you have 'raw_images/001.jpg' inside your project folder
input_file = "chapter_401/001.jpg"
output_file = "output/001_translated.jpg"
if os.path.exists(input_file):
print(f"🚀 Starting Demo on {input_file}...")
translator.process_single_image(
image_path=input_file,
output_path=output_file,
series_info=None
)
print(f"✨ Demo Complete! Check {output_file}")
else:
print(f"❌ Error: Could not find {input_file}. Please check your folder structure.")