| """ |
| translator_engine.py |
| ==================== |
| Core Translation Engine for Massive Text Files (English -> Hindi) |
| |
| ARCHITECTURE DECISIONS: |
| ----------------------- |
| 1. STREAMING READ/WRITE: We never hold the full translated file in RAM. |
| - Read source file paragraph-by-paragraph |
| - Translate each paragraph |
| - Immediately append translated text to output file on disk |
| - If process crashes at paragraph 25,000 β the first 24,999 are SAFE on disk |
| |
| 2. CHUNKING STRATEGY: |
| - Split file by double-newline (\n\n) to get paragraphs |
| - Each paragraph is one translation unit |
| - Single newlines (\n) WITHIN a paragraph are preserved by translating |
| line-by-line inside each paragraph |
| - This guarantees ZERO text loss and exact structural preservation |
| |
| 3. RATE LIMITING & IP BAN PREVENTION: |
| - Each translation call has a mandatory sleep (configurable, default 0.3s) |
| - ThreadPoolExecutor with LIMITED workers (default 2) β NOT aggressive |
| - On HTTP 429 (Too Many Requests): exponential backoff via tenacity |
| - Base wait: 10 seconds, multiplier: 2x, max wait: 320 seconds |
| - Max 10 retries per chunk before marking as FAILED |
| - On connection drop: same retry logic catches ConnectionError, Timeout |
| |
| 4. CONCURRENCY MODEL: |
| - We use ThreadPoolExecutor but with STRICT controls: |
| a) Max 2 workers (Google free tier can't handle more) |
| b) Each worker has mandatory inter-request delay |
| c) A global rate limiter (threading.Semaphore) prevents burst |
| - This is NOT about max speed β it's about RELIABLE completion |
| of 50,000+ paragraphs without getting IP-banned |
| |
| 5. ERROR HANDLING SCENARIOS (explicitly documented): |
| - HTTP 429 Too Many Requests β exponential backoff, retry up to 10x |
| - ConnectionError / Timeout β same retry logic, waits and retries |
| - InvalidURL / encoding error β log error, write original text as fallback |
| - Process crash β all previously written paragraphs are safe on disk |
| - Google temporary block β backoff will wait up to ~320s per retry |
| """ |
|
|
| import os |
| import re |
| import time |
| import threading |
| import logging |
| from typing import Optional, Callable |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass, field |
|
|
| from deep_translator import GoogleTranslator |
| from tenacity import ( |
| retry, |
| stop_after_attempt, |
| wait_exponential, |
| retry_if_exception_type, |
| before_sleep_log, |
| RetryError, |
| ) |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(threadName)s] %(levelname)s: %(message)s", |
| ) |
| logger = logging.getLogger("translator_engine") |
|
|
|
|
| |
| |
| |
| @dataclass |
| class TranslatorConfig: |
| """ |
| All configuration for the translation engine. |
| Tuned for Google Free Translator on a 16GB RAM server. |
| |
| IMPORTANT FOR HUGGINGFACE SPACES: |
| - HF Spaces have shared IPs, so we must be EXTRA conservative |
| - Lower workers, higher delays to avoid bans |
| """ |
|
|
| |
| source_lang: str = "en" |
| target_lang: str = "hi" |
|
|
| |
| |
| |
| max_chunk_chars: int = 4500 |
|
|
| |
| |
| min_request_delay: float = 0.5 |
|
|
| |
| |
| max_workers: int = 2 |
|
|
| |
| max_concurrent_requests: int = 2 |
|
|
| |
| max_retries: int = 10 |
| backoff_base_wait: int = 10 |
| backoff_multiplier: int = 2 |
| backoff_max_wait: int = 320 |
|
|
| |
| |
| |
| |
| disk_flush_interval: int = 5 |
|
|
|
|
| |
| |
| |
| @dataclass |
| class TranslationProgress: |
| """ |
| Thread-safe progress tracker. |
| The frontend polls this to show real-time progress. |
| """ |
| total_paragraphs: int = 0 |
| translated_paragraphs: int = 0 |
| failed_paragraphs: int = 0 |
| status: str = "idle" |
| error_message: str = "" |
| current_phase: str = "" |
| start_time: float = 0.0 |
| file_name: str = "" |
| output_file: str = "" |
|
|
| _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) |
|
|
| def increment_translated(self): |
| with self._lock: |
| self.translated_paragraphs += 1 |
|
|
| def increment_failed(self): |
| with self._lock: |
| self.failed_paragraphs += 1 |
|
|
| def set_status(self, status: str, phase: str = ""): |
| with self._lock: |
| self.status = status |
| if phase: |
| self.current_phase = phase |
|
|
| def to_dict(self) -> dict: |
| with self._lock: |
| elapsed = 0 |
| speed = 0 |
| eta_seconds = 0 |
|
|
| if self.start_time > 0 and self.translated_paragraphs > 0: |
| elapsed = time.time() - self.start_time |
| speed = self.translated_paragraphs / elapsed |
| remaining = self.total_paragraphs - self.translated_paragraphs |
| eta_seconds = remaining / speed if speed > 0 else 0 |
|
|
| return { |
| "total_paragraphs": self.total_paragraphs, |
| "translated_paragraphs": self.translated_paragraphs, |
| "failed_paragraphs": self.failed_paragraphs, |
| "status": self.status, |
| "error_message": self.error_message, |
| "current_phase": self.current_phase, |
| "file_name": self.file_name, |
| "output_file": self.output_file, |
| "elapsed_seconds": round(elapsed, 1), |
| "speed_per_second": round(speed, 2), |
| "eta_seconds": round(eta_seconds, 1), |
| "percent_complete": round( |
| (self.translated_paragraphs / self.total_paragraphs * 100) |
| if self.total_paragraphs > 0 |
| else 0, |
| 1, |
| ), |
| } |
|
|
|
|
| |
| |
| |
| class ChunkTranslator: |
| """ |
| Translates a single text chunk (paragraph or sub-paragraph). |
| |
| ERROR HANDLING SCENARIOS: |
| ------------------------- |
| SCENARIO 1 β HTTP 429 Too Many Requests (IP Rate Limit): |
| Google returns 429 when we're sending too many requests. |
| β tenacity catches this, waits 10s (then 20s, 40s, 80s, 160s, 320s...) |
| β Retries the EXACT same chunk up to 10 times |
| β After 10 failures, returns original text as fallback (no data loss) |
| |
| SCENARIO 2 β Connection Drop (ConnectionError, Timeout): |
| Network issues, Google server down, DNS failure, etc. |
| β Same retry logic as above catches these exceptions |
| β Exponential backoff gives the network time to recover |
| β If persistent failure, returns original text |
| |
| SCENARIO 3 β Encoding/Invalid Input: |
| Malformed characters, binary data mixed in text |
| β Caught by generic Exception handler |
| β Returns original text (preserving content even if untranslated) |
| |
| SCENARIO 4 β Google Temporary IP Block (longer than rate limit): |
| Sometimes Google blocks an IP for minutes, not just seconds |
| β Our max backoff is 320 seconds (~5 minutes) |
| β With 10 retries and exponential backoff, total possible wait |
| is ~10 + 20 + 40 + 80 + 160 + 320 + 320 + 320 + 320 + 320 = ~31 minutes |
| β This is usually enough for Google to unblock |
| """ |
|
|
| def __init__(self, config: TranslatorConfig): |
| self.config = config |
| |
| self.request_semaphore = threading.Semaphore(config.max_concurrent_requests) |
| |
| self._last_request_time = threading.local() |
|
|
| def _enforce_rate_limit(self): |
| """ |
| Ensures minimum delay between requests on the SAME thread. |
| This prevents any single thread from hammering Google. |
| """ |
| now = time.time() |
| last = getattr(self._last_request_time, "value", 0) |
| elapsed = now - last |
| if elapsed < self.config.min_request_delay: |
| sleep_time = self.config.min_request_delay - elapsed |
| time.sleep(sleep_time) |
| self._last_request_time.value = time.time() |
|
|
| @retry( |
| |
| retry=retry_if_exception_type(( |
| Exception, |
| )), |
| |
| wait=wait_exponential( |
| multiplier=2, |
| min=10, |
| max=320, |
| ), |
| |
| stop=stop_after_attempt(10), |
| |
| before_sleep=before_sleep_log(logger, logging.WARNING), |
| |
| reraise=True, |
| ) |
| def _translate_with_retry(self, text: str) -> str: |
| """ |
| The actual translation call, wrapped with tenacity retry decorator. |
| |
| This is the INNERMOST function β it talks to Google Translate. |
| If Google returns 429 or connection drops, tenacity handles retry. |
| """ |
| |
| with self.request_semaphore: |
| |
| self._enforce_rate_limit() |
|
|
| |
| |
| translator = GoogleTranslator( |
| source=self.config.source_lang, |
| target=self.config.target_lang, |
| ) |
|
|
| result = translator.translate(text=text) |
|
|
| |
| if result is None: |
| return text |
|
|
| return result |
|
|
| def translate_chunk(self, text: str) -> str: |
| """ |
| Public API: Translate a text chunk with full error handling. |
| |
| Returns translated text on success. |
| Returns ORIGINAL text on permanent failure (zero text loss guarantee). |
| """ |
| |
| if not text or not text.strip(): |
| return text |
|
|
| try: |
| return self._translate_with_retry(text) |
| except RetryError as e: |
| |
| logger.error( |
| f"PERMANENT FAILURE after {self.config.max_retries} retries. " |
| f"Chunk (first 100 chars): '{text[:100]}...'. " |
| f"Error: {e.last_attempt.exception()}" |
| ) |
| |
| return text |
| except Exception as e: |
| |
| logger.error(f"Unexpected translation error: {e}. Preserving original text.") |
| return text |
|
|
|
|
| |
| |
| |
| class TextSplitter: |
| """ |
| Splits massive text files into translation-ready chunks while |
| preserving EXACT line break structure. |
| |
| STRATEGY: |
| 1. Split file by double-newline (\n\n) β paragraphs |
| 2. For each paragraph, if it's within char limit β translate as-is |
| 3. If paragraph exceeds char limit β split by single newline (\n) β lines |
| 4. If a single line exceeds char limit β split by sentences |
| 5. After translation, rejoin with EXACT same separators |
| |
| This guarantees: |
| - Every \n is preserved |
| - Every \n\n is preserved |
| - No lines are skipped or merged |
| """ |
|
|
| def __init__(self, config: TranslatorConfig): |
| self.config = config |
|
|
| def split_into_paragraphs(self, file_path: str): |
| """ |
| Generator: Yields paragraphs from file WITHOUT loading entire file into RAM. |
| |
| Uses streaming read β reads file line by line, accumulates paragraphs, |
| yields when a paragraph boundary (double newline) is found. |
| |
| Memory usage: Only ONE paragraph in RAM at a time. |
| For a 3-million-word file, this is crucial. |
| """ |
| current_paragraph_lines = [] |
| consecutive_empty_lines = 0 |
|
|
| with open(file_path, "r", encoding="utf-8", errors="replace") as f: |
| for line in f: |
| |
| stripped = line.rstrip("\n") |
|
|
| if stripped == "": |
| consecutive_empty_lines += 1 |
| if consecutive_empty_lines >= 1 and current_paragraph_lines: |
| |
| paragraph_text = "\n".join(current_paragraph_lines) |
| yield paragraph_text |
| current_paragraph_lines = [] |
| |
| yield "" |
| else: |
| consecutive_empty_lines = 0 |
| current_paragraph_lines.append(stripped) |
|
|
| |
| if current_paragraph_lines: |
| paragraph_text = "\n".join(current_paragraph_lines) |
| yield paragraph_text |
|
|
| def split_paragraph_into_chunks(self, paragraph: str) -> list[str]: |
| """ |
| If a paragraph is too long for one API call, split it into |
| smaller chunks while preserving line boundaries. |
| |
| Returns a list of chunk strings. When rejoined with \n, they |
| reconstruct the original paragraph exactly. |
| """ |
| if len(paragraph) <= self.config.max_chunk_chars: |
| return [paragraph] |
|
|
| |
| lines = paragraph.split("\n") |
| chunks = [] |
| current_chunk_lines = [] |
| current_chunk_len = 0 |
|
|
| for line in lines: |
| line_len = len(line) + 1 |
|
|
| if current_chunk_len + line_len > self.config.max_chunk_chars: |
| if current_chunk_lines: |
| chunks.append("\n".join(current_chunk_lines)) |
| current_chunk_lines = [] |
| current_chunk_len = 0 |
|
|
| |
| if line_len > self.config.max_chunk_chars: |
| sentence_chunks = self._split_long_line(line) |
| chunks.extend(sentence_chunks) |
| continue |
|
|
| current_chunk_lines.append(line) |
| current_chunk_len += line_len |
|
|
| if current_chunk_lines: |
| chunks.append("\n".join(current_chunk_lines)) |
|
|
| return chunks |
|
|
| def _split_long_line(self, line: str) -> list[str]: |
| """ |
| Last resort: Split a very long single line by sentence boundaries. |
| Preserves all text β just breaks it into translatable pieces. |
| """ |
| |
| sentences = re.split(r"(?<=[.!?])\s+", line) |
| chunks = [] |
| current = "" |
|
|
| for sentence in sentences: |
| if len(current) + len(sentence) + 1 > self.config.max_chunk_chars: |
| if current: |
| chunks.append(current) |
| current = sentence |
| else: |
| current = f"{current} {sentence}" if current else sentence |
|
|
| if current: |
| chunks.append(current) |
|
|
| |
| |
| final_chunks = [] |
| for chunk in chunks: |
| if len(chunk) > self.config.max_chunk_chars: |
| for i in range(0, len(chunk), self.config.max_chunk_chars): |
| final_chunks.append(chunk[i : i + self.config.max_chunk_chars]) |
| else: |
| final_chunks.append(chunk) |
|
|
| return final_chunks |
|
|
| def count_paragraphs(self, file_path: str) -> int: |
| """ |
| Quick scan to count total paragraphs for progress tracking. |
| Streams through file β doesn't load into RAM. |
| """ |
| count = 0 |
| for _ in self.split_into_paragraphs(file_path): |
| count += 1 |
| return count |
|
|
|
|
| |
| |
| |
| class MassiveFileTranslator: |
| """ |
| The main orchestrator that: |
| 1. Reads the input file (streaming) |
| 2. Splits into paragraphs |
| 3. Translates each paragraph (with concurrency + rate limiting) |
| 4. Writes translated text to output file (streaming/append) |
| 5. Tracks progress in real-time |
| |
| STREAMING WRITE GUARANTEE: |
| -------------------------- |
| Every translated paragraph is IMMEDIATELY flushed to the output file. |
| If the process crashes at paragraph 25,000 out of 50,000: |
| - The output file contains paragraphs 1-24,999 fully translated |
| - No data is held only in RAM |
| - You can resume or at least salvage the partial translation |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[TranslatorConfig] = None, |
| progress: Optional[TranslationProgress] = None, |
| ): |
| self.config = config or TranslatorConfig() |
| self.progress = progress or TranslationProgress() |
| self.chunk_translator = ChunkTranslator(self.config) |
| self.text_splitter = TextSplitter(self.config) |
| |
| self._write_lock = threading.Lock() |
| |
| self._cancel_flag = threading.Event() |
|
|
| def cancel(self): |
| """Signal the translation to stop gracefully.""" |
| self._cancel_flag.set() |
| self.progress.set_status("failed", "Cancelled by user") |
|
|
| def translate_file(self, input_path: str, output_path: str) -> str: |
| """ |
| Main entry point: Translate an entire file. |
| |
| This method is designed to be called in a background thread. |
| It streams through the input, translates, and streams to output. |
| |
| Returns the output file path on completion. |
| """ |
| try: |
| self.progress.set_status("preparing", "Counting paragraphs...") |
| self.progress.file_name = os.path.basename(input_path) |
| self.progress.output_file = os.path.basename(output_path) |
|
|
| |
| logger.info(f"Counting paragraphs in: {input_path}") |
| total = self.text_splitter.count_paragraphs(input_path) |
| self.progress.total_paragraphs = total |
| logger.info(f"Total paragraphs to translate: {total}") |
|
|
| if total == 0: |
| self.progress.set_status("completed", "File is empty") |
| |
| open(output_path, "w", encoding="utf-8").close() |
| return output_path |
|
|
| |
| with open(output_path, "w", encoding="utf-8") as f: |
| f.write("") |
|
|
| |
| self.progress.set_status("translating", "Translation in progress...") |
| self.progress.start_time = time.time() |
|
|
| self._translate_sequential_with_threads(input_path, output_path) |
|
|
| |
| if not self._cancel_flag.is_set(): |
| self.progress.set_status("completed", "Translation finished!") |
| logger.info( |
| f"Translation complete. Output: {output_path}. " |
| f"Translated: {self.progress.translated_paragraphs}, " |
| f"Failed: {self.progress.failed_paragraphs}" |
| ) |
|
|
| return output_path |
|
|
| except Exception as e: |
| logger.exception(f"Fatal error during translation: {e}") |
| self.progress.set_status("failed", f"Fatal error: {str(e)}") |
| self.progress.error_message = str(e) |
| raise |
|
|
| def _translate_sequential_with_threads( |
| self, input_path: str, output_path: str |
| ): |
| """ |
| ORDERED translation with thread pool. |
| |
| WHY ORDERED MATTERS: |
| We must write paragraphs to the output file in the EXACT same order |
| as the input file. Random ordering would scramble the book. |
| |
| STRATEGY: |
| - We use a thread pool for concurrent translation |
| - But we process in BATCHES to maintain order |
| - Each batch = N paragraphs (N = max_workers * 3 for pipeline efficiency) |
| - Within a batch, paragraphs are translated concurrently |
| - After the entire batch is done, we write results IN ORDER to disk |
| - Then move to the next batch |
| |
| This gives us: |
| β
Concurrency (multiple paragraphs translated simultaneously) |
| β
Strict ordering (output matches input structure exactly) |
| β
Streaming writes (each batch is flushed to disk immediately) |
| """ |
| batch_size = self.config.max_workers * 3 |
| paragraph_generator = self.text_splitter.split_into_paragraphs(input_path) |
|
|
| with ThreadPoolExecutor( |
| max_workers=self.config.max_workers, |
| thread_name_prefix="translator", |
| ) as executor: |
|
|
| batch = [] |
| batch_indices = [] |
| paragraph_index = 0 |
|
|
| for paragraph in paragraph_generator: |
| if self._cancel_flag.is_set(): |
| logger.warning("Translation cancelled.") |
| return |
|
|
| batch.append(paragraph) |
| batch_indices.append(paragraph_index) |
| paragraph_index += 1 |
|
|
| |
| if len(batch) >= batch_size: |
| self._process_batch( |
| executor, batch, batch_indices, output_path |
| ) |
| batch = [] |
| batch_indices = [] |
|
|
| |
| if batch: |
| self._process_batch( |
| executor, batch, batch_indices, output_path |
| ) |
|
|
| def _process_batch( |
| self, |
| executor: ThreadPoolExecutor, |
| batch: list[str], |
| indices: list[int], |
| output_path: str, |
| ): |
| """ |
| Process a batch of paragraphs: |
| 1. Submit all to thread pool for concurrent translation |
| 2. Wait for ALL to complete |
| 3. Write results to disk IN ORDER |
| """ |
| |
| future_to_index = {} |
| for i, paragraph in enumerate(batch): |
| if self._cancel_flag.is_set(): |
| return |
|
|
| |
| if not paragraph.strip(): |
| future_to_index[i] = paragraph |
| else: |
| future = executor.submit(self._translate_single_paragraph, paragraph) |
| future_to_index[future] = i |
|
|
| |
| results = [""] * len(batch) |
|
|
| |
| for key, value in list(future_to_index.items()): |
| if isinstance(key, int): |
| results[key] = value |
| del future_to_index[key] |
|
|
| |
| for future in as_completed(future_to_index): |
| idx = future_to_index[future] |
| try: |
| translated_text = future.result() |
| results[idx] = translated_text |
| except Exception as e: |
| |
| |
| logger.error(f"Unexpected future error: {e}") |
| results[idx] = batch[idx] |
| self.progress.increment_failed() |
|
|
| |
| with self._write_lock: |
| with open(output_path, "a", encoding="utf-8") as f: |
| for i, translated in enumerate(results): |
| if i > 0 or os.path.getsize(output_path) > 0: |
| f.write("\n\n") |
| f.write(translated) |
| |
| f.flush() |
| os.fsync(f.fileno()) |
|
|
| def _translate_single_paragraph(self, paragraph: str) -> str: |
| """ |
| Translate a single paragraph, handling the case where it might |
| be too long for a single API call. |
| |
| Preserves internal \n line breaks exactly. |
| """ |
| |
| chunks = self.text_splitter.split_paragraph_into_chunks(paragraph) |
|
|
| if len(chunks) == 1: |
| |
| |
| |
| lines = paragraph.split("\n") |
| translated_lines = [] |
| for line in lines: |
| if line.strip(): |
| translated_line = self.chunk_translator.translate_chunk(line) |
| translated_lines.append(translated_line) |
| else: |
| |
| translated_lines.append(line) |
|
|
| self.progress.increment_translated() |
| return "\n".join(translated_lines) |
| else: |
| |
| translated_chunks = [] |
| for chunk in chunks: |
| lines = chunk.split("\n") |
| translated_lines = [] |
| for line in lines: |
| if line.strip(): |
| translated_line = self.chunk_translator.translate_chunk(line) |
| translated_lines.append(translated_line) |
| else: |
| translated_lines.append(line) |
| translated_chunks.append("\n".join(translated_lines)) |
|
|
| self.progress.increment_translated() |
| return "\n".join(translated_chunks) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| print("=" * 60) |
| print("TRANSLATOR ENGINE β Quick Self-Test") |
| print("=" * 60) |
|
|
| config = TranslatorConfig() |
| progress = TranslationProgress() |
|
|
| |
| chunk_translator = ChunkTranslator(config) |
| test_text = "Hello, how are you? This is a test of the translation engine." |
|
|
| print(f"\nOriginal: {test_text}") |
| translated = chunk_translator.translate_chunk(test_text) |
| print(f"Translated: {translated}") |
|
|
| |
| splitter = TextSplitter(config) |
| long_text = "A" * 5000 |
| chunks = splitter.split_paragraph_into_chunks(long_text) |
| print(f"\nLong text ({len(long_text)} chars) split into {len(chunks)} chunks") |
| print(f"Chunk sizes: {[len(c) for c in chunks]}") |
|
|
| print("\nβ
Self-test complete. Engine is functional.") |