|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any |
|
|
import google.generativeai as genai |
|
|
from PIL import Image |
|
|
import PyPDF2 |
|
|
import pytesseract |
|
|
from doctr.io import DocumentFile |
|
|
from doctr.models import ocr_predictor |
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
GEMINI_API_KEY = "AIzaSyB2b80YwNHs3Yj6RZOTL8wjXk2YhxCluOA" |
|
|
if GEMINI_API_KEY: |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
|
|
|
EXTRACTION_PROMPT = """You are a shipping document data extraction specialist. Extract structured data from the provided shipping/logistics documents. |
|
|
|
|
|
Extract the following fields into a JSON format: |
|
|
|
|
|
{ |
|
|
"poNumber": "Purchase Order Number", |
|
|
"shipFrom": "Origin/Ship From Location", |
|
|
"carrierType": "Transportation type (RAIL/TRUCK/etc)", |
|
|
"originCarrier": "Carrier name (CN/CPRS/etc)", |
|
|
"railCarNumber": "Rail car identifier", |
|
|
"totalQuantity": "Total number of packages", |
|
|
"totalUnits": "Unit type (UNIT/MBF/MSFT/etc)", |
|
|
"accountName": "Customer/Account name", |
|
|
"inventories": { |
|
|
"items": [ |
|
|
{ |
|
|
"quantityShipped": "Quantity as number, no of packages", |
|
|
"inventoryUnits": "Unit type from document (MBF, FBM, SF, UNIT etc.)", |
|
|
"productName": "Full product description", |
|
|
"productCode": "Product code/SKU", |
|
|
"product": { |
|
|
"category": "Product category (OSB/Lumber/etc)", |
|
|
"unit": "Unit type from document (MBF, FBM, SF, UNIT etc.)", |
|
|
"pcs": "Pieces per unit", |
|
|
"mbf": "Thousand board feet (if applicable)", |
|
|
"sf": "Square feet (if applicable)", |
|
|
"pcsHeight": "Height in inches", |
|
|
"pcsWidth": "Width in inches", |
|
|
"pcsLength": "Length in the same unit as document" |
|
|
}, |
|
|
"customFields": [ |
|
|
"Mill||Mill Name", |
|
|
"Vendor||Vendor Name" |
|
|
] |
|
|
} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
IMPORTANT INSTRUCTIONS: |
|
|
1. Extract ALL products/items found in the document |
|
|
2. Convert text numbers to actual numbers (e.g., "54" → 54) |
|
|
3. Parse dimensions carefully, Do NOT convert units(e.g., "2x6x14" means height=6, width=14, length=2) |
|
|
4. Calculate MBF/SF when possible from dimensions and piece count |
|
|
5. If a field is not found, use null (not empty string) |
|
|
6. For multiple products, create separate items in the inventories.items array |
|
|
7. Extract custom fields like Mill, Vendor from document metadata |
|
|
8. Unit types must be (PCS/PKG/MBF/MSFT/etc) |
|
|
|
|
|
Return ONLY valid JSON, no markdown formatting or explanations.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(pdf_file) -> str: |
|
|
"""Extract text from PDF file""" |
|
|
try: |
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
|
text = "" |
|
|
for page in pdf_reader.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
return text |
|
|
except Exception as e: |
|
|
return f"Error extracting PDF text: {str(e)}" |
|
|
|
|
|
|
|
|
def convert_pdf_to_images(pdf_file) -> List[Image.Image]: |
|
|
"""Convert PDF pages to images""" |
|
|
try: |
|
|
from pdf2image import convert_from_path |
|
|
images = convert_from_path(pdf_file) |
|
|
return images |
|
|
except ImportError: |
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"Error converting PDF to images: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def extract_text_from_image(img_path: str) -> str: |
|
|
"""Extract text using DocTR for better structure""" |
|
|
try: |
|
|
doc = DocumentFile.from_images(img_path) |
|
|
result = ocr_model(doc) |
|
|
export = result.export() |
|
|
lines = [] |
|
|
|
|
|
|
|
|
for page in export['pages']: |
|
|
for block in page['blocks']: |
|
|
for line in block['lines']: |
|
|
line_text = " ".join([w['value'] for w in line['words']]) |
|
|
lines.append(line_text) |
|
|
|
|
|
return "\n".join(lines) |
|
|
except Exception as e: |
|
|
print(f"Error extracting text from image {img_path}: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def process_files(files: List[str]) -> Dict[str, Any]: |
|
|
"""Process uploaded files and extract text/images""" |
|
|
processed_data = { |
|
|
"files": [], |
|
|
"combined_text": "", |
|
|
"images": [] |
|
|
} |
|
|
|
|
|
if not files: |
|
|
return processed_data |
|
|
|
|
|
for file_path in files: |
|
|
file_name = Path(file_path).name |
|
|
file_ext = Path(file_path).suffix.lower() |
|
|
|
|
|
file_data = { |
|
|
"filename": file_name, |
|
|
"type": file_ext, |
|
|
"content": "" |
|
|
} |
|
|
|
|
|
try: |
|
|
if file_ext == '.pdf': |
|
|
text = extract_text_from_pdf(file_path) |
|
|
file_data["content"] = text |
|
|
processed_data["combined_text"] += f"\n--- {file_name} ---\n{text}\n" |
|
|
|
|
|
images = convert_pdf_to_images(file_path) |
|
|
processed_data["images"].extend(images) |
|
|
|
|
|
elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: |
|
|
img = Image.open(file_path) |
|
|
processed_data["images"].append(img) |
|
|
file_data["content"] = f"Image file: {file_name}" |
|
|
processed_data["combined_text"] += f"\n--- {file_name} (Image) ---\n" |
|
|
|
|
|
|
|
|
text = pytesseract.image_to_string(img) |
|
|
processed_data["combined_text"] += f"\n--- {file_name} (Image) ---\n{text}\n" |
|
|
|
|
|
elif file_ext in ['.txt']: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
text = f.read() |
|
|
file_data["content"] = text |
|
|
processed_data["combined_text"] += f"\n--- {file_name} ---\n{text}\n" |
|
|
|
|
|
processed_data["files"].append(file_data) |
|
|
|
|
|
except Exception as e: |
|
|
file_data["content"] = f"Error processing file: {str(e)}" |
|
|
processed_data["files"].append(file_data) |
|
|
|
|
|
return processed_data |
|
|
|
|
|
|
|
|
def extract_with_gemini(processed_data: Dict[str, Any], api_key: str) -> Dict[str, Any]: |
|
|
"""Extract structured data using Gemini API""" |
|
|
|
|
|
if not api_key: |
|
|
return {"error": "Gemini API key not provided"} |
|
|
|
|
|
try: |
|
|
genai.configure(api_key=api_key) |
|
|
|
|
|
|
|
|
model = genai.GenerativeModel('models/gemini-2.5-flash') |
|
|
|
|
|
|
|
|
print("available models : ", genai.list_models()) |
|
|
|
|
|
|
|
|
content = [EXTRACTION_PROMPT] |
|
|
if processed_data["combined_text"]: |
|
|
content.append(f"\nDocument Text:\n{processed_data['combined_text']}") |
|
|
|
|
|
for img in processed_data["images"][:5]: |
|
|
content.append(img) |
|
|
|
|
|
response = model.generate_content(content) |
|
|
response_text = response.text.strip() |
|
|
|
|
|
if response_text.startswith("```json"): |
|
|
response_text = response_text[7:] |
|
|
if response_text.startswith("```"): |
|
|
response_text = response_text[3:] |
|
|
if response_text.endswith("```"): |
|
|
response_text = response_text[:-3] |
|
|
|
|
|
extracted_data = json.loads(response_text) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"data": extracted_data, |
|
|
"raw_response": response_text |
|
|
} |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"JSON parsing error: {str(e)}", |
|
|
"raw_response": response.text if 'response' in locals() else None |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Extraction error: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
def process_documents(files, api_key): |
|
|
"""Main processing function""" |
|
|
|
|
|
if not files: |
|
|
print("⚠️ Please provide at least one document.") |
|
|
return |
|
|
|
|
|
if not api_key: |
|
|
print("⚠️ Please provide your Gemini API key.") |
|
|
return |
|
|
|
|
|
|
|
|
print("📄 Processing files...") |
|
|
processed_data = process_files(files) |
|
|
|
|
|
|
|
|
print("🤖 Extracting data with Gemini AI...") |
|
|
result = extract_with_gemini(processed_data, api_key) |
|
|
|
|
|
if result.get("success"): |
|
|
json_output = json.dumps(result["data"], indent=2) |
|
|
print(" Extraction Successful!") |
|
|
print(json_output) |
|
|
|
|
|
output_file = "output.json" |
|
|
with open(output_file, "w", encoding="utf-8") as f: |
|
|
f.write(json_output) |
|
|
print(f"JSON saved to {output_file}") |
|
|
return json_output |
|
|
else: |
|
|
print(f" Extraction Failed: {result.get('error', 'Unknown error')}") |
|
|
print("Raw Response:", result.get('raw_response', 'No response')) |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _gradio_wrapper(uploaded_files): |
|
|
""" |
|
|
uploaded_files: list of temporary file dicts that Gradio provides. |
|
|
Returns: status_message, json_text, preview_text |
|
|
""" |
|
|
if not uploaded_files: |
|
|
return ("No files uploaded.", "{}", "") |
|
|
|
|
|
|
|
|
file_paths = [] |
|
|
for f in uploaded_files: |
|
|
|
|
|
|
|
|
if isinstance(f, str) and os.path.exists(f): |
|
|
file_paths.append(f) |
|
|
else: |
|
|
|
|
|
try: |
|
|
temp_path = f.name |
|
|
if os.path.exists(temp_path): |
|
|
file_paths.append(temp_path) |
|
|
else: |
|
|
|
|
|
content = None |
|
|
if hasattr(f, "read"): |
|
|
content = f.read() |
|
|
elif isinstance(f, dict) and "name" in f: |
|
|
file_paths.append(f["name"]) |
|
|
continue |
|
|
|
|
|
if content: |
|
|
|
|
|
tmp_dir = Path("gradio_tmp") |
|
|
tmp_dir.mkdir(exist_ok=True) |
|
|
dest = tmp_dir / Path(f.name).name |
|
|
with open(dest, "wb") as out: |
|
|
out.write(content) |
|
|
file_paths.append(str(dest)) |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
if isinstance(f, dict) and "name" in f and os.path.exists(f["name"]): |
|
|
file_paths.append(f["name"]) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if not file_paths: |
|
|
return ("Uploaded files could not be located.", "{}", "") |
|
|
|
|
|
status_msg = "Processing..." |
|
|
|
|
|
json_result = process_documents(file_paths, GEMINI_API_KEY) |
|
|
|
|
|
if json_result: |
|
|
|
|
|
pretty = json_result |
|
|
try: |
|
|
parsed = json.loads(pretty) |
|
|
preview = "" |
|
|
|
|
|
po = parsed.get("poNumber") |
|
|
inv = parsed.get("inventories", {}).get("items", []) |
|
|
first_prod = inv[0].get("productName") if inv else None |
|
|
preview = f"PO: {po}\nFirst product: {first_prod}" |
|
|
except Exception: |
|
|
preview = pretty[:100] + "..." |
|
|
return ("Extraction completed.", pretty, preview) |
|
|
else: |
|
|
return ("Extraction failed. Check console for details.", "{}", "") |
|
|
|
|
|
|
|
|
def build_ui(): |
|
|
"""Create a simple web UI that uses the same processing code above.""" |
|
|
with gr.Blocks() as ui: |
|
|
gr.Markdown("## Document Extractor — Upload files to extract structured shipping data") |
|
|
gr.Markdown(""" |
|
|
### 💡 Tips: |
|
|
- Upload multiple files for batch processing |
|
|
- For images: ensure text is clear and well-lit |
|
|
- For PDFs: both text-based and scanned PDFs work |
|
|
- The AI will analyze visual content even if text extraction fails |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
file_input = gr.File( |
|
|
label="Select documents (PDF, image, text)", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf", ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".txt", ".csv", ".doc", ".docx"] |
|
|
) |
|
|
run_btn = gr.Button("Extract", variant="primary") |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
status = gr.Textbox(label="Status", lines=2) |
|
|
output_json = gr.Code(label="Extracted JSON", language="json", lines=20) |
|
|
preview = gr.Textbox(label="Quick preview", lines=4) |
|
|
|
|
|
run_btn.click(fn=_gradio_wrapper, inputs=[file_input], outputs=[status, output_json, preview]) |
|
|
|
|
|
return ui |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
files_to_process = ["sample1.pdf"] |
|
|
|
|
|
process_documents(files_to_process, GEMINI_API_KEY) |
|
|
|
|
|
|
|
|
demo = build_ui() |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |
|
|
|