|
|
import logging |
|
|
import os |
|
|
|
|
|
import torch |
|
|
import uvicorn |
|
|
from fastapi import FastAPI, File, Form, HTTPException, UploadFile |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from app.models.document_processor import DocumentProcessor |
|
|
from app.models.html_processor import HTMLProcessor |
|
|
from app.models.text_chunker import TextChunker |
|
|
from app.models.translation_model_ct2 import TranslationModelCT2 |
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI( |
|
|
title="Universal Translator API", |
|
|
description="API for text, HTML, and document translation services", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
try: |
|
|
model = TranslationModelCT2(model_cache_dir=os.getenv("CT2_MODEL_CACHE", ".cache/ct2_models")) |
|
|
html_processor = HTMLProcessor() |
|
|
text_chunker = TextChunker(max_tokens=250, overlap_tokens=30) |
|
|
document_processor = DocumentProcessor() |
|
|
|
|
|
initialization_error = None |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing components: {str(e)}") |
|
|
initialization_error = str(e) |
|
|
|
|
|
class TranslationRequest(BaseModel): |
|
|
text: str |
|
|
source_lang_code: str |
|
|
target_lang_code: str |
|
|
|
|
|
class TranslationResponse(BaseModel): |
|
|
translated_text: str |
|
|
|
|
|
class HTMLTranslationRequest(BaseModel): |
|
|
html: str |
|
|
source_lang_code: str |
|
|
target_lang_code: str |
|
|
|
|
|
class HTMLTranslationResponse(BaseModel): |
|
|
translated_html: str |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Health check endpoint""" |
|
|
if initialization_error: |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Service initialization failed", |
|
|
"error": initialization_error |
|
|
} |
|
|
return {"status": "ok", "model": "OPUS-MT/NLLB-CPU-Optimized", "version": "1.0"} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Extended health check with environment information""" |
|
|
return { |
|
|
"status": "ok" if not initialization_error else "error", |
|
|
"error": initialization_error, |
|
|
"environment": { |
|
|
"python_version": os.environ.get('PYTHON_VERSION'), |
|
|
"cuda_available": torch.cuda.is_available(), |
|
|
"device": str(model.device) if hasattr(model, 'device') else "Unknown", |
|
|
"model_info": model.get_model_info() if hasattr(model, 'get_model_info') else {} |
|
|
} |
|
|
} |
|
|
|
|
|
@app.post("/translate", response_model=TranslationResponse) |
|
|
async def translate_text(request: TranslationRequest): |
|
|
"""Translate text from source to target language""" |
|
|
if initialization_error: |
|
|
raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") |
|
|
|
|
|
try: |
|
|
logger.info(f"Translating from {request.source_lang_code} to {request.target_lang_code}") |
|
|
|
|
|
modified_text = request.text |
|
|
modified_target_code = request.target_lang_code |
|
|
|
|
|
if request.target_lang_code == "tam": |
|
|
modified_text = f">>tam<<{request.text}" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "tel": |
|
|
modified_text = f">>tel<<{request.text}" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "kan": |
|
|
modified_text = f">>kan<<{request.text}" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "mal": |
|
|
modified_text = f">>mal<<{request.text}" |
|
|
modified_target_code = "dra" |
|
|
|
|
|
if len(modified_text) > 1000: |
|
|
chunks = text_chunker.create_chunks(modified_text) |
|
|
chunk_texts = [chunk.text for chunk in chunks] |
|
|
|
|
|
translated_chunks = model.translate_batch( |
|
|
chunk_texts, |
|
|
request.source_lang_code, |
|
|
modified_target_code |
|
|
) |
|
|
|
|
|
final_translation = text_chunker.combine_translations( |
|
|
modified_text, chunks, translated_chunks |
|
|
) |
|
|
else: |
|
|
|
|
|
final_translation = model.translate( |
|
|
modified_text, |
|
|
request.source_lang_code, |
|
|
modified_target_code |
|
|
) |
|
|
|
|
|
return {"translated_text": final_translation} |
|
|
except Exception as e: |
|
|
logger.error(f"Translation error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/translate-html", response_model=HTMLTranslationResponse) |
|
|
async def translate_html(request: HTMLTranslationRequest): |
|
|
"""Translate HTML content while preserving structure""" |
|
|
if initialization_error: |
|
|
raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") |
|
|
|
|
|
try: |
|
|
text_fragments, dom_data = html_processor.extract_text(request.html) |
|
|
|
|
|
if not text_fragments: |
|
|
return {"translated_html": request.html} |
|
|
|
|
|
modified_target_code = request.target_lang_code |
|
|
special_token = "" |
|
|
|
|
|
if request.target_lang_code == "tam": |
|
|
special_token = ">>tam<<" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "tel": |
|
|
special_token = ">>tel<<" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "kan": |
|
|
special_token = ">>kan<<" |
|
|
modified_target_code = "dra" |
|
|
elif request.target_lang_code == "mal": |
|
|
special_token = ">>mal<<" |
|
|
modified_target_code = "dra" |
|
|
|
|
|
if special_token: |
|
|
logger.info(f"Using special language token for HTML: {special_token}") |
|
|
modified_fragments = [] |
|
|
for fragment in text_fragments: |
|
|
if fragment.strip(): |
|
|
modified_fragments.append(f"{special_token}{fragment}") |
|
|
else: |
|
|
modified_fragments.append(fragment) |
|
|
else: |
|
|
modified_fragments = text_fragments |
|
|
|
|
|
non_empty_fragments = [] |
|
|
empty_indices = [] |
|
|
for i, fragment in enumerate(modified_fragments): |
|
|
if fragment.strip(): |
|
|
non_empty_fragments.append(fragment) |
|
|
else: |
|
|
empty_indices.append(i) |
|
|
|
|
|
translated_fragments = model.translate_batch( |
|
|
non_empty_fragments, |
|
|
request.source_lang_code, |
|
|
modified_target_code |
|
|
) |
|
|
|
|
|
full_translated_fragments = [] |
|
|
non_empty_idx = 0 |
|
|
|
|
|
for i in range(len(modified_fragments)): |
|
|
if i in empty_indices: |
|
|
full_translated_fragments.append("") |
|
|
else: |
|
|
full_translated_fragments.append(translated_fragments[non_empty_idx]) |
|
|
non_empty_idx += 1 |
|
|
|
|
|
translated_html = html_processor.replace_text(dom_data, full_translated_fragments) |
|
|
|
|
|
return {"translated_html": translated_html} |
|
|
except Exception as e: |
|
|
logger.error(f"HTML translation error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/process-document") |
|
|
async def process_document( |
|
|
file: UploadFile = File(...), |
|
|
source_lang_code: str = Form(...), |
|
|
target_lang_code: str = Form(...), |
|
|
use_ocr: bool = Form(False) |
|
|
): |
|
|
"""Process and translate document (PDF or image)""" |
|
|
if initialization_error: |
|
|
raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") |
|
|
|
|
|
try: |
|
|
file_content = await file.read() |
|
|
|
|
|
extracted_text = document_processor.process_document( |
|
|
file_data=file_content, |
|
|
filename=file.filename, |
|
|
use_ocr=use_ocr |
|
|
) |
|
|
|
|
|
if not extracted_text: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="No text could be extracted from the document" |
|
|
) |
|
|
|
|
|
modified_target_code = target_lang_code |
|
|
modified_text = extracted_text |
|
|
|
|
|
if target_lang_code == "tam": |
|
|
modified_text = f">>tam<<{extracted_text}" |
|
|
modified_target_code = "dra" |
|
|
elif target_lang_code == "tel": |
|
|
modified_text = f">>tel<<{extracted_text}" |
|
|
modified_target_code = "dra" |
|
|
elif target_lang_code == "kan": |
|
|
modified_text = f">>kan<<{extracted_text}" |
|
|
modified_target_code = "dra" |
|
|
elif target_lang_code == "mal": |
|
|
modified_text = f">>mal<<{extracted_text}" |
|
|
modified_target_code = "dra" |
|
|
|
|
|
if len(modified_text) > 1000: |
|
|
chunks = text_chunker.create_chunks(modified_text) |
|
|
chunk_texts = [chunk.text for chunk in chunks] |
|
|
|
|
|
translated_chunks = model.translate_batch( |
|
|
chunk_texts, |
|
|
source_lang_code, |
|
|
modified_target_code |
|
|
) |
|
|
|
|
|
translated_text = text_chunker.combine_translations( |
|
|
modified_text, chunks, translated_chunks |
|
|
) |
|
|
else: |
|
|
translated_text = model.translate( |
|
|
modified_text, |
|
|
source_lang_code, |
|
|
modified_target_code |
|
|
) |
|
|
|
|
|
return { |
|
|
"extracted_text": extracted_text, |
|
|
"translated_text": translated_text |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Document processing error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run("api_server:app", host="0.0.0.0", port=7860, reload=True) |