Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form, Request | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from typing import Optional | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from document_processor import process_document, supported_formats | |
| from fastapi.templating import Jinja2Templates | |
| from transformers import pipeline | |
| app = FastAPI() | |
| # Mount static files for CSS/JS if needed | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Setup templates | |
| templates = Jinja2Templates(directory="templates") | |
| async def upload_form(request: Request): | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "supported_formats": supported_formats() | |
| }) | |
| async def translate_file( | |
| file: UploadFile = File(...), | |
| target_language: str = Form(...) | |
| ): | |
| try: | |
| # Validate file extension | |
| file_ext = Path(file.filename).suffix.lower() | |
| if file_ext not in supported_formats(): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format. Supported formats: {', '.join(supported_formats())}" | |
| ) | |
| # Create temp file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as temp_file: | |
| temp_path = temp_file.name | |
| content = await file.read() | |
| temp_file.write(content) | |
| # Extract text from document | |
| extracted_text = process_document(temp_path) | |
| # Translation model map | |
| model_map = { | |
| "es": "Helsinki-NLP/opus-mt-en-es", | |
| "fr": "Helsinki-NLP/opus-mt-en-fr", | |
| "de": "Helsinki-NLP/opus-mt-en-de", | |
| "it": "Helsinki-NLP/opus-mt-en-it", | |
| "pt": "Helsinki-NLP/opus-mt-en-pt" | |
| } | |
| model_name = model_map.get(target_language) | |
| if not model_name: | |
| raise HTTPException(status_code=400, detail="Unsupported target language.") | |
| # Load appropriate translation pipeline | |
| print(f"[DEBUG] Loading translation model: {model_name}") | |
| pipe = pipeline("translation", model=model_name) | |
| # Translate text | |
| translated_text = pipe(extracted_text, max_length=1024)[0]["translation_text"] | |
| # Save translation to text file | |
| temp_dir = Path("./tmp") | |
| temp_dir.mkdir(exist_ok=True) | |
| output_filename = f"translated_{Path(file.filename).stem}.txt" | |
| output_path = temp_dir / output_filename | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| f.write(translated_text) | |
| print(f"[DEBUG] Writing translation to: {output_path}") | |
| return FileResponse( | |
| output_path, | |
| filename=output_filename, | |
| media_type="text/plain" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if 'temp_path' in locals() and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| def download_file(filename: str): | |
| filepath = Path("./tmp") / filename | |
| if not filepath.exists(): | |
| raise HTTPException(status_code=404, detail="File not found.") | |
| return FileResponse(filepath, filename=filename) | |
| def health_check(): | |
| return {"status": "healthy"} | |