chinese-translator-paste / chinese_file_translator.py
algorembrant's picture
Upload 8 files
b03b79f verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ChineseFileTranslator v1.0.0
================================
Author : algorembrant
License : MIT
Version : 1.0.0
Translate Chinese text inside .txt or .md files to English.
Preserves Markdown structure (headings, bold, italics, code blocks, tables, links).
Supports batch/vectorized processing, multiple translation backends,
auto-detection of Chinese script, and history logging.
USAGE COMMANDS
--------------
Translate a single file (default: Google backend):
python chinese_file_translator.py input.txt
Translate and save to a specific output file:
python chinese_file_translator.py input.md -o translated.md
Translate using the offline Helsinki-NLP MarianMT model:
python chinese_file_translator.py input.txt --backend offline
Translate using Microsoft Translator (requires API key in config):
python chinese_file_translator.py input.txt --backend microsoft
Force Simplified Chinese OCR/detection:
python chinese_file_translator.py input.txt --lang simplified
Force Traditional Chinese:
python chinese_file_translator.py input.txt --lang traditional
Auto-detect Chinese script (default):
python chinese_file_translator.py input.txt --lang auto
Enable GPU (CUDA) for offline model:
python chinese_file_translator.py input.txt --backend offline --gpu
Set OCR confidence threshold (0.0 - 1.0, default 0.3):
python chinese_file_translator.py input.txt --confidence 0.4
Batch translate all .txt and .md files in a directory:
python chinese_file_translator.py --batch ./my_folder/
Batch translate with output directory:
python chinese_file_translator.py --batch ./input/ --batch-out ./output/
Set chunk size for large files (default 4000 chars):
python chinese_file_translator.py input.txt --chunk-size 2000
Append both Chinese source and English translation side-by-side:
python chinese_file_translator.py input.txt --bilingual
Only extract and print detected Chinese text (no translation):
python chinese_file_translator.py input.txt --extract-only
Print translated output to stdout instead of file:
python chinese_file_translator.py input.txt --stdout
Export translation history to JSON on exit:
python chinese_file_translator.py input.txt --export-history out.json
Enable verbose/debug logging:
python chinese_file_translator.py input.txt --verbose
Show version and exit:
python chinese_file_translator.py --version
Show full help:
python chinese_file_translator.py --help
SUPPORTED FILE TYPES
--------------------
- Plain text (.txt) : All Chinese detected and translated in-place
- Markdown (.md) : Chinese content translated; Markdown syntax preserved
Preserved: headings (#), bold (**), italic (*), inline code (`),
fenced code blocks (```), blockquotes (>), tables (|),
links ([text](url)), images (![alt](url)), horizontal rules
SUPPORTED CHINESE VARIANTS
---------------------------
- Simplified Chinese (Mandarin, simplified/simp)
- Traditional Chinese (Mandarin / Hong Kong / Taiwan)
- Cantonese / Yue (detected via Unicode CJK ranges)
- Classical Chinese (Literary Chinese, treated as Traditional)
- Mixed Chinese-English (Chinglish / code-switching)
TRANSLATION BACKENDS
--------------------
1. Google Translate (online, fast, default, no API key needed)
2. Microsoft Translate (online, fallback, requires Azure API key)
3. Helsinki-NLP MarianMT (offline, opus-mt-zh-en, ~300 MB download on first use)
CONFIGURATION
-------------
Config is stored at: ~/.chinese_file_translator/config.json
History is stored at: ~/.chinese_file_translator/history.json
Logs are stored at: ~/.chinese_file_translator/app.log
EXTERNAL SETUP REQUIRED
-----------------------
PyTorch (required only for offline backend):
CPU-only:
pip install torch --index-url https://download.pytorch.org/whl/cpu
CUDA 11.8:
pip install torch --index-url https://download.pytorch.org/whl/cu118
CUDA 12.1:
pip install torch --index-url https://download.pytorch.org/whl/cu121
Helsinki-NLP model is downloaded automatically on first offline run (~300 MB):
Model: Helsinki-NLP/opus-mt-zh-en
Cache: ~/.chinese_file_translator/models/
Microsoft Translator (optional):
Get a free API key from Azure Cognitive Services and add to config.json:
{ "microsoft_api_key": "YOUR_KEY_HERE", "microsoft_region": "eastus" }
"""
# ── Standard Library ──────────────────────────────────────────────────────────
import os
import re
import sys
import json
import time
import logging
import argparse
import textwrap
import threading
import unicodedata
from copy import deepcopy
from pathlib import Path
from datetime import datetime
from typing import (
Any, Dict, Generator, List, Optional, Sequence, Tuple
)
# ── Online Translation ────────────────────────────────────────────────────────
try:
from deep_translator import GoogleTranslator, MicrosoftTranslator
DEEP_TRANSLATOR_AVAILABLE = True
except ImportError:
DEEP_TRANSLATOR_AVAILABLE = False
# ── Offline Translation ───────────────────────────────────────────────────────
OFFLINE_AVAILABLE = False
try:
from transformers import MarianMTModel, MarianTokenizer
import torch
OFFLINE_AVAILABLE = True
except ImportError:
pass
# ── Progress bar (optional) ───────────────────────────────────────────────────
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
# ── Clipboard (optional) ─────────────────────────────────────────────────────
try:
import pyperclip
CLIPBOARD_AVAILABLE = True
except ImportError:
CLIPBOARD_AVAILABLE = False
# ── Constants ─────────────────────────────────────────────────────────────────
APP_NAME = "ChineseFileTranslator"
APP_VERSION = "1.0.0"
APP_AUTHOR = "algorembrant"
_HOME = Path.home() / ".chinese_file_translator"
CONFIG_FILE = _HOME / "config.json"
HISTORY_FILE = _HOME / "history.json"
LOG_FILE = _HOME / "app.log"
OFFLINE_MODEL = "Helsinki-NLP/opus-mt-zh-en"
OFFLINE_MODEL_T = "Helsinki-NLP/opus-mt-zht-en"
# CJK Unicode blocks used for Chinese detection
_CJK_RANGES: Tuple[Tuple[int, int], ...] = (
(0x4E00, 0x9FFF), # CJK Unified Ideographs
(0x3400, 0x4DBF), # CJK Extension A
(0x20000, 0x2A6DF), # CJK Extension B
(0x2A700, 0x2B73F), # CJK Extension C
(0x2B740, 0x2B81F), # CJK Extension D
(0xF900, 0xFAFF), # CJK Compatibility Ideographs
(0x2F800, 0x2FA1F), # CJK Compatibility Supplement
(0x3000, 0x303F), # CJK Symbols and Punctuation
(0xFF00, 0xFFEF), # Fullwidth / Halfwidth Forms
(0xFE30, 0xFE4F), # CJK Compatibility Forms
)
# Markdown patterns that must NOT be translated
_MD_CODE_FENCE = re.compile(r"```[\s\S]*?```")
_MD_INLINE_CODE = re.compile(r"`[^`\n]*?`")
_MD_LINK = re.compile(r"(!?\[[^\]]*?\])\(([^)]*?)\)")
_MD_HTML_TAG = re.compile(r"<[a-zA-Z/][^>]*?>")
_MD_FRONTMATTER = re.compile(r"^---[\s\S]*?^---", re.MULTILINE)
# ════════════════════════════════════════════════════════════════════════════
# LOGGING
# ════════════════════════════════════════════════════════════════════════════
def setup_logging(verbose: bool = False) -> logging.Logger:
_HOME.mkdir(parents=True, exist_ok=True)
level = logging.DEBUG if verbose else logging.INFO
fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
handlers: List[logging.Handler] = [
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
]
logging.basicConfig(level=level, format=fmt, handlers=handlers)
return logging.getLogger(APP_NAME)
logger = logging.getLogger(APP_NAME)
# ════════════════════════════════════════════════════════════════════════════
# CONFIG
# ════════════════════════════════════════════════════════════════════════════
class Config:
"""Persistent JSON configuration. CLI args override stored values."""
DEFAULTS: Dict[str, Any] = {
"backend" : "google",
"lang" : "auto",
"use_gpu" : False,
"confidence_threshold" : 0.30,
"chunk_size" : 4000,
"batch_size" : 10,
"bilingual" : False,
"preserve_whitespace" : True,
"microsoft_api_key" : "",
"microsoft_region" : "eastus",
"offline_model_dir" : str(_HOME / "models"),
"max_history" : 1000,
"output_suffix" : "_translated",
"retry_attempts" : 3,
"retry_delay_seconds" : 1.5,
}
def __init__(self) -> None:
self._data: Dict[str, Any] = dict(self.DEFAULTS)
_HOME.mkdir(parents=True, exist_ok=True)
self._load()
def _load(self) -> None:
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
self._data.update(json.load(f))
except Exception as exc:
logger.warning(f"Config load failed ({exc}). Using defaults.")
def save(self) -> None:
try:
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, ensure_ascii=False)
except Exception as exc:
logger.error(f"Config save failed: {exc}")
def get(self, key: str, default: Any = None) -> Any:
return self._data.get(key, self.DEFAULTS.get(key, default))
def set(self, key: str, value: Any) -> None:
self._data[key] = value
self.save()
def apply_args(self, args: argparse.Namespace) -> None:
if getattr(args, "backend", None):
self._data["backend"] = args.backend
if getattr(args, "lang", None):
self._data["lang"] = args.lang
if getattr(args, "gpu", False):
self._data["use_gpu"] = True
if getattr(args, "confidence", None) is not None:
self._data["confidence_threshold"] = args.confidence
if getattr(args, "chunk_size", None) is not None:
self._data["chunk_size"] = args.chunk_size
if getattr(args, "bilingual", False):
self._data["bilingual"] = True
if getattr(args, "offline", False):
self._data["backend"] = "offline"
# ════════════════════════════════════════════════════════════════════════════
# CHINESE DETECTION UTILITIES
# ════════════════════════════════════════════════════════════════════════════
def _is_cjk(char: str) -> bool:
"""Return True if the character falls within any CJK Unicode range."""
cp = ord(char)
return any(lo <= cp <= hi for lo, hi in _CJK_RANGES)
def contains_chinese(text: str, min_ratio: float = 0.0) -> bool:
"""
Return True when Chinese characters are present in `text`.
If `min_ratio` is > 0, requires that fraction of non-whitespace characters.
"""
if not text or not text.strip():
return False
non_ws = [c for c in text if not c.isspace()]
if not non_ws:
return False
cjk_count = sum(1 for c in non_ws if _is_cjk(c))
if min_ratio <= 0:
return cjk_count > 0
return (cjk_count / len(non_ws)) >= min_ratio
def chinese_ratio(text: str) -> float:
"""Return the fraction of non-whitespace chars that are CJK."""
non_ws = [c for c in text if not c.isspace()]
if not non_ws:
return 0.0
return sum(1 for c in non_ws if _is_cjk(c)) / len(non_ws)
def detect_script(text: str) -> str:
"""
Heuristic: Traditional Chinese uses specific code points absent from
Simplified. Returns 'traditional', 'simplified', or 'mixed'.
"""
# Characters common in Traditional but rarely in Simplified
_TRAD_MARKERS = set(
"繁體國語臺灣學習問題開發電腦時間工作歷史語言文化"
"經濟機會關係發展環境教育政府社會應該雖然雖然認為"
)
_SIMP_MARKERS = set(
"简体国语台湾学习问题开发电脑时间工作历史语言文化"
"经济机会关系发展环境教育政府社会应该虽然认为"
)
trad = sum(1 for c in text if c in _TRAD_MARKERS)
simp = sum(1 for c in text if c in _SIMP_MARKERS)
if trad > simp:
return "traditional"
if simp > trad:
return "simplified"
return "simplified" # default fallback
# ════════════════════════════════════════════════════════════════════════════
# TRANSLATION ENGINE
# ════════════════════════════════════════════════════════════════════════════
class TranslationEngine:
"""
Multi-backend Chinese-to-English translation.
Vectorized batch mode is used for the offline (MarianMT) backend.
Online backends (Google, Microsoft) chunk by character limit with
sentence-boundary awareness and automatic retry on transient errors.
"""
_GOOGLE_LIMIT = 4500 # chars per Google request
_MS_LIMIT = 10000 # chars per Microsoft request
_OFFLINE_LIMIT = 512 # tokens; use 400-char char proxy
def __init__(self, config: Config) -> None:
self.cfg = config
self._offline_model: Any = None
self._offline_tok: Any = None
self._lock = threading.Lock()
# ── Public API ────────────────────────────────────────────────────────
def translate(
self, text: str, source_lang: str = "auto"
) -> Tuple[str, str]:
"""
Translate `text` to English.
Returns (translated_text, backend_name).
"""
if not text or not text.strip():
return text, "passthrough"
backend = self.cfg.get("backend", "google")
attempt_order: List[str] = _dedupe_list([backend, "google", "offline"])
last_exc: Optional[Exception] = None
for b in attempt_order:
try:
result = self._call_backend(b, text, source_lang)
return result, b
except Exception as exc:
logger.warning(f"Backend '{b}' failed for [{text}]: {exc}")
last_exc = exc
# NEVER CRASH: return original if all failed
logger.error(f"All translation backends failed for [{text}]. Returning original.")
return text, "failed"
def translate_batch(
self,
texts: List[str],
source_lang: str = "auto",
) -> List[Tuple[str, str]]:
"""
Translate a list of strings.
Uses vectorized batching for the offline backend; serial calls for
online backends (rate-limit friendly).
"""
backend = self.cfg.get("backend", "google")
if backend == "offline" and OFFLINE_AVAILABLE:
return self._translate_batch_offline(texts)
# Serial with progress
results: List[Tuple[str, str]] = []
iterable = (
tqdm(texts, desc="Translating", unit="chunk")
if TQDM_AVAILABLE else texts
)
for text in iterable:
results.append(self.translate(text, source_lang))
# Small delay for online backends to avoid rate limits
if backend in ("google", "microsoft"):
time.sleep(0.3)
return results
# ── Backend dispatch ──────────────────────────────────────────────────
def _call_backend(
self, backend: str, text: str, source_lang: str
) -> str:
retries = int(self.cfg.get("retry_attempts", 3))
delay = float(self.cfg.get("retry_delay_seconds", 1.5))
last_exc2: Optional[Exception] = None
for attempt in range(retries):
try:
if backend == "google":
return self._google(text, source_lang)
elif backend == "microsoft":
return self._microsoft(text, source_lang)
elif backend == "offline":
translated, _ = self._offline_single(text)
return translated
else:
raise ValueError(f"Unknown backend: {backend}")
except Exception as exc:
last_exc2 = exc
if attempt < retries - 1:
time.sleep(delay * (attempt + 1))
raise RuntimeError(
f"Backend '{backend}' failed after {retries} attempts: {last_exc2}"
)
# ── Google ────────────────────────────────────────────────────────────
def _google(self, text: str, source_lang: str) -> str:
if not DEEP_TRANSLATOR_AVAILABLE:
raise RuntimeError("deep-translator not installed.")
lang_map = {"simplified": "zh-CN", "traditional": "zh-TW", "auto": "auto"}
src = lang_map.get(source_lang, "auto")
chunks = list(_split_text(text, self._GOOGLE_LIMIT))
parts: List[str] = []
for chunk in chunks:
try:
translated = GoogleTranslator(source=src, target="en").translate(chunk)
# If it's None or returned original Chinese, it failed
if not translated or (translated.strip() == chunk.strip() and contains_chinese(chunk)):
raise RuntimeError("Google returned original or None")
parts.append(translated)
except Exception as e:
raise RuntimeError(f"Google translate error: {e}")
return " ".join(parts)
# ── Microsoft ─────────────────────────────────────────────────────────
def _microsoft(self, text: str, source_lang: str) -> str:
if not DEEP_TRANSLATOR_AVAILABLE:
raise RuntimeError(
"deep-translator not installed. Run: pip install deep-translator"
)
api_key = str(self.cfg.get("microsoft_api_key", ""))
region = str(self.cfg.get("microsoft_region", "eastus"))
if not api_key:
raise ValueError(
"Microsoft API key not configured. "
"Add 'microsoft_api_key' to ~/.chinese_file_translator/config.json"
)
lang_map = {"simplified": "zh-Hans", "traditional": "zh-Hant", "auto": "auto"}
src = lang_map.get(source_lang, "auto")
chunks = list(_split_text(text, self._MS_LIMIT))
parts = []
for chunk in chunks:
tr = MicrosoftTranslator(
api_key=api_key, region=region, source=src, target="en"
).translate(chunk)
parts.append(tr or chunk)
return " ".join(parts)
# ── Offline (MarianMT) ────────────────────────────────────────────────
def _load_offline(self) -> None:
if not OFFLINE_AVAILABLE:
raise RuntimeError("Offline model dependencies not installed.")
model_dir = str(self.cfg.get("offline_model_dir", str(_HOME / "models")))
Path(model_dir).mkdir(parents=True, exist_ok=True)
# ...
self._offline_tok = MarianTokenizer.from_pretrained(
OFFLINE_MODEL, cache_dir=model_dir
)
model = MarianMTModel.from_pretrained(
OFFLINE_MODEL, cache_dir=model_dir
)
use_gpu = bool(self.cfg.get("use_gpu", False))
device = "cuda" if (use_gpu and torch.cuda.is_available()) else "cpu"
self._offline_model = model.to(device)
logger.info(f"Offline model loaded on '{device}'.")
def _offline_single(self, text: str) -> Tuple[str, str]:
with self._lock:
if self._offline_model is None:
self._load_offline()
chunks = list(_split_text(text, self._OFFLINE_LIMIT))
results = self._vectorized_translate(chunks)
return " ".join(results), "offline"
def _translate_batch_offline(
self, texts: List[str]
) -> List[Tuple[str, str]]:
"""Vectorized: flatten all chunks, translate in one pass, reassemble."""
with self._lock:
if self._offline_model is None:
self._load_offline()
# Build chunk index: (text_idx, chunk_idx) -> flat_idx
all_chunks: List[str] = []
chunk_map: List[Tuple[int, int]] = [] # (text_idx, n_chunks)
for t_idx, text in enumerate(texts):
if not text or not text.strip():
chunk_map.append((t_idx, 0))
continue
chunks = list(_split_text(text, self._OFFLINE_LIMIT))
start = len(all_chunks)
all_chunks.extend(chunks)
chunk_map.append((t_idx, len(chunks)))
if not all_chunks:
return [(t, "passthrough") for t in texts]
# One vectorized forward pass
translated_chunks = self._vectorized_translate(all_chunks)
# Reassemble
results: List[Tuple[str, str]] = []
flat_idx = 0
for t_idx, n in chunk_map:
if n == 0:
results.append((texts[t_idx], "passthrough"))
else:
assembled = " ".join(translated_chunks[flat_idx : flat_idx + n])
results.append((assembled, "offline"))
flat_idx += n
return results
def _vectorized_translate(self, chunks: List[str]) -> List[str]:
"""Run MarianMT on a list of strings in one batched forward pass."""
if not chunks:
return []
tok = self._offline_tok
model = self._offline_model
if tok is None or model is None:
raise RuntimeError("Offline model not loaded.")
device = next(model.parameters()).device
batch_size = int(self.cfg.get("batch_size", 10))
results: List[str] = []
# Split into mini-batches to avoid OOM on large inputs
for i in range(0, len(chunks), batch_size):
mini = chunks[i : i + batch_size]
enc = tok(
mini,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
).to(device)
with torch.no_grad():
out = model.generate(**enc)
decoded = tok.batch_decode(out, skip_special_tokens=True)
results.extend(decoded)
return results
# ════════════════════════════════════════════════════════════════════════════
# TEXT SPLITTING UTILITIES
# ════════════════════════════════════════════════════════════════════════════
def _split_text(text: str, max_len: int) -> Generator[str, None, None]:
"""Split text at sentence boundaries for chunking."""
if len(text) <= max_len:
yield text
return
sentence_ends = re.compile(r"[。!?\n!?\.]")
current: List[str] = []
current_len = 0
for segment in sentence_ends.split(text):
seg = segment.strip()
if not seg:
continue
if current_len + len(seg) + 1 > max_len and current:
yield " ".join(current)
current = [seg]
current_len = len(seg)
else:
current.append(seg)
current_len += len(seg) + 1
if current:
yield " ".join(current)
def _dedupe_list(lst: List[str]) -> List[str]:
seen: set = set()
out: List[str] = []
for item in lst:
if item not in seen:
seen.add(item)
out.append(item)
return out
# ════════════════════════════════════════════════════════════════════════════
# MARKDOWN PARSER / SEGMENT EXTRACTOR
# ════════════════════════════════════════════════════════════════════════════
class MarkdownProcessor:
"""Ultra-robust Markdown protection."""
_TOKEN = "___MY_PROTECT_PH_{idx}___"
def __init__(self) -> None:
self._protected: Dict[int, str] = {}
self._ph_counter = 0
def _next_placeholder(self, original: str) -> str:
idx = self._ph_counter
token = self._TOKEN.format(idx=idx)
self._protected[idx] = original
self._ph_counter += 1
return token
def protect(self, text: str) -> str:
"""Replace code/links/tags with unique tokens."""
self._protected.clear()
self._ph_counter = 0
# Protect YAML
text = _MD_FRONTMATTER.sub(lambda m: self._next_placeholder(m.group(0)), text)
# Protect Code Fences but leave content if it has Chinese
def _fence_sub(m: re.Match) -> str:
full = m.group(0)
if contains_chinese(full):
# Only protect the ``` lines
lines = full.splitlines()
if len(lines) >= 2:
p1 = self._next_placeholder(lines[0])
p2 = self._next_placeholder(lines[-1])
content = "\n".join(lines[1:-1])
return f"{p1}\n{content}\n{p2}"
return self._next_placeholder(full)
text = _MD_CODE_FENCE.sub(_fence_sub, text)
# Protect HTML and Inline Code and Links
text = _MD_HTML_TAG.sub(lambda m: self._next_placeholder(m.group(0)), text)
text = _MD_LINK.sub(lambda m: f"{m.group(1)}({self._next_placeholder(m.group(2))})", text)
text = _MD_INLINE_CODE.sub(lambda m: self._next_placeholder(m.group(0)), text)
return text
def restore(self, text: str) -> str:
"""Sequential replacement of all tokens."""
# We replace them in reverse to avoid partial matches if idx 10 and 1 exist
for idx in sorted(self._protected.keys(), reverse=True):
token = self._TOKEN.format(idx=idx)
original = self._protected[idx]
# Use regex to handle potential space mangling by Google
pattern = re.compile(re.escape(token).replace(r"\_", r"\s*\_*"), re.IGNORECASE)
text = pattern.sub(original.replace("\\", "\\\\"), text)
return text
class FileTranslator:
"""Orchestrates translation with 'Never Miss' strategy."""
def __init__(self, config: Config) -> None:
self.cfg = config
self.engine = TranslationEngine(config)
self._md_proc = MarkdownProcessor()
def translate_file(
self,
input_path: Path,
output_path: Optional[Path] = None,
extract_only: bool = False,
to_stdout: bool = False,
) -> Path:
input_path = Path(input_path).resolve()
if not input_path.exists(): raise FileNotFoundError(f"Missing: {input_path}")
suffix = input_path.suffix.lower()
if suffix not in (".txt", ".md"): raise ValueError("Unsupported type")
raw = input_path.read_text(encoding="utf-8", errors="replace")
if extract_only:
extracted = "\n".join([l for l in raw.splitlines() if contains_chinese(l)])
if to_stdout: print(extracted); return input_path
out = output_path or _default_output(input_path, self.cfg)
out.write_text(extracted, encoding="utf-8")
return out
res = self._translate_md(raw) if suffix == ".md" else self._translate_txt(raw)
if to_stdout: print(res); return input_path
out = output_path or _default_output(input_path, self.cfg)
out.write_text(res, encoding="utf-8")
return out
def _translate_txt(self, text: str) -> str:
lines = text.splitlines(keepends=True)
bilingual = bool(self.cfg.get("bilingual", False))
out_lines = []
for line in lines:
stripped = line.rstrip("\n\r")
if contains_chinese(stripped):
tr = self._translate_granular(stripped)
eol = "\n" if line.endswith("\n") else ""
out_lines.append(f"{stripped}\n{tr}{eol}" if bilingual else f"{tr}{eol}")
else:
out_lines.append(line)
return "".join(out_lines)
def _translate_md(self, text: str) -> str:
"""Global Surgical Batch Translation with fixed CJK regex."""
# 1. Protect structure
protected = self._md_proc.protect(text)
# 2. Extract all CJK blocks (Inclusive range for stability)
CJK_BLOCK_RE = re.compile(
r"["
r"\u4e00-\u9fff" # Basic
r"\u3400-\u4dbf" # Ext A
r"\U00020000-\U0002ceaf" # Ext B-E
r"\uf900-\ufaff" # Compatibility
r"\u3000-\u303f" # Symbols/Punctuation
r"\uff00-\uffef" # Fullwidth
r"\u00b7" # Middle dot
r"\u2014-\u2027" # Punctuation ranges
r"]+"
)
# Filter out blocks that are ONLY numbers or symbols if they don't have AT LEAST ONE CJK
def _has_real_cjk(s):
return any('\u4e00' <= c <= '\u9fff' or '\u3400' <= c <= '\u4dbf' or ord(c) > 0xffff for c in s)
all_candidate_blocks = CJK_BLOCK_RE.findall(protected)
all_blocks = _dedupe_list([b for b in all_candidate_blocks if _has_real_cjk(b)])
if not all_blocks:
return self._md_proc.restore(protected)
# 3. Batch translate unique blocks
logger.info(f"Found {len(all_blocks)} unique Chinese blocks. Batch translating...")
translated = self.engine.translate_batch(all_blocks, source_lang="simplified")
# 4. Global replacement
mapping = {}
for orig, (tr, _) in zip(all_blocks, translated):
if tr.strip() and tr.strip() != orig.strip():
mapping[orig] = tr
else:
try:
t, _ = self.engine.translate(orig, source_lang="simplified")
mapping[orig] = t
except:
mapping[orig] = orig
sorted_orig = sorted(mapping.keys(), key=len, reverse=True)
final_text = protected
for orig in sorted_orig:
final_text = final_text.replace(orig, mapping[orig])
# 5. Restore
return self._md_proc.restore(final_text)
def _translate_granular(self, text: str) -> str:
"""Fallback for TXT or other sparse areas."""
CJK_BLOCK_RE = re.compile(
r"[\u4e00-\u9fff\u3400-\u4dbf\U00020000-\U0002ceaf\u3000-\u303f\uff00-\uffef]+"
)
def _sub(m: re.Match) -> str:
chunk = m.group(0)
if not any('\u4e00' <= c <= '\u9fff' for c in chunk): return chunk
try:
t, _ = self.engine.translate(chunk, source_lang="simplified")
return t
except:
return chunk
return CJK_BLOCK_RE.sub(_sub, text)
@staticmethod
def _extract_chinese_lines(text: str) -> List[str]:
"""Return only lines that contain Chinese text."""
return [
line for line in text.splitlines()
if contains_chinese(line)
]
def _detect_script_bulk(self, texts: List[str]) -> str:
"""Detect dominant script from a list of strings."""
lang_mode = str(self.cfg.get("lang", "auto"))
if lang_mode in ("simplified", "traditional"):
return lang_mode
combined = " ".join(texts[:50]) # sample first 50 segments
return detect_script(combined)
# ── Batch directory translation ───────────────────────────────────────
def translate_directory(
self,
input_dir: Path,
output_dir: Optional[Path] = None,
) -> List[Path]:
"""Translate all .txt and .md files in `input_dir`."""
input_dir = Path(input_dir).resolve()
if not input_dir.is_dir():
raise NotADirectoryError(f"Not a directory: {input_dir}")
files = sorted(
list(input_dir.glob("*.txt")) + list(input_dir.glob("*.md"))
)
if not files:
logger.warning(f"No .txt or .md files found in {input_dir}")
return []
logger.info(f"Batch translating {len(files)} file(s) from {input_dir}")
out_paths: List[Path] = []
iterable = (
tqdm(files, desc="Files", unit="file")
if TQDM_AVAILABLE else files
)
for fpath in iterable:
try:
if output_dir:
out_file = Path(output_dir) / fpath.name
Path(output_dir).mkdir(parents=True, exist_ok=True)
else:
out_file = _default_output(fpath, self.cfg)
result = self.translate_file(fpath, output_path=out_file)
out_paths.append(result)
logger.info(f" Done: {fpath.name} -> {result.name}")
except Exception as exc:
logger.error(f" Failed: {fpath.name}: {exc}")
return out_paths
# ════════════════════════════════════════════════════════════════════════════
# HISTORY MANAGER
# ════════════════════════════════════════════════════════════════════════════
class HistoryManager:
"""Log translation sessions to a persistent JSON file."""
def __init__(self, config: Config) -> None:
self.cfg = config
self._items: List[Dict[str, Any]] = []
_HOME.mkdir(parents=True, exist_ok=True)
self._load()
def _load(self) -> None:
if HISTORY_FILE.exists():
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
self._items = json.load(f)
except Exception:
self._items = []
def save(self) -> None:
try:
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(self._items, f, ensure_ascii=False, indent=2)
except Exception as exc:
logger.error(f"History save error: {exc}")
def add(
self,
input_file: str,
output_file: str,
backend: str,
script: str,
segments_count: int,
elapsed_seconds: float,
) -> None:
entry: Dict[str, Any] = {
"timestamp" : datetime.now().isoformat(),
"input_file" : input_file,
"output_file" : output_file,
"backend" : backend,
"script" : script,
"segments_count" : segments_count,
"elapsed_seconds": round(elapsed_seconds, 2),
}
self._items.insert(0, entry)
max_h = int(self.cfg.get("max_history", 1000))
while len(self._items) > max_h:
self._items.pop()
self.save()
def export(self, path: str) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(self._items, f, ensure_ascii=False, indent=2)
logger.info(f"History exported to {path}")
def get_all(self) -> List[Dict[str, Any]]:
return list(self._items)
# ════════════════════════════════════════════════════════════════════════════
# PATH HELPERS
# ════════════════════════════════════════════════════════════════════════════
def _default_output(input_path: Path, config: Config) -> Path:
"""Derive default output path: input_translated.ext"""
suffix = str(config.get("output_suffix", "_translated"))
return input_path.with_stem(input_path.stem + suffix)
# ════════════════════════════════════════════════════════════════════════════
# CLI ARG PARSER
# ════════════════════════════════════════════════════════════════════════════
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="chinese_file_translator",
description=(
f"{APP_NAME} v{APP_VERSION} by {APP_AUTHOR}\n"
"Translate Chinese text inside .txt or .md files to English."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""
Examples:
python chinese_file_translator.py input.txt
python chinese_file_translator.py input.md -o translated.md
python chinese_file_translator.py input.txt --backend offline --gpu
python chinese_file_translator.py input.txt --bilingual
python chinese_file_translator.py input.txt --extract-only
python chinese_file_translator.py --batch ./docs/ --batch-out ./out/
python chinese_file_translator.py input.txt --stdout
"""),
)
parser.add_argument(
"input",
nargs="?",
help="Input .txt or .md file path",
)
parser.add_argument(
"-o", "--output",
dest="output",
metavar="FILE",
help="Output file path (default: <input>_translated.<ext>)",
)
parser.add_argument(
"--batch",
metavar="DIR",
help="Translate all .txt and .md files in a directory",
)
parser.add_argument(
"--batch-out",
dest="batch_out",
metavar="DIR",
help="Output directory for batch translation",
)
parser.add_argument(
"--backend",
choices=["google", "microsoft", "offline"],
help="Translation backend (default: google)",
)
parser.add_argument(
"--offline",
action="store_true",
help="Shorthand for --backend offline",
)
parser.add_argument(
"--lang",
choices=["auto", "simplified", "traditional"],
default="auto",
help="Chinese script mode (default: auto)",
)
parser.add_argument(
"--gpu",
action="store_true",
help="Use GPU (CUDA) for offline translation",
)
parser.add_argument(
"--confidence",
type=float,
metavar="0.0-1.0",
help="Chinese detection confidence threshold (default: 0.05 ratio)",
)
parser.add_argument(
"--chunk-size",
dest="chunk_size",
type=int,
metavar="N",
help="Max characters per translation request (default: 4000)",
)
parser.add_argument(
"--bilingual",
action="store_true",
help="Keep original Chinese alongside English translation",
)
parser.add_argument(
"--extract-only",
dest="extract_only",
action="store_true",
help="Only extract and save detected Chinese lines, no translation",
)
parser.add_argument(
"--stdout",
action="store_true",
help="Print translated output to stdout instead of writing a file",
)
parser.add_argument(
"--export-history",
dest="export_history",
metavar="FILE",
help="Export translation history to a JSON file",
)
parser.add_argument(
"--version",
action="version",
version=f"{APP_NAME} {APP_VERSION}",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging",
)
return parser
# ════════════════════════════════════════════════════════════════════════════
# DEPENDENCY CHECK
# ════════════════════════════════════════════════════════════════════════════
def check_dependencies(args: argparse.Namespace) -> None:
issues: List[str] = []
want_offline = getattr(args, "offline", False) or getattr(args, "backend", "") == "offline"
if not DEEP_TRANSLATOR_AVAILABLE:
issues.append(
"deep-translator -> pip install deep-translator"
)
if want_offline and not OFFLINE_AVAILABLE:
issues.append(
"transformers / torch -> pip install transformers torch\n"
" (CPU) pip install torch --index-url https://download.pytorch.org/whl/cpu\n"
" (CUDA) pip install torch --index-url https://download.pytorch.org/whl/cu121"
)
if issues:
print("\n" + "=" * 55)
print(f"[{APP_NAME}] Missing dependencies:")
for i in issues:
print(f" {i}")
print("=" * 55 + "\n")
# ════════════════════════════════════════════════════════════════════════════
# MAIN
# ════════════════════════════════════════════════════════════════════════════
def main() -> None:
parser = _build_parser()
args = parser.parse_args()
setup_logging(verbose=getattr(args, "verbose", False))
check_dependencies(args)
cfg = Config()
cfg.apply_args(args)
history = HistoryManager(cfg)
translator = FileTranslator(cfg)
# ── Export history shortcut ───────────────────────────────────────────
if getattr(args, "export_history", None):
history.export(args.export_history)
if not args.input and not args.batch:
return
# ── Batch mode ────────────────────────────────────────────────────────
if getattr(args, "batch", None):
batch_dir = Path(args.batch)
out_dir = Path(args.batch_out) if getattr(args, "batch_out", None) else None
t0 = time.time()
out_paths = translator.translate_directory(batch_dir, output_dir=out_dir)
elapsed = time.time() - t0
print(
f"\nBatch complete: {len(out_paths)} file(s) translated "
f"in {elapsed:.1f}s"
)
for p in out_paths:
print(f" -> {p}")
history.add(
input_file=str(batch_dir),
output_file=str(out_dir or batch_dir),
backend=str(cfg.get("backend")),
script=str(cfg.get("lang")),
segments_count=len(out_paths),
elapsed_seconds=elapsed,
)
return
# ── Single file mode ──────────────────────────────────────────────────
if not args.input:
parser.print_help()
sys.exit(0)
input_path = Path(args.input)
output_path = Path(args.output) if getattr(args, "output", None) else None
t0 = time.time()
try:
out = translator.translate_file(
input_path = input_path,
output_path = output_path,
extract_only = getattr(args, "extract_only", False),
to_stdout = getattr(args, "stdout", False),
)
except (FileNotFoundError, ValueError, RuntimeError) as exc:
logger.error(str(exc))
sys.exit(1)
elapsed = time.time() - t0
if not getattr(args, "stdout", False):
print(f"\n{APP_NAME} v{APP_VERSION}")
print(f"Input : {input_path}")
print(f"Output : {out}")
print(f"Backend : {cfg.get('backend')}")
print(f"Script : {cfg.get('lang')}")
print(f"Elapsed : {elapsed:.2f}s")
print(f"Config : {CONFIG_FILE}")
print(f"Log : {LOG_FILE}")
history.add(
input_file = str(input_path),
output_file = str(out),
backend = str(cfg.get("backend")),
script = str(cfg.get("lang")),
segments_count = 0,
elapsed_seconds = elapsed,
)
if __name__ == "__main__":
main()