trabb / translator.py
fokan's picture
first push
0cb01bf
import os
import requests
import asyncio
import aiohttp
import subprocess
import tempfile
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
import logging
from docx import Document
from docx.shared import Inches
import time
import json
from PyPDF2 import PdfReader
import pypdfium2 as pdfium
import pdfplumber
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.utils import simpleSplit
import io
logger = logging.getLogger(__name__)
@dataclass
class TranslationReport:
original_file: Path
translated_file: Path
pages_count: int
paragraphs_count: int
status: str
errors: Optional[List[str]] = None
@dataclass
class PDFTextElement:
"""Represents a text element with its position and formatting"""
text: str
x: float
y: float
width: float
height: float
font_name: str
font_size: float
page_num: int
class DocumentTranslator:
def __init__(self):
self.api_key = os.getenv("OPENROUTER_API_KEY")
if not self.api_key:
logger.warning("OPENROUTER_API_KEY not found in environment variables")
self.base_url = "https://openrouter.ai/api/v1"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://huggingface.co",
"X-Title": "Document Translator"
}
def is_ready(self) -> bool:
"""Check if translator is ready"""
return bool(self.api_key)
async def get_available_models(self) -> List[Dict]:
"""Return the specified free models"""
return [
{
"id": "google/gemini-2.0-flash-exp:free",
"name": "Google Gemini 2.0 Flash (Free)",
"description": "Fast and free Google AI model",
"pricing": {}
},
{
"id": "tngtech/deepseek-r1t2-chimera:free",
"name": "DeepSeek R1T2 Chimera (Free)",
"description": "Free advanced reasoning model",
"pricing": {}
}
]
def _get_default_models(self) -> List[Dict]:
"""Return the specified free models as default"""
return [
{"id": "google/gemini-2.0-flash-exp:free", "name": "Google Gemini 2.0 Flash (Free)", "description": "Fast and free Google AI model"},
{"id": "tngtech/deepseek-r1t2-chimera:free", "name": "DeepSeek R1T2 Chimera (Free)", "description": "Free advanced reasoning model"}
]
async def translate_text(self, text: str, model: str, source_lang: str = "auto", target_lang: str = "en") -> str:
"""Translate text using OpenRouter API with improved prompt and validation"""
if not text.strip():
return text
# Validate API key first
if not self.api_key:
raise Exception("OpenRouter API key not configured. Please set OPENROUTER_API_KEY environment variable.")
# Create a more specific translation prompt
if source_lang == "auto":
prompt = f"""You are a professional document translator. Translate the following text to {target_lang} (Arabic if 'ar', English if 'en', etc.).
IMPORTANT INSTRUCTIONS:
1. Translate ONLY the content, do not add explanations
2. Maintain the original formatting and structure
3. Preserve technical terms appropriately
4. Return ONLY the translated text
5. If the text is already in the target language, still provide a proper translation/rewrite
Text to translate:
{text}
Translated text:"""
else:
prompt = f"""You are a professional document translator. Translate the following text from {source_lang} to {target_lang}.
IMPORTANT INSTRUCTIONS:
1. Translate ONLY the content, do not add explanations
2. Maintain the original formatting and structure
3. Preserve technical terms appropriately
4. Return ONLY the translated text
5. If the text is already in the target language, still provide a proper translation/rewrite
Text to translate:
{text}
Translated text:"""
try:
async with aiohttp.ClientSession() as session:
payload = {
"model": model,
"messages": [
{"role": "system", "content": "You are a professional document translator. You MUST provide a translation. Never return the original text unchanged."},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": len(text) * 4 + 500 # More generous token limit
}
logger.info(f"Translating text: '{text[:50]}...' from {source_lang} to {target_lang} using model {model}")
async with session.post(
f"{self.base_url}/chat/completions",
headers=self.headers,
json=payload
) as response:
if response.status == 200:
data = await response.json()
translated = data["choices"][0]["message"]["content"].strip()
# Clean up the response to ensure we only get the translation
if "Translated text:" in translated:
translated = translated.split("Translated text:")[-1].strip()
# Remove any introductory phrases
for phrase in ["Here is the translation:", "Translation:", "The translation is:"]:
if translated.startswith(phrase):
translated = translated[len(phrase):].strip()
# Validate that we got a meaningful translation
if not translated or translated == text:
logger.warning(f"Translation returned empty or unchanged text")
# Don't fall back to original - raise error instead
raise Exception("Translation failed: received empty or unchanged text")
logger.info(f"Translation successful: '{translated[:50]}...'")
return translated
elif response.status == 429:
error_text = await response.text()
logger.error(f"Rate limit error: {response.status} - {error_text}")
raise Exception(f"Rate limit exceeded for model {model}. Please try again later or use a different model.")
else:
error_text = await response.text()
logger.error(f"Translation API error: {response.status} - {error_text}")
raise Exception(f"Translation API error: {response.status} - {error_text}")
except Exception as e:
logger.error(f"Translation error: {e}")
raise Exception(f"Translation failed: {str(e)}")
def extract_text_from_pdf(self, pdf_path: Path) -> str:
"""Extract text directly from PDF as fallback method"""
try:
logger.info(f"Attempting direct text extraction from PDF: {pdf_path}")
reader = PdfReader(pdf_path)
text_content = ""
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text()
if page_text.strip():
text_content += f"\n\n--- Page {page_num + 1} ---\n\n{page_text}"
logger.info(f"Extracted {len(text_content)} characters from {len(reader.pages)} pages")
return text_content
except Exception as e:
logger.error(f"Direct PDF text extraction failed: {e}")
return ""
def extract_text_with_coordinates(self, pdf_path: Path) -> List[PDFTextElement]:
"""Extract text elements with their coordinates using pdfplumber - optimized version"""
text_elements = []
try:
logger.info(f"Extracting text with coordinates from {pdf_path}")
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
# Extract words instead of individual characters for better performance
words = page.extract_words()
if not words:
continue
# Process words with their bounding boxes
for word in words:
if word.get('text', '').strip():
text_elements.append(PDFTextElement(
text=word['text'],
x=word['x0'],
y=word['y0'],
width=word['x1'] - word['x0'],
height=word['y1'] - word['y0'],
font_name='Helvetica', # Simplified for performance
font_size=12, # Default size
page_num=page_num
))
logger.info(f"Extracted {len(text_elements)} text elements with coordinates (optimized)")
return text_elements
except Exception as e:
logger.error(f"Error extracting text with coordinates: {e}")
raise
async def translate_pdf_with_formatting(self, pdf_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]:
"""Translate PDF while preserving exact formatting using improved approach"""
try:
logger.info(f"Translating PDF with formatting preservation: {pdf_path}")
# For better performance and quality, let's use a hybrid approach:
# 1. Extract text with layout information using pdfplumber
# 2. Convert to DOCX to preserve structure
# 3. Translate the DOCX
# 4. Convert back to PDF with original layout
# First, try to extract text with better structure
structured_text = self.extract_structured_text(pdf_path)
if not structured_text:
raise Exception("No text could be extracted from PDF")
# Create a DOCX with the structured text
docx_path = output_dir / f"{pdf_path.stem}_temp.docx"
doc = Document()
# Add content with basic structure
for page_num, page_content in enumerate(structured_text):
# Add page separator
if page_num > 0:
doc.add_page_break()
# Add paragraphs
for paragraph_text in page_content:
if paragraph_text.strip():
doc.add_paragraph(paragraph_text)
doc.save(docx_path)
# Translate the DOCX
translated_docx, paragraphs_count = await self.translate_docx(
docx_path, model, source_lang, target_lang, output_dir
)
# Convert back to PDF with original filename
final_translated_file = output_dir / f"{pdf_path.stem}.pdf"
cmd = [
"libreoffice",
"--headless",
"--convert-to", "pdf",
"--outdir", str(output_dir),
str(translated_docx)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# Rename the output file
temp_pdf = output_dir / f"{translated_docx.stem}.pdf"
if temp_pdf.exists() and temp_pdf != final_translated_file:
temp_pdf.rename(final_translated_file)
# Clean up temporary files
try:
docx_path.unlink(missing_ok=True)
translated_docx.unlink(missing_ok=True)
except:
pass
logger.info(f"Successfully created formatted PDF with {paragraphs_count} translated paragraphs")
return final_translated_file, paragraphs_count
except Exception as e:
logger.error(f"Error in improved PDF translation: {e}")
raise
def extract_structured_text(self, pdf_path: Path) -> List[List[str]]:
"""Extract structured text from PDF with page and paragraph information"""
structured_text = []
try:
logger.info(f"Extracting structured text from {pdf_path}")
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
page_text = []
# Extract text with better structure
text = page.extract_text()
if text:
# Split into paragraphs (double newlines)
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
page_text.extend(paragraphs)
structured_text.append(page_text)
logger.info(f"Extracted structured text from {len(structured_text)} pages")
return structured_text
except Exception as e:
logger.error(f"Error extracting structured text: {e}")
return []
def create_pdf_with_text_elements(self, original_pdf_path: Path, text_elements: List[PDFTextElement], output_path: Path):
"""Create a new PDF with translated text elements in their original positions - with timeout"""
try:
logger.info(f"Creating PDF with translated text elements: {output_path}")
# Use a timeout context
import signal
def timeout_handler(signum, frame):
raise TimeoutError("PDF creation timed out")
# Set timeout to 5 minutes
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300)
try:
# Open original PDF with pypdfium2 to get page dimensions
pdf = pdfium.PdfDocument(str(original_pdf_path))
# Create a new PDF writer
packet = io.BytesIO()
can = canvas.Canvas(packet)
# Group elements by page
pages_elements = {}
for element in text_elements:
if element.page_num not in pages_elements:
pages_elements[element.page_num] = []
pages_elements[element.page_num].append(element)
# Process each page
for page_num in sorted(pages_elements.keys()):
page = pdf.get_page(page_num)
width, height = page.get_size()
# Set page size
can.setPageSize((width, height))
# Add translated text elements
page_elements = pages_elements[page_num]
for element in page_elements:
# Set font and size
try:
can.setFont(element.font_name, element.font_size)
except:
can.setFont("Helvetica", element.font_size)
# Position text (PDF coordinates start from bottom-left)
x = element.x
y = height - element.y - element.height # Adjust for PDF coordinate system
# Draw text
can.drawString(x, y, element.text)
# Move to next page
can.showPage()
# Save the PDF
can.save()
# Write to file
with open(output_path, 'wb') as f:
f.write(packet.getvalue())
logger.info(f"Successfully created formatted PDF: {output_path}")
finally:
# Cancel the alarm
signal.alarm(0)
except TimeoutError:
logger.error("PDF creation timed out after 5 minutes")
raise Exception("PDF creation timed out. The document may be too complex for coordinate-based translation.")
except Exception as e:
logger.error(f"Error creating PDF with text elements: {e}")
raise
finally:
if 'pdf' in locals():
pdf.close()
async def translate_pdf_direct(self, pdf_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]:
"""Translate PDF by extracting text directly and creating new DOCX"""
try:
logger.info(f"Using direct PDF text extraction method for {pdf_path}")
# Extract text from PDF
pdf_text = self.extract_text_from_pdf(pdf_path)
if not pdf_text.strip():
raise Exception("No text could be extracted from PDF")
# Split text into paragraphs
paragraphs = [p.strip() for p in pdf_text.split('\n\n') if p.strip()]
logger.info(f"Split PDF text into {len(paragraphs)} paragraphs")
# Create new DOCX document
doc = Document()
doc.add_heading('Translated Document', 0)
paragraphs_translated = 0
# Translate each paragraph
for i, paragraph in enumerate(paragraphs):
if len(paragraph.strip()) > 10: # Only translate substantial paragraphs
logger.info(f"Translating paragraph {i+1}/{len(paragraphs)}: '{paragraph[:50]}...'")
try:
translated_text = await self.translate_text(
paragraph, model, source_lang, target_lang
)
# Add translated paragraph to document
doc.add_paragraph(translated_text)
paragraphs_translated += 1
except Exception as trans_error:
logger.error(f"Failed to translate paragraph: {trans_error}")
raise Exception(f"Translation failed for paragraph: {str(trans_error)}")
# Add delay to avoid rate limiting
await asyncio.sleep(0.3)
else:
# Add short text as-is
doc.add_paragraph(paragraph)
if paragraphs_translated == 0:
raise Exception("No paragraphs were successfully translated")
# Save translated document
translated_path = output_dir / f"translated_{pdf_path.stem}.docx"
doc.save(translated_path)
logger.info(f"Successfully created translated DOCX with {paragraphs_translated} translated paragraphs")
return translated_path, paragraphs_translated
except Exception as e:
logger.error(f"Direct PDF translation failed: {e}")
raise
def pdf_to_docx(self, pdf_path: Path, output_dir: Path) -> Path:
try:
docx_path = output_dir / f"{pdf_path.stem}.docx"
# Log the conversion attempt
logger.info(f"Starting PDF to DOCX conversion: {pdf_path} -> {docx_path}")
# Use LibreOffice to convert PDF to DOCX
cmd = [
"libreoffice",
"--headless",
"--convert-to", "docx",
"--outdir", str(output_dir),
str(pdf_path)
]
logger.info(f"Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
logger.info(f"LibreOffice exit code: {result.returncode}")
logger.info(f"LibreOffice stdout: {result.stdout}")
logger.info(f"LibreOffice stderr: {result.stderr}")
# Check if conversion was successful
if result.returncode == 0:
if docx_path.exists():
file_size = docx_path.stat().st_size
logger.info(f"Successfully converted {pdf_path} to {docx_path} (size: {file_size} bytes)")
# Verify the DOCX file has content
try:
from docx import Document
doc = Document(docx_path)
paragraph_count = len([p for p in doc.paragraphs if p.text.strip()])
logger.info(f"DOCX contains {paragraph_count} paragraphs with text")
if paragraph_count == 0:
logger.warning("Converted DOCX appears to have no text content")
# Try alternative conversion approach if available
except Exception as e:
logger.error(f"Error validating DOCX content: {e}")
return docx_path
else:
raise Exception(f"Conversion completed but output file {docx_path} not found")
else:
raise Exception(f"LibreOffice conversion failed with exit code {result.returncode}: {result.stderr}")
except subprocess.TimeoutExpired:
raise Exception("PDF conversion timed out after 120 seconds")
except Exception as e:
logger.error(f"Error converting PDF to DOCX: {e}")
raise
def docx_to_pdf(self, docx_path: Path, output_dir: Path) -> Path:
"""Convert DOCX to PDF using LibreOffice"""
try:
pdf_path = output_dir / f"{docx_path.stem}.pdf"
cmd = [
"libreoffice",
"--headless",
"--convert-to", "pdf",
"--outdir", str(output_dir),
str(docx_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0 and pdf_path.exists():
logger.info(f"Successfully converted {docx_path} to {pdf_path}")
return pdf_path
else:
logger.error(f"LibreOffice conversion failed: {result.stderr}")
raise Exception(f"DOCX to PDF conversion failed: {result.stderr}")
except subprocess.TimeoutExpired:
raise Exception("DOCX to PDF conversion timed out")
except Exception as e:
logger.error(f"Error converting DOCX to PDF: {e}")
raise
async def translate_docx(self, docx_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]:
"""Translate DOCX document paragraph by paragraph with enhanced validation"""
try:
# Load the document
logger.info(f"Loading DOCX document: {docx_path}")
doc = Document(docx_path)
paragraphs_count = 0
total_paragraphs = len(doc.paragraphs)
logger.info(f"Document has {total_paragraphs} total paragraphs")
# Count paragraphs with text first
text_paragraphs = [p for p in doc.paragraphs if p.text.strip()]
logger.info(f"Found {len(text_paragraphs)} paragraphs with text content")
if len(text_paragraphs) == 0:
raise Exception("No text content found in document")
# Log first few paragraphs for debugging
for i, paragraph in enumerate(text_paragraphs[:3]):
logger.info(f"Sample paragraph {i+1}: '{paragraph.text[:100]}...'")
# Translate each paragraph
for i, paragraph in enumerate(doc.paragraphs):
if paragraph.text.strip():
original_text = paragraph.text.strip()
logger.info(f"Translating paragraph {paragraphs_count + 1}/{len(text_paragraphs)}: '{original_text[:50]}...'")
try:
translated_text = await self.translate_text(
original_text, model, source_lang, target_lang
)
# Verify translation actually happened
if translated_text == original_text:
logger.warning(f"Translation returned identical text for: '{original_text[:50]}...'")
# Continue anyway - maybe it was already in target language
else:
logger.info(f"Translation successful: '{translated_text[:50]}...'")
paragraph.text = translated_text
paragraphs_count += 1
except Exception as trans_error:
logger.error(f"Failed to translate paragraph: {trans_error}")
raise Exception(f"Translation failed for paragraph: {str(trans_error)}")
# Add small delay to avoid rate limiting
await asyncio.sleep(0.3)
# Translate tables if any
table_cells_translated = 0
for table_idx, table in enumerate(doc.tables):
logger.info(f"Processing table {table_idx + 1} of {len(doc.tables)}")
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if cell.text.strip():
original_text = cell.text.strip()
try:
translated_text = await self.translate_text(
original_text, model, source_lang, target_lang
)
cell.text = translated_text
table_cells_translated += 1
except Exception as trans_error:
logger.warning(f"Failed to translate table cell: {trans_error}")
# Continue with other cells
await asyncio.sleep(0.1)
logger.info(f"Translated {table_cells_translated} table cells")
total_translated = paragraphs_count + table_cells_translated
if total_translated == 0:
raise Exception("No content was successfully translated")
# Save translated document
translated_path = output_dir / f"translated_{docx_path.name}"
doc.save(translated_path)
logger.info(f"Successfully translated {total_translated} text elements and saved to {translated_path}")
# Verify the saved document
if translated_path.exists():
file_size = translated_path.stat().st_size
logger.info(f"Translated document saved (size: {file_size} bytes)")
else:
raise Exception("Failed to save translated document")
return translated_path, total_translated
except Exception as e:
logger.error(f"Error translating DOCX: {e}")
raise
async def translate_document(
self,
input_file: Path,
model: str,
source_language: str = "auto",
target_language: str = "en",
output_dir: Optional[Path] = None
) -> TranslationReport:
"""
Main translation function that handles both PDF and DOCX files
For PDFs, uses coordinate-based approach to preserve formatting
For DOCX, uses paragraph-by-paragraph translation
"""
if output_dir is None:
output_dir = input_file.parent
original_file = input_file
file_extension = input_file.suffix.lower()
original_filename = input_file.stem # filename without extension
try:
if file_extension == ".pdf":
logger.info(f"Processing PDF file with formatting preservation: {input_file}")
try:
# Use coordinate-based PDF translation to preserve formatting
logger.info(f"Using coordinate-based translation for {input_file}")
translated_file, paragraphs_count = await self.translate_pdf_with_formatting(
pdf_path=input_file,
model=model,
source_lang=source_language,
target_lang=target_language,
output_dir=output_dir
)
# Rename to original filename to maintain same name
final_translated_file = output_dir / f"{original_filename}.pdf"
if translated_file != final_translated_file:
translated_file.rename(final_translated_file)
translated_file = final_translated_file
# Estimate pages
try:
pdf = pdfium.PdfDocument(str(translated_file))
pages_count = len(pdf)
pdf.close()
except:
pages_count = 1
except Exception as format_error:
logger.warning(f"Coordinate-based PDF translation failed: {format_error}")
logger.info("Falling back to LibreOffice conversion method")
# Fallback to original method
try:
# Try LibreOffice conversion first
logger.info(f"Attempting LibreOffice conversion for {input_file}")
docx_file = self.pdf_to_docx(input_file, output_dir)
# Translate the DOCX
logger.info(f"Translating converted DOCX {docx_file}")
translated_docx, paragraphs_count = await self.translate_docx(
docx_file, model, source_language, target_language, output_dir
)
# If no paragraphs were translated, try direct method
if paragraphs_count == 0:
logger.warning("LibreOffice conversion produced no translatable content, trying direct extraction")
raise Exception("No content found in LibreOffice conversion")
# Convert translated DOCX back to PDF with ORIGINAL filename
logger.info(f"Converting translated DOCX back to PDF with original filename")
final_translated_file = output_dir / f"{original_filename}.pdf"
# Use LibreOffice to convert with specific output name
cmd = [
"libreoffice",
"--headless",
"--convert-to", "pdf",
"--outdir", str(output_dir),
str(translated_docx)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# LibreOffice creates file with docx stem name, rename to original
temp_pdf = output_dir / f"{translated_docx.stem}.pdf"
if temp_pdf.exists() and temp_pdf != final_translated_file:
temp_pdf.rename(final_translated_file)
translated_file = final_translated_file
# Estimate pages (rough estimate: 1 page = ~500 words)
doc = Document(translated_docx)
total_words = sum(len(p.text.split()) for p in doc.paragraphs)
pages_count = max(1, total_words // 500)
except Exception as libreoffice_error:
logger.warning(f"LibreOffice method failed: {libreoffice_error}")
logger.info("Falling back to direct PDF text extraction")
# Fallback to direct PDF text extraction
translated_docx, paragraphs_count = await self.translate_pdf_direct(
input_file, model, source_language, target_language, output_dir
)
# Convert the translated DOCX to PDF with original filename
final_translated_file = output_dir / f"{original_filename}.pdf"
cmd = [
"libreoffice",
"--headless",
"--convert-to", "pdf",
"--outdir", str(output_dir),
str(translated_docx)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# LibreOffice creates file with docx stem name, rename to original
temp_pdf = output_dir / f"{translated_docx.stem}.pdf"
if temp_pdf.exists() and temp_pdf != final_translated_file:
temp_pdf.rename(final_translated_file)
translated_file = final_translated_file
# Estimate pages (rough estimate: 1 page = ~500 words)
doc = Document(translated_docx)
total_words = sum(len(p.text.split()) for p in doc.paragraphs)
pages_count = max(1, total_words // 500)
elif file_extension == ".docx":
# Translate DOCX directly, keeping original filename
logger.info(f"Translating DOCX {input_file}")
# Create output file with original filename
final_translated_file = output_dir / f"{original_filename}.docx"
translated_file, paragraphs_count = await self.translate_docx(
input_file, model, source_language, target_language, output_dir
)
# Rename to original filename if different
if translated_file != final_translated_file:
translated_file.rename(final_translated_file)
translated_file = final_translated_file
# Estimate pages
doc = Document(translated_file)
total_words = sum(len(p.text.split()) for p in doc.paragraphs)
pages_count = max(1, total_words // 500)
else:
raise Exception(f"Unsupported file format: {file_extension}")
# Verify translation was successful
if paragraphs_count == 0:
raise Exception("Translation failed: No paragraphs were translated")
return TranslationReport(
original_file=original_file,
translated_file=translated_file,
pages_count=pages_count,
paragraphs_count=paragraphs_count,
status="success"
)
except Exception as e:
logger.error(f"Document translation failed: {e}")
return TranslationReport(
original_file=original_file,
translated_file=original_file, # Return original as fallback
pages_count=0,
paragraphs_count=0,
status="failed",
errors=[str(e)]
)