import logging import os import torch import uvicorn from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from app.models.document_processor import DocumentProcessor from app.models.html_processor import HTMLProcessor from app.models.text_chunker import TextChunker from app.models.translation_model_ct2 import TranslationModelCT2 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = FastAPI( title="Universal Translator API", description="API for text, HTML, and document translation services", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) try: model = TranslationModelCT2(model_cache_dir=os.getenv("CT2_MODEL_CACHE", ".cache/ct2_models")) html_processor = HTMLProcessor() text_chunker = TextChunker(max_tokens=250, overlap_tokens=30) document_processor = DocumentProcessor() initialization_error = None except Exception as e: logger.error(f"Error initializing components: {str(e)}") initialization_error = str(e) class TranslationRequest(BaseModel): text: str source_lang_code: str target_lang_code: str class TranslationResponse(BaseModel): translated_text: str class HTMLTranslationRequest(BaseModel): html: str source_lang_code: str target_lang_code: str class HTMLTranslationResponse(BaseModel): translated_html: str @app.get("/") async def root(): """Health check endpoint""" if initialization_error: return { "status": "error", "message": "Service initialization failed", "error": initialization_error } return {"status": "ok", "model": "OPUS-MT/NLLB-CPU-Optimized", "version": "1.0"} @app.get("/health") async def health_check(): """Extended health check with environment information""" return { "status": "ok" if not initialization_error else "error", "error": initialization_error, "environment": { "python_version": os.environ.get('PYTHON_VERSION'), "cuda_available": torch.cuda.is_available(), "device": str(model.device) if hasattr(model, 'device') else "Unknown", "model_info": model.get_model_info() if hasattr(model, 'get_model_info') else {} } } @app.post("/translate", response_model=TranslationResponse) async def translate_text(request: TranslationRequest): """Translate text from source to target language""" if initialization_error: raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") try: logger.info(f"Translating from {request.source_lang_code} to {request.target_lang_code}") modified_text = request.text modified_target_code = request.target_lang_code if request.target_lang_code == "tam": modified_text = f">>tam<<{request.text}" modified_target_code = "dra" elif request.target_lang_code == "tel": modified_text = f">>tel<<{request.text}" modified_target_code = "dra" elif request.target_lang_code == "kan": modified_text = f">>kan<<{request.text}" modified_target_code = "dra" elif request.target_lang_code == "mal": modified_text = f">>mal<<{request.text}" modified_target_code = "dra" if len(modified_text) > 1000: chunks = text_chunker.create_chunks(modified_text) chunk_texts = [chunk.text for chunk in chunks] translated_chunks = model.translate_batch( chunk_texts, request.source_lang_code, modified_target_code ) final_translation = text_chunker.combine_translations( modified_text, chunks, translated_chunks ) else: final_translation = model.translate( modified_text, request.source_lang_code, modified_target_code ) return {"translated_text": final_translation} except Exception as e: logger.error(f"Translation error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/translate-html", response_model=HTMLTranslationResponse) async def translate_html(request: HTMLTranslationRequest): """Translate HTML content while preserving structure""" if initialization_error: raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") try: text_fragments, dom_data = html_processor.extract_text(request.html) if not text_fragments: return {"translated_html": request.html} # No text to translate modified_target_code = request.target_lang_code special_token = "" if request.target_lang_code == "tam": special_token = ">>tam<<" modified_target_code = "dra" elif request.target_lang_code == "tel": special_token = ">>tel<<" modified_target_code = "dra" elif request.target_lang_code == "kan": special_token = ">>kan<<" modified_target_code = "dra" elif request.target_lang_code == "mal": special_token = ">>mal<<" modified_target_code = "dra" if special_token: logger.info(f"Using special language token for HTML: {special_token}") modified_fragments = [] for fragment in text_fragments: if fragment.strip(): modified_fragments.append(f"{special_token}{fragment}") else: modified_fragments.append(fragment) else: modified_fragments = text_fragments non_empty_fragments = [] empty_indices = [] for i, fragment in enumerate(modified_fragments): if fragment.strip(): non_empty_fragments.append(fragment) else: empty_indices.append(i) translated_fragments = model.translate_batch( non_empty_fragments, request.source_lang_code, modified_target_code ) full_translated_fragments = [] non_empty_idx = 0 for i in range(len(modified_fragments)): if i in empty_indices: full_translated_fragments.append("") else: full_translated_fragments.append(translated_fragments[non_empty_idx]) non_empty_idx += 1 translated_html = html_processor.replace_text(dom_data, full_translated_fragments) return {"translated_html": translated_html} except Exception as e: logger.error(f"HTML translation error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/process-document") async def process_document( file: UploadFile = File(...), source_lang_code: str = Form(...), target_lang_code: str = Form(...), use_ocr: bool = Form(False) ): """Process and translate document (PDF or image)""" if initialization_error: raise HTTPException(status_code=500, detail=f"Service not properly initialized: {initialization_error}") try: file_content = await file.read() extracted_text = document_processor.process_document( file_data=file_content, filename=file.filename, use_ocr=use_ocr ) if not extracted_text: raise HTTPException( status_code=400, detail="No text could be extracted from the document" ) modified_target_code = target_lang_code modified_text = extracted_text if target_lang_code == "tam": modified_text = f">>tam<<{extracted_text}" modified_target_code = "dra" elif target_lang_code == "tel": modified_text = f">>tel<<{extracted_text}" modified_target_code = "dra" elif target_lang_code == "kan": modified_text = f">>kan<<{extracted_text}" modified_target_code = "dra" elif target_lang_code == "mal": modified_text = f">>mal<<{extracted_text}" modified_target_code = "dra" if len(modified_text) > 1000: chunks = text_chunker.create_chunks(modified_text) chunk_texts = [chunk.text for chunk in chunks] translated_chunks = model.translate_batch( chunk_texts, source_lang_code, modified_target_code ) translated_text = text_chunker.combine_translations( modified_text, chunks, translated_chunks ) else: translated_text = model.translate( modified_text, source_lang_code, modified_target_code ) return { "extracted_text": extracted_text, "translated_text": translated_text } except Exception as e: logger.error(f"Document processing error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run("api_server:app", host="0.0.0.0", port=7860, reload=True)