|
|
import os |
|
|
import requests |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import subprocess |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional, Tuple, Any |
|
|
from dataclasses import dataclass |
|
|
import logging |
|
|
from docx import Document |
|
|
from docx.shared import Inches |
|
|
import time |
|
|
import json |
|
|
from PyPDF2 import PdfReader |
|
|
import pypdfium2 as pdfium |
|
|
import pdfplumber |
|
|
from reportlab.pdfgen import canvas |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.pdfbase import pdfmetrics |
|
|
from reportlab.pdfbase.ttfonts import TTFont |
|
|
from reportlab.lib.utils import simpleSplit |
|
|
import io |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class TranslationReport: |
|
|
original_file: Path |
|
|
translated_file: Path |
|
|
pages_count: int |
|
|
paragraphs_count: int |
|
|
status: str |
|
|
errors: Optional[List[str]] = None |
|
|
|
|
|
@dataclass |
|
|
class PDFTextElement: |
|
|
"""Represents a text element with its position and formatting""" |
|
|
text: str |
|
|
x: float |
|
|
y: float |
|
|
width: float |
|
|
height: float |
|
|
font_name: str |
|
|
font_size: float |
|
|
page_num: int |
|
|
|
|
|
class DocumentTranslator: |
|
|
def __init__(self): |
|
|
self.api_key = os.getenv("OPENROUTER_API_KEY") |
|
|
if not self.api_key: |
|
|
logger.warning("OPENROUTER_API_KEY not found in environment variables") |
|
|
|
|
|
self.base_url = "https://openrouter.ai/api/v1" |
|
|
self.headers = { |
|
|
"Authorization": f"Bearer {self.api_key}", |
|
|
"Content-Type": "application/json", |
|
|
"HTTP-Referer": "https://huggingface.co", |
|
|
"X-Title": "Document Translator" |
|
|
} |
|
|
|
|
|
def is_ready(self) -> bool: |
|
|
"""Check if translator is ready""" |
|
|
return bool(self.api_key) |
|
|
|
|
|
async def get_available_models(self) -> List[Dict]: |
|
|
"""Return the specified free models""" |
|
|
return [ |
|
|
{ |
|
|
"id": "google/gemini-2.0-flash-exp:free", |
|
|
"name": "Google Gemini 2.0 Flash (Free)", |
|
|
"description": "Fast and free Google AI model", |
|
|
"pricing": {} |
|
|
}, |
|
|
{ |
|
|
"id": "tngtech/deepseek-r1t2-chimera:free", |
|
|
"name": "DeepSeek R1T2 Chimera (Free)", |
|
|
"description": "Free advanced reasoning model", |
|
|
"pricing": {} |
|
|
} |
|
|
] |
|
|
|
|
|
def _get_default_models(self) -> List[Dict]: |
|
|
"""Return the specified free models as default""" |
|
|
return [ |
|
|
{"id": "google/gemini-2.0-flash-exp:free", "name": "Google Gemini 2.0 Flash (Free)", "description": "Fast and free Google AI model"}, |
|
|
{"id": "tngtech/deepseek-r1t2-chimera:free", "name": "DeepSeek R1T2 Chimera (Free)", "description": "Free advanced reasoning model"} |
|
|
] |
|
|
|
|
|
async def translate_text(self, text: str, model: str, source_lang: str = "auto", target_lang: str = "en") -> str: |
|
|
"""Translate text using OpenRouter API with improved prompt and validation""" |
|
|
if not text.strip(): |
|
|
return text |
|
|
|
|
|
|
|
|
if not self.api_key: |
|
|
raise Exception("OpenRouter API key not configured. Please set OPENROUTER_API_KEY environment variable.") |
|
|
|
|
|
|
|
|
if source_lang == "auto": |
|
|
prompt = f"""You are a professional document translator. Translate the following text to {target_lang} (Arabic if 'ar', English if 'en', etc.). |
|
|
|
|
|
IMPORTANT INSTRUCTIONS: |
|
|
1. Translate ONLY the content, do not add explanations |
|
|
2. Maintain the original formatting and structure |
|
|
3. Preserve technical terms appropriately |
|
|
4. Return ONLY the translated text |
|
|
5. If the text is already in the target language, still provide a proper translation/rewrite |
|
|
|
|
|
Text to translate: |
|
|
{text} |
|
|
|
|
|
Translated text:""" |
|
|
else: |
|
|
prompt = f"""You are a professional document translator. Translate the following text from {source_lang} to {target_lang}. |
|
|
|
|
|
IMPORTANT INSTRUCTIONS: |
|
|
1. Translate ONLY the content, do not add explanations |
|
|
2. Maintain the original formatting and structure |
|
|
3. Preserve technical terms appropriately |
|
|
4. Return ONLY the translated text |
|
|
5. If the text is already in the target language, still provide a proper translation/rewrite |
|
|
|
|
|
Text to translate: |
|
|
{text} |
|
|
|
|
|
Translated text:""" |
|
|
|
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
payload = { |
|
|
"model": model, |
|
|
"messages": [ |
|
|
{"role": "system", "content": "You are a professional document translator. You MUST provide a translation. Never return the original text unchanged."}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
"temperature": 0.1, |
|
|
"max_tokens": len(text) * 4 + 500 |
|
|
} |
|
|
|
|
|
logger.info(f"Translating text: '{text[:50]}...' from {source_lang} to {target_lang} using model {model}") |
|
|
|
|
|
async with session.post( |
|
|
f"{self.base_url}/chat/completions", |
|
|
headers=self.headers, |
|
|
json=payload |
|
|
) as response: |
|
|
if response.status == 200: |
|
|
data = await response.json() |
|
|
translated = data["choices"][0]["message"]["content"].strip() |
|
|
|
|
|
|
|
|
if "Translated text:" in translated: |
|
|
translated = translated.split("Translated text:")[-1].strip() |
|
|
|
|
|
|
|
|
for phrase in ["Here is the translation:", "Translation:", "The translation is:"]: |
|
|
if translated.startswith(phrase): |
|
|
translated = translated[len(phrase):].strip() |
|
|
|
|
|
|
|
|
if not translated or translated == text: |
|
|
logger.warning(f"Translation returned empty or unchanged text") |
|
|
|
|
|
raise Exception("Translation failed: received empty or unchanged text") |
|
|
|
|
|
logger.info(f"Translation successful: '{translated[:50]}...'") |
|
|
return translated |
|
|
elif response.status == 429: |
|
|
error_text = await response.text() |
|
|
logger.error(f"Rate limit error: {response.status} - {error_text}") |
|
|
raise Exception(f"Rate limit exceeded for model {model}. Please try again later or use a different model.") |
|
|
else: |
|
|
error_text = await response.text() |
|
|
logger.error(f"Translation API error: {response.status} - {error_text}") |
|
|
raise Exception(f"Translation API error: {response.status} - {error_text}") |
|
|
except Exception as e: |
|
|
logger.error(f"Translation error: {e}") |
|
|
raise Exception(f"Translation failed: {str(e)}") |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_path: Path) -> str: |
|
|
"""Extract text directly from PDF as fallback method""" |
|
|
try: |
|
|
logger.info(f"Attempting direct text extraction from PDF: {pdf_path}") |
|
|
reader = PdfReader(pdf_path) |
|
|
text_content = "" |
|
|
|
|
|
for page_num, page in enumerate(reader.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text.strip(): |
|
|
text_content += f"\n\n--- Page {page_num + 1} ---\n\n{page_text}" |
|
|
|
|
|
logger.info(f"Extracted {len(text_content)} characters from {len(reader.pages)} pages") |
|
|
return text_content |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Direct PDF text extraction failed: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_with_coordinates(self, pdf_path: Path) -> List[PDFTextElement]: |
|
|
"""Extract text elements with their coordinates using pdfplumber - optimized version""" |
|
|
text_elements = [] |
|
|
|
|
|
try: |
|
|
logger.info(f"Extracting text with coordinates from {pdf_path}") |
|
|
|
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page_num, page in enumerate(pdf.pages): |
|
|
|
|
|
words = page.extract_words() |
|
|
if not words: |
|
|
continue |
|
|
|
|
|
|
|
|
for word in words: |
|
|
if word.get('text', '').strip(): |
|
|
text_elements.append(PDFTextElement( |
|
|
text=word['text'], |
|
|
x=word['x0'], |
|
|
y=word['y0'], |
|
|
width=word['x1'] - word['x0'], |
|
|
height=word['y1'] - word['y0'], |
|
|
font_name='Helvetica', |
|
|
font_size=12, |
|
|
page_num=page_num |
|
|
)) |
|
|
|
|
|
logger.info(f"Extracted {len(text_elements)} text elements with coordinates (optimized)") |
|
|
return text_elements |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting text with coordinates: {e}") |
|
|
raise |
|
|
|
|
|
async def translate_pdf_with_formatting(self, pdf_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]: |
|
|
"""Translate PDF while preserving exact formatting using improved approach""" |
|
|
try: |
|
|
logger.info(f"Translating PDF with formatting preservation: {pdf_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
structured_text = self.extract_structured_text(pdf_path) |
|
|
|
|
|
if not structured_text: |
|
|
raise Exception("No text could be extracted from PDF") |
|
|
|
|
|
|
|
|
docx_path = output_dir / f"{pdf_path.stem}_temp.docx" |
|
|
doc = Document() |
|
|
|
|
|
|
|
|
for page_num, page_content in enumerate(structured_text): |
|
|
|
|
|
if page_num > 0: |
|
|
doc.add_page_break() |
|
|
|
|
|
|
|
|
for paragraph_text in page_content: |
|
|
if paragraph_text.strip(): |
|
|
doc.add_paragraph(paragraph_text) |
|
|
|
|
|
doc.save(docx_path) |
|
|
|
|
|
|
|
|
translated_docx, paragraphs_count = await self.translate_docx( |
|
|
docx_path, model, source_lang, target_lang, output_dir |
|
|
) |
|
|
|
|
|
|
|
|
final_translated_file = output_dir / f"{pdf_path.stem}.pdf" |
|
|
|
|
|
cmd = [ |
|
|
"libreoffice", |
|
|
"--headless", |
|
|
"--convert-to", "pdf", |
|
|
"--outdir", str(output_dir), |
|
|
str(translated_docx) |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) |
|
|
|
|
|
|
|
|
temp_pdf = output_dir / f"{translated_docx.stem}.pdf" |
|
|
if temp_pdf.exists() and temp_pdf != final_translated_file: |
|
|
temp_pdf.rename(final_translated_file) |
|
|
|
|
|
|
|
|
try: |
|
|
docx_path.unlink(missing_ok=True) |
|
|
translated_docx.unlink(missing_ok=True) |
|
|
except: |
|
|
pass |
|
|
|
|
|
logger.info(f"Successfully created formatted PDF with {paragraphs_count} translated paragraphs") |
|
|
return final_translated_file, paragraphs_count |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in improved PDF translation: {e}") |
|
|
raise |
|
|
|
|
|
def extract_structured_text(self, pdf_path: Path) -> List[List[str]]: |
|
|
"""Extract structured text from PDF with page and paragraph information""" |
|
|
structured_text = [] |
|
|
|
|
|
try: |
|
|
logger.info(f"Extracting structured text from {pdf_path}") |
|
|
|
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page_num, page in enumerate(pdf.pages): |
|
|
page_text = [] |
|
|
|
|
|
|
|
|
text = page.extract_text() |
|
|
if text: |
|
|
|
|
|
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] |
|
|
page_text.extend(paragraphs) |
|
|
|
|
|
structured_text.append(page_text) |
|
|
|
|
|
logger.info(f"Extracted structured text from {len(structured_text)} pages") |
|
|
return structured_text |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting structured text: {e}") |
|
|
return [] |
|
|
|
|
|
def create_pdf_with_text_elements(self, original_pdf_path: Path, text_elements: List[PDFTextElement], output_path: Path): |
|
|
"""Create a new PDF with translated text elements in their original positions - with timeout""" |
|
|
try: |
|
|
logger.info(f"Creating PDF with translated text elements: {output_path}") |
|
|
|
|
|
|
|
|
import signal |
|
|
|
|
|
def timeout_handler(signum, frame): |
|
|
raise TimeoutError("PDF creation timed out") |
|
|
|
|
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler) |
|
|
signal.alarm(300) |
|
|
|
|
|
try: |
|
|
|
|
|
pdf = pdfium.PdfDocument(str(original_pdf_path)) |
|
|
|
|
|
|
|
|
packet = io.BytesIO() |
|
|
can = canvas.Canvas(packet) |
|
|
|
|
|
|
|
|
pages_elements = {} |
|
|
for element in text_elements: |
|
|
if element.page_num not in pages_elements: |
|
|
pages_elements[element.page_num] = [] |
|
|
pages_elements[element.page_num].append(element) |
|
|
|
|
|
|
|
|
for page_num in sorted(pages_elements.keys()): |
|
|
page = pdf.get_page(page_num) |
|
|
width, height = page.get_size() |
|
|
|
|
|
|
|
|
can.setPageSize((width, height)) |
|
|
|
|
|
|
|
|
page_elements = pages_elements[page_num] |
|
|
for element in page_elements: |
|
|
|
|
|
try: |
|
|
can.setFont(element.font_name, element.font_size) |
|
|
except: |
|
|
can.setFont("Helvetica", element.font_size) |
|
|
|
|
|
|
|
|
x = element.x |
|
|
y = height - element.y - element.height |
|
|
|
|
|
|
|
|
can.drawString(x, y, element.text) |
|
|
|
|
|
|
|
|
can.showPage() |
|
|
|
|
|
|
|
|
can.save() |
|
|
|
|
|
|
|
|
with open(output_path, 'wb') as f: |
|
|
f.write(packet.getvalue()) |
|
|
|
|
|
logger.info(f"Successfully created formatted PDF: {output_path}") |
|
|
|
|
|
finally: |
|
|
|
|
|
signal.alarm(0) |
|
|
|
|
|
except TimeoutError: |
|
|
logger.error("PDF creation timed out after 5 minutes") |
|
|
raise Exception("PDF creation timed out. The document may be too complex for coordinate-based translation.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating PDF with text elements: {e}") |
|
|
raise |
|
|
finally: |
|
|
if 'pdf' in locals(): |
|
|
pdf.close() |
|
|
|
|
|
async def translate_pdf_direct(self, pdf_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]: |
|
|
"""Translate PDF by extracting text directly and creating new DOCX""" |
|
|
try: |
|
|
logger.info(f"Using direct PDF text extraction method for {pdf_path}") |
|
|
|
|
|
|
|
|
pdf_text = self.extract_text_from_pdf(pdf_path) |
|
|
|
|
|
if not pdf_text.strip(): |
|
|
raise Exception("No text could be extracted from PDF") |
|
|
|
|
|
|
|
|
paragraphs = [p.strip() for p in pdf_text.split('\n\n') if p.strip()] |
|
|
logger.info(f"Split PDF text into {len(paragraphs)} paragraphs") |
|
|
|
|
|
|
|
|
doc = Document() |
|
|
doc.add_heading('Translated Document', 0) |
|
|
|
|
|
paragraphs_translated = 0 |
|
|
|
|
|
|
|
|
for i, paragraph in enumerate(paragraphs): |
|
|
if len(paragraph.strip()) > 10: |
|
|
logger.info(f"Translating paragraph {i+1}/{len(paragraphs)}: '{paragraph[:50]}...'") |
|
|
|
|
|
try: |
|
|
translated_text = await self.translate_text( |
|
|
paragraph, model, source_lang, target_lang |
|
|
) |
|
|
|
|
|
|
|
|
doc.add_paragraph(translated_text) |
|
|
paragraphs_translated += 1 |
|
|
|
|
|
except Exception as trans_error: |
|
|
logger.error(f"Failed to translate paragraph: {trans_error}") |
|
|
raise Exception(f"Translation failed for paragraph: {str(trans_error)}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.3) |
|
|
else: |
|
|
|
|
|
doc.add_paragraph(paragraph) |
|
|
|
|
|
if paragraphs_translated == 0: |
|
|
raise Exception("No paragraphs were successfully translated") |
|
|
|
|
|
|
|
|
translated_path = output_dir / f"translated_{pdf_path.stem}.docx" |
|
|
doc.save(translated_path) |
|
|
|
|
|
logger.info(f"Successfully created translated DOCX with {paragraphs_translated} translated paragraphs") |
|
|
return translated_path, paragraphs_translated |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Direct PDF translation failed: {e}") |
|
|
raise |
|
|
|
|
|
def pdf_to_docx(self, pdf_path: Path, output_dir: Path) -> Path: |
|
|
try: |
|
|
docx_path = output_dir / f"{pdf_path.stem}.docx" |
|
|
|
|
|
|
|
|
logger.info(f"Starting PDF to DOCX conversion: {pdf_path} -> {docx_path}") |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"libreoffice", |
|
|
"--headless", |
|
|
"--convert-to", "docx", |
|
|
"--outdir", str(output_dir), |
|
|
str(pdf_path) |
|
|
] |
|
|
|
|
|
logger.info(f"Running command: {' '.join(cmd)}") |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) |
|
|
|
|
|
logger.info(f"LibreOffice exit code: {result.returncode}") |
|
|
logger.info(f"LibreOffice stdout: {result.stdout}") |
|
|
logger.info(f"LibreOffice stderr: {result.stderr}") |
|
|
|
|
|
|
|
|
if result.returncode == 0: |
|
|
if docx_path.exists(): |
|
|
file_size = docx_path.stat().st_size |
|
|
logger.info(f"Successfully converted {pdf_path} to {docx_path} (size: {file_size} bytes)") |
|
|
|
|
|
|
|
|
try: |
|
|
from docx import Document |
|
|
doc = Document(docx_path) |
|
|
paragraph_count = len([p for p in doc.paragraphs if p.text.strip()]) |
|
|
logger.info(f"DOCX contains {paragraph_count} paragraphs with text") |
|
|
|
|
|
if paragraph_count == 0: |
|
|
logger.warning("Converted DOCX appears to have no text content") |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error validating DOCX content: {e}") |
|
|
|
|
|
return docx_path |
|
|
else: |
|
|
raise Exception(f"Conversion completed but output file {docx_path} not found") |
|
|
else: |
|
|
raise Exception(f"LibreOffice conversion failed with exit code {result.returncode}: {result.stderr}") |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
raise Exception("PDF conversion timed out after 120 seconds") |
|
|
except Exception as e: |
|
|
logger.error(f"Error converting PDF to DOCX: {e}") |
|
|
raise |
|
|
|
|
|
def docx_to_pdf(self, docx_path: Path, output_dir: Path) -> Path: |
|
|
"""Convert DOCX to PDF using LibreOffice""" |
|
|
try: |
|
|
pdf_path = output_dir / f"{docx_path.stem}.pdf" |
|
|
|
|
|
cmd = [ |
|
|
"libreoffice", |
|
|
"--headless", |
|
|
"--convert-to", "pdf", |
|
|
"--outdir", str(output_dir), |
|
|
str(docx_path) |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) |
|
|
|
|
|
if result.returncode == 0 and pdf_path.exists(): |
|
|
logger.info(f"Successfully converted {docx_path} to {pdf_path}") |
|
|
return pdf_path |
|
|
else: |
|
|
logger.error(f"LibreOffice conversion failed: {result.stderr}") |
|
|
raise Exception(f"DOCX to PDF conversion failed: {result.stderr}") |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
raise Exception("DOCX to PDF conversion timed out") |
|
|
except Exception as e: |
|
|
logger.error(f"Error converting DOCX to PDF: {e}") |
|
|
raise |
|
|
|
|
|
async def translate_docx(self, docx_path: Path, model: str, source_lang: str, target_lang: str, output_dir: Path) -> Tuple[Path, int]: |
|
|
"""Translate DOCX document paragraph by paragraph with enhanced validation""" |
|
|
try: |
|
|
|
|
|
logger.info(f"Loading DOCX document: {docx_path}") |
|
|
doc = Document(docx_path) |
|
|
paragraphs_count = 0 |
|
|
total_paragraphs = len(doc.paragraphs) |
|
|
|
|
|
logger.info(f"Document has {total_paragraphs} total paragraphs") |
|
|
|
|
|
|
|
|
text_paragraphs = [p for p in doc.paragraphs if p.text.strip()] |
|
|
logger.info(f"Found {len(text_paragraphs)} paragraphs with text content") |
|
|
|
|
|
if len(text_paragraphs) == 0: |
|
|
raise Exception("No text content found in document") |
|
|
|
|
|
|
|
|
for i, paragraph in enumerate(text_paragraphs[:3]): |
|
|
logger.info(f"Sample paragraph {i+1}: '{paragraph.text[:100]}...'") |
|
|
|
|
|
|
|
|
for i, paragraph in enumerate(doc.paragraphs): |
|
|
if paragraph.text.strip(): |
|
|
original_text = paragraph.text.strip() |
|
|
logger.info(f"Translating paragraph {paragraphs_count + 1}/{len(text_paragraphs)}: '{original_text[:50]}...'") |
|
|
|
|
|
try: |
|
|
translated_text = await self.translate_text( |
|
|
original_text, model, source_lang, target_lang |
|
|
) |
|
|
|
|
|
|
|
|
if translated_text == original_text: |
|
|
logger.warning(f"Translation returned identical text for: '{original_text[:50]}...'") |
|
|
|
|
|
else: |
|
|
logger.info(f"Translation successful: '{translated_text[:50]}...'") |
|
|
|
|
|
paragraph.text = translated_text |
|
|
paragraphs_count += 1 |
|
|
|
|
|
except Exception as trans_error: |
|
|
logger.error(f"Failed to translate paragraph: {trans_error}") |
|
|
raise Exception(f"Translation failed for paragraph: {str(trans_error)}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.3) |
|
|
|
|
|
|
|
|
table_cells_translated = 0 |
|
|
for table_idx, table in enumerate(doc.tables): |
|
|
logger.info(f"Processing table {table_idx + 1} of {len(doc.tables)}") |
|
|
for row_idx, row in enumerate(table.rows): |
|
|
for cell_idx, cell in enumerate(row.cells): |
|
|
if cell.text.strip(): |
|
|
original_text = cell.text.strip() |
|
|
try: |
|
|
translated_text = await self.translate_text( |
|
|
original_text, model, source_lang, target_lang |
|
|
) |
|
|
cell.text = translated_text |
|
|
table_cells_translated += 1 |
|
|
except Exception as trans_error: |
|
|
logger.warning(f"Failed to translate table cell: {trans_error}") |
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
logger.info(f"Translated {table_cells_translated} table cells") |
|
|
total_translated = paragraphs_count + table_cells_translated |
|
|
|
|
|
if total_translated == 0: |
|
|
raise Exception("No content was successfully translated") |
|
|
|
|
|
|
|
|
translated_path = output_dir / f"translated_{docx_path.name}" |
|
|
doc.save(translated_path) |
|
|
|
|
|
logger.info(f"Successfully translated {total_translated} text elements and saved to {translated_path}") |
|
|
|
|
|
|
|
|
if translated_path.exists(): |
|
|
file_size = translated_path.stat().st_size |
|
|
logger.info(f"Translated document saved (size: {file_size} bytes)") |
|
|
else: |
|
|
raise Exception("Failed to save translated document") |
|
|
|
|
|
return translated_path, total_translated |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error translating DOCX: {e}") |
|
|
raise |
|
|
|
|
|
async def translate_document( |
|
|
self, |
|
|
input_file: Path, |
|
|
model: str, |
|
|
source_language: str = "auto", |
|
|
target_language: str = "en", |
|
|
output_dir: Optional[Path] = None |
|
|
) -> TranslationReport: |
|
|
""" |
|
|
Main translation function that handles both PDF and DOCX files |
|
|
For PDFs, uses coordinate-based approach to preserve formatting |
|
|
For DOCX, uses paragraph-by-paragraph translation |
|
|
""" |
|
|
if output_dir is None: |
|
|
output_dir = input_file.parent |
|
|
|
|
|
original_file = input_file |
|
|
file_extension = input_file.suffix.lower() |
|
|
original_filename = input_file.stem |
|
|
|
|
|
try: |
|
|
if file_extension == ".pdf": |
|
|
logger.info(f"Processing PDF file with formatting preservation: {input_file}") |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Using coordinate-based translation for {input_file}") |
|
|
translated_file, paragraphs_count = await self.translate_pdf_with_formatting( |
|
|
pdf_path=input_file, |
|
|
model=model, |
|
|
source_lang=source_language, |
|
|
target_lang=target_language, |
|
|
output_dir=output_dir |
|
|
) |
|
|
|
|
|
|
|
|
final_translated_file = output_dir / f"{original_filename}.pdf" |
|
|
if translated_file != final_translated_file: |
|
|
translated_file.rename(final_translated_file) |
|
|
translated_file = final_translated_file |
|
|
|
|
|
|
|
|
try: |
|
|
pdf = pdfium.PdfDocument(str(translated_file)) |
|
|
pages_count = len(pdf) |
|
|
pdf.close() |
|
|
except: |
|
|
pages_count = 1 |
|
|
|
|
|
except Exception as format_error: |
|
|
logger.warning(f"Coordinate-based PDF translation failed: {format_error}") |
|
|
logger.info("Falling back to LibreOffice conversion method") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Attempting LibreOffice conversion for {input_file}") |
|
|
docx_file = self.pdf_to_docx(input_file, output_dir) |
|
|
|
|
|
|
|
|
logger.info(f"Translating converted DOCX {docx_file}") |
|
|
translated_docx, paragraphs_count = await self.translate_docx( |
|
|
docx_file, model, source_language, target_language, output_dir |
|
|
) |
|
|
|
|
|
|
|
|
if paragraphs_count == 0: |
|
|
logger.warning("LibreOffice conversion produced no translatable content, trying direct extraction") |
|
|
raise Exception("No content found in LibreOffice conversion") |
|
|
|
|
|
|
|
|
logger.info(f"Converting translated DOCX back to PDF with original filename") |
|
|
final_translated_file = output_dir / f"{original_filename}.pdf" |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"libreoffice", |
|
|
"--headless", |
|
|
"--convert-to", "pdf", |
|
|
"--outdir", str(output_dir), |
|
|
str(translated_docx) |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) |
|
|
|
|
|
|
|
|
temp_pdf = output_dir / f"{translated_docx.stem}.pdf" |
|
|
if temp_pdf.exists() and temp_pdf != final_translated_file: |
|
|
temp_pdf.rename(final_translated_file) |
|
|
|
|
|
translated_file = final_translated_file |
|
|
|
|
|
|
|
|
doc = Document(translated_docx) |
|
|
total_words = sum(len(p.text.split()) for p in doc.paragraphs) |
|
|
pages_count = max(1, total_words // 500) |
|
|
|
|
|
except Exception as libreoffice_error: |
|
|
logger.warning(f"LibreOffice method failed: {libreoffice_error}") |
|
|
logger.info("Falling back to direct PDF text extraction") |
|
|
|
|
|
|
|
|
translated_docx, paragraphs_count = await self.translate_pdf_direct( |
|
|
input_file, model, source_language, target_language, output_dir |
|
|
) |
|
|
|
|
|
|
|
|
final_translated_file = output_dir / f"{original_filename}.pdf" |
|
|
|
|
|
cmd = [ |
|
|
"libreoffice", |
|
|
"--headless", |
|
|
"--convert-to", "pdf", |
|
|
"--outdir", str(output_dir), |
|
|
str(translated_docx) |
|
|
] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) |
|
|
|
|
|
|
|
|
temp_pdf = output_dir / f"{translated_docx.stem}.pdf" |
|
|
if temp_pdf.exists() and temp_pdf != final_translated_file: |
|
|
temp_pdf.rename(final_translated_file) |
|
|
|
|
|
translated_file = final_translated_file |
|
|
|
|
|
|
|
|
doc = Document(translated_docx) |
|
|
total_words = sum(len(p.text.split()) for p in doc.paragraphs) |
|
|
pages_count = max(1, total_words // 500) |
|
|
|
|
|
elif file_extension == ".docx": |
|
|
|
|
|
logger.info(f"Translating DOCX {input_file}") |
|
|
|
|
|
|
|
|
final_translated_file = output_dir / f"{original_filename}.docx" |
|
|
|
|
|
translated_file, paragraphs_count = await self.translate_docx( |
|
|
input_file, model, source_language, target_language, output_dir |
|
|
) |
|
|
|
|
|
|
|
|
if translated_file != final_translated_file: |
|
|
translated_file.rename(final_translated_file) |
|
|
translated_file = final_translated_file |
|
|
|
|
|
|
|
|
doc = Document(translated_file) |
|
|
total_words = sum(len(p.text.split()) for p in doc.paragraphs) |
|
|
pages_count = max(1, total_words // 500) |
|
|
|
|
|
else: |
|
|
raise Exception(f"Unsupported file format: {file_extension}") |
|
|
|
|
|
|
|
|
if paragraphs_count == 0: |
|
|
raise Exception("Translation failed: No paragraphs were translated") |
|
|
|
|
|
return TranslationReport( |
|
|
original_file=original_file, |
|
|
translated_file=translated_file, |
|
|
pages_count=pages_count, |
|
|
paragraphs_count=paragraphs_count, |
|
|
status="success" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Document translation failed: {e}") |
|
|
return TranslationReport( |
|
|
original_file=original_file, |
|
|
translated_file=original_file, |
|
|
pages_count=0, |
|
|
paragraphs_count=0, |
|
|
status="failed", |
|
|
errors=[str(e)] |
|
|
) |