| |
| |
| """ |
| ChineseFileTranslator v1.0.0 |
| ================================ |
| Author : algorembrant |
| License : MIT |
| Version : 1.0.0 |
| |
| Translate Chinese text inside .txt or .md files to English. |
| Preserves Markdown structure (headings, bold, italics, code blocks, tables, links). |
| Supports batch/vectorized processing, multiple translation backends, |
| auto-detection of Chinese script, and history logging. |
| |
| USAGE COMMANDS |
| -------------- |
| Translate a single file (default: Google backend): |
| python chinese_file_translator.py input.txt |
| |
| Translate and save to a specific output file: |
| python chinese_file_translator.py input.md -o translated.md |
| |
| Translate using the offline Helsinki-NLP MarianMT model: |
| python chinese_file_translator.py input.txt --backend offline |
| |
| Translate using Microsoft Translator (requires API key in config): |
| python chinese_file_translator.py input.txt --backend microsoft |
| |
| Force Simplified Chinese OCR/detection: |
| python chinese_file_translator.py input.txt --lang simplified |
| |
| Force Traditional Chinese: |
| python chinese_file_translator.py input.txt --lang traditional |
| |
| Auto-detect Chinese script (default): |
| python chinese_file_translator.py input.txt --lang auto |
| |
| Enable GPU (CUDA) for offline model: |
| python chinese_file_translator.py input.txt --backend offline --gpu |
| |
| Set OCR confidence threshold (0.0 - 1.0, default 0.3): |
| python chinese_file_translator.py input.txt --confidence 0.4 |
| |
| Batch translate all .txt and .md files in a directory: |
| python chinese_file_translator.py --batch ./my_folder/ |
| |
| Batch translate with output directory: |
| python chinese_file_translator.py --batch ./input/ --batch-out ./output/ |
| |
| Set chunk size for large files (default 4000 chars): |
| python chinese_file_translator.py input.txt --chunk-size 2000 |
| |
| Append both Chinese source and English translation side-by-side: |
| python chinese_file_translator.py input.txt --bilingual |
| |
| Only extract and print detected Chinese text (no translation): |
| python chinese_file_translator.py input.txt --extract-only |
| |
| Print translated output to stdout instead of file: |
| python chinese_file_translator.py input.txt --stdout |
| |
| Export translation history to JSON on exit: |
| python chinese_file_translator.py input.txt --export-history out.json |
| |
| Enable verbose/debug logging: |
| python chinese_file_translator.py input.txt --verbose |
| |
| Show version and exit: |
| python chinese_file_translator.py --version |
| |
| Show full help: |
| python chinese_file_translator.py --help |
| |
| SUPPORTED FILE TYPES |
| -------------------- |
| - Plain text (.txt) : All Chinese detected and translated in-place |
| - Markdown (.md) : Chinese content translated; Markdown syntax preserved |
| Preserved: headings (#), bold (**), italic (*), inline code (`), |
| fenced code blocks (```), blockquotes (>), tables (|), |
| links ([text](url)), images (), horizontal rules |
| |
| SUPPORTED CHINESE VARIANTS |
| --------------------------- |
| - Simplified Chinese (Mandarin, simplified/simp) |
| - Traditional Chinese (Mandarin / Hong Kong / Taiwan) |
| - Cantonese / Yue (detected via Unicode CJK ranges) |
| - Classical Chinese (Literary Chinese, treated as Traditional) |
| - Mixed Chinese-English (Chinglish / code-switching) |
| |
| TRANSLATION BACKENDS |
| -------------------- |
| 1. Google Translate (online, fast, default, no API key needed) |
| 2. Microsoft Translate (online, fallback, requires Azure API key) |
| 3. Helsinki-NLP MarianMT (offline, opus-mt-zh-en, ~300 MB download on first use) |
| |
| CONFIGURATION |
| ------------- |
| Config is stored at: ~/.chinese_file_translator/config.json |
| History is stored at: ~/.chinese_file_translator/history.json |
| Logs are stored at: ~/.chinese_file_translator/app.log |
| |
| EXTERNAL SETUP REQUIRED |
| ----------------------- |
| PyTorch (required only for offline backend): |
| CPU-only: |
| pip install torch --index-url https://download.pytorch.org/whl/cpu |
| CUDA 11.8: |
| pip install torch --index-url https://download.pytorch.org/whl/cu118 |
| CUDA 12.1: |
| pip install torch --index-url https://download.pytorch.org/whl/cu121 |
| |
| Helsinki-NLP model is downloaded automatically on first offline run (~300 MB): |
| Model: Helsinki-NLP/opus-mt-zh-en |
| Cache: ~/.chinese_file_translator/models/ |
| |
| Microsoft Translator (optional): |
| Get a free API key from Azure Cognitive Services and add to config.json: |
| { "microsoft_api_key": "YOUR_KEY_HERE", "microsoft_region": "eastus" } |
| """ |
|
|
| |
| import os |
| import re |
| import sys |
| import json |
| import time |
| import logging |
| import argparse |
| import textwrap |
| import threading |
| import unicodedata |
| from copy import deepcopy |
| from pathlib import Path |
| from datetime import datetime |
| from typing import ( |
| Any, Dict, Generator, List, Optional, Sequence, Tuple |
| ) |
|
|
| |
| try: |
| from deep_translator import GoogleTranslator, MicrosoftTranslator |
| DEEP_TRANSLATOR_AVAILABLE = True |
| except ImportError: |
| DEEP_TRANSLATOR_AVAILABLE = False |
|
|
| |
| OFFLINE_AVAILABLE = False |
| try: |
| from transformers import MarianMTModel, MarianTokenizer |
| import torch |
| OFFLINE_AVAILABLE = True |
| except ImportError: |
| pass |
|
|
| |
| try: |
| from tqdm import tqdm |
| TQDM_AVAILABLE = True |
| except ImportError: |
| TQDM_AVAILABLE = False |
|
|
| |
| try: |
| import pyperclip |
| CLIPBOARD_AVAILABLE = True |
| except ImportError: |
| CLIPBOARD_AVAILABLE = False |
|
|
| |
| APP_NAME = "ChineseFileTranslator" |
| APP_VERSION = "1.0.0" |
| APP_AUTHOR = "algorembrant" |
| _HOME = Path.home() / ".chinese_file_translator" |
| CONFIG_FILE = _HOME / "config.json" |
| HISTORY_FILE = _HOME / "history.json" |
| LOG_FILE = _HOME / "app.log" |
| OFFLINE_MODEL = "Helsinki-NLP/opus-mt-zh-en" |
| OFFLINE_MODEL_T = "Helsinki-NLP/opus-mt-zht-en" |
|
|
| |
| _CJK_RANGES: Tuple[Tuple[int, int], ...] = ( |
| (0x4E00, 0x9FFF), |
| (0x3400, 0x4DBF), |
| (0x20000, 0x2A6DF), |
| (0x2A700, 0x2B73F), |
| (0x2B740, 0x2B81F), |
| (0xF900, 0xFAFF), |
| (0x2F800, 0x2FA1F), |
| (0x3000, 0x303F), |
| (0xFF00, 0xFFEF), |
| (0xFE30, 0xFE4F), |
| ) |
|
|
| |
| _MD_CODE_FENCE = re.compile(r"```[\s\S]*?```") |
| _MD_INLINE_CODE = re.compile(r"`[^`\n]*?`") |
| _MD_LINK = re.compile(r"(!?\[[^\]]*?\])\(([^)]*?)\)") |
| _MD_HTML_TAG = re.compile(r"<[a-zA-Z/][^>]*?>") |
| _MD_FRONTMATTER = re.compile(r"^---[\s\S]*?^---", re.MULTILINE) |
|
|
|
|
| |
| |
| |
| def setup_logging(verbose: bool = False) -> logging.Logger: |
| _HOME.mkdir(parents=True, exist_ok=True) |
| level = logging.DEBUG if verbose else logging.INFO |
| fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" |
| handlers: List[logging.Handler] = [ |
| logging.FileHandler(LOG_FILE, encoding="utf-8"), |
| logging.StreamHandler(sys.stdout), |
| ] |
| logging.basicConfig(level=level, format=fmt, handlers=handlers) |
| return logging.getLogger(APP_NAME) |
|
|
|
|
| logger = logging.getLogger(APP_NAME) |
|
|
|
|
| |
| |
| |
| class Config: |
| """Persistent JSON configuration. CLI args override stored values.""" |
|
|
| DEFAULTS: Dict[str, Any] = { |
| "backend" : "google", |
| "lang" : "auto", |
| "use_gpu" : False, |
| "confidence_threshold" : 0.30, |
| "chunk_size" : 4000, |
| "batch_size" : 10, |
| "bilingual" : False, |
| "preserve_whitespace" : True, |
| "microsoft_api_key" : "", |
| "microsoft_region" : "eastus", |
| "offline_model_dir" : str(_HOME / "models"), |
| "max_history" : 1000, |
| "output_suffix" : "_translated", |
| "retry_attempts" : 3, |
| "retry_delay_seconds" : 1.5, |
| } |
|
|
| def __init__(self) -> None: |
| self._data: Dict[str, Any] = dict(self.DEFAULTS) |
| _HOME.mkdir(parents=True, exist_ok=True) |
| self._load() |
|
|
| def _load(self) -> None: |
| if CONFIG_FILE.exists(): |
| try: |
| with open(CONFIG_FILE, "r", encoding="utf-8") as f: |
| self._data.update(json.load(f)) |
| except Exception as exc: |
| logger.warning(f"Config load failed ({exc}). Using defaults.") |
|
|
| def save(self) -> None: |
| try: |
| with open(CONFIG_FILE, "w", encoding="utf-8") as f: |
| json.dump(self._data, f, indent=2, ensure_ascii=False) |
| except Exception as exc: |
| logger.error(f"Config save failed: {exc}") |
|
|
| def get(self, key: str, default: Any = None) -> Any: |
| return self._data.get(key, self.DEFAULTS.get(key, default)) |
|
|
| def set(self, key: str, value: Any) -> None: |
| self._data[key] = value |
| self.save() |
|
|
| def apply_args(self, args: argparse.Namespace) -> None: |
| if getattr(args, "backend", None): |
| self._data["backend"] = args.backend |
| if getattr(args, "lang", None): |
| self._data["lang"] = args.lang |
| if getattr(args, "gpu", False): |
| self._data["use_gpu"] = True |
| if getattr(args, "confidence", None) is not None: |
| self._data["confidence_threshold"] = args.confidence |
| if getattr(args, "chunk_size", None) is not None: |
| self._data["chunk_size"] = args.chunk_size |
| if getattr(args, "bilingual", False): |
| self._data["bilingual"] = True |
| if getattr(args, "offline", False): |
| self._data["backend"] = "offline" |
|
|
|
|
| |
| |
| |
| def _is_cjk(char: str) -> bool: |
| """Return True if the character falls within any CJK Unicode range.""" |
| cp = ord(char) |
| return any(lo <= cp <= hi for lo, hi in _CJK_RANGES) |
|
|
|
|
| def contains_chinese(text: str, min_ratio: float = 0.0) -> bool: |
| """ |
| Return True when Chinese characters are present in `text`. |
| If `min_ratio` is > 0, requires that fraction of non-whitespace characters. |
| """ |
| if not text or not text.strip(): |
| return False |
| non_ws = [c for c in text if not c.isspace()] |
| if not non_ws: |
| return False |
| cjk_count = sum(1 for c in non_ws if _is_cjk(c)) |
| if min_ratio <= 0: |
| return cjk_count > 0 |
| return (cjk_count / len(non_ws)) >= min_ratio |
|
|
|
|
| def chinese_ratio(text: str) -> float: |
| """Return the fraction of non-whitespace chars that are CJK.""" |
| non_ws = [c for c in text if not c.isspace()] |
| if not non_ws: |
| return 0.0 |
| return sum(1 for c in non_ws if _is_cjk(c)) / len(non_ws) |
|
|
|
|
| def detect_script(text: str) -> str: |
| """ |
| Heuristic: Traditional Chinese uses specific code points absent from |
| Simplified. Returns 'traditional', 'simplified', or 'mixed'. |
| """ |
| |
| _TRAD_MARKERS = set( |
| "繁體國語臺灣學習問題開發電腦時間工作歷史語言文化" |
| "經濟機會關係發展環境教育政府社會應該雖然雖然認為" |
| ) |
| _SIMP_MARKERS = set( |
| "简体国语台湾学习问题开发电脑时间工作历史语言文化" |
| "经济机会关系发展环境教育政府社会应该虽然认为" |
| ) |
| trad = sum(1 for c in text if c in _TRAD_MARKERS) |
| simp = sum(1 for c in text if c in _SIMP_MARKERS) |
| if trad > simp: |
| return "traditional" |
| if simp > trad: |
| return "simplified" |
| return "simplified" |
|
|
|
|
| |
| |
| |
| class TranslationEngine: |
| """ |
| Multi-backend Chinese-to-English translation. |
| |
| Vectorized batch mode is used for the offline (MarianMT) backend. |
| Online backends (Google, Microsoft) chunk by character limit with |
| sentence-boundary awareness and automatic retry on transient errors. |
| """ |
|
|
| _GOOGLE_LIMIT = 4500 |
| _MS_LIMIT = 10000 |
| _OFFLINE_LIMIT = 512 |
|
|
| def __init__(self, config: Config) -> None: |
| self.cfg = config |
| self._offline_model: Any = None |
| self._offline_tok: Any = None |
| self._lock = threading.Lock() |
|
|
| |
|
|
| def translate( |
| self, text: str, source_lang: str = "auto" |
| ) -> Tuple[str, str]: |
| """ |
| Translate `text` to English. |
| Returns (translated_text, backend_name). |
| """ |
| if not text or not text.strip(): |
| return text, "passthrough" |
|
|
| backend = self.cfg.get("backend", "google") |
| attempt_order: List[str] = _dedupe_list([backend, "google", "offline"]) |
|
|
| last_exc: Optional[Exception] = None |
| for b in attempt_order: |
| try: |
| result = self._call_backend(b, text, source_lang) |
| return result, b |
| except Exception as exc: |
| logger.warning(f"Backend '{b}' failed for [{text}]: {exc}") |
| last_exc = exc |
|
|
| |
| logger.error(f"All translation backends failed for [{text}]. Returning original.") |
| return text, "failed" |
|
|
| def translate_batch( |
| self, |
| texts: List[str], |
| source_lang: str = "auto", |
| ) -> List[Tuple[str, str]]: |
| """ |
| Translate a list of strings. |
| Uses vectorized batching for the offline backend; serial calls for |
| online backends (rate-limit friendly). |
| """ |
| backend = self.cfg.get("backend", "google") |
| if backend == "offline" and OFFLINE_AVAILABLE: |
| return self._translate_batch_offline(texts) |
| |
| results: List[Tuple[str, str]] = [] |
| iterable = ( |
| tqdm(texts, desc="Translating", unit="chunk") |
| if TQDM_AVAILABLE else texts |
| ) |
| for text in iterable: |
| results.append(self.translate(text, source_lang)) |
| |
| if backend in ("google", "microsoft"): |
| time.sleep(0.3) |
| return results |
|
|
| |
|
|
| def _call_backend( |
| self, backend: str, text: str, source_lang: str |
| ) -> str: |
| retries = int(self.cfg.get("retry_attempts", 3)) |
| delay = float(self.cfg.get("retry_delay_seconds", 1.5)) |
| last_exc2: Optional[Exception] = None |
| for attempt in range(retries): |
| try: |
| if backend == "google": |
| return self._google(text, source_lang) |
| elif backend == "microsoft": |
| return self._microsoft(text, source_lang) |
| elif backend == "offline": |
| translated, _ = self._offline_single(text) |
| return translated |
| else: |
| raise ValueError(f"Unknown backend: {backend}") |
| except Exception as exc: |
| last_exc2 = exc |
| if attempt < retries - 1: |
| time.sleep(delay * (attempt + 1)) |
| raise RuntimeError( |
| f"Backend '{backend}' failed after {retries} attempts: {last_exc2}" |
| ) |
|
|
| |
|
|
| def _google(self, text: str, source_lang: str) -> str: |
| if not DEEP_TRANSLATOR_AVAILABLE: |
| raise RuntimeError("deep-translator not installed.") |
| |
| lang_map = {"simplified": "zh-CN", "traditional": "zh-TW", "auto": "auto"} |
| src = lang_map.get(source_lang, "auto") |
| chunks = list(_split_text(text, self._GOOGLE_LIMIT)) |
| parts: List[str] = [] |
| |
| for chunk in chunks: |
| try: |
| translated = GoogleTranslator(source=src, target="en").translate(chunk) |
| |
| if not translated or (translated.strip() == chunk.strip() and contains_chinese(chunk)): |
| raise RuntimeError("Google returned original or None") |
| parts.append(translated) |
| except Exception as e: |
| raise RuntimeError(f"Google translate error: {e}") |
| |
| return " ".join(parts) |
|
|
| |
|
|
| def _microsoft(self, text: str, source_lang: str) -> str: |
| if not DEEP_TRANSLATOR_AVAILABLE: |
| raise RuntimeError( |
| "deep-translator not installed. Run: pip install deep-translator" |
| ) |
| api_key = str(self.cfg.get("microsoft_api_key", "")) |
| region = str(self.cfg.get("microsoft_region", "eastus")) |
| if not api_key: |
| raise ValueError( |
| "Microsoft API key not configured. " |
| "Add 'microsoft_api_key' to ~/.chinese_file_translator/config.json" |
| ) |
| lang_map = {"simplified": "zh-Hans", "traditional": "zh-Hant", "auto": "auto"} |
| src = lang_map.get(source_lang, "auto") |
| chunks = list(_split_text(text, self._MS_LIMIT)) |
| parts = [] |
| for chunk in chunks: |
| tr = MicrosoftTranslator( |
| api_key=api_key, region=region, source=src, target="en" |
| ).translate(chunk) |
| parts.append(tr or chunk) |
| return " ".join(parts) |
|
|
| |
|
|
| def _load_offline(self) -> None: |
| if not OFFLINE_AVAILABLE: |
| raise RuntimeError("Offline model dependencies not installed.") |
| model_dir = str(self.cfg.get("offline_model_dir", str(_HOME / "models"))) |
| Path(model_dir).mkdir(parents=True, exist_ok=True) |
| |
| self._offline_tok = MarianTokenizer.from_pretrained( |
| OFFLINE_MODEL, cache_dir=model_dir |
| ) |
| model = MarianMTModel.from_pretrained( |
| OFFLINE_MODEL, cache_dir=model_dir |
| ) |
| use_gpu = bool(self.cfg.get("use_gpu", False)) |
| device = "cuda" if (use_gpu and torch.cuda.is_available()) else "cpu" |
| self._offline_model = model.to(device) |
| logger.info(f"Offline model loaded on '{device}'.") |
|
|
| def _offline_single(self, text: str) -> Tuple[str, str]: |
| with self._lock: |
| if self._offline_model is None: |
| self._load_offline() |
| chunks = list(_split_text(text, self._OFFLINE_LIMIT)) |
| results = self._vectorized_translate(chunks) |
| return " ".join(results), "offline" |
|
|
| def _translate_batch_offline( |
| self, texts: List[str] |
| ) -> List[Tuple[str, str]]: |
| """Vectorized: flatten all chunks, translate in one pass, reassemble.""" |
| with self._lock: |
| if self._offline_model is None: |
| self._load_offline() |
|
|
| |
| all_chunks: List[str] = [] |
| chunk_map: List[Tuple[int, int]] = [] |
|
|
| for t_idx, text in enumerate(texts): |
| if not text or not text.strip(): |
| chunk_map.append((t_idx, 0)) |
| continue |
| chunks = list(_split_text(text, self._OFFLINE_LIMIT)) |
| start = len(all_chunks) |
| all_chunks.extend(chunks) |
| chunk_map.append((t_idx, len(chunks))) |
|
|
| if not all_chunks: |
| return [(t, "passthrough") for t in texts] |
|
|
| |
| translated_chunks = self._vectorized_translate(all_chunks) |
|
|
| |
| results: List[Tuple[str, str]] = [] |
| flat_idx = 0 |
| for t_idx, n in chunk_map: |
| if n == 0: |
| results.append((texts[t_idx], "passthrough")) |
| else: |
| assembled = " ".join(translated_chunks[flat_idx : flat_idx + n]) |
| results.append((assembled, "offline")) |
| flat_idx += n |
| return results |
|
|
| def _vectorized_translate(self, chunks: List[str]) -> List[str]: |
| """Run MarianMT on a list of strings in one batched forward pass.""" |
| if not chunks: |
| return [] |
|
|
| tok = self._offline_tok |
| model = self._offline_model |
| if tok is None or model is None: |
| raise RuntimeError("Offline model not loaded.") |
|
|
| device = next(model.parameters()).device |
| batch_size = int(self.cfg.get("batch_size", 10)) |
| results: List[str] = [] |
|
|
| |
| for i in range(0, len(chunks), batch_size): |
| mini = chunks[i : i + batch_size] |
| enc = tok( |
| mini, |
| return_tensors="pt", |
| padding=True, |
| truncation=True, |
| max_length=512, |
| ).to(device) |
| with torch.no_grad(): |
| out = model.generate(**enc) |
| decoded = tok.batch_decode(out, skip_special_tokens=True) |
| results.extend(decoded) |
|
|
| return results |
|
|
|
|
| |
| |
| |
| def _split_text(text: str, max_len: int) -> Generator[str, None, None]: |
| """Split text at sentence boundaries for chunking.""" |
| if len(text) <= max_len: |
| yield text |
| return |
|
|
| sentence_ends = re.compile(r"[。!?\n!?\.]") |
| current: List[str] = [] |
| current_len = 0 |
|
|
| for segment in sentence_ends.split(text): |
| seg = segment.strip() |
| if not seg: |
| continue |
| if current_len + len(seg) + 1 > max_len and current: |
| yield " ".join(current) |
| current = [seg] |
| current_len = len(seg) |
| else: |
| current.append(seg) |
| current_len += len(seg) + 1 |
|
|
| if current: |
| yield " ".join(current) |
|
|
|
|
| def _dedupe_list(lst: List[str]) -> List[str]: |
| seen: set = set() |
| out: List[str] = [] |
| for item in lst: |
| if item not in seen: |
| seen.add(item) |
| out.append(item) |
| return out |
|
|
|
|
| |
| |
| |
| class MarkdownProcessor: |
| """Ultra-robust Markdown protection.""" |
| _TOKEN = "___MY_PROTECT_PH_{idx}___" |
|
|
| def __init__(self) -> None: |
| self._protected: Dict[int, str] = {} |
| self._ph_counter = 0 |
|
|
| def _next_placeholder(self, original: str) -> str: |
| idx = self._ph_counter |
| token = self._TOKEN.format(idx=idx) |
| self._protected[idx] = original |
| self._ph_counter += 1 |
| return token |
|
|
| def protect(self, text: str) -> str: |
| """Replace code/links/tags with unique tokens.""" |
| self._protected.clear() |
| self._ph_counter = 0 |
|
|
| |
| text = _MD_FRONTMATTER.sub(lambda m: self._next_placeholder(m.group(0)), text) |
|
|
| |
| def _fence_sub(m: re.Match) -> str: |
| full = m.group(0) |
| if contains_chinese(full): |
| |
| lines = full.splitlines() |
| if len(lines) >= 2: |
| p1 = self._next_placeholder(lines[0]) |
| p2 = self._next_placeholder(lines[-1]) |
| content = "\n".join(lines[1:-1]) |
| return f"{p1}\n{content}\n{p2}" |
| return self._next_placeholder(full) |
| text = _MD_CODE_FENCE.sub(_fence_sub, text) |
|
|
| |
| text = _MD_HTML_TAG.sub(lambda m: self._next_placeholder(m.group(0)), text) |
| text = _MD_LINK.sub(lambda m: f"{m.group(1)}({self._next_placeholder(m.group(2))})", text) |
| text = _MD_INLINE_CODE.sub(lambda m: self._next_placeholder(m.group(0)), text) |
|
|
| return text |
|
|
| def restore(self, text: str) -> str: |
| """Sequential replacement of all tokens.""" |
| |
| for idx in sorted(self._protected.keys(), reverse=True): |
| token = self._TOKEN.format(idx=idx) |
| original = self._protected[idx] |
| |
| pattern = re.compile(re.escape(token).replace(r"\_", r"\s*\_*"), re.IGNORECASE) |
| text = pattern.sub(original.replace("\\", "\\\\"), text) |
| return text |
|
|
|
|
| class FileTranslator: |
| """Orchestrates translation with 'Never Miss' strategy.""" |
| def __init__(self, config: Config) -> None: |
| self.cfg = config |
| self.engine = TranslationEngine(config) |
| self._md_proc = MarkdownProcessor() |
|
|
| def translate_file( |
| self, |
| input_path: Path, |
| output_path: Optional[Path] = None, |
| extract_only: bool = False, |
| to_stdout: bool = False, |
| ) -> Path: |
| input_path = Path(input_path).resolve() |
| if not input_path.exists(): raise FileNotFoundError(f"Missing: {input_path}") |
|
|
| suffix = input_path.suffix.lower() |
| if suffix not in (".txt", ".md"): raise ValueError("Unsupported type") |
|
|
| raw = input_path.read_text(encoding="utf-8", errors="replace") |
| if extract_only: |
| extracted = "\n".join([l for l in raw.splitlines() if contains_chinese(l)]) |
| if to_stdout: print(extracted); return input_path |
| out = output_path or _default_output(input_path, self.cfg) |
| out.write_text(extracted, encoding="utf-8") |
| return out |
|
|
| res = self._translate_md(raw) if suffix == ".md" else self._translate_txt(raw) |
| if to_stdout: print(res); return input_path |
| out = output_path or _default_output(input_path, self.cfg) |
| out.write_text(res, encoding="utf-8") |
| return out |
|
|
| def _translate_txt(self, text: str) -> str: |
| lines = text.splitlines(keepends=True) |
| bilingual = bool(self.cfg.get("bilingual", False)) |
| |
| out_lines = [] |
| for line in lines: |
| stripped = line.rstrip("\n\r") |
| if contains_chinese(stripped): |
| tr = self._translate_granular(stripped) |
| eol = "\n" if line.endswith("\n") else "" |
| out_lines.append(f"{stripped}\n{tr}{eol}" if bilingual else f"{tr}{eol}") |
| else: |
| out_lines.append(line) |
| return "".join(out_lines) |
|
|
| def _translate_md(self, text: str) -> str: |
| """Global Surgical Batch Translation with fixed CJK regex.""" |
| |
| protected = self._md_proc.protect(text) |
|
|
| |
| CJK_BLOCK_RE = re.compile( |
| r"[" |
| r"\u4e00-\u9fff" |
| r"\u3400-\u4dbf" |
| r"\U00020000-\U0002ceaf" |
| r"\uf900-\ufaff" |
| r"\u3000-\u303f" |
| r"\uff00-\uffef" |
| r"\u00b7" |
| r"\u2014-\u2027" |
| r"]+" |
| ) |
| |
| def _has_real_cjk(s): |
| return any('\u4e00' <= c <= '\u9fff' or '\u3400' <= c <= '\u4dbf' or ord(c) > 0xffff for c in s) |
|
|
| all_candidate_blocks = CJK_BLOCK_RE.findall(protected) |
| all_blocks = _dedupe_list([b for b in all_candidate_blocks if _has_real_cjk(b)]) |
| |
| if not all_blocks: |
| return self._md_proc.restore(protected) |
|
|
| |
| logger.info(f"Found {len(all_blocks)} unique Chinese blocks. Batch translating...") |
| translated = self.engine.translate_batch(all_blocks, source_lang="simplified") |
|
|
| |
| mapping = {} |
| for orig, (tr, _) in zip(all_blocks, translated): |
| if tr.strip() and tr.strip() != orig.strip(): |
| mapping[orig] = tr |
| else: |
| try: |
| t, _ = self.engine.translate(orig, source_lang="simplified") |
| mapping[orig] = t |
| except: |
| mapping[orig] = orig |
|
|
| sorted_orig = sorted(mapping.keys(), key=len, reverse=True) |
| final_text = protected |
| for orig in sorted_orig: |
| final_text = final_text.replace(orig, mapping[orig]) |
|
|
| |
| return self._md_proc.restore(final_text) |
|
|
| def _translate_granular(self, text: str) -> str: |
| """Fallback for TXT or other sparse areas.""" |
| CJK_BLOCK_RE = re.compile( |
| r"[\u4e00-\u9fff\u3400-\u4dbf\U00020000-\U0002ceaf\u3000-\u303f\uff00-\uffef]+" |
| ) |
| def _sub(m: re.Match) -> str: |
| chunk = m.group(0) |
| if not any('\u4e00' <= c <= '\u9fff' for c in chunk): return chunk |
| try: |
| t, _ = self.engine.translate(chunk, source_lang="simplified") |
| return t |
| except: |
| return chunk |
| return CJK_BLOCK_RE.sub(_sub, text) |
|
|
| @staticmethod |
| def _extract_chinese_lines(text: str) -> List[str]: |
| """Return only lines that contain Chinese text.""" |
| return [ |
| line for line in text.splitlines() |
| if contains_chinese(line) |
| ] |
|
|
| def _detect_script_bulk(self, texts: List[str]) -> str: |
| """Detect dominant script from a list of strings.""" |
| lang_mode = str(self.cfg.get("lang", "auto")) |
| if lang_mode in ("simplified", "traditional"): |
| return lang_mode |
| combined = " ".join(texts[:50]) |
| return detect_script(combined) |
|
|
| |
|
|
| def translate_directory( |
| self, |
| input_dir: Path, |
| output_dir: Optional[Path] = None, |
| ) -> List[Path]: |
| """Translate all .txt and .md files in `input_dir`.""" |
| input_dir = Path(input_dir).resolve() |
| if not input_dir.is_dir(): |
| raise NotADirectoryError(f"Not a directory: {input_dir}") |
|
|
| files = sorted( |
| list(input_dir.glob("*.txt")) + list(input_dir.glob("*.md")) |
| ) |
| if not files: |
| logger.warning(f"No .txt or .md files found in {input_dir}") |
| return [] |
|
|
| logger.info(f"Batch translating {len(files)} file(s) from {input_dir}") |
| out_paths: List[Path] = [] |
|
|
| iterable = ( |
| tqdm(files, desc="Files", unit="file") |
| if TQDM_AVAILABLE else files |
| ) |
| for fpath in iterable: |
| try: |
| if output_dir: |
| out_file = Path(output_dir) / fpath.name |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| else: |
| out_file = _default_output(fpath, self.cfg) |
| result = self.translate_file(fpath, output_path=out_file) |
| out_paths.append(result) |
| logger.info(f" Done: {fpath.name} -> {result.name}") |
| except Exception as exc: |
| logger.error(f" Failed: {fpath.name}: {exc}") |
|
|
| return out_paths |
|
|
|
|
| |
| |
| |
| class HistoryManager: |
| """Log translation sessions to a persistent JSON file.""" |
|
|
| def __init__(self, config: Config) -> None: |
| self.cfg = config |
| self._items: List[Dict[str, Any]] = [] |
| _HOME.mkdir(parents=True, exist_ok=True) |
| self._load() |
|
|
| def _load(self) -> None: |
| if HISTORY_FILE.exists(): |
| try: |
| with open(HISTORY_FILE, "r", encoding="utf-8") as f: |
| self._items = json.load(f) |
| except Exception: |
| self._items = [] |
|
|
| def save(self) -> None: |
| try: |
| with open(HISTORY_FILE, "w", encoding="utf-8") as f: |
| json.dump(self._items, f, ensure_ascii=False, indent=2) |
| except Exception as exc: |
| logger.error(f"History save error: {exc}") |
|
|
| def add( |
| self, |
| input_file: str, |
| output_file: str, |
| backend: str, |
| script: str, |
| segments_count: int, |
| elapsed_seconds: float, |
| ) -> None: |
| entry: Dict[str, Any] = { |
| "timestamp" : datetime.now().isoformat(), |
| "input_file" : input_file, |
| "output_file" : output_file, |
| "backend" : backend, |
| "script" : script, |
| "segments_count" : segments_count, |
| "elapsed_seconds": round(elapsed_seconds, 2), |
| } |
| self._items.insert(0, entry) |
| max_h = int(self.cfg.get("max_history", 1000)) |
| while len(self._items) > max_h: |
| self._items.pop() |
| self.save() |
|
|
| def export(self, path: str) -> None: |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(self._items, f, ensure_ascii=False, indent=2) |
| logger.info(f"History exported to {path}") |
|
|
| def get_all(self) -> List[Dict[str, Any]]: |
| return list(self._items) |
|
|
|
|
| |
| |
| |
| def _default_output(input_path: Path, config: Config) -> Path: |
| """Derive default output path: input_translated.ext""" |
| suffix = str(config.get("output_suffix", "_translated")) |
| return input_path.with_stem(input_path.stem + suffix) |
|
|
|
|
| |
| |
| |
| def _build_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser( |
| prog="chinese_file_translator", |
| description=( |
| f"{APP_NAME} v{APP_VERSION} by {APP_AUTHOR}\n" |
| "Translate Chinese text inside .txt or .md files to English." |
| ), |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=textwrap.dedent(""" |
| Examples: |
| python chinese_file_translator.py input.txt |
| python chinese_file_translator.py input.md -o translated.md |
| python chinese_file_translator.py input.txt --backend offline --gpu |
| python chinese_file_translator.py input.txt --bilingual |
| python chinese_file_translator.py input.txt --extract-only |
| python chinese_file_translator.py --batch ./docs/ --batch-out ./out/ |
| python chinese_file_translator.py input.txt --stdout |
| """), |
| ) |
| parser.add_argument( |
| "input", |
| nargs="?", |
| help="Input .txt or .md file path", |
| ) |
| parser.add_argument( |
| "-o", "--output", |
| dest="output", |
| metavar="FILE", |
| help="Output file path (default: <input>_translated.<ext>)", |
| ) |
| parser.add_argument( |
| "--batch", |
| metavar="DIR", |
| help="Translate all .txt and .md files in a directory", |
| ) |
| parser.add_argument( |
| "--batch-out", |
| dest="batch_out", |
| metavar="DIR", |
| help="Output directory for batch translation", |
| ) |
| parser.add_argument( |
| "--backend", |
| choices=["google", "microsoft", "offline"], |
| help="Translation backend (default: google)", |
| ) |
| parser.add_argument( |
| "--offline", |
| action="store_true", |
| help="Shorthand for --backend offline", |
| ) |
| parser.add_argument( |
| "--lang", |
| choices=["auto", "simplified", "traditional"], |
| default="auto", |
| help="Chinese script mode (default: auto)", |
| ) |
| parser.add_argument( |
| "--gpu", |
| action="store_true", |
| help="Use GPU (CUDA) for offline translation", |
| ) |
| parser.add_argument( |
| "--confidence", |
| type=float, |
| metavar="0.0-1.0", |
| help="Chinese detection confidence threshold (default: 0.05 ratio)", |
| ) |
| parser.add_argument( |
| "--chunk-size", |
| dest="chunk_size", |
| type=int, |
| metavar="N", |
| help="Max characters per translation request (default: 4000)", |
| ) |
| parser.add_argument( |
| "--bilingual", |
| action="store_true", |
| help="Keep original Chinese alongside English translation", |
| ) |
| parser.add_argument( |
| "--extract-only", |
| dest="extract_only", |
| action="store_true", |
| help="Only extract and save detected Chinese lines, no translation", |
| ) |
| parser.add_argument( |
| "--stdout", |
| action="store_true", |
| help="Print translated output to stdout instead of writing a file", |
| ) |
| parser.add_argument( |
| "--export-history", |
| dest="export_history", |
| metavar="FILE", |
| help="Export translation history to a JSON file", |
| ) |
| parser.add_argument( |
| "--version", |
| action="version", |
| version=f"{APP_NAME} {APP_VERSION}", |
| ) |
| parser.add_argument( |
| "--verbose", |
| action="store_true", |
| help="Enable DEBUG-level logging", |
| ) |
| return parser |
|
|
|
|
| |
| |
| |
| def check_dependencies(args: argparse.Namespace) -> None: |
| issues: List[str] = [] |
| want_offline = getattr(args, "offline", False) or getattr(args, "backend", "") == "offline" |
| if not DEEP_TRANSLATOR_AVAILABLE: |
| issues.append( |
| "deep-translator -> pip install deep-translator" |
| ) |
| if want_offline and not OFFLINE_AVAILABLE: |
| issues.append( |
| "transformers / torch -> pip install transformers torch\n" |
| " (CPU) pip install torch --index-url https://download.pytorch.org/whl/cpu\n" |
| " (CUDA) pip install torch --index-url https://download.pytorch.org/whl/cu121" |
| ) |
| if issues: |
| print("\n" + "=" * 55) |
| print(f"[{APP_NAME}] Missing dependencies:") |
| for i in issues: |
| print(f" {i}") |
| print("=" * 55 + "\n") |
|
|
|
|
| |
| |
| |
| def main() -> None: |
| parser = _build_parser() |
| args = parser.parse_args() |
|
|
| setup_logging(verbose=getattr(args, "verbose", False)) |
| check_dependencies(args) |
|
|
| cfg = Config() |
| cfg.apply_args(args) |
|
|
| history = HistoryManager(cfg) |
| translator = FileTranslator(cfg) |
|
|
| |
| if getattr(args, "export_history", None): |
| history.export(args.export_history) |
| if not args.input and not args.batch: |
| return |
|
|
| |
| if getattr(args, "batch", None): |
| batch_dir = Path(args.batch) |
| out_dir = Path(args.batch_out) if getattr(args, "batch_out", None) else None |
| t0 = time.time() |
| out_paths = translator.translate_directory(batch_dir, output_dir=out_dir) |
| elapsed = time.time() - t0 |
| print( |
| f"\nBatch complete: {len(out_paths)} file(s) translated " |
| f"in {elapsed:.1f}s" |
| ) |
| for p in out_paths: |
| print(f" -> {p}") |
| history.add( |
| input_file=str(batch_dir), |
| output_file=str(out_dir or batch_dir), |
| backend=str(cfg.get("backend")), |
| script=str(cfg.get("lang")), |
| segments_count=len(out_paths), |
| elapsed_seconds=elapsed, |
| ) |
| return |
|
|
| |
| if not args.input: |
| parser.print_help() |
| sys.exit(0) |
|
|
| input_path = Path(args.input) |
| output_path = Path(args.output) if getattr(args, "output", None) else None |
|
|
| t0 = time.time() |
| try: |
| out = translator.translate_file( |
| input_path = input_path, |
| output_path = output_path, |
| extract_only = getattr(args, "extract_only", False), |
| to_stdout = getattr(args, "stdout", False), |
| ) |
| except (FileNotFoundError, ValueError, RuntimeError) as exc: |
| logger.error(str(exc)) |
| sys.exit(1) |
|
|
| elapsed = time.time() - t0 |
|
|
| if not getattr(args, "stdout", False): |
| print(f"\n{APP_NAME} v{APP_VERSION}") |
| print(f"Input : {input_path}") |
| print(f"Output : {out}") |
| print(f"Backend : {cfg.get('backend')}") |
| print(f"Script : {cfg.get('lang')}") |
| print(f"Elapsed : {elapsed:.2f}s") |
| print(f"Config : {CONFIG_FILE}") |
| print(f"Log : {LOG_FILE}") |
|
|
| history.add( |
| input_file = str(input_path), |
| output_file = str(out), |
| backend = str(cfg.get("backend")), |
| script = str(cfg.get("lang")), |
| segments_count = 0, |
| elapsed_seconds = elapsed, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|