Spaces:
Runtime error
Runtime error
| import os | |
| import requests | |
| import torch | |
| from io import BytesIO | |
| from PIL import Image | |
| from supabase import create_client, Client | |
| from docling.document_converter import DocumentConverter | |
| from transformers import pipeline | |
| # --- CONFIGURATION --- | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| converter = DocumentConverter() | |
| # LayoutLMv3 handles the 'Intelligence' (Question Answering on images) | |
| # Note: This automatically uses GPU if available | |
| device = 0 if torch.cuda.is_available() else -1 | |
| qa_predictor = pipeline( | |
| "document-question-answering", | |
| model="impira/layoutlm-document-qa", | |
| device=device | |
| ) | |
| def download_as_image(url): | |
| """Downloads a file and ensures it's a PIL Image for LayoutLM.""" | |
| response = requests.get(url, timeout=10) | |
| return Image.open(BytesIO(response.content)).convert("RGB") | |
| def clean_amount(text): | |
| """Converts string currency (e.g., '$1,200.50') to a float.""" | |
| try: | |
| return float(''.join(c for c in text if c.isdigit() or c == '.')) | |
| except ValueError: | |
| return 0.0 | |
| def process_document(invoice_id, file_url): | |
| """The main extraction pipeline.""" | |
| print(f"π Processing Invoice ID: {invoice_id}") | |
| try: | |
| # STEP 1: Layout Analysis (Docling) | |
| # This converts the file into a structural JSON map | |
| conversion_result = converter.convert(file_url) | |
| doc_json = conversion_result.document.export_to_dict() | |
| # STEP 2: Spatial Extraction (Hugging Face / LayoutLM) | |
| # We ask questions directly to the document image | |
| img = download_as_image(file_url) | |
| questions = { | |
| "vendor_name": "What is the name of the company or vendor?", | |
| "invoice_number": "What is the invoice number or reference ID?", | |
| "total_amount": "What is the grand total or total amount due?", | |
| "date": "What is the date of the invoice?" | |
| } | |
| extracted = {} | |
| for key, question in questions.items(): | |
| result = qa_predictor(image=img, question=question) | |
| extracted[key] = result[0]['answer'] if result else "Not Found" | |
| # STEP 3: Update Supabase | |
| # We push the normalized data back to the database | |
| supabase.table("invoices").update({ | |
| "status": "completed", | |
| "vendor_name": extracted["vendor_name"], | |
| "invoice_number": extracted["invoice_number"], | |
| "total_amount": clean_amount(extracted["total_amount"]), | |
| "raw_json": doc_json, # Saves the full Docling structure for future use | |
| "metadata": {"extracted_date": extracted["date"]} | |
| }).eq("id", invoice_id).execute() | |
| print(f"β Success: Updated {invoice_id}") | |
| except Exception as e: | |
| print(f"β Critical Error: {str(e)}") | |
| supabase.table("invoices").update({"status": "error"}).eq("id", invoice_id).execute() | |
| # --- 3. TRIGGER (Example for manual testing) --- | |
| if __name__ == "__main__": | |
| # In production, this would be called by your n8n webhook or an API route | |
| print("π€ Worker is online and waiting for tasks...") |