| """ |
| FastAPI Server for Invoice Information Extractor |
| Provides REST API for invoice processing |
| """ |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form |
| from fastapi.responses import JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from contextlib import asynccontextmanager |
| from typing import Optional |
| import tempfile |
| import os |
| import shutil |
|
|
| from config import API_TITLE, API_DESCRIPTION, API_VERSION |
| from model_manager import model_manager |
| from inference import InferenceProcessor |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Lifecycle manager - loads models on startup""" |
| print("π Starting Invoice Information Extractor API...") |
| print("=" * 60) |
| |
| |
| try: |
| model_manager.load_models() |
| print("=" * 60) |
| print("β
API is ready to accept requests!") |
| print("=" * 60) |
| except Exception as e: |
| print(f"β Failed to load models: {str(e)}") |
| raise |
| |
| yield |
| |
| |
| print("π Shutting down API...") |
|
|
|
|
| |
| app = FastAPI( |
| title=API_TITLE, |
| description=API_DESCRIPTION, |
| version=API_VERSION, |
| lifespan=lifespan |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| frontend_dist = os.path.join(os.path.dirname(__file__), "frontend", "dist") |
| if os.path.exists(frontend_dist): |
| app.mount("/assets", StaticFiles(directory=os.path.join(frontend_dist, "assets")), name="assets") |
| print(f"π Serving frontend from: {frontend_dist}") |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint - Serve frontend or API information""" |
| frontend_index = os.path.join(os.path.dirname(__file__), "frontend", "dist", "index.html") |
| if os.path.exists(frontend_index): |
| return FileResponse(frontend_index) |
| |
| |
| return { |
| "name": API_TITLE, |
| "version": API_VERSION, |
| "status": "running", |
| "models_loaded": model_manager.is_loaded(), |
| "endpoints": { |
| "health": "/health", |
| "process": "/process-invoice (POST)", |
| "extract": "/extract (POST)", |
| "docs": "/docs" |
| } |
| } |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint""" |
| return { |
| "status": "healthy", |
| "models_loaded": model_manager.is_loaded() |
| } |
|
|
|
|
| @app.post("/extract") |
| async def extract_invoice( |
| file: UploadFile = File(..., description="Invoice image file (JPG, PNG, JPEG)"), |
| doc_id: Optional[str] = Form(None, description="Optional document identifier"), |
| enhance_image: Optional[bool] = Form(False, description="Apply OpenCV enhancement preprocessing"), |
| reasoning_mode: Optional[str] = Form("simple", description="VLM reasoning mode: 'simple' or 'reason'") |
| ): |
| """ |
| Extract information from invoice image |
| |
| **Parameters:** |
| - **file**: Invoice image file (required) |
| - **doc_id**: Optional document identifier (auto-generated from filename if not provided) |
| |
| **Returns:** |
| - JSON with extracted fields, confidence scores, and metadata |
| |
| **Example Response:** |
| ```json |
| { |
| "doc_id": "invoice_001", |
| "fields": { |
| "dealer_name": "ABC Tractors Pvt Ltd", |
| "model_name": "Mahindra 575 DI", |
| "horse_power": 50, |
| "asset_cost": 525000, |
| "signature": {"present": true, "bbox": [100, 200, 300, 250]}, |
| "stamp": {"present": true, "bbox": [400, 500, 500, 550]} |
| }, |
| "confidence": 0.89, |
| "processing_time_sec": 3.8, |
| "cost_estimate_usd": 0.000528 |
| } |
| ``` |
| """ |
| |
| |
| if file.content_type and not file.content_type.startswith("image/"): |
| raise HTTPException( |
| status_code=400, |
| detail="File must be an image (JPG, PNG, JPEG)" |
| ) |
| |
| |
| if file.filename: |
| ext = os.path.splitext(file.filename)[1].lower() |
| if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']: |
| raise HTTPException( |
| status_code=400, |
| detail="File must be an image (JPG, PNG, JPEG, GIF, BMP, TIFF, WEBP)" |
| ) |
| |
| |
| if not model_manager.is_loaded(): |
| raise HTTPException( |
| status_code=503, |
| detail="Models not loaded. Please wait for server initialization." |
| ) |
| |
| |
| import time |
| request_start = time.time() |
| temp_file = None |
| try: |
| |
| io_start = time.time() |
| suffix = os.path.splitext(file.filename)[1] |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
| temp_file = temp.name |
| |
| shutil.copyfileobj(file.file, temp) |
| io_time = round(time.time() - io_start, 3) |
| |
| |
| if doc_id is None: |
| doc_id = os.path.splitext(file.filename)[0] |
| |
| |
| result = InferenceProcessor.process_invoice(temp_file, doc_id, enhance_image, reasoning_mode) |
| |
| |
| result['total_request_time_sec'] = round(time.time() - request_start, 2) |
| result['file_io_time_sec'] = io_time |
| |
| return JSONResponse(content=result, media_type="application/json; charset=utf-8") |
| |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error processing invoice: {str(e)}" |
| ) |
| |
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.unlink(temp_file) |
| except: |
| pass |
| |
| |
| file.file.close() |
|
|
|
|
| @app.post("/process-invoice") |
| async def process_invoice( |
| file: UploadFile = File(..., description="Invoice image file"), |
| enhance_image: Optional[bool] = Form(False, description="Apply OpenCV enhancement preprocessing"), |
| reasoning_mode: Optional[str] = Form("simple", description="VLM reasoning mode: 'simple' or 'reason'") |
| ): |
| """ |
| Process a single invoice and return extracted information |
| Simplified endpoint for frontend integration |
| |
| **Parameters:** |
| - **file**: Invoice image file (required) |
| - **enhance_image**: Apply OpenCV enhancement preprocessing (optional) |
| - **reasoning_mode**: VLM reasoning mode: 'simple' for single-step, 'reason' for Chain of Thought (optional) |
| |
| **Returns:** |
| - JSON with extracted_text, signature_coords, stamp_coords |
| """ |
| |
| |
| if file.content_type and not file.content_type.startswith("image/"): |
| raise HTTPException( |
| status_code=400, |
| detail="File must be an image" |
| ) |
| |
| |
| if not model_manager.is_loaded(): |
| raise HTTPException( |
| status_code=503, |
| detail="Models not loaded. Please wait for server initialization." |
| ) |
| |
| temp_file = None |
| try: |
| |
| suffix = os.path.splitext(file.filename)[1] if file.filename else '.jpg' |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
| temp_file = temp.name |
| shutil.copyfileobj(file.file, temp) |
| |
| |
| doc_id = os.path.splitext(file.filename)[0] if file.filename else "invoice" |
| |
| |
| result = InferenceProcessor.process_invoice(temp_file, doc_id, enhance_image, reasoning_mode) |
| |
| |
| fields = result.get("fields", {}) |
| signature_info = fields.get("signature", {}) |
| stamp_info = fields.get("stamp", {}) |
| |
| |
| extracted_text_parts = [] |
| if fields.get("dealer_name"): |
| extracted_text_parts.append(f"Dealer Name: {fields['dealer_name']}") |
| if fields.get("model_name"): |
| extracted_text_parts.append(f"Model Name: {fields['model_name']}") |
| if fields.get("horse_power"): |
| extracted_text_parts.append(f"Horse Power: {fields['horse_power']}") |
| if fields.get("asset_cost"): |
| extracted_text_parts.append(f"Asset Cost: {fields['asset_cost']}") |
| |
| extracted_text = "\n".join(extracted_text_parts) if extracted_text_parts else "No structured data extracted" |
| |
| |
| signature_coords = [] |
| if signature_info.get("present") and signature_info.get("bbox"): |
| bbox = signature_info["bbox"] |
| |
| signature_coords = [[bbox[0], bbox[1], bbox[2], bbox[3]]] |
| |
| stamp_coords = [] |
| if stamp_info.get("present") and stamp_info.get("bbox"): |
| bbox = stamp_info["bbox"] |
| |
| stamp_coords = [[bbox[0], bbox[1], bbox[2], bbox[3]]] |
| |
| |
| return JSONResponse(content={ |
| "extracted_text": extracted_text, |
| "signature_coords": signature_coords, |
| "stamp_coords": stamp_coords, |
| "doc_id": result.get("doc_id", doc_id), |
| "processing_time": result.get("processing_time_sec", 0), |
| "confidence": result.get("confidence", 0), |
| "cost_estimate_usd": result.get("cost_estimate_usd", 0), |
| "fields": fields, |
| "timing_breakdown": result.get("timing_breakdown", {}) |
| }, media_type="application/json; charset=utf-8") |
| |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error processing invoice: {str(e)}" |
| ) |
| |
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.unlink(temp_file) |
| except: |
| pass |
| |
| |
| file.file.close() |
|
|
|
|
| @app.post("/extract_batch") |
| async def extract_batch( |
| files: list[UploadFile] = File(..., description="Multiple invoice images") |
| ): |
| """ |
| Extract information from multiple invoice images |
| |
| **Parameters:** |
| - **files**: List of invoice image files |
| |
| **Returns:** |
| - JSON array with results for each invoice |
| """ |
| |
| if not model_manager.is_loaded(): |
| raise HTTPException( |
| status_code=503, |
| detail="Models not loaded. Please wait for server initialization." |
| ) |
| |
| results = [] |
| temp_files = [] |
| |
| try: |
| for file in files: |
| |
| if not file.content_type.startswith("image/"): |
| results.append({ |
| "filename": file.filename, |
| "error": "File must be an image" |
| }) |
| continue |
| |
| |
| suffix = os.path.splitext(file.filename)[1] |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
| temp_file = temp.name |
| temp_files.append(temp_file) |
| shutil.copyfileobj(file.file, temp) |
| |
| |
| try: |
| doc_id = os.path.splitext(file.filename)[0] |
| result = InferenceProcessor.process_invoice(temp_file, doc_id) |
| results.append(result) |
| except Exception as e: |
| results.append({ |
| "filename": file.filename, |
| "error": str(e) |
| }) |
| |
| return JSONResponse(content={"results": results}, media_type="application/json; charset=utf-8") |
| |
| finally: |
| |
| for temp_file in temp_files: |
| if os.path.exists(temp_file): |
| try: |
| os.unlink(temp_file) |
| except: |
| pass |
| |
| for file in files: |
| file.file.close() |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| |
| |
| uvicorn.run( |
| "app:app", |
| host="0.0.0.0", |
| port=7860, |
| reload=False |
| ) |
|
|