trabb / app /main.py
fokan's picture
first push
ab208dc
import sys
import os
# Add the parent directory to the Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import tempfile
import shutil
from pathlib import Path
import asyncio
from typing import Optional
import logging
from translator import DocumentTranslator, TranslationReport
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('translation.log') if os.path.exists('.') else logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Document Translator", description="Translate PDF and DOCX documents using OpenRouter")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
app.mount("/static", StaticFiles(directory="web"), name="static")
# Initialize translator
translator = DocumentTranslator()
# Create uploads directory
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@app.get("/")
async def read_root():
"""Serve the main HTML page"""
return FileResponse("web/index.html")
@app.get("/models")
async def get_available_models():
"""Get list of available translation models from OpenRouter"""
try:
models = await translator.get_available_models()
return {"models": models}
except Exception as e:
logger.error(f"Error fetching models: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch available models")
@app.post("/translate")
async def translate_document(
file: UploadFile = File(...),
model: str = Form(...),
source_language: str = Form(default="auto"),
target_language: str = Form(default="en")
):
"""
Translate a document (PDF or DOCX) using the specified model
Returns translated file with same name and format as original
"""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Validate file type
allowed_extensions = {".pdf", ".docx"}
file_extension = Path(file.filename).suffix.lower()
if file_extension not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Allowed: {', '.join(allowed_extensions)}"
)
# Validate API key
if not translator.is_ready():
raise HTTPException(
status_code=500,
detail="Translation service not configured. Please check OPENROUTER_API_KEY."
)
# Create temporary directory for this translation
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Save uploaded file
input_file = temp_path / file.filename
with open(input_file, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
try:
# Perform translation
logger.info(f"Starting translation of {input_file} using model {model}")
logger.info(f"Translation: {source_language} -> {target_language}")
# Log file information
if input_file.suffix.lower() == ".pdf":
logger.info("Using coordinate-based PDF translation to preserve formatting")
result = await translator.translate_document(
input_file=input_file,
model=model,
source_language=source_language,
target_language=target_language,
output_dir=temp_path
)
# Check if translation was successful
if result.status == "failed":
error_details = f"Translation failed: {result.errors[0] if result.errors else 'Unknown error'}"
logger.error(error_details)
raise HTTPException(status_code=500, detail=error_details)
if result.paragraphs_count == 0:
logger.error("Translation completed but no paragraphs were translated")
raise HTTPException(
status_code=500,
detail="Translation failed: No content was translated. Please check if the file contains readable text."
)
# Move files to uploads directory for serving
timestamp = int(asyncio.get_event_loop().time())
result_dir = UPLOAD_DIR / f"translation_{timestamp}"
result_dir.mkdir(exist_ok=True)
# Copy result files with original names (no prefix)
final_files = {}
if result.original_file.exists():
# Keep original filename
original_dest = result_dir / file.filename
shutil.copy2(result.original_file, original_dest)
final_files["original"] = str(original_dest.relative_to(UPLOAD_DIR))
if result.translated_file.exists():
# Use original filename for translated file too
translated_dest = result_dir / file.filename
shutil.copy2(result.translated_file, translated_dest)
final_files["translated"] = str(translated_dest.relative_to(UPLOAD_DIR))
# Generate report
report = {
"status": "success",
"original_filename": file.filename,
"translated_filename": file.filename, # Same filename
"pages_translated": result.pages_count,
"paragraphs_translated": result.paragraphs_count,
"model_used": model,
"source_language": source_language,
"target_language": target_language,
"files": final_files,
"message": f"Successfully translated {result.paragraphs_count} paragraphs from {source_language} to {target_language}"
}
logger.info(f"Translation completed successfully: {result.paragraphs_count} paragraphs translated")
return JSONResponse(content=report)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Translation error: {e}")
# Handle rate limit errors specifically
if "rate limit" in str(e).lower() or "429" in str(e):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Please try again later or switch to a different model."
)
else:
raise HTTPException(status_code=500, detail=f"Translation failed: {str(e)}")
@app.get("/download/{file_path:path}")
async def download_file(file_path: str):
"""Download a translated file"""
file_location = UPLOAD_DIR / file_path
if not file_location.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_location,
filename=file_location.name,
media_type='application/octet-stream'
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "translator_ready": translator.is_ready()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)