|
|
""" |
|
|
FastAPI Server for Invoice Information Extractor |
|
|
Provides REST API for invoice processing |
|
|
""" |
|
|
|
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, Form |
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Optional |
|
|
import tempfile |
|
|
import os |
|
|
import shutil |
|
|
|
|
|
from config import API_TITLE, API_DESCRIPTION, API_VERSION |
|
|
from model_manager import model_manager |
|
|
from inference import InferenceProcessor |
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
"""Lifecycle manager - loads models on startup""" |
|
|
print("π Starting Invoice Information Extractor API...") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
try: |
|
|
model_manager.load_models() |
|
|
print("=" * 60) |
|
|
print("β
API is ready to accept requests!") |
|
|
print("=" * 60) |
|
|
except Exception as e: |
|
|
print(f"β Failed to load models: {str(e)}") |
|
|
raise |
|
|
|
|
|
yield |
|
|
|
|
|
|
|
|
print("π Shutting down API...") |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title=API_TITLE, |
|
|
description=API_DESCRIPTION, |
|
|
version=API_VERSION, |
|
|
lifespan=lifespan |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
frontend_dist = os.path.join(os.path.dirname(__file__), "frontend", "dist") |
|
|
if os.path.exists(frontend_dist): |
|
|
app.mount("/assets", StaticFiles(directory=os.path.join(frontend_dist, "assets")), name="assets") |
|
|
print(f"π Serving frontend from: {frontend_dist}") |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Root endpoint - Serve frontend or API information""" |
|
|
frontend_index = os.path.join(os.path.dirname(__file__), "frontend", "dist", "index.html") |
|
|
if os.path.exists(frontend_index): |
|
|
return FileResponse(frontend_index) |
|
|
|
|
|
|
|
|
return { |
|
|
"name": API_TITLE, |
|
|
"version": API_VERSION, |
|
|
"status": "running", |
|
|
"models_loaded": model_manager.is_loaded(), |
|
|
"endpoints": { |
|
|
"health": "/health", |
|
|
"process": "/process-invoice (POST)", |
|
|
"extract": "/extract (POST)", |
|
|
"docs": "/docs" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"models_loaded": model_manager.is_loaded() |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/extract") |
|
|
async def extract_invoice( |
|
|
file: UploadFile = File(..., description="Invoice image file (JPG, PNG, JPEG)"), |
|
|
doc_id: Optional[str] = Form(None, description="Optional document identifier"), |
|
|
enhance_image: Optional[bool] = Form(False, description="Apply OpenCV enhancement preprocessing"), |
|
|
reasoning_mode: Optional[str] = Form("simple", description="VLM reasoning mode: 'simple' or 'reason'") |
|
|
): |
|
|
""" |
|
|
Extract information from invoice image |
|
|
|
|
|
**Parameters:** |
|
|
- **file**: Invoice image file (required) |
|
|
- **doc_id**: Optional document identifier (auto-generated from filename if not provided) |
|
|
|
|
|
**Returns:** |
|
|
- JSON with extracted fields, confidence scores, and metadata |
|
|
|
|
|
**Example Response:** |
|
|
```json |
|
|
{ |
|
|
"doc_id": "invoice_001", |
|
|
"fields": { |
|
|
"dealer_name": "ABC Tractors Pvt Ltd", |
|
|
"model_name": "Mahindra 575 DI", |
|
|
"horse_power": 50, |
|
|
"asset_cost": 525000, |
|
|
"signature": {"present": true, "bbox": [100, 200, 300, 250]}, |
|
|
"stamp": {"present": true, "bbox": [400, 500, 500, 550]} |
|
|
}, |
|
|
"confidence": 0.89, |
|
|
"processing_time_sec": 3.8, |
|
|
"cost_estimate_usd": 0.000528 |
|
|
} |
|
|
``` |
|
|
""" |
|
|
|
|
|
|
|
|
if file.content_type and not file.content_type.startswith("image/"): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="File must be an image (JPG, PNG, JPEG)" |
|
|
) |
|
|
|
|
|
|
|
|
if file.filename: |
|
|
ext = os.path.splitext(file.filename)[1].lower() |
|
|
if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="File must be an image (JPG, PNG, JPEG, GIF, BMP, TIFF, WEBP)" |
|
|
) |
|
|
|
|
|
|
|
|
if not model_manager.is_loaded(): |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="Models not loaded. Please wait for server initialization." |
|
|
) |
|
|
|
|
|
|
|
|
import time |
|
|
request_start = time.time() |
|
|
temp_file = None |
|
|
try: |
|
|
|
|
|
io_start = time.time() |
|
|
suffix = os.path.splitext(file.filename)[1] |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
|
|
temp_file = temp.name |
|
|
|
|
|
shutil.copyfileobj(file.file, temp) |
|
|
io_time = round(time.time() - io_start, 3) |
|
|
|
|
|
|
|
|
if doc_id is None: |
|
|
doc_id = os.path.splitext(file.filename)[0] |
|
|
|
|
|
|
|
|
result = InferenceProcessor.process_invoice(temp_file, doc_id, enhance_image, reasoning_mode) |
|
|
|
|
|
|
|
|
result['total_request_time_sec'] = round(time.time() - request_start, 2) |
|
|
result['file_io_time_sec'] = io_time |
|
|
|
|
|
return JSONResponse(content=result, media_type="application/json; charset=utf-8") |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Error processing invoice: {str(e)}" |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
if temp_file and os.path.exists(temp_file): |
|
|
try: |
|
|
os.unlink(temp_file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
file.file.close() |
|
|
|
|
|
|
|
|
@app.post("/process-invoice") |
|
|
async def process_invoice( |
|
|
file: UploadFile = File(..., description="Invoice image file"), |
|
|
enhance_image: Optional[bool] = Form(False, description="Apply OpenCV enhancement preprocessing"), |
|
|
reasoning_mode: Optional[str] = Form("simple", description="VLM reasoning mode: 'simple' or 'reason'") |
|
|
): |
|
|
""" |
|
|
Process a single invoice and return extracted information |
|
|
Simplified endpoint for frontend integration |
|
|
|
|
|
**Parameters:** |
|
|
- **file**: Invoice image file (required) |
|
|
- **enhance_image**: Apply OpenCV enhancement preprocessing (optional) |
|
|
- **reasoning_mode**: VLM reasoning mode: 'simple' for single-step, 'reason' for Chain of Thought (optional) |
|
|
|
|
|
**Returns:** |
|
|
- JSON with extracted_text, signature_coords, stamp_coords |
|
|
""" |
|
|
|
|
|
|
|
|
if file.content_type and not file.content_type.startswith("image/"): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="File must be an image" |
|
|
) |
|
|
|
|
|
|
|
|
if not model_manager.is_loaded(): |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="Models not loaded. Please wait for server initialization." |
|
|
) |
|
|
|
|
|
temp_file = None |
|
|
try: |
|
|
|
|
|
suffix = os.path.splitext(file.filename)[1] if file.filename else '.jpg' |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
|
|
temp_file = temp.name |
|
|
shutil.copyfileobj(file.file, temp) |
|
|
|
|
|
|
|
|
doc_id = os.path.splitext(file.filename)[0] if file.filename else "invoice" |
|
|
|
|
|
|
|
|
result = InferenceProcessor.process_invoice(temp_file, doc_id, enhance_image, reasoning_mode) |
|
|
|
|
|
|
|
|
fields = result.get("fields", {}) |
|
|
signature_info = fields.get("signature", {}) |
|
|
stamp_info = fields.get("stamp", {}) |
|
|
|
|
|
|
|
|
extracted_text_parts = [] |
|
|
if fields.get("dealer_name"): |
|
|
extracted_text_parts.append(f"Dealer Name: {fields['dealer_name']}") |
|
|
if fields.get("model_name"): |
|
|
extracted_text_parts.append(f"Model Name: {fields['model_name']}") |
|
|
if fields.get("horse_power"): |
|
|
extracted_text_parts.append(f"Horse Power: {fields['horse_power']}") |
|
|
if fields.get("asset_cost"): |
|
|
extracted_text_parts.append(f"Asset Cost: {fields['asset_cost']}") |
|
|
|
|
|
extracted_text = "\n".join(extracted_text_parts) if extracted_text_parts else "No structured data extracted" |
|
|
|
|
|
|
|
|
signature_coords = [] |
|
|
if signature_info.get("present") and signature_info.get("bbox"): |
|
|
bbox = signature_info["bbox"] |
|
|
|
|
|
signature_coords = [[bbox[0], bbox[1], bbox[2], bbox[3]]] |
|
|
|
|
|
stamp_coords = [] |
|
|
if stamp_info.get("present") and stamp_info.get("bbox"): |
|
|
bbox = stamp_info["bbox"] |
|
|
|
|
|
stamp_coords = [[bbox[0], bbox[1], bbox[2], bbox[3]]] |
|
|
|
|
|
|
|
|
return JSONResponse(content={ |
|
|
"extracted_text": extracted_text, |
|
|
"signature_coords": signature_coords, |
|
|
"stamp_coords": stamp_coords, |
|
|
"doc_id": result.get("doc_id", doc_id), |
|
|
"processing_time": result.get("processing_time_sec", 0), |
|
|
"confidence": result.get("confidence", 0), |
|
|
"cost_estimate_usd": result.get("cost_estimate_usd", 0), |
|
|
"fields": fields, |
|
|
"timing_breakdown": result.get("timing_breakdown", {}) |
|
|
}, media_type="application/json; charset=utf-8") |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Error processing invoice: {str(e)}" |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
if temp_file and os.path.exists(temp_file): |
|
|
try: |
|
|
os.unlink(temp_file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
file.file.close() |
|
|
|
|
|
|
|
|
@app.post("/extract_batch") |
|
|
async def extract_batch( |
|
|
files: list[UploadFile] = File(..., description="Multiple invoice images") |
|
|
): |
|
|
""" |
|
|
Extract information from multiple invoice images |
|
|
|
|
|
**Parameters:** |
|
|
- **files**: List of invoice image files |
|
|
|
|
|
**Returns:** |
|
|
- JSON array with results for each invoice |
|
|
""" |
|
|
|
|
|
if not model_manager.is_loaded(): |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="Models not loaded. Please wait for server initialization." |
|
|
) |
|
|
|
|
|
results = [] |
|
|
temp_files = [] |
|
|
|
|
|
try: |
|
|
for file in files: |
|
|
|
|
|
if not file.content_type.startswith("image/"): |
|
|
results.append({ |
|
|
"filename": file.filename, |
|
|
"error": "File must be an image" |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
suffix = os.path.splitext(file.filename)[1] |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp: |
|
|
temp_file = temp.name |
|
|
temp_files.append(temp_file) |
|
|
shutil.copyfileobj(file.file, temp) |
|
|
|
|
|
|
|
|
try: |
|
|
doc_id = os.path.splitext(file.filename)[0] |
|
|
result = InferenceProcessor.process_invoice(temp_file, doc_id) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
results.append({ |
|
|
"filename": file.filename, |
|
|
"error": str(e) |
|
|
}) |
|
|
|
|
|
return JSONResponse(content={"results": results}, media_type="application/json; charset=utf-8") |
|
|
|
|
|
finally: |
|
|
|
|
|
for temp_file in temp_files: |
|
|
if os.path.exists(temp_file): |
|
|
try: |
|
|
os.unlink(temp_file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
for file in files: |
|
|
file.file.close() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
uvicorn.run( |
|
|
"app:app", |
|
|
host="0.0.0.0", |
|
|
port=7860, |
|
|
reload=False |
|
|
) |
|
|
|