Spaces:
Running
Running
| import os | |
| import base64 | |
| import json | |
| import re | |
| from io import BytesIO | |
| from typing import Any, Dict, List | |
| import httpx | |
| try: | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| PDF_SUPPORT = True | |
| except ImportError as e: | |
| PDF_SUPPORT = False | |
| print(f"[WARNING] PDF support libraries not available: {e}. PDF conversion will not work.") | |
| # Get your OpenRouter API key from env (you'll set this in Hugging Face later) | |
| OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1/chat/completions" | |
| MODEL_NAME = "qwen/qwen3-vl-235b-a22b-instruct" | |
| def _pdf_to_images(pdf_bytes: bytes) -> List[bytes]: | |
| """ | |
| Convert PDF pages to PNG images. | |
| Returns a list of PNG image bytes, one per page. | |
| """ | |
| if not PDF_SUPPORT: | |
| raise RuntimeError("PyMuPDF not installed. Cannot convert PDF to images.") | |
| pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| images = [] | |
| print(f"[INFO] PDF has {len(pdf_doc)} page(s)") | |
| for page_num in range(len(pdf_doc)): | |
| page = pdf_doc[page_num] | |
| # Render page to image (zoom factor 2 for better quality) | |
| mat = fitz.Matrix(2.0, 2.0) # 2x zoom for better quality | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to PIL Image then to PNG bytes | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| img_bytes = BytesIO() | |
| img.save(img_bytes, format="PNG") | |
| images.append(img_bytes.getvalue()) | |
| print(f"[INFO] Converted page {page_num + 1} to image ({pix.width}x{pix.height})") | |
| pdf_doc.close() | |
| return images | |
| def _image_bytes_to_base64(image_bytes: bytes) -> str: | |
| """Convert image bytes to base64 data URL.""" | |
| b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| return f"data:image/png;base64,{b64}" | |
| def _file_to_image_blocks(file_bytes: bytes, content_type: str) -> List[Dict[str, Any]]: | |
| """ | |
| Convert file to image blocks for the vision model. | |
| - For images: Returns single image block | |
| - For PDFs: Converts each page to an image and returns multiple blocks | |
| """ | |
| # Handle PDF files | |
| if content_type == "application/pdf" or content_type.endswith("/pdf"): | |
| if not PDF_SUPPORT: | |
| raise RuntimeError("PDF support requires PyMuPDF. Please install it.") | |
| print(f"[INFO] Converting PDF to images...") | |
| pdf_images = _pdf_to_images(file_bytes) | |
| # Create image blocks for each page | |
| image_blocks = [] | |
| for i, img_bytes in enumerate(pdf_images): | |
| image_url = _image_bytes_to_base64(img_bytes) | |
| image_blocks.append({ | |
| "type": "input_image", | |
| "image_url": image_url, | |
| }) | |
| print(f"[INFO] Created image block for page {i + 1} ({len(img_bytes)} bytes)") | |
| return image_blocks | |
| # Handle regular image files | |
| else: | |
| b64 = base64.b64encode(file_bytes).decode("utf-8") | |
| print(f"[DEBUG] Encoding image file. Content type: {content_type}, Size: {len(file_bytes)} bytes") | |
| return [{ | |
| "type": "input_image", | |
| "image_url": f"data:{content_type};base64,{b64}", | |
| }] | |
| async def extract_fields_from_document( | |
| file_bytes: bytes, | |
| content_type: str, | |
| filename: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Call OpenRouter with Qwen3-VL and return parsed JSON with fields. | |
| We instruct the model to return JSON only. | |
| """ | |
| if not OPENROUTER_API_KEY: | |
| raise RuntimeError("OPENROUTER_API_KEY environment variable is not set") | |
| # Convert file to image blocks (handles PDF conversion) | |
| image_blocks = _file_to_image_blocks(file_bytes, content_type) | |
| if not image_blocks: | |
| raise ValueError("No images generated from file") | |
| print(f"[INFO] Generated {len(image_blocks)} image block(s) for processing") | |
| system_prompt = ( | |
| "You are a document extraction engine. " | |
| "You analyze invoices, receipts, contracts, reports and similar documents, " | |
| "and output structured JSON only (no explanations or comments)." | |
| ) | |
| # Update prompt for multi-page documents | |
| if len(image_blocks) > 1: | |
| user_prompt = ( | |
| f"Extract important key-value pairs from this {len(image_blocks)}-page document. " | |
| "Analyze all pages and combine the information into a single JSON response.\n" | |
| "Use this shape:\n" | |
| "{\n" | |
| ' \"doc_type\": \"invoice | receipt | contract | report | other\",\n' | |
| ' \"confidence\": number between 0 and 100,\n' | |
| ' \"fields\": {\n' | |
| ' \"invoice_number\": \"...\",\n' | |
| ' \"date\": \"...\",\n' | |
| ' \"due_date\": \"...\",\n' | |
| ' \"total_amount\": \"...\",\n' | |
| ' \"currency\": \"...\",\n' | |
| ' \"vendor_name\": \"...\",\n' | |
| ' \"line_items\": [\n' | |
| ' {\"description\": \"...\", \"quantity\": \"...\", \"unit_price\": \"...\", \"line_total\": \"...\"}\n' | |
| ' ],\n' | |
| ' \"other_field\": \"...\"\n' | |
| " }\n" | |
| "}\n" | |
| "If fields are missing or not applicable, simply omit them. " | |
| "Combine information from all pages into a single response." | |
| ) | |
| else: | |
| user_prompt = ( | |
| "Extract important key-value pairs from the document and respond with JSON only.\n" | |
| "Use this shape:\n" | |
| "{\n" | |
| ' \"doc_type\": \"invoice | receipt | contract | report | other\",\n' | |
| ' \"confidence\": number between 0 and 100,\n' | |
| ' \"fields\": {\n' | |
| ' \"invoice_number\": \"...\",\n' | |
| ' \"date\": \"...\",\n' | |
| ' \"due_date\": \"...\",\n' | |
| ' \"total_amount\": \"...\",\n' | |
| ' \"currency\": \"...\",\n' | |
| ' \"vendor_name\": \"...\",\n' | |
| ' \"line_items\": [\n' | |
| ' {\"description\": \"...\", \"quantity\": \"...\", \"unit_price\": \"...\", \"line_total\": \"...\"}\n' | |
| ' ],\n' | |
| ' \"other_field\": \"...\"\n' | |
| " }\n" | |
| "}\n" | |
| "If fields are missing or not applicable, simply omit them." | |
| ) | |
| # Build content array with text prompt and all image blocks | |
| user_content = [{"type": "text", "text": user_prompt}] | |
| user_content.extend(image_blocks) | |
| payload: Dict[str, Any] = { | |
| "model": MODEL_NAME, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": [{"type": "text", "text": system_prompt}], | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_content, | |
| }, | |
| ], | |
| "max_tokens": 4096, # Increased for multi-page documents | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json", | |
| # Optional attribution headers | |
| "HTTP-Referer": os.environ.get( | |
| "APP_URL", | |
| "https://huggingface.co/spaces/your-space", | |
| ), | |
| "X-Title": "Document Capture Demo", | |
| } | |
| async with httpx.AsyncClient(timeout=120) as client: | |
| resp = await client.post(OPENROUTER_BASE_URL, headers=headers, json=payload) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # OpenRouter returns choices[0].message.content | |
| if "choices" not in data or len(data["choices"]) == 0: | |
| raise ValueError("No choices in OpenRouter response") | |
| content = data["choices"][0]["message"]["content"] | |
| # Log the raw response for debugging (first 500 chars) | |
| print(f"[DEBUG] OpenRouter response preview: {str(content)[:500]}") | |
| # content may be a string or a list of content blocks | |
| if isinstance(content, list): | |
| text = "".join(part.get("text", "") for part in content if part.get("type") == "text") | |
| else: | |
| text = content | |
| if not text or not text.strip(): | |
| raise ValueError("Empty response from OpenRouter API") | |
| # Try to parse JSON from the model output | |
| # The model might return JSON wrapped in markdown code blocks or with extra text | |
| try: | |
| # First, try direct JSON parsing | |
| parsed = json.loads(text) | |
| print(f"[DEBUG] Successfully parsed JSON directly") | |
| return parsed | |
| except json.JSONDecodeError as e: | |
| print(f"[DEBUG] Direct JSON parse failed: {e}") | |
| # Try to extract JSON from markdown code blocks | |
| json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) | |
| if json_match: | |
| try: | |
| parsed = json.loads(json_match.group(1)) | |
| print(f"[DEBUG] Successfully parsed JSON from markdown code block") | |
| return parsed | |
| except json.JSONDecodeError as e2: | |
| print(f"[DEBUG] Markdown code block parse failed: {e2}") | |
| # Try to find JSON object in the text (look for {...}) | |
| json_match = re.search(r'\{.*\}', text, re.DOTALL) | |
| if json_match: | |
| try: | |
| parsed = json.loads(json_match.group(0)) | |
| print(f"[DEBUG] Successfully parsed JSON from regex match") | |
| return parsed | |
| except json.JSONDecodeError as e3: | |
| print(f"[DEBUG] Regex match parse failed: {e3}") | |
| # If all parsing fails, return a default structure with the raw text | |
| print(f"[WARNING] All JSON parsing attempts failed. Returning fallback structure.") | |
| return { | |
| "doc_type": "other", | |
| "confidence": 50.0, | |
| "fields": { | |
| "raw_response": text[:1000], # First 1000 chars for debugging | |
| "error": "Could not parse JSON from model response", | |
| "note": "Check server logs for full response" | |
| } | |
| } | |