|
|
import streamlit as st
|
|
|
import os
|
|
|
import json
|
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
|
from PIL import Image, ImageDraw
|
|
|
import pandas as pd
|
|
|
import sys
|
|
|
from src.report_generator import generate_bulk_html_report
|
|
|
|
|
|
|
|
|
try:
|
|
|
from pdf2image import convert_from_bytes
|
|
|
PDF_SUPPORT = True
|
|
|
except ImportError:
|
|
|
PDF_SUPPORT = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from src.pipeline import process_invoice
|
|
|
from src.database import init_db
|
|
|
|
|
|
|
|
|
@st.cache_resource
|
|
|
def initialize_database_once():
|
|
|
"""Run DB init only once per session/restart"""
|
|
|
init_db()
|
|
|
|
|
|
initialize_database_once()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_invoice_format(raw_text: str):
|
|
|
if raw_text and "SDN BHD" in raw_text:
|
|
|
return {
|
|
|
"name": "Retail Invoice (MY)",
|
|
|
"confidence": 95,
|
|
|
"supported": True,
|
|
|
"indicators": ["Detected 'SDN BHD' suffix"]
|
|
|
}
|
|
|
return {
|
|
|
"name": "Unknown Format",
|
|
|
"confidence": 20,
|
|
|
"supported": False,
|
|
|
"indicators": ["No known company suffix detected"]
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(
|
|
|
page_title="Smart Invoice Processor",
|
|
|
page_icon="🧾",
|
|
|
layout="wide"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown(
|
|
|
"""
|
|
|
<style>
|
|
|
/* Fix Hugging Face iframe glitch */
|
|
|
.stApp > header {visibility: hidden;}
|
|
|
.main .block-container {padding-top: 2rem;}
|
|
|
img { max-width: 100%; height: auto; }
|
|
|
/* Disable spinner blur */
|
|
|
.st-emotion-cache-16idsys { filter: none !important; transition: none !important; }
|
|
|
</style>
|
|
|
""",
|
|
|
unsafe_allow_html=True
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("🧾 Smart Invoice Processor (Hybrid ML Pipeline)")
|
|
|
st.markdown(
|
|
|
"**System Status:** 🟢 Online | "
|
|
|
"**Model:** LayoutLMv3 + Rules | "
|
|
|
"**Pipeline:** OCR → ML → Validation"
|
|
|
)
|
|
|
|
|
|
st.divider()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar:
|
|
|
st.header("ℹ️ About")
|
|
|
st.info(
|
|
|
"End-to-end invoice processing system that extracts structured data "
|
|
|
"from scanned images and PDFs using ML + rule-based validation."
|
|
|
)
|
|
|
|
|
|
st.header("⚙️ Extraction Mode")
|
|
|
extraction_method = st.selectbox(
|
|
|
"Choose extraction method",
|
|
|
("ML-Based (LayoutLMv3)", "Rule-Based (Regex)")
|
|
|
)
|
|
|
|
|
|
st.header("📊 Stats")
|
|
|
if "processed_count" not in st.session_state:
|
|
|
st.session_state.processed_count = 0
|
|
|
st.metric("Invoices Processed", st.session_state.processed_count)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tab1, tab2, tab3 = st.tabs(
|
|
|
["🚀 Upload & Process", "📚 Sample Invoices", "ℹ️ How It Works"]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with tab1:
|
|
|
col_left, col_right = st.columns([1, 1])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with col_left:
|
|
|
st.subheader("1. Upload Invoice")
|
|
|
|
|
|
|
|
|
uploaded_files = st.file_uploader(
|
|
|
"Upload Invoices (Bulk Supported)",
|
|
|
type=["jpg", "jpeg", "png", "pdf"],
|
|
|
accept_multiple_files=True
|
|
|
)
|
|
|
|
|
|
if "bulk_results" not in st.session_state:
|
|
|
st.session_state.bulk_results = None
|
|
|
|
|
|
if uploaded_files and st.button("✨ Process All Files", type="primary"):
|
|
|
all_results = []
|
|
|
progress_bar = st.progress(0)
|
|
|
status_text = st.empty()
|
|
|
|
|
|
with st.spinner(f"Processing {len(uploaded_files)} documents..."):
|
|
|
temp_dir = Path("temp")
|
|
|
temp_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
for i, uploaded_file in enumerate(uploaded_files):
|
|
|
status_text.text(f"Processing file {i+1}/{len(uploaded_files)}: {uploaded_file.name}")
|
|
|
|
|
|
temp_path = temp_dir / uploaded_file.name
|
|
|
with open(temp_path, "wb") as f:
|
|
|
f.write(uploaded_file.getbuffer())
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = process_invoice(str(temp_path), method='ml')
|
|
|
all_results.append(result)
|
|
|
except Exception as e:
|
|
|
st.error(f"Error processing {uploaded_file.name}: {e}")
|
|
|
|
|
|
|
|
|
progress_bar.progress((i + 1) / len(uploaded_files))
|
|
|
|
|
|
st.success("✅ Bulk Processing Complete!")
|
|
|
st.session_state.bulk_results = all_results
|
|
|
|
|
|
if st.session_state.bulk_results:
|
|
|
|
|
|
html_report = generate_bulk_html_report(st.session_state.bulk_results)
|
|
|
|
|
|
|
|
|
st.download_button(
|
|
|
label="📥 Download Bulk HTML Report",
|
|
|
data=html_report,
|
|
|
file_name="bulk_invoice_report.html",
|
|
|
mime="text/html"
|
|
|
)
|
|
|
|
|
|
|
|
|
st.subheader("Summary")
|
|
|
df = pd.DataFrame(st.session_state.bulk_results)
|
|
|
if not df.empty:
|
|
|
|
|
|
cols = [c for c in ["vendor", "date", "total_amount", "validation_status"] if c in df.columns]
|
|
|
st.dataframe(df[cols], width='stretch')
|
|
|
|
|
|
if uploaded_files:
|
|
|
first_file = uploaded_files[0]
|
|
|
st.caption(f"Preview: {first_file.name}" + (f" (+{len(uploaded_files)-1} more)" if len(uploaded_files) > 1 else ""))
|
|
|
|
|
|
|
|
|
if first_file.type == "application/pdf":
|
|
|
if PDF_SUPPORT:
|
|
|
pdf_bytes = first_file.read()
|
|
|
first_file.seek(0)
|
|
|
pages = convert_from_bytes(pdf_bytes, first_page=1, last_page=1)
|
|
|
if pages:
|
|
|
pdf_preview_image = pages[0]
|
|
|
st.session_state.pdf_preview = pdf_preview_image
|
|
|
st.image(pdf_preview_image, width=250, caption="PDF Preview (Page 1)")
|
|
|
else:
|
|
|
st.warning("PDF preview requires pdf2image. Install with: `pip install pdf2image`")
|
|
|
else:
|
|
|
image = Image.open(first_file)
|
|
|
first_file.seek(0)
|
|
|
st.image(image, width=250, caption="Uploaded Invoice")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with col_right:
|
|
|
st.subheader("2. Extraction Results")
|
|
|
|
|
|
|
|
|
|
|
|
if uploaded_files and len(uploaded_files) == 1:
|
|
|
single_file = uploaded_files[0]
|
|
|
if st.button("✨ Extract Data", type="primary"):
|
|
|
with st.spinner("Running invoice extraction pipeline..."):
|
|
|
try:
|
|
|
temp_dir = Path("temp")
|
|
|
temp_dir.mkdir(exist_ok=True)
|
|
|
temp_path = temp_dir / single_file.name
|
|
|
|
|
|
with open(temp_path, "wb") as f:
|
|
|
f.write(single_file.getbuffer())
|
|
|
|
|
|
method = "ml" if "ML" in extraction_method else "rules"
|
|
|
|
|
|
|
|
|
result = process_invoice(str(temp_path), method=method)
|
|
|
|
|
|
|
|
|
db_status = result.get('_db_status', 'disabled')
|
|
|
|
|
|
if db_status == 'saved':
|
|
|
st.success("✅ Extraction & Storage Complete")
|
|
|
st.toast("Invoice saved to Database!", icon="💾")
|
|
|
elif db_status == 'queued':
|
|
|
st.success("✅ Extraction Complete")
|
|
|
st.toast("Saving to database...", icon="💾")
|
|
|
elif db_status == 'duplicate':
|
|
|
st.success("✅ Extraction Complete")
|
|
|
st.toast("Duplicate invoice (already in database)", icon="⚠️")
|
|
|
elif db_status == 'disabled':
|
|
|
st.success("✅ Extraction Complete")
|
|
|
if not st.session_state.get('_db_warning_shown', False):
|
|
|
st.toast("Database disabled (Demo Mode)", icon="ℹ️")
|
|
|
st.session_state['_db_warning_shown'] = True
|
|
|
else:
|
|
|
st.success("✅ Extraction Complete")
|
|
|
|
|
|
|
|
|
if not isinstance(result, dict):
|
|
|
st.error("Pipeline returned invalid data.")
|
|
|
st.stop()
|
|
|
|
|
|
if '_db_status' in result:
|
|
|
del result['_db_status']
|
|
|
|
|
|
st.session_state.data = result
|
|
|
st.session_state.format_info = detect_invoice_format(
|
|
|
result.get("raw_text", "")
|
|
|
)
|
|
|
st.session_state.processed_count += 1
|
|
|
|
|
|
|
|
|
raw_predictions = result.get("raw_predictions")
|
|
|
if raw_predictions:
|
|
|
if single_file.type == "application/pdf":
|
|
|
if "pdf_preview" in st.session_state:
|
|
|
overlay_image = st.session_state.pdf_preview.copy().convert("RGB")
|
|
|
else:
|
|
|
overlay_image = None
|
|
|
else:
|
|
|
single_file.seek(0)
|
|
|
overlay_image = Image.open(single_file).convert("RGB")
|
|
|
|
|
|
if overlay_image:
|
|
|
draw = ImageDraw.Draw(overlay_image)
|
|
|
for entity_name, entity_data in raw_predictions.items():
|
|
|
bboxes = entity_data.get("bbox", [])
|
|
|
for box in bboxes:
|
|
|
x, y, w, h = box
|
|
|
draw.rectangle([x, y, x + w, y + h], outline="red", width=2)
|
|
|
|
|
|
overlay_image.thumbnail((800, 800))
|
|
|
st.image(overlay_image, caption="AI Detection Overlay", width="content")
|
|
|
|
|
|
except Exception as e:
|
|
|
st.error(f"Pipeline error: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "data" in st.session_state:
|
|
|
data = st.session_state.data
|
|
|
|
|
|
|
|
|
status = data.get("validation_status", "unknown")
|
|
|
if status == "passed":
|
|
|
st.success("✅ Data Validation Passed")
|
|
|
elif status == "failed":
|
|
|
st.error("❌ Data Validation Failed")
|
|
|
else:
|
|
|
st.warning("⚠️ Validation Not Performed")
|
|
|
|
|
|
|
|
|
m1, m2, m3 = st.columns(3)
|
|
|
m1.metric("Vendor", data.get("vendor") or "N/A")
|
|
|
m2.metric("Date", data.get("date") or "N/A")
|
|
|
total = data.get("total_amount")
|
|
|
m3.metric("Total Amount", f"${total}" if total else "N/A")
|
|
|
|
|
|
st.divider()
|
|
|
|
|
|
|
|
|
s1, s2 = st.columns(2)
|
|
|
s1.metric("Receipt / Invoice #", data.get("receipt_number") or "N/A")
|
|
|
|
|
|
bill_to = data.get("bill_to")
|
|
|
if isinstance(bill_to, dict):
|
|
|
bill_to = bill_to.get("name")
|
|
|
s2.metric("Bill To", bill_to or "N/A")
|
|
|
|
|
|
|
|
|
st.subheader("🛒 Line Items")
|
|
|
items = data.get("items", [])
|
|
|
if items:
|
|
|
st.dataframe(pd.DataFrame(items), width='stretch')
|
|
|
else:
|
|
|
st.info("No line items extracted.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with st.expander("🔍 Advanced Details"):
|
|
|
format_info = st.session_state.format_info
|
|
|
st.write("**Detected Format:**", format_info["name"])
|
|
|
st.write("**Detection Confidence:**", f"{format_info['confidence']}%")
|
|
|
for ind in format_info["indicators"]:
|
|
|
st.write(f"• {ind}")
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.write("**Semantic Hash:**", data.get("semantic_hash", "N/A"))
|
|
|
|
|
|
with st.expander("📄 Full JSON Output"):
|
|
|
st.json(data)
|
|
|
|
|
|
st.download_button(
|
|
|
"💾 Download JSON",
|
|
|
json.dumps(data, indent=2),
|
|
|
file_name=f"invoice_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
|
|
|
mime="application/json"
|
|
|
)
|
|
|
|
|
|
html_report = generate_bulk_html_report([data])
|
|
|
st.download_button(
|
|
|
"📥 Download HTML Report",
|
|
|
html_report,
|
|
|
file_name=f"invoice_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html",
|
|
|
mime="text/html"
|
|
|
)
|
|
|
|
|
|
with st.expander("📝 Raw OCR Text"):
|
|
|
st.text(data.get("raw_text", "No OCR text available"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with tab2:
|
|
|
st.header("📚 Sample Invoices")
|
|
|
|
|
|
sample_dir = Path("data/samples")
|
|
|
if sample_dir.exists():
|
|
|
samples = list(sample_dir.glob("*"))
|
|
|
if samples:
|
|
|
st.image(
|
|
|
Image.open(samples[0]),
|
|
|
caption=samples[0].name,
|
|
|
width=250
|
|
|
)
|
|
|
else:
|
|
|
st.info("No sample invoices found.")
|
|
|
else:
|
|
|
st.warning("Sample directory not found.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with tab3:
|
|
|
st.header("ℹ️ System Architecture")
|
|
|
st.markdown(
|
|
|
"""
|
|
|
Input Handling
|
|
|
|
|
|
JPG / PNG / PDF detection
|
|
|
|
|
|
OCR & Layout Parsing
|
|
|
|
|
|
Tesseract + LayoutLMv3
|
|
|
|
|
|
Hybrid Extraction
|
|
|
|
|
|
ML predictions with rule-based fallback
|
|
|
|
|
|
Validation
|
|
|
|
|
|
Schema & consistency checks
|
|
|
|
|
|
Output
|
|
|
|
|
|
Structured JSON + UI visualization
|
|
|
"""
|
|
|
)
|
|
|
|