Spaces:
Sleeping
Sleeping
File size: 11,100 Bytes
faa3050 d79b7f7 4bdd01c f74e17e d79b7f7 f74e17e 2a944a5 d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e ec0b507 f74e17e ec0b507 f74e17e d79b7f7 f74e17e 343b0c3 d79b7f7 f74e17e 2a944a5 4bdd01c 8f86a3c 4bdd01c 8f86a3c 4bdd01c 8f86a3c 4bdd01c 8f86a3c 4bdd01c 8f86a3c 4bdd01c f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 f74e17e d79b7f7 566dc81 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# src/pipeline.py
"""
Main invoice processing pipeline
Orchestrates preprocessing, OCR, and extraction
"""
from typing import Dict, Any, Optional
from pathlib import Path
import json
import threading
from pydantic import ValidationError
import cv2
# --- IMPORTS ---
from src.preprocessing import load_image, convert_to_grayscale, remove_noise
from src.extraction import structure_output
from src.ml_extraction import extract_ml_based
from src.schema import InvoiceData
from src.pdf_utils import extract_text_from_pdf, convert_pdf_to_images
from src.utils import generate_semantic_hash
from src.repository import InvoiceRepository
from src.database import DB_CONNECTED
def process_invoice(image_path: str,
method: str = 'ml',
save_results: bool = False,
output_dir: str = 'outputs') -> Dict[str, Any]:
"""
Process an invoice image using either rule-based or ML-based extraction.
Args:
image_path: Path to the invoice image.
method: The extraction method to use ('ml' or 'rules'). Default is 'ml'.
save_results: Whether to save JSON results to a file.
output_dir: Directory to save results.
Returns:
A dictionary with the extracted invoice data.
"""
if not Path(image_path).exists():
raise FileNotFoundError(f"Image/PDF not found at path: {image_path}")
print(f"Processing: {image_path}")
raw_result = {}
is_digital_pdf = False
# --- 1. SMART PDF HANDLING ---
if image_path.lower().endswith('.pdf'):
print("📄 PDF detected. Checking type...")
try:
# Attempt to extract text directly (Fast Path)
digital_text = extract_text_from_pdf(image_path)
# Heuristic: If we found >50 chars, it's likely a native Digital PDF
if len(digital_text.strip()) > 50:
print(" ✅ Digital Text found. Using Rule-Based Engine (Fast Mode).")
# We bypass the ML model because we have perfect text
raw_result = structure_output(digital_text)
is_digital_pdf = True
method = 'rules (digital)' # Override method for logging
else:
print(" ⚠️ Sparse text detected. Treating as Scanned PDF.")
# Convert first page to image for the ML pipeline
print(" 🔄 Converting Page 1 to Image...")
images = convert_pdf_to_images(image_path)
# Save as temp jpg so our existing pipeline can read it
# (In production, you might pass the array directly, but this is safer for now)
temp_jpg = image_path.replace('.pdf', '.jpg')
cv2.imwrite(temp_jpg, images[0])
# SWAP THE PATH: The rest of the pipeline will now see a JPG!
image_path = temp_jpg
print(f" ➡️ Continuing with converted image: {image_path}")
except Exception as e:
print(f" ❌ PDF Error: {e}. Falling back to standard processing.")
# --- 2. STANDARD EXTRACTION (ML / RULES) ---
# Only run this if we didn't already extract from Digital PDF
if not is_digital_pdf:
print(f"⚙️ Using '{method}' method on image...")
if method == 'ml':
try:
raw_result = extract_ml_based(image_path)
except Exception as e:
raise ValueError(f"Error during ML-based extraction: {e}")
elif method == 'rules':
try:
print("⚠️ Rule-based mode is deprecated. Redirecting to ML-based extraction.")
raw_result = extract_ml_based(image_path)
except Exception as e:
raise ValueError(f"Error during ML-based extraction: {e}")
# Clean up temp file if we created one
if image_path.endswith('.jpg') and 'sample_pdf' in image_path: # Safety check
# Optional: os.remove(image_path)
pass
# --- VALIDATION STEP ---
final_data = raw_result # Default to raw if validation crashes hard
if method == 'ml':
try:
invoice = InvoiceData(**raw_result)
final_data = invoice.model_dump(mode='json')
final_data['validation_status'] = 'passed'
print("✅ Data Validation Passed")
except ValidationError as e:
print(f"❌ Data Validation Failed: {len(e.errors())} errors")
# We keep the 'raw_result' data so the user isn't left with nothing,
# but we attach the error report so they know what to fix.
final_data = raw_result.copy()
final_data['validation_status'] = 'failed'
# Format errors nicely
error_list = []
for err in e.errors():
field = " -> ".join(str(loc) for loc in err['loc'])
msg = err['msg']
print(f" - {field}: {msg}")
error_list.append(f"{field}: {msg}")
final_data['validation_errors'] = error_list
# Preserve raw_predictions and raw_text for UI visualization (not in schema)
if 'raw_predictions' in raw_result:
final_data['raw_predictions'] = raw_result['raw_predictions']
if 'raw_text' in raw_result:
final_data['raw_text'] = raw_result['raw_text']
# --- DUPLICATE DETECTION ---
# We calculate the hash based on the final (or raw) data.
# This gives us a unique fingerprint for this specific business transaction.
final_data['semantic_hash'] = generate_semantic_hash(final_data)
# --- DATABASE SAVE (ASYNC - Fire and Forget) ---
def background_db_operation(data_to_save):
"""Check for duplicate and save in background thread"""
try:
repo = InvoiceRepository()
if repo.session:
# Check for duplicate first
existing = repo.get_by_hash(data_to_save.get('semantic_hash', ''))
if existing:
print(f" ⚠️ [Background] Duplicate: {data_to_save.get('receipt_number')}")
else:
# Not a duplicate - save it
saved = repo.save_invoice(data_to_save)
if saved:
print(f" ✅ [Background] Saved: {data_to_save.get('receipt_number')}")
else:
print(f" ⚠️ [Background] Save failed: {data_to_save.get('receipt_number')}")
except Exception as e:
print(f" ⚠️ [Background] DB Error: {e}")
if DB_CONNECTED:
# Fire and forget - don't wait for result
save_thread = threading.Thread(target=background_db_operation, args=(final_data.copy(),))
save_thread.start()
final_data['_db_status'] = 'queued'
else:
final_data['_db_status'] = 'disabled'
# --- SAVING STEP ---
if save_results:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Helper to serialize Decimals/Dates for JSON (standard json.dump fails on them)
# You can use 'default=str' in json.dump or convert before saving
json_path = output_path / (Path(image_path).stem + f"_{method}.json")
try:
with open(json_path, 'w', encoding='utf-8') as f:
# Use default=str to handle Decimal and Date objects automatically
json.dump(final_data, f, indent=2, ensure_ascii=False, default=str)
except Exception as e:
raise IOError(f"Error saving results to {json_path}: {e}")
return final_data
def process_batch(image_folder: str, output_dir: str = 'outputs') -> list:
"""Process multiple invoices in a folder""" # Corrected indentation
results = []
supported_extensions = ['*.jpg', '*.png', '*.jpeg']
for ext in supported_extensions:
for img_file in Path(image_folder).glob(ext):
print(f"🔄 Processing: {img_file}")
try:
result = process_invoice(str(img_file), save_results=True, output_dir=output_dir)
results.append(result)
except Exception as e:
print(f"❌ Error processing {img_file}: {e}")
print(f"\n🎉 Batch processing complete! {len(results)} invoices processed.")
return results
def main():
"""Command-line interface for invoice processing"""
import argparse
parser = argparse.ArgumentParser(
description='Process invoice images or folders and extract structured data.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process a single invoice
python src/pipeline.py data/raw/receipt1.jpg
# Process and save a single invoice
python src/pipeline.py data/raw/receipt1.jpg --save
# Process an entire folder of invoices
python src/pipeline.py data/raw --save --output results/
"""
)
# Corrected: Single 'path' argument
parser.add_argument('path', help='Path to an invoice image or a folder of images')
parser.add_argument('--save', action='store_true', help='Save results to JSON files')
parser.add_argument('--output', default='outputs', help='Output directory for JSON files')
parser.add_argument('--method', default='ml', choices=['ml', 'rules'], help="Extraction method: 'ml' or 'rules'")
args = parser.parse_args()
try:
# Check if path is a directory or a file
if Path(args.path).is_dir():
process_batch(args.path, output_dir=args.output)
elif Path(args.path).is_file():
# Corrected: Use args.path
print(f"🔄 Processing: {args.path}")
result = process_invoice(args.path, method=args.method, save_results=args.save, output_dir=args.output)
print("\n📊 Extracted Data:")
print("=" * 60)
print(f"Vendor: {result.get('vendor', 'N/A')}")
print(f"Invoice Number: {result.get('invoice_number', 'N/A')}")
print(f"Date: {result.get('date', 'N/A')}")
print(f"Total: ${result.get('total_amount', 0.0)}")
print("=" * 60)
if args.save:
print(f"\n💾 JSON saved to: {args.output}/{Path(args.path).stem}.json")
else:
raise FileNotFoundError(f"Path does not exist: {args.path}")
except Exception as e:
print(f"❌ An error occurred: {e}")
return 1
return 0
if __name__ == '__main__':
import sys
sys.exit(main()) |