Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile,HTTPException | |
| from fastapi.responses import JSONResponse | |
| import shutil | |
| import os | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uuid | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| from model_utils import extract_invoice_data_from_image | |
| from typing import List | |
| import asyncio | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/ultralytics" | |
| os.environ["XDG_CACHE_HOME"] = "/tmp" | |
| os.environ["FONTCONFIG_PATH"] = "/tmp" | |
| os.makedirs("/tmp/matplotlib", exist_ok=True) | |
| os.makedirs("/tmp/ultralytics", exist_ok=True) | |
| os.makedirs("/tmp/fontconfig", exist_ok=True) | |
| app = FastAPI() | |
| UPLOAD_DIR = "/tmp/uploads" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| ALLOWED_EXTENSIONS = {".png", ".jpg", ".jpeg",".pdf"} | |
| MAX_FILES_PER_REQUEST = 10 | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def resize_to_640(img: Image.Image) -> Image.Image: | |
| w_percent = 640 / float(img.size[0]) | |
| h_size = int((float(img.size[1]) * float(w_percent))) | |
| return img.resize((640, h_size), Image.LANCZOS) | |
| async def process_single_file(file: UploadFile) -> dict: | |
| file_ext = os.path.splitext(file.filename)[-1].lower() | |
| if file_ext not in ALLOWED_EXTENSIONS: | |
| raise HTTPException(status_code=400, detail=f"Unsupported format: {file.filename}. Supported: .png, .jpg, .jpeg, .pdf") | |
| unique_filename = f"{uuid.uuid4().hex}{file_ext}" | |
| file_path = os.path.join(UPLOAD_DIR, unique_filename) | |
| image_path = None | |
| try: | |
| # Save uploaded file temporarily | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| if file_ext == ".pdf": | |
| # Convert PDF's first page to image | |
| images = convert_from_path(file_path, dpi=300) | |
| if not images: | |
| return {"error": f"No pages found in PDF: {file.filename}"} | |
| img = resize_to_640(images[0]) | |
| image_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4().hex}.png") | |
| img.save(image_path) | |
| else: | |
| image_path = file_path | |
| # Run inference | |
| extracted_data = extract_invoice_data_from_image(image_path) | |
| return {"filename": file.filename, "data": extracted_data} | |
| except Exception as ex: | |
| return {"error": f"Processing failed for {file.filename}: {str(ex)}"} | |
| finally: | |
| # Clean up temp files | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| if image_path and os.path.exists(image_path) and image_path != file_path: | |
| os.remove(image_path) | |
| async def extract_invoice(files: List[UploadFile] = File(..., max_files=MAX_FILES_PER_REQUEST)): | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files uploaded") | |
| # Process files concurrently | |
| tasks = [process_single_file(file) for file in files] | |
| results = await asyncio.gather(*tasks) | |
| # Aggregate results | |
| success_count = sum(1 for r in results if "error" not in r) | |
| error_count = len(results) - success_count | |
| return JSONResponse(content={ | |
| "success": True, | |
| "message": f"Processed {len(files)} invoices. {success_count} succeeded, {error_count} failed.", | |
| "data": results | |
| }) |