Novelt / translator_engine.py
Ruhivig65's picture
Upload 6 files
6cc0372 verified
"""
translator_engine.py
====================
Core Translation Engine for Massive Text Files (English -> Hindi)
ARCHITECTURE DECISIONS:
-----------------------
1. STREAMING READ/WRITE: We never hold the full translated file in RAM.
- Read source file paragraph-by-paragraph
- Translate each paragraph
- Immediately append translated text to output file on disk
- If process crashes at paragraph 25,000 β€” the first 24,999 are SAFE on disk
2. CHUNKING STRATEGY:
- Split file by double-newline (\n\n) to get paragraphs
- Each paragraph is one translation unit
- Single newlines (\n) WITHIN a paragraph are preserved by translating
line-by-line inside each paragraph
- This guarantees ZERO text loss and exact structural preservation
3. RATE LIMITING & IP BAN PREVENTION:
- Each translation call has a mandatory sleep (configurable, default 0.3s)
- ThreadPoolExecutor with LIMITED workers (default 2) β€” NOT aggressive
- On HTTP 429 (Too Many Requests): exponential backoff via tenacity
- Base wait: 10 seconds, multiplier: 2x, max wait: 320 seconds
- Max 10 retries per chunk before marking as FAILED
- On connection drop: same retry logic catches ConnectionError, Timeout
4. CONCURRENCY MODEL:
- We use ThreadPoolExecutor but with STRICT controls:
a) Max 2 workers (Google free tier can't handle more)
b) Each worker has mandatory inter-request delay
c) A global rate limiter (threading.Semaphore) prevents burst
- This is NOT about max speed β€” it's about RELIABLE completion
of 50,000+ paragraphs without getting IP-banned
5. ERROR HANDLING SCENARIOS (explicitly documented):
- HTTP 429 Too Many Requests β†’ exponential backoff, retry up to 10x
- ConnectionError / Timeout β†’ same retry logic, waits and retries
- InvalidURL / encoding error β†’ log error, write original text as fallback
- Process crash β†’ all previously written paragraphs are safe on disk
- Google temporary block β†’ backoff will wait up to ~320s per retry
"""
import os
import re
import time
import threading
import logging
from typing import Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from deep_translator import GoogleTranslator
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
RetryError,
)
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(threadName)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger("translator_engine")
# ============================================================================
# CONFIGURATION β€” All tunables in one place
# ============================================================================
@dataclass
class TranslatorConfig:
"""
All configuration for the translation engine.
Tuned for Google Free Translator on a 16GB RAM server.
IMPORTANT FOR HUGGINGFACE SPACES:
- HF Spaces have shared IPs, so we must be EXTRA conservative
- Lower workers, higher delays to avoid bans
"""
# --- Translation Settings ---
source_lang: str = "en"
target_lang: str = "hi"
# --- Chunking ---
# Google Translator free tier has a ~5000 char limit per request.
# We set our max chunk size to 4500 to leave safety margin.
max_chunk_chars: int = 4500
# --- Rate Limiting ---
# Minimum seconds to wait BETWEEN translation requests (per thread)
min_request_delay: float = 0.5
# How many concurrent translation threads
# KEEP THIS LOW β€” Google will ban aggressive concurrent requests
max_workers: int = 2
# Global semaphore β€” max simultaneous in-flight requests across all threads
max_concurrent_requests: int = 2
# --- Retry / Backoff (for HTTP 429, Connection drops) ---
max_retries: int = 10 # Max retries per single chunk
backoff_base_wait: int = 10 # First retry waits 10 seconds
backoff_multiplier: int = 2 # Each subsequent retry doubles wait
backoff_max_wait: int = 320 # Never wait more than 320s per retry
# --- File I/O ---
# How many paragraphs to batch into one write operation
# (each paragraph is still translated individually, but we flush to disk
# every N paragraphs for I/O efficiency)
disk_flush_interval: int = 5
# ============================================================================
# PROGRESS TRACKER β€” Thread-safe progress monitoring
# ============================================================================
@dataclass
class TranslationProgress:
"""
Thread-safe progress tracker.
The frontend polls this to show real-time progress.
"""
total_paragraphs: int = 0
translated_paragraphs: int = 0
failed_paragraphs: int = 0
status: str = "idle" # idle, preparing, translating, completed, failed
error_message: str = ""
current_phase: str = ""
start_time: float = 0.0
file_name: str = ""
output_file: str = ""
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def increment_translated(self):
with self._lock:
self.translated_paragraphs += 1
def increment_failed(self):
with self._lock:
self.failed_paragraphs += 1
def set_status(self, status: str, phase: str = ""):
with self._lock:
self.status = status
if phase:
self.current_phase = phase
def to_dict(self) -> dict:
with self._lock:
elapsed = 0
speed = 0
eta_seconds = 0
if self.start_time > 0 and self.translated_paragraphs > 0:
elapsed = time.time() - self.start_time
speed = self.translated_paragraphs / elapsed # paragraphs per second
remaining = self.total_paragraphs - self.translated_paragraphs
eta_seconds = remaining / speed if speed > 0 else 0
return {
"total_paragraphs": self.total_paragraphs,
"translated_paragraphs": self.translated_paragraphs,
"failed_paragraphs": self.failed_paragraphs,
"status": self.status,
"error_message": self.error_message,
"current_phase": self.current_phase,
"file_name": self.file_name,
"output_file": self.output_file,
"elapsed_seconds": round(elapsed, 1),
"speed_per_second": round(speed, 2),
"eta_seconds": round(eta_seconds, 1),
"percent_complete": round(
(self.translated_paragraphs / self.total_paragraphs * 100)
if self.total_paragraphs > 0
else 0,
1,
),
}
# ============================================================================
# SINGLE CHUNK TRANSLATOR β€” With full retry & backoff logic
# ============================================================================
class ChunkTranslator:
"""
Translates a single text chunk (paragraph or sub-paragraph).
ERROR HANDLING SCENARIOS:
-------------------------
SCENARIO 1 β€” HTTP 429 Too Many Requests (IP Rate Limit):
Google returns 429 when we're sending too many requests.
β†’ tenacity catches this, waits 10s (then 20s, 40s, 80s, 160s, 320s...)
β†’ Retries the EXACT same chunk up to 10 times
β†’ After 10 failures, returns original text as fallback (no data loss)
SCENARIO 2 β€” Connection Drop (ConnectionError, Timeout):
Network issues, Google server down, DNS failure, etc.
β†’ Same retry logic as above catches these exceptions
β†’ Exponential backoff gives the network time to recover
β†’ If persistent failure, returns original text
SCENARIO 3 β€” Encoding/Invalid Input:
Malformed characters, binary data mixed in text
β†’ Caught by generic Exception handler
β†’ Returns original text (preserving content even if untranslated)
SCENARIO 4 β€” Google Temporary IP Block (longer than rate limit):
Sometimes Google blocks an IP for minutes, not just seconds
β†’ Our max backoff is 320 seconds (~5 minutes)
β†’ With 10 retries and exponential backoff, total possible wait
is ~10 + 20 + 40 + 80 + 160 + 320 + 320 + 320 + 320 + 320 = ~31 minutes
β†’ This is usually enough for Google to unblock
"""
def __init__(self, config: TranslatorConfig):
self.config = config
# Global semaphore: limits total in-flight requests across all threads
self.request_semaphore = threading.Semaphore(config.max_concurrent_requests)
# Per-thread rate limiter
self._last_request_time = threading.local()
def _enforce_rate_limit(self):
"""
Ensures minimum delay between requests on the SAME thread.
This prevents any single thread from hammering Google.
"""
now = time.time()
last = getattr(self._last_request_time, "value", 0)
elapsed = now - last
if elapsed < self.config.min_request_delay:
sleep_time = self.config.min_request_delay - elapsed
time.sleep(sleep_time)
self._last_request_time.value = time.time()
@retry(
# Retry on these specific exceptions (network + rate limit errors)
retry=retry_if_exception_type((
Exception, # deep-translator wraps errors in generic exceptions
)),
# Exponential backoff: 10s β†’ 20s β†’ 40s β†’ 80s β†’ 160s β†’ 320s (capped)
wait=wait_exponential(
multiplier=2,
min=10,
max=320,
),
# Maximum 10 retry attempts per chunk
stop=stop_after_attempt(10),
# Log before each retry sleep (helps debugging)
before_sleep=before_sleep_log(logger, logging.WARNING),
# Re-raise the final exception if all retries exhausted
reraise=True,
)
def _translate_with_retry(self, text: str) -> str:
"""
The actual translation call, wrapped with tenacity retry decorator.
This is the INNERMOST function β€” it talks to Google Translate.
If Google returns 429 or connection drops, tenacity handles retry.
"""
# Acquire semaphore β€” blocks if too many concurrent requests
with self.request_semaphore:
# Enforce per-thread rate limit
self._enforce_rate_limit()
# Create a fresh translator instance per call
# (deep-translator is not guaranteed thread-safe with shared instances)
translator = GoogleTranslator(
source=self.config.source_lang,
target=self.config.target_lang,
)
result = translator.translate(text=text)
# Google sometimes returns None for empty strings
if result is None:
return text
return result
def translate_chunk(self, text: str) -> str:
"""
Public API: Translate a text chunk with full error handling.
Returns translated text on success.
Returns ORIGINAL text on permanent failure (zero text loss guarantee).
"""
# Don't waste API calls on empty/whitespace-only text
if not text or not text.strip():
return text
try:
return self._translate_with_retry(text)
except RetryError as e:
# All 10 retries exhausted β€” log and return original
logger.error(
f"PERMANENT FAILURE after {self.config.max_retries} retries. "
f"Chunk (first 100 chars): '{text[:100]}...'. "
f"Error: {e.last_attempt.exception()}"
)
# ZERO TEXT LOSS: Return original text rather than losing it
return text
except Exception as e:
# Unexpected error β€” still preserve the original text
logger.error(f"Unexpected translation error: {e}. Preserving original text.")
return text
# ============================================================================
# PARAGRAPH SPLITTER β€” Preserves exact structure
# ============================================================================
class TextSplitter:
"""
Splits massive text files into translation-ready chunks while
preserving EXACT line break structure.
STRATEGY:
1. Split file by double-newline (\n\n) β†’ paragraphs
2. For each paragraph, if it's within char limit β†’ translate as-is
3. If paragraph exceeds char limit β†’ split by single newline (\n) β†’ lines
4. If a single line exceeds char limit β†’ split by sentences
5. After translation, rejoin with EXACT same separators
This guarantees:
- Every \n is preserved
- Every \n\n is preserved
- No lines are skipped or merged
"""
def __init__(self, config: TranslatorConfig):
self.config = config
def split_into_paragraphs(self, file_path: str):
"""
Generator: Yields paragraphs from file WITHOUT loading entire file into RAM.
Uses streaming read β€” reads file line by line, accumulates paragraphs,
yields when a paragraph boundary (double newline) is found.
Memory usage: Only ONE paragraph in RAM at a time.
For a 3-million-word file, this is crucial.
"""
current_paragraph_lines = []
consecutive_empty_lines = 0
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
# Remove only the trailing newline for analysis
stripped = line.rstrip("\n")
if stripped == "":
consecutive_empty_lines += 1
if consecutive_empty_lines >= 1 and current_paragraph_lines:
# End of a paragraph β€” yield it
paragraph_text = "\n".join(current_paragraph_lines)
yield paragraph_text
current_paragraph_lines = []
# Yield empty line as separator (preserves exact blank lines)
yield ""
else:
consecutive_empty_lines = 0
current_paragraph_lines.append(stripped)
# Don't forget the last paragraph (file might not end with \n\n)
if current_paragraph_lines:
paragraph_text = "\n".join(current_paragraph_lines)
yield paragraph_text
def split_paragraph_into_chunks(self, paragraph: str) -> list[str]:
"""
If a paragraph is too long for one API call, split it into
smaller chunks while preserving line boundaries.
Returns a list of chunk strings. When rejoined with \n, they
reconstruct the original paragraph exactly.
"""
if len(paragraph) <= self.config.max_chunk_chars:
return [paragraph]
# Split by lines first
lines = paragraph.split("\n")
chunks = []
current_chunk_lines = []
current_chunk_len = 0
for line in lines:
line_len = len(line) + 1 # +1 for the \n separator
if current_chunk_len + line_len > self.config.max_chunk_chars:
if current_chunk_lines:
chunks.append("\n".join(current_chunk_lines))
current_chunk_lines = []
current_chunk_len = 0
# If a single line is STILL too long, split by sentences
if line_len > self.config.max_chunk_chars:
sentence_chunks = self._split_long_line(line)
chunks.extend(sentence_chunks)
continue
current_chunk_lines.append(line)
current_chunk_len += line_len
if current_chunk_lines:
chunks.append("\n".join(current_chunk_lines))
return chunks
def _split_long_line(self, line: str) -> list[str]:
"""
Last resort: Split a very long single line by sentence boundaries.
Preserves all text β€” just breaks it into translatable pieces.
"""
# Split on sentence endings while keeping the delimiter
sentences = re.split(r"(?<=[.!?])\s+", line)
chunks = []
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 > self.config.max_chunk_chars:
if current:
chunks.append(current)
current = sentence
else:
current = f"{current} {sentence}" if current else sentence
if current:
chunks.append(current)
# If we still have chunks that are too long (no sentence boundaries),
# do a hard split at max_chunk_chars (last resort, very rare)
final_chunks = []
for chunk in chunks:
if len(chunk) > self.config.max_chunk_chars:
for i in range(0, len(chunk), self.config.max_chunk_chars):
final_chunks.append(chunk[i : i + self.config.max_chunk_chars])
else:
final_chunks.append(chunk)
return final_chunks
def count_paragraphs(self, file_path: str) -> int:
"""
Quick scan to count total paragraphs for progress tracking.
Streams through file β€” doesn't load into RAM.
"""
count = 0
for _ in self.split_into_paragraphs(file_path):
count += 1
return count
# ============================================================================
# MAIN TRANSLATION ORCHESTRATOR β€” Ties everything together
# ============================================================================
class MassiveFileTranslator:
"""
The main orchestrator that:
1. Reads the input file (streaming)
2. Splits into paragraphs
3. Translates each paragraph (with concurrency + rate limiting)
4. Writes translated text to output file (streaming/append)
5. Tracks progress in real-time
STREAMING WRITE GUARANTEE:
--------------------------
Every translated paragraph is IMMEDIATELY flushed to the output file.
If the process crashes at paragraph 25,000 out of 50,000:
- The output file contains paragraphs 1-24,999 fully translated
- No data is held only in RAM
- You can resume or at least salvage the partial translation
"""
def __init__(
self,
config: Optional[TranslatorConfig] = None,
progress: Optional[TranslationProgress] = None,
):
self.config = config or TranslatorConfig()
self.progress = progress or TranslationProgress()
self.chunk_translator = ChunkTranslator(self.config)
self.text_splitter = TextSplitter(self.config)
# Lock for sequential file writing (multiple threads, one output file)
self._write_lock = threading.Lock()
# Flag to support graceful cancellation
self._cancel_flag = threading.Event()
def cancel(self):
"""Signal the translation to stop gracefully."""
self._cancel_flag.set()
self.progress.set_status("failed", "Cancelled by user")
def translate_file(self, input_path: str, output_path: str) -> str:
"""
Main entry point: Translate an entire file.
This method is designed to be called in a background thread.
It streams through the input, translates, and streams to output.
Returns the output file path on completion.
"""
try:
self.progress.set_status("preparing", "Counting paragraphs...")
self.progress.file_name = os.path.basename(input_path)
self.progress.output_file = os.path.basename(output_path)
# Phase 1: Count total paragraphs (quick streaming scan)
logger.info(f"Counting paragraphs in: {input_path}")
total = self.text_splitter.count_paragraphs(input_path)
self.progress.total_paragraphs = total
logger.info(f"Total paragraphs to translate: {total}")
if total == 0:
self.progress.set_status("completed", "File is empty")
# Create empty output file
open(output_path, "w", encoding="utf-8").close()
return output_path
# Phase 2: Clear/create output file
with open(output_path, "w", encoding="utf-8") as f:
f.write("") # Create empty file
# Phase 3: Stream-translate with ordered sequential writing
self.progress.set_status("translating", "Translation in progress...")
self.progress.start_time = time.time()
self._translate_sequential_with_threads(input_path, output_path)
# Phase 4: Done
if not self._cancel_flag.is_set():
self.progress.set_status("completed", "Translation finished!")
logger.info(
f"Translation complete. Output: {output_path}. "
f"Translated: {self.progress.translated_paragraphs}, "
f"Failed: {self.progress.failed_paragraphs}"
)
return output_path
except Exception as e:
logger.exception(f"Fatal error during translation: {e}")
self.progress.set_status("failed", f"Fatal error: {str(e)}")
self.progress.error_message = str(e)
raise
def _translate_sequential_with_threads(
self, input_path: str, output_path: str
):
"""
ORDERED translation with thread pool.
WHY ORDERED MATTERS:
We must write paragraphs to the output file in the EXACT same order
as the input file. Random ordering would scramble the book.
STRATEGY:
- We use a thread pool for concurrent translation
- But we process in BATCHES to maintain order
- Each batch = N paragraphs (N = max_workers * 3 for pipeline efficiency)
- Within a batch, paragraphs are translated concurrently
- After the entire batch is done, we write results IN ORDER to disk
- Then move to the next batch
This gives us:
βœ… Concurrency (multiple paragraphs translated simultaneously)
βœ… Strict ordering (output matches input structure exactly)
βœ… Streaming writes (each batch is flushed to disk immediately)
"""
batch_size = self.config.max_workers * 3 # Pipeline efficiency
paragraph_generator = self.text_splitter.split_into_paragraphs(input_path)
with ThreadPoolExecutor(
max_workers=self.config.max_workers,
thread_name_prefix="translator",
) as executor:
batch = []
batch_indices = []
paragraph_index = 0
for paragraph in paragraph_generator:
if self._cancel_flag.is_set():
logger.warning("Translation cancelled.")
return
batch.append(paragraph)
batch_indices.append(paragraph_index)
paragraph_index += 1
# Process batch when full
if len(batch) >= batch_size:
self._process_batch(
executor, batch, batch_indices, output_path
)
batch = []
batch_indices = []
# Process remaining paragraphs in the last (partial) batch
if batch:
self._process_batch(
executor, batch, batch_indices, output_path
)
def _process_batch(
self,
executor: ThreadPoolExecutor,
batch: list[str],
indices: list[int],
output_path: str,
):
"""
Process a batch of paragraphs:
1. Submit all to thread pool for concurrent translation
2. Wait for ALL to complete
3. Write results to disk IN ORDER
"""
# Submit all paragraphs in this batch to the thread pool
future_to_index = {}
for i, paragraph in enumerate(batch):
if self._cancel_flag.is_set():
return
# Empty paragraphs (blank lines) don't need translation
if not paragraph.strip():
future_to_index[i] = paragraph # Store directly
else:
future = executor.submit(self._translate_single_paragraph, paragraph)
future_to_index[future] = i
# Collect results in order
results = [""] * len(batch)
# First, fill in the empty paragraphs (no futures for these)
for key, value in list(future_to_index.items()):
if isinstance(key, int):
results[key] = value
del future_to_index[key]
# Wait for translation futures
for future in as_completed(future_to_index):
idx = future_to_index[future]
try:
translated_text = future.result()
results[idx] = translated_text
except Exception as e:
# This shouldn't happen (translate_chunk handles all errors)
# but just in case β€” preserve original text
logger.error(f"Unexpected future error: {e}")
results[idx] = batch[idx] # Original text as fallback
self.progress.increment_failed()
# Write entire batch to disk IN ORDER
with self._write_lock:
with open(output_path, "a", encoding="utf-8") as f:
for i, translated in enumerate(results):
if i > 0 or os.path.getsize(output_path) > 0:
f.write("\n\n") # Paragraph separator
f.write(translated)
# Flush to disk immediately β€” crash protection
f.flush()
os.fsync(f.fileno())
def _translate_single_paragraph(self, paragraph: str) -> str:
"""
Translate a single paragraph, handling the case where it might
be too long for a single API call.
Preserves internal \n line breaks exactly.
"""
# Split paragraph into API-friendly chunks if needed
chunks = self.text_splitter.split_paragraph_into_chunks(paragraph)
if len(chunks) == 1:
# Paragraph fits in one API call
# But we still need to preserve internal line breaks
# Strategy: translate line by line within the paragraph
lines = paragraph.split("\n")
translated_lines = []
for line in lines:
if line.strip():
translated_line = self.chunk_translator.translate_chunk(line)
translated_lines.append(translated_line)
else:
# Preserve empty lines within paragraph
translated_lines.append(line)
self.progress.increment_translated()
return "\n".join(translated_lines)
else:
# Large paragraph β€” translate each chunk, then rejoin
translated_chunks = []
for chunk in chunks:
lines = chunk.split("\n")
translated_lines = []
for line in lines:
if line.strip():
translated_line = self.chunk_translator.translate_chunk(line)
translated_lines.append(translated_line)
else:
translated_lines.append(line)
translated_chunks.append("\n".join(translated_lines))
self.progress.increment_translated()
return "\n".join(translated_chunks)
# ============================================================================
# QUICK TEST β€” Run this file directly to test translation
# ============================================================================
if __name__ == "__main__":
print("=" * 60)
print("TRANSLATOR ENGINE β€” Quick Self-Test")
print("=" * 60)
config = TranslatorConfig()
progress = TranslationProgress()
# Test single chunk translation
chunk_translator = ChunkTranslator(config)
test_text = "Hello, how are you? This is a test of the translation engine."
print(f"\nOriginal: {test_text}")
translated = chunk_translator.translate_chunk(test_text)
print(f"Translated: {translated}")
# Test text splitter
splitter = TextSplitter(config)
long_text = "A" * 5000
chunks = splitter.split_paragraph_into_chunks(long_text)
print(f"\nLong text ({len(long_text)} chars) split into {len(chunks)} chunks")
print(f"Chunk sizes: {[len(c) for c in chunks]}")
print("\nβœ… Self-test complete. Engine is functional.")