Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import json | |
| import os | |
| from pathlib import Path | |
| import base64 | |
| from typing import List, Dict, Any | |
| import google.generativeai as genai | |
| from PIL import Image | |
| import PyPDF2 | |
| import io | |
| # Configure Gemini API | |
| # GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_API_KEY = "AIzaSyB2b80YwNHs3Yj6RZOTL8wjXk2YhxCluOA" | |
| if GEMINI_API_KEY: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| EXTRACTION_PROMPT = """You are a shipping document data extraction specialist. Extract structured data from the provided shipping/logistics documents. | |
| Extract the following fields into a JSON format: | |
| { | |
| "poNumber": "Purchase Order Number", | |
| "shipFrom": "Origin/Ship From Location", | |
| "carrierType": "Transportation type (RAIL/TRUCK/etc)", | |
| "originCarrier": "Carrier name (CN/CPRS/etc)", | |
| "railCarNumber": "Rail car identifier", | |
| "totalQuantity": "Total quantity as number", | |
| "totalUnits": "Unit type (UNIT/MBF/MSFT/etc)", | |
| "accountName": "Customer/Account name", | |
| "inventories": { | |
| "items": [ | |
| { | |
| "quantityShipped": "Quantity as number", | |
| "inventoryUnits": "Unit type", | |
| "productName": "Full product description", | |
| "productCode": "Product code/SKU", | |
| "product": { | |
| "category": "Product category (OSB/Lumber/etc)", | |
| "unit": "Unit count as number", | |
| "pcs": "Pieces per unit", | |
| "mbf": "Thousand board feet (if applicable)", | |
| "sf": "Square feet (if applicable)", | |
| "pcsHeight": "Height in inches", | |
| "pcsWidth": "Width in inches", | |
| "pcsLength": "Length in feet" | |
| }, | |
| "customFields": [ | |
| "Mill||Mill Name", | |
| "Vendor||Vendor Name" | |
| ] | |
| } | |
| ] | |
| } | |
| } | |
| IMPORTANT INSTRUCTIONS: | |
| 1. Extract ALL products/items found in the document | |
| 2. Convert text numbers to actual numbers (e.g., "54" β 54) | |
| 3. Parse dimensions carefully (e.g., "2X6X14" means height=2, width=6, length=14) | |
| 4. Calculate MBF/SF when possible from dimensions and piece count | |
| 5. If a field is not found, use null (not empty string) | |
| 6. For multiple products, create separate items in the inventories.items array | |
| 7. Extract custom fields like Mill, Vendor from document metadata | |
| Return ONLY valid JSON, no markdown formatting or explanations.""" | |
| def extract_text_from_pdf(pdf_file) -> str: | |
| """Extract text from PDF file""" | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| return f"Error extracting PDF text: {str(e)}" | |
| def convert_pdf_to_images(pdf_file) -> List[Image.Image]: | |
| """Convert PDF pages to images""" | |
| try: | |
| from pdf2image import convert_from_path | |
| images = convert_from_path(pdf_file) | |
| return images | |
| except ImportError: | |
| # Fallback if pdf2image not available | |
| return [] | |
| except Exception as e: | |
| print(f"Error converting PDF to images: {e}") | |
| return [] | |
| def process_files(files: List[str]) -> Dict[str, Any]: | |
| """Process uploaded files and extract text/images""" | |
| processed_data = { | |
| "files": [], | |
| "combined_text": "", | |
| "images": [] | |
| } | |
| if not files: | |
| return processed_data | |
| for file_path in files: | |
| file_name = Path(file_path).name | |
| file_ext = Path(file_path).suffix.lower() | |
| file_data = { | |
| "filename": file_name, | |
| "type": file_ext, | |
| "content": "" | |
| } | |
| try: | |
| if file_ext == '.pdf': | |
| # Extract text from PDF | |
| text = extract_text_from_pdf(file_path) | |
| file_data["content"] = text | |
| processed_data["combined_text"] += f"\n--- {file_name} ---\n{text}\n" | |
| # Try to convert PDF to images for visual extraction | |
| images = convert_pdf_to_images(file_path) | |
| processed_data["images"].extend(images) | |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| # Load image | |
| img = Image.open(file_path) | |
| processed_data["images"].append(img) | |
| file_data["content"] = f"Image file: {file_name}" | |
| processed_data["combined_text"] += f"\n--- {file_name} (Image) ---\n" | |
| elif file_ext in ['.txt']: | |
| # Read text file | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| file_data["content"] = text | |
| processed_data["combined_text"] += f"\n--- {file_name} ---\n{text}\n" | |
| processed_data["files"].append(file_data) | |
| except Exception as e: | |
| file_data["content"] = f"Error processing file: {str(e)}" | |
| processed_data["files"].append(file_data) | |
| return processed_data | |
| def extract_with_gemini(processed_data: Dict[str, Any], api_key: str) -> Dict[str, Any]: | |
| """Extract structured data using Gemini API""" | |
| if not api_key: | |
| return {"error": "Gemini API key not provided"} | |
| try: | |
| # Configure Gemini with provided API key | |
| genai.configure(api_key=api_key) | |
| # Use Gemini Pro Vision if images available, otherwise Pro | |
| if processed_data["images"]: | |
| model = genai.GenerativeModel('gemini-1.5-flash') | |
| # Prepare content with images and text | |
| content = [EXTRACTION_PROMPT] | |
| # Add text content | |
| if processed_data["combined_text"]: | |
| content.append(f"\nDocument Text:\n{processed_data['combined_text']}") | |
| # Add images (limit to first 5 for token management) | |
| for img in processed_data["images"][:5]: | |
| content.append(img) | |
| response = model.generate_content(content) | |
| else: | |
| # Text-only extraction | |
| model = genai.GenerativeModel('gemini-1.5-flash') | |
| prompt = f"{EXTRACTION_PROMPT}\n\nDocument Text:\n{processed_data['combined_text']}" | |
| response = model.generate_content(prompt) | |
| # Parse response | |
| response_text = response.text.strip() | |
| # Remove markdown code blocks if present | |
| if response_text.startswith("```json"): | |
| response_text = response_text[7:] | |
| if response_text.startswith("```"): | |
| response_text = response_text[3:] | |
| if response_text.endswith("```"): | |
| response_text = response_text[:-3] | |
| response_text = response_text.strip() | |
| # Parse JSON | |
| extracted_data = json.loads(response_text) | |
| return { | |
| "success": True, | |
| "data": extracted_data, | |
| "raw_response": response_text | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "success": False, | |
| "error": f"JSON parsing error: {str(e)}", | |
| "raw_response": response.text if 'response' in locals() else None | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Extraction error: {str(e)}" | |
| } | |
| def process_documents(files, api_key): | |
| """Main processing function""" | |
| if not files: | |
| return "β οΈ Please upload at least one document.", None, None | |
| if not api_key: | |
| return "β οΈ Please provide your Gemini API key.", None, None | |
| try: | |
| # Step 1: Process files | |
| status = "π Processing files..." | |
| processed_data = process_files(files) | |
| # Step 2: Extract with Gemini | |
| status = "π€ Extracting data with Gemini AI..." | |
| result = extract_with_gemini(processed_data, api_key) | |
| if result.get("success"): | |
| # Format JSON output | |
| json_output = json.dumps(result["data"], indent=2) | |
| # Create summary | |
| data = result["data"] | |
| summary = f"""β **Extraction Successful!** | |
| **Shipment Details:** | |
| - PO Number: {data.get('poNumber', 'N/A')} | |
| - Ship From: {data.get('shipFrom', 'N/A')} | |
| - Carrier: {data.get('originCarrier', 'N/A')} ({data.get('carrierType', 'N/A')}) | |
| - Rail Car: {data.get('railCarNumber', 'N/A')} | |
| - Total Quantity: {data.get('totalQuantity', 'N/A')} {data.get('totalUnits', '')} | |
| - Account: {data.get('accountName', 'N/A')} | |
| **Products Found:** {len(data.get('inventories', {}).get('items', []))} | |
| """ | |
| return summary, json_output, None | |
| else: | |
| error_msg = f"β **Extraction Failed**\n\nError: {result.get('error', 'Unknown error')}" | |
| raw = result.get('raw_response', 'No response') | |
| return error_msg, None, raw | |
| except Exception as e: | |
| return f"β **Processing Error**\n\n{str(e)}", None, None | |
| # Create Gradio Interface | |
| with gr.Blocks(title="Shipping Document Extractor", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π¦ Shipping Document Data Extractor | |
| Upload shipping documents (PDFs, images, etc.) to automatically extract structured data. | |
| Powered by Google Gemini AI. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| api_key_input = gr.Textbox( | |
| label="π Gemini API Key", | |
| placeholder="Enter your Gemini API key", | |
| type="password", | |
| value=GEMINI_API_KEY or "" | |
| ) | |
| gr.Markdown("[Get your API key](https://makersuite.google.com/app/apikey)") | |
| file_input = gr.File( | |
| label="π Upload Documents", | |
| file_count="multiple", | |
| file_types=[".pdf", ".jpg", ".jpeg", ".png", ".txt"] | |
| ) | |
| process_btn = gr.Button("π Extract Data", variant="primary", size="lg") | |
| gr.Markdown(""" | |
| ### Supported Files: | |
| - PDF documents | |
| - Images (JPG, PNG) | |
| - Text files | |
| """) | |
| with gr.Column(scale=2): | |
| status_output = gr.Markdown(label="Status") | |
| with gr.Tabs(): | |
| with gr.Tab("π Extracted JSON"): | |
| json_output = gr.Code( | |
| label="Structured Data (JSON)", | |
| language="json", | |
| lines=20 | |
| ) | |
| download_btn = gr.DownloadButton( | |
| label="πΎ Download JSON", | |
| visible=False | |
| ) | |
| with gr.Tab("π Raw Response"): | |
| raw_output = gr.Code( | |
| label="Raw API Response", | |
| language="text", | |
| lines=20 | |
| ) | |
| # Event handlers | |
| def update_download(json_str): | |
| if json_str: | |
| return gr.DownloadButton(visible=True, value=json_str) | |
| return gr.DownloadButton(visible=False) | |
| process_btn.click( | |
| fn=process_documents, | |
| inputs=[file_input, api_key_input], | |
| outputs=[status_output, json_output, raw_output] | |
| ) | |
| json_output.change( | |
| fn=update_download, | |
| inputs=[json_output], | |
| outputs=[download_btn] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Tips: | |
| - Upload multiple documents at once for batch processing | |
| - Better quality images/PDFs = better extraction accuracy | |
| - The AI extracts: PO numbers, carrier info, products, quantities, dimensions, and more | |
| """) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() |