KarthikUppalapati57's picture
Update worker.py
0657b84 verified
import os
import requests
import torch
from io import BytesIO
from PIL import Image
from supabase import create_client, Client
from docling.document_converter import DocumentConverter
from transformers import pipeline
# --- CONFIGURATION ---
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
converter = DocumentConverter()
# LayoutLMv3 handles the 'Intelligence' (Question Answering on images)
# Note: This automatically uses GPU if available
device = 0 if torch.cuda.is_available() else -1
qa_predictor = pipeline(
"document-question-answering",
model="impira/layoutlm-document-qa",
device=device
)
def download_as_image(url):
"""Downloads a file and ensures it's a PIL Image for LayoutLM."""
response = requests.get(url, timeout=10)
return Image.open(BytesIO(response.content)).convert("RGB")
def clean_amount(text):
"""Converts string currency (e.g., '$1,200.50') to a float."""
try:
return float(''.join(c for c in text if c.isdigit() or c == '.'))
except ValueError:
return 0.0
def process_document(invoice_id, file_url):
"""The main extraction pipeline."""
print(f"πŸ”„ Processing Invoice ID: {invoice_id}")
try:
# STEP 1: Layout Analysis (Docling)
# This converts the file into a structural JSON map
conversion_result = converter.convert(file_url)
doc_json = conversion_result.document.export_to_dict()
# STEP 2: Spatial Extraction (Hugging Face / LayoutLM)
# We ask questions directly to the document image
img = download_as_image(file_url)
questions = {
"vendor_name": "What is the name of the company or vendor?",
"invoice_number": "What is the invoice number or reference ID?",
"total_amount": "What is the grand total or total amount due?",
"date": "What is the date of the invoice?"
}
extracted = {}
for key, question in questions.items():
result = qa_predictor(image=img, question=question)
extracted[key] = result[0]['answer'] if result else "Not Found"
# STEP 3: Update Supabase
# We push the normalized data back to the database
supabase.table("invoices").update({
"status": "completed",
"vendor_name": extracted["vendor_name"],
"invoice_number": extracted["invoice_number"],
"total_amount": clean_amount(extracted["total_amount"]),
"raw_json": doc_json, # Saves the full Docling structure for future use
"metadata": {"extracted_date": extracted["date"]}
}).eq("id", invoice_id).execute()
print(f"βœ… Success: Updated {invoice_id}")
except Exception as e:
print(f"❌ Critical Error: {str(e)}")
supabase.table("invoices").update({"status": "error"}).eq("id", invoice_id).execute()
# --- 3. TRIGGER (Example for manual testing) ---
if __name__ == "__main__":
# In production, this would be called by your n8n webhook or an API route
print("πŸ€– Worker is online and waiting for tasks...")