import os import requests import torch from io import BytesIO from PIL import Image from supabase import create_client, Client from docling.document_converter import DocumentConverter from transformers import pipeline # --- CONFIGURATION --- SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") supabase = create_client(SUPABASE_URL, SUPABASE_KEY) converter = DocumentConverter() # LayoutLMv3 handles the 'Intelligence' (Question Answering on images) # Note: This automatically uses GPU if available device = 0 if torch.cuda.is_available() else -1 qa_predictor = pipeline( "document-question-answering", model="impira/layoutlm-document-qa", device=device ) def download_as_image(url): """Downloads a file and ensures it's a PIL Image for LayoutLM.""" response = requests.get(url, timeout=10) return Image.open(BytesIO(response.content)).convert("RGB") def clean_amount(text): """Converts string currency (e.g., '$1,200.50') to a float.""" try: return float(''.join(c for c in text if c.isdigit() or c == '.')) except ValueError: return 0.0 def process_document(invoice_id, file_url): """The main extraction pipeline.""" print(f"🔄 Processing Invoice ID: {invoice_id}") try: # STEP 1: Layout Analysis (Docling) # This converts the file into a structural JSON map conversion_result = converter.convert(file_url) doc_json = conversion_result.document.export_to_dict() # STEP 2: Spatial Extraction (Hugging Face / LayoutLM) # We ask questions directly to the document image img = download_as_image(file_url) questions = { "vendor_name": "What is the name of the company or vendor?", "invoice_number": "What is the invoice number or reference ID?", "total_amount": "What is the grand total or total amount due?", "date": "What is the date of the invoice?" } extracted = {} for key, question in questions.items(): result = qa_predictor(image=img, question=question) extracted[key] = result[0]['answer'] if result else "Not Found" # STEP 3: Update Supabase # We push the normalized data back to the database supabase.table("invoices").update({ "status": "completed", "vendor_name": extracted["vendor_name"], "invoice_number": extracted["invoice_number"], "total_amount": clean_amount(extracted["total_amount"]), "raw_json": doc_json, # Saves the full Docling structure for future use "metadata": {"extracted_date": extracted["date"]} }).eq("id", invoice_id).execute() print(f"✅ Success: Updated {invoice_id}") except Exception as e: print(f"❌ Critical Error: {str(e)}") supabase.table("invoices").update({"status": "error"}).eq("id", invoice_id).execute() # --- 3. TRIGGER (Example for manual testing) --- if __name__ == "__main__": # In production, this would be called by your n8n webhook or an API route print("🤖 Worker is online and waiting for tasks...")