shivam701171's picture
Update app.py
c602672 verified
#!/usr/bin/env python3
"""
AI Invoice Processing System - Complete Single File for Hugging Face Spaces
A comprehensive system with AI-powered extraction, semantic search, and analytics.
Author: AI Assistant
Date: 2024
Version: HuggingFace Single File v1.0
"""
# ===============================================================================
# IMPORTS AND COMPATIBILITY CHECKS
# ===============================================================================
import os
import json
import re
import tempfile
import shutil
import pickle
import numpy as np
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import time
import logging
import uuid
# Check if running on Hugging Face Spaces
IS_HF_SPACE = os.getenv("SPACE_ID") is not None
# Get Hugging Face token from environment or Streamlit secrets
HF_TOKEN = None
try:
# Try Streamlit secrets first (for HF Spaces)
HF_TOKEN = st.secrets.get("HF_TOKEN", None)
except:
# Fall back to environment variable
HF_TOKEN = os.getenv("HF_TOKEN", None)
# Streamlit and core libraries
import streamlit as st
import sqlite3
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
# Vector storage and embeddings (with fallbacks)
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
st.warning("⚠️ FAISS not available. Vector search will be disabled.")
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
st.warning("⚠️ Sentence Transformers not available. Using fallback methods.")
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
# Document processing (simplified for HF)
try:
import pdfplumber
PDF_PROCESSING_AVAILABLE = True
PDF_PROCESSOR = "pdfplumber"
except ImportError:
try:
import PyPDF2
PDF_PROCESSING_AVAILABLE = True
PDF_PROCESSOR = "PyPDF2"
except ImportError:
PDF_PROCESSING_AVAILABLE = False
PDF_PROCESSOR = None
# ===============================================================================
# STREAMLIT CONFIGURATION
# ===============================================================================
st.set_page_config(
page_title="AI Invoice Processing System",
page_icon="πŸ“„",
layout="wide",
initial_sidebar_state="expanded",
menu_items={
'Get Help': 'https://huggingface.co/spaces',
'Report a bug': 'https://huggingface.co/spaces',
'About': """
# AI Invoice Processing System
Built for Hugging Face Spaces with AI-powered extraction and semantic search.
"""
}
)
# ===============================================================================
# CONFIGURATION
# ===============================================================================
HF_CONFIG = {
"max_file_size_mb": 10,
"max_concurrent_files": 3,
"timeout_seconds": 30,
"use_cpu_only": True,
"embedding_model": "all-MiniLM-L6-v2",
"cache_dir": "./cache",
"data_dir": "./data",
"enable_ollama": False,
}
# Create necessary directories
os.makedirs(HF_CONFIG["cache_dir"], exist_ok=True)
os.makedirs(HF_CONFIG["data_dir"], exist_ok=True)
# ===============================================================================
# DATA STRUCTURES
# ===============================================================================
@dataclass
class InvoiceData:
"""Data structure for extracted invoice information"""
supplier_name: str = ""
buyer_name: str = ""
invoice_number: str = ""
date: str = ""
amount: float = 0.0
quantity: int = 0
product_description: str = ""
file_path: str = ""
extraction_confidence: float = 0.0
processing_method: str = "regex"
@dataclass
class VectorSearchResult:
"""Data structure for vector search results"""
invoice_id: str
invoice_number: str
supplier_name: str
similarity_score: float
content_preview: str
metadata: Dict
# ===============================================================================
# DOCUMENT PROCESSING CLASSES
# ===============================================================================
class DocumentProcessor:
"""Simplified document processor for Hugging Face Spaces"""
def __init__(self):
self.setup_processors()
def setup_processors(self):
"""Setup available document processors"""
self.processors = {}
# PDF processing
if PDF_PROCESSING_AVAILABLE:
if PDF_PROCESSOR == "pdfplumber":
self.processors['pdf'] = self.extract_with_pdfplumber
st.success("βœ… PDF processing available (pdfplumber)")
elif PDF_PROCESSOR == "PyPDF2":
self.processors['pdf'] = self.extract_with_pypdf2
st.success("βœ… PDF processing available (PyPDF2)")
else:
st.warning("⚠️ No PDF processor available")
# Text files
self.processors['txt'] = self.extract_text_file
def extract_with_pdfplumber(self, file_path: str) -> str:
"""Extract text using pdfplumber"""
try:
import pdfplumber
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
except Exception as e:
st.error(f"PDF extraction failed: {e}")
return ""
def extract_with_pypdf2(self, file_path: str) -> str:
"""Extract text using PyPDF2"""
try:
import PyPDF2
text = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
except Exception as e:
st.error(f"PDF extraction failed: {e}")
return ""
def extract_text_file(self, file_path: str) -> str:
"""Extract text from text files"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
st.error(f"Text file extraction failed: {e}")
return ""
def extract_text_from_document(self, file_path: str) -> str:
"""Extract text from document based on file type"""
file_ext = Path(file_path).suffix.lower()
if file_ext == '.pdf':
processor = self.processors.get('pdf')
elif file_ext == '.txt':
processor = self.processors.get('txt')
else:
st.warning(f"Unsupported file type: {file_ext}")
return ""
if processor:
return processor(file_path)
else:
st.error(f"No processor available for {file_ext}")
return ""
# ===============================================================================
# AI EXTRACTION CLASS
# ===============================================================================
class AIExtractor:
"""AI extraction for Hugging Face Spaces with Mistral 7B support"""
def __init__(self):
self.use_mistral = self.setup_mistral()
self.use_transformers = self.setup_transformers() if not self.use_mistral else False
def setup_mistral(self):
"""Try to setup Mistral 7B model with proper authentication"""
try:
# Check if we have HF token
if not HF_TOKEN:
st.warning("⚠️ Hugging Face token not found. Add HF_TOKEN to secrets for Mistral access.")
return False
# Check if we're in a high-resource environment
import psutil
memory_gb = psutil.virtual_memory().total / (1024**3)
if memory_gb < 8:
st.warning("⚠️ Insufficient memory for Mistral 7B. Using lighter models.")
return False
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from huggingface_hub import login
# Login with HF token
login(token=HF_TOKEN)
with st.spinner("πŸ”„ Loading Mistral 7B model (this may take a few minutes)..."):
# Use the instruction-tuned model
model_name = "mistralai/Mistral-7B-Instruct-v0.1"
# Load with reduced precision for memory efficiency
self.mistral_tokenizer = AutoTokenizer.from_pretrained(
model_name,
cache_dir=HF_CONFIG["cache_dir"],
token=HF_TOKEN
)
self.mistral_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if TORCH_AVAILABLE else None,
device_map="auto" if TORCH_AVAILABLE else None,
load_in_8bit=True, # Use 8-bit quantization
cache_dir=HF_CONFIG["cache_dir"],
token=HF_TOKEN
)
# Create pipeline
self.mistral_pipeline = pipeline(
"text-generation",
model=self.mistral_model,
tokenizer=self.mistral_tokenizer,
torch_dtype=torch.float16 if TORCH_AVAILABLE else None,
device_map="auto" if TORCH_AVAILABLE else None
)
st.success("βœ… Mistral 7B model loaded successfully!")
return True
except ImportError as e:
st.warning(f"⚠️ Missing dependencies for Mistral 7B: {e}")
return False
except Exception as e:
st.warning(f"⚠️ Mistral 7B not available: {e}")
st.info("πŸ’‘ To use Mistral 7B: Add your Hugging Face token to secrets as 'HF_TOKEN'")
return False
def setup_transformers(self):
"""Fallback to lighter NER model"""
try:
from transformers import pipeline
with st.spinner("Loading fallback AI model..."):
self.ner_pipeline = pipeline(
"ner",
model="dbmdz/bert-large-cased-finetuned-conll03-english",
aggregation_strategy="simple"
)
st.success("βœ… Fallback AI extraction model loaded")
return True
except Exception as e:
st.warning(f"⚠️ AI extraction not available: {e}")
return False
def extract_with_mistral(self, text: str) -> InvoiceData:
"""Extract invoice data using Mistral 7B"""
try:
# Create a detailed prompt for Mistral
prompt = f"""<s>[INST] You are an expert at extracting structured information from invoices.
Extract the following information from this invoice text and respond ONLY with valid JSON:
{{
"invoice_number": "invoice or bill number",
"supplier_name": "company providing goods/services",
"buyer_name": "company receiving goods/services",
"date": "date in YYYY-MM-DD format",
"amount": "total amount as number only",
"quantity": "total quantity as integer",
"product_description": "brief description of items/services"
}}
Invoice text:
{text[:2000]}
Respond with JSON only: [/INST]"""
# Generate response
response = self.mistral_pipeline(
prompt,
max_new_tokens=300,
temperature=0.1,
do_sample=True,
pad_token_id=self.mistral_tokenizer.eos_token_id
)
# Extract the generated text
generated_text = response[0]['generated_text']
# Find JSON in the response
json_start = generated_text.find('{')
json_end = generated_text.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = generated_text[json_start:json_end]
# Parse JSON
import json
data = json.loads(json_str)
# Create InvoiceData object
invoice_data = InvoiceData()
invoice_data.supplier_name = str(data.get('supplier_name', '')).strip()
invoice_data.buyer_name = str(data.get('buyer_name', '')).strip()
invoice_data.invoice_number = str(data.get('invoice_number', '')).strip()
invoice_data.date = self.parse_date(str(data.get('date', '')))
# Parse amount
try:
amount_val = data.get('amount', 0)
if isinstance(amount_val, str):
amount_clean = re.sub(r'[^\d.]', '', amount_val)
invoice_data.amount = float(amount_clean) if amount_clean else 0.0
else:
invoice_data.amount = float(amount_val)
except:
invoice_data.amount = 0.0
# Parse quantity
try:
qty_val = data.get('quantity', 0)
invoice_data.quantity = int(float(str(qty_val).replace(',', '')))
except:
invoice_data.quantity = 0
invoice_data.product_description = str(data.get('product_description', '')).strip()
invoice_data.extraction_confidence = 0.95 # High confidence for Mistral
invoice_data.processing_method = "mistral_7b"
return invoice_data
else:
st.warning("⚠️ Mistral response didn't contain valid JSON, falling back to regex")
return self.extract_with_regex(text)
except Exception as e:
st.error(f"Mistral extraction failed: {e}")
return self.extract_with_regex(text)
def extract_with_ai(self, text: str) -> InvoiceData:
"""Extract invoice data using available AI method"""
if self.use_mistral:
st.info("πŸ€– Using Mistral 7B for extraction...")
return self.extract_with_mistral(text)
elif self.use_transformers:
st.info("πŸ€– Using NER model for extraction...")
return self.extract_with_ner(text)
else:
st.info("πŸ”§ Using regex extraction...")
return self.extract_with_regex(text)
def extract_with_ner(self, text: str) -> InvoiceData:
"""Extract using NER model (fallback method)"""
try:
# Use NER to extract entities
entities = self.ner_pipeline(text[:512]) # Limit text length
invoice_data = InvoiceData()
invoice_data.processing_method = "ai_ner"
# Extract specific entities
for entity in entities:
entity_text = entity['word'].replace('##', '')
if entity['entity_group'] == 'ORG':
if not invoice_data.supplier_name:
invoice_data.supplier_name = entity_text
elif not invoice_data.buyer_name:
invoice_data.buyer_name = entity_text
elif entity['entity_group'] == 'MISC':
if not invoice_data.invoice_number and any(c.isdigit() for c in entity_text):
invoice_data.invoice_number = entity_text
# Fall back to regex for missing fields
regex_data = self.extract_with_regex(text)
# Combine results
if not invoice_data.invoice_number:
invoice_data.invoice_number = regex_data.invoice_number
if not invoice_data.amount:
invoice_data.amount = regex_data.amount
if not invoice_data.date:
invoice_data.date = regex_data.date
if not invoice_data.quantity:
invoice_data.quantity = regex_data.quantity
invoice_data.extraction_confidence = 0.8
return invoice_data
except Exception as e:
st.error(f"NER extraction failed: {e}")
return self.extract_with_regex(text)
def extract_with_regex(self, text: str) -> InvoiceData:
"""Enhanced regex extraction with better amount detection"""
invoice_data = InvoiceData()
invoice_data.processing_method = "regex"
# Enhanced regex patterns with more comprehensive matching
patterns = {
'invoice_number': [
r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)',
r'bill\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)',
r'inv\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)',
r'ref\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)',
r'#\s*([A-Z0-9\-_/]{3,})',
r'(?:^|\s)([A-Z]{2,}\d{3,}|\d{3,}[A-Z]{2,})', # Common patterns like ABC123 or 123ABC
],
'amount': [
# Currency symbols with amounts
r'total\s*(?:amount)?\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
r'amount\s*(?:due|paid|total)?\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
r'grand\s*total\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
r'net\s*(?:amount|total)\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
r'sub\s*total\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
# Currency symbols at the beginning
r'[\$₹£€]\s*([0-9,]+\.?\d*)',
# Amounts at end of lines (common in invoices)
r'([0-9,]+\.?\d*)\s*[\$₹£€]?\s*
def parse_date(self, date_str: str) -> str:
"""Parse date to YYYY-MM-DD format"""
if not date_str:
return ""
formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d']
for fmt in formats:
try:
parsed_date = datetime.strptime(date_str, fmt)
return parsed_date.strftime('%Y-%m-%d')
except ValueError:
continue
return date_str
# ===============================================================================
# VECTOR STORE CLASS
# ===============================================================================
class VectorStore:
"""Simplified vector store for Hugging Face Spaces"""
def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
self.embedding_model_name = embedding_model
self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl")
self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl")
self.embedding_model = None
self.vectors = []
self.document_metadata = []
self.embedding_dimension = None
self.setup_embedding_model()
self.load_vector_store()
def setup_embedding_model(self):
"""Initialize the sentence transformer model"""
if not SENTENCE_TRANSFORMERS_AVAILABLE:
st.warning("⚠️ Sentence Transformers not available. Vector search disabled.")
return
try:
with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."):
self.embedding_model = SentenceTransformer(
self.embedding_model_name,
cache_folder=HF_CONFIG["cache_dir"]
)
# Get embedding dimension
test_embedding = self.embedding_model.encode(["test"])
self.embedding_dimension = test_embedding.shape[0]
st.success(f"βœ… Embedding model loaded: {self.embedding_model_name}")
except Exception as e:
st.error(f"❌ Failed to load embedding model: {e}")
self.embedding_model = None
def load_vector_store(self):
"""Load existing vector store"""
try:
if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path):
with open(self.vector_store_path, 'rb') as f:
self.vectors = pickle.load(f)
with open(self.metadata_path, 'rb') as f:
self.document_metadata = pickle.load(f)
st.success(f"βœ… Vector store loaded: {len(self.document_metadata)} documents")
else:
self.vectors = []
self.document_metadata = []
st.info("πŸ“„ New vector store initialized")
except Exception as e:
st.error(f"❌ Error loading vector store: {e}")
self.vectors = []
self.document_metadata = []
def save_vector_store(self):
"""Save vector store to disk"""
try:
with open(self.vector_store_path, 'wb') as f:
pickle.dump(self.vectors, f)
with open(self.metadata_path, 'wb') as f:
pickle.dump(self.document_metadata, f)
return True
except Exception as e:
st.error(f"Error saving vector store: {e}")
return False
def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str:
"""Create searchable text from invoice data"""
text_parts = []
for field, value in invoice_data.items():
if value and field != 'id':
text_parts.append(f"{field}: {value}")
if raw_text:
text_parts.append(f"content: {raw_text[:300]}")
return " | ".join(text_parts)
def add_document(self, invoice_data: dict, raw_text: str = "") -> bool:
"""Add a document to the vector store"""
if not self.embedding_model:
return False
try:
document_text = self.create_document_text(invoice_data, raw_text)
# Generate embedding
embedding = self.embedding_model.encode(document_text, normalize_embeddings=True)
# Create metadata
metadata = {
'invoice_id': invoice_data.get('id', ''),
'invoice_number': invoice_data.get('invoice_number', ''),
'supplier_name': invoice_data.get('supplier_name', ''),
'buyer_name': invoice_data.get('buyer_name', ''),
'amount': invoice_data.get('amount', 0),
'date': invoice_data.get('date', ''),
'file_name': invoice_data.get('file_info', {}).get('file_name', ''),
'document_text': document_text[:200],
'timestamp': datetime.now().isoformat()
}
# Add to store
self.vectors.append(embedding)
self.document_metadata.append(metadata)
return True
except Exception as e:
st.error(f"Error adding document to vector store: {e}")
return False
def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]:
"""Perform semantic search using cosine similarity"""
if not self.embedding_model or not self.vectors:
return []
try:
# Generate query embedding
query_embedding = self.embedding_model.encode(query, normalize_embeddings=True)
# Calculate similarities
similarities = []
for i, doc_embedding in enumerate(self.vectors):
similarity = np.dot(query_embedding, doc_embedding)
similarities.append((similarity, i))
# Sort by similarity
similarities.sort(reverse=True)
# Return top results
results = []
for similarity, idx in similarities[:top_k]:
if similarity > 0.1: # Relevance threshold
metadata = self.document_metadata[idx]
result = VectorSearchResult(
invoice_id=metadata.get('invoice_id', ''),
invoice_number=metadata.get('invoice_number', ''),
supplier_name=metadata.get('supplier_name', ''),
similarity_score=float(similarity),
content_preview=metadata.get('document_text', ''),
metadata=metadata
)
results.append(result)
return results
except Exception as e:
st.error(f"Error in semantic search: {e}")
return []
# ===============================================================================
# MAIN PROCESSOR CLASS
# ===============================================================================
class InvoiceProcessor:
"""Main invoice processor for Hugging Face Spaces"""
def __init__(self):
self.setup_storage()
self.document_processor = DocumentProcessor()
self.ai_extractor = AIExtractor()
self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None
# Initialize stats
self.processing_stats = {
'total_processed': 0,
'successful': 0,
'failed': 0,
'start_time': datetime.now()
}
def setup_storage(self):
"""Setup storage paths"""
self.data_dir = HF_CONFIG["data_dir"]
self.json_path = os.path.join(self.data_dir, "invoices.json")
# Initialize JSON storage
if not os.path.exists(self.json_path):
initial_data = {
"metadata": {
"created_at": datetime.now().isoformat(),
"version": "hf_v1.0",
"total_invoices": 0
},
"invoices": [],
"summary": {
"total_amount": 0.0,
"unique_suppliers": [],
"processing_stats": {"successful": 0, "failed": 0}
}
}
self.save_json_data(initial_data)
def load_json_data(self) -> dict:
"""Load invoice data from JSON"""
try:
with open(self.json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
self.setup_storage()
return self.load_json_data()
def save_json_data(self, data: dict):
"""Save invoice data to JSON"""
try:
with open(self.json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
st.error(f"Error saving data: {e}")
def process_uploaded_file(self, uploaded_file) -> InvoiceData:
"""Process a single uploaded file with enhanced debugging"""
self.processing_stats['total_processed'] += 1
try:
# Debug file info
file_size = len(uploaded_file.getvalue())
file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown'
st.info(f"πŸ“„ Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})")
# Check file size
if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024:
error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB"
st.error(error_msg)
self.processing_stats['failed'] += 1
return InvoiceData()
# Check file type
if file_extension not in ['pdf', 'txt']:
error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)"
st.warning(error_msg)
self.processing_stats['failed'] += 1
return InvoiceData()
# Save temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file:
file_content = uploaded_file.getvalue()
tmp_file.write(file_content)
tmp_file_path = tmp_file.name
st.info(f"πŸ’Ύ Saved temporarily to: {tmp_file_path}")
try:
# Extract text
st.info("πŸ” Extracting text from document...")
text = self.document_processor.extract_text_from_document(tmp_file_path)
if not text or not text.strip():
st.warning(f"❌ No text extracted from {uploaded_file.name}")
self.processing_stats['failed'] += 1
return InvoiceData()
text_length = len(text)
st.info(f"πŸ“ Extracted {text_length} characters of text")
# Show text preview
if text_length > 0:
with st.expander("πŸ“„ Text Preview (First 500 characters)", expanded=False):
st.text(text[:500] + "..." if len(text) > 500 else text)
# Extract invoice data
st.info("πŸ€– Extracting invoice data using AI/Regex...")
invoice_data = self.ai_extractor.extract_with_ai(text)
invoice_data.file_path = uploaded_file.name
# Show extraction results
st.info(f"πŸ“Š Extraction completed with {invoice_data.extraction_confidence:.1%} confidence")
# Save to storage
st.info("πŸ’Ύ Saving extracted data...")
self.save_invoice_data(invoice_data, text, file_size)
self.processing_stats['successful'] += 1
st.success(f"βœ… Successfully processed {uploaded_file.name}")
return invoice_data
finally:
# Cleanup
try:
os.unlink(tmp_file_path)
st.info("🧹 Cleaned up temporary file")
except:
pass
except Exception as e:
error_msg = f"Error processing {uploaded_file.name}: {str(e)}"
st.error(error_msg)
self.processing_stats['failed'] += 1
# Show detailed error for debugging
with st.expander("πŸ” Error Details", expanded=False):
st.code(str(e))
import traceback
st.code(traceback.format_exc())
return InvoiceData()
def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int):
"""Save invoice data to JSON and vector store"""
try:
# Load existing data
data = self.load_json_data()
# Create invoice record
invoice_record = {
"id": len(data["invoices"]) + 1,
"invoice_number": invoice_data.invoice_number,
"supplier_name": invoice_data.supplier_name,
"buyer_name": invoice_data.buyer_name,
"date": invoice_data.date,
"amount": invoice_data.amount,
"quantity": invoice_data.quantity,
"product_description": invoice_data.product_description,
"file_info": {
"file_name": invoice_data.file_path,
"file_size": file_size
},
"extraction_info": {
"confidence": invoice_data.extraction_confidence,
"method": invoice_data.processing_method,
"raw_text_preview": raw_text[:300]
},
"timestamps": {
"created_at": datetime.now().isoformat()
}
}
# Add to invoices
data["invoices"].append(invoice_record)
# Update summary
self.update_summary(data)
# Save JSON
self.save_json_data(data)
# Add to vector store
if self.vector_store:
self.vector_store.add_document(invoice_record, raw_text)
self.vector_store.save_vector_store()
except Exception as e:
st.error(f"Error saving invoice data: {e}")
def update_summary(self, data: dict):
"""Update summary statistics"""
invoices = data["invoices"]
total_amount = sum(inv.get("amount", 0) for inv in invoices)
unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name")))
data["summary"] = {
"total_amount": total_amount,
"unique_suppliers": unique_suppliers,
"processing_stats": {
"successful": self.processing_stats['successful'],
"failed": self.processing_stats['failed'],
"total_processed": self.processing_stats['total_processed']
}
}
data["metadata"]["last_updated"] = datetime.now().isoformat()
data["metadata"]["total_invoices"] = len(invoices)
# ===============================================================================
# CHATBOT CLASS
# ===============================================================================
class ChatBot:
"""Chatbot for invoice queries"""
def __init__(self, processor: InvoiceProcessor):
self.processor = processor
def query_database(self, query: str) -> str:
"""Process user query and return response"""
try:
data = self.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
return "No invoice data found. Please upload some invoices first."
query_lower = query.lower()
# Handle different query types
if any(phrase in query_lower for phrase in ["summary", "overview", "total"]):
return self.generate_summary(data)
elif "count" in query_lower or "how many" in query_lower:
return self.handle_count_query(data)
elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]):
return self.handle_amount_query(data)
elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]):
return self.handle_supplier_query(data, query)
elif self.processor.vector_store:
return self.handle_semantic_search(query)
else:
return self.handle_general_query(data, query)
except Exception as e:
return f"Error processing query: {e}"
def generate_summary(self, data: dict) -> str:
"""Generate comprehensive summary"""
invoices = data.get("invoices", [])
summary = data.get("summary", {})
if not invoices:
return "No invoices found in the system."
total_amount = summary.get("total_amount", 0)
avg_amount = total_amount / len(invoices) if invoices else 0
unique_suppliers = len(summary.get("unique_suppliers", []))
response = f"""
**πŸ“Š Invoice System Summary**
β€’ **Total Invoices**: {len(invoices):,}
β€’ **Total Value**: β‚Ή{total_amount:,.2f}
β€’ **Average Invoice**: β‚Ή{avg_amount:,.2f}
β€’ **Unique Suppliers**: {unique_suppliers}
**πŸ“ˆ Processing Stats**
β€’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)}
β€’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)}
**πŸ” Recent Invoices**
"""
# Show recent invoices
recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5]
for i, inv in enumerate(recent, 1):
response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (β‚Ή{inv.get('amount', 0):,.2f})"
return response
def handle_count_query(self, data: dict) -> str:
"""Handle count-related queries"""
invoices = data.get("invoices", [])
total = len(invoices)
unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number')))
return f"""
**πŸ“Š Invoice Count Summary**
β€’ **Total Records**: {total}
β€’ **Unique Invoice Numbers**: {unique_numbers}
β€’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0}
**πŸ“… Processing Timeline**
β€’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'}
β€’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'}
"""
def handle_amount_query(self, data: dict) -> str:
"""Handle amount-related queries"""
invoices = data.get("invoices", [])
amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0]
if not amounts:
return "No amount information found in invoices."
total_amount = sum(amounts)
avg_amount = total_amount / len(amounts)
max_amount = max(amounts)
min_amount = min(amounts)
# Find high-value invoices
high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount
high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold]
response = f"""
**πŸ’° Financial Analysis**
β€’ **Total Amount**: β‚Ή{total_amount:,.2f}
β€’ **Average Amount**: β‚Ή{avg_amount:,.2f}
β€’ **Highest Invoice**: β‚Ή{max_amount:,.2f}
β€’ **Lowest Invoice**: β‚Ή{min_amount:,.2f}
**🎯 High-Value Invoices (β‚Ή{high_value_threshold:,.2f}+)**
"""
for i, inv in enumerate(high_value_invoices[:5], 1):
response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (β‚Ή{inv.get('amount', 0):,.2f})"
return response
def handle_supplier_query(self, data: dict, query: str) -> str:
"""Handle supplier-related queries"""
invoices = data.get("invoices", [])
# Count invoices by supplier
supplier_counts = {}
supplier_amounts = {}
for inv in invoices:
supplier = inv.get('supplier_name', '').strip()
if supplier:
supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1
supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0)
if not supplier_counts:
return "No supplier information found in invoices."
# Sort suppliers by amount
top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10]
response = f"""
**🏒 Supplier Analysis**
β€’ **Total Unique Suppliers**: {len(supplier_counts)}
β€’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices)
**πŸ’° Top Suppliers by Amount**
"""
for i, (supplier, amount) in enumerate(top_suppliers, 1):
count = supplier_counts[supplier]
avg = amount / count if count > 0 else 0
response += f"\n{i}. **{supplier}** - β‚Ή{amount:,.2f} ({count} invoices, avg: β‚Ή{avg:,.2f})"
return response
def handle_semantic_search(self, query: str) -> str:
"""Handle semantic search queries"""
try:
results = self.processor.vector_store.semantic_search(query, top_k=5)
if not results:
return f"No relevant results found for '{query}'. Try different keywords."
response = f"πŸ” **Semantic Search Results for '{query}'**\n\n"
for i, result in enumerate(results, 1):
response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n"
response += f" β€’ Similarity: {result.similarity_score:.3f}\n"
response += f" β€’ Amount: β‚Ή{result.metadata.get('amount', 0):,.2f}\n"
response += f" β€’ Preview: {result.content_preview[:100]}...\n\n"
return response
except Exception as e:
return f"Semantic search error: {e}"
def handle_general_query(self, data: dict, query: str) -> str:
"""Handle general queries with keyword search"""
invoices = data.get("invoices", [])
query_words = query.lower().split()
# Simple keyword matching
matching_invoices = []
for inv in invoices:
text_to_search = (
inv.get('supplier_name', '') + ' ' +
inv.get('buyer_name', '') + ' ' +
inv.get('product_description', '') + ' ' +
inv.get('extraction_info', {}).get('raw_text_preview', '')
).lower()
if any(word in text_to_search for word in query_words):
matching_invoices.append(inv)
if not matching_invoices:
return f"No invoices found matching '{query}'. Try different keywords or check the summary."
response = f"πŸ” **Found {len(matching_invoices)} invoices matching '{query}'**\n\n"
for i, inv in enumerate(matching_invoices[:5], 1):
response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n"
response += f" β€’ Amount: β‚Ή{inv.get('amount', 0):,.2f}\n"
response += f" β€’ Date: {inv.get('date', 'N/A')}\n\n"
if len(matching_invoices) > 5:
response += f"... and {len(matching_invoices) - 5} more results."
return response
# ===============================================================================
# STREAMLIT APPLICATION
# ===============================================================================
def create_app():
"""Main Streamlit application"""
# Generate unique session ID for this run
if 'session_id' not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())[:8]
session_id = st.session_state.session_id
# Custom CSS
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
text-align: center;
color: #FF6B35;
margin-bottom: 1rem;
}
.feature-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 1rem;
border-radius: 10px;
color: white;
margin: 0.5rem 0;
text-align: center;
}
.status-ok { color: #28a745; font-weight: bold; }
.status-warning { color: #ffc107; font-weight: bold; }
.status-error { color: #dc3545; font-weight: bold; }
</style>
""", unsafe_allow_html=True)
# Header
st.markdown('<h1 class="main-header">πŸ“„ AI Invoice Processing System</h1>', unsafe_allow_html=True)
st.markdown("""
<div style="text-align: center; margin-bottom: 2rem;">
<p style="font-size: 1.1rem; color: #666;">
AI-Powered Document Processing β€’ Semantic Search β€’ Smart Analytics β€’ Hugging Face Spaces
</p>
</div>
""", unsafe_allow_html=True)
# Initialize processor
if 'processor' not in st.session_state:
with st.spinner("πŸ”§ Initializing AI Invoice Processor..."):
try:
st.session_state.processor = InvoiceProcessor()
st.session_state.chatbot = ChatBot(st.session_state.processor)
st.session_state.chat_history = []
st.success("βœ… System initialized successfully!")
except Exception as e:
st.error(f"❌ Initialization failed: {e}")
st.stop()
# Sidebar
with st.sidebar:
st.header("πŸŽ›οΈ System Status")
processor = st.session_state.processor
# Component status
if processor.document_processor.processors:
st.markdown('<span class="status-ok">βœ… Document Processing</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-error">❌ Document Processing</span>', unsafe_allow_html=True)
if processor.ai_extractor.use_transformers:
st.markdown('<span class="status-ok">βœ… AI Extraction</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-warning">⚠️ Regex Extraction</span>', unsafe_allow_html=True)
if processor.vector_store and processor.vector_store.embedding_model:
st.markdown('<span class="status-ok">βœ… Semantic Search</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-warning">⚠️ Keyword Search Only</span>', unsafe_allow_html=True)
# Quick stats
st.header("πŸ“Š Quick Stats")
try:
data = processor.load_json_data()
total_invoices = len(data.get("invoices", []))
total_amount = data.get("summary", {}).get("total_amount", 0)
st.metric("Total Invoices", total_invoices)
st.metric("Total Value", f"β‚Ή{total_amount:,.2f}")
st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}")
except Exception as e:
st.error(f"Stats error: {e}")
# System info
st.header("βš™οΈ System Info")
st.info(f"""
**Session ID:** {session_id}
**Limits:**
β€’ Max file size: 10MB
β€’ Max concurrent files: 3
β€’ Timeout: 30s
""")
# Main navigation
selected_tab = st.radio(
"Choose a section:",
["πŸ“€ Upload & Process", "πŸ’¬ AI Chat", "πŸ“Š Analytics", "πŸ“‹ Data Explorer"],
horizontal=True,
key=f"main_navigation_{session_id}"
)
# -------------------------------------------------------------------------
# UPLOAD & PROCESS SECTION
# -------------------------------------------------------------------------
if selected_tab == "πŸ“€ Upload & Process":
st.header("πŸ“€ Upload Invoice Documents")
# Feature highlights
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div class="feature-box">
<h4>πŸ€– AI Extraction</h4>
<p>Advanced NLP models extract structured data automatically</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown("""
<div class="feature-box">
<h4>πŸ” Smart Search</h4>
<p>Semantic search finds invoices using natural language</p>
</div>
""", unsafe_allow_html=True)
with col3:
st.markdown("""
<div class="feature-box">
<h4>πŸ“Š Analytics</h4>
<p>Comprehensive insights and visualizations</p>
</div>
""", unsafe_allow_html=True)
# File upload
st.markdown("### πŸ“ Upload Your Invoices")
# Initialize session state for files if not exists
if f'uploaded_files_{session_id}' not in st.session_state:
st.session_state[f'uploaded_files_{session_id}'] = None
if f'processing_complete_{session_id}' not in st.session_state:
st.session_state[f'processing_complete_{session_id}'] = False
if f'currently_processing_{session_id}' not in st.session_state:
st.session_state[f'currently_processing_{session_id}'] = False
if f'processed_file_hashes_{session_id}' not in st.session_state:
st.session_state[f'processed_file_hashes_{session_id}'] = set()
# File uploader with stable key
uploaded_files = st.file_uploader(
"Choose invoice files (PDF, TXT supported)",
type=['pdf', 'txt'],
accept_multiple_files=True,
help="Maximum file size: 10MB per file",
key=f"file_uploader_stable_{session_id}"
)
# Store uploaded files in session state only if they're new
if uploaded_files:
# Create file hashes to detect if files have changed
current_file_hashes = set()
for file in uploaded_files:
file_hash = hash((file.name, file.size))
current_file_hashes.add(file_hash)
# Check if files have changed
stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set())
if current_file_hashes != stored_hashes:
st.session_state[f'uploaded_files_{session_id}'] = uploaded_files
st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes
st.session_state[f'processing_complete_{session_id}'] = False
st.session_state[f'currently_processing_{session_id}'] = False
st.info("πŸ“„ New files detected - ready for processing")
# Get files from session state
current_files = st.session_state[f'uploaded_files_{session_id}']
is_processing = st.session_state[f'currently_processing_{session_id}']
is_complete = st.session_state[f'processing_complete_{session_id}']
if current_files:
max_files = 3
if len(current_files) > max_files:
st.warning(f"⚠️ Too many files selected. Processing first {max_files} files.")
current_files = current_files[:max_files]
st.info(f"πŸ“Š {len(current_files)} files selected")
# Show file names
st.markdown("**Selected Files:**")
for i, file in enumerate(current_files, 1):
file_size_mb = len(file.getvalue()) / (1024 * 1024)
file_hash = hash((file.name, file.size))
processed_icon = "βœ…" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "πŸ“„"
st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)")
# Process button - only show if not currently processing
col1, col2 = st.columns([1, 1])
with col1:
if not is_processing and not is_complete:
if st.button("πŸš€ Process Files", type="primary", key=f"process_btn_{session_id}"):
st.session_state[f'currently_processing_{session_id}'] = True
st.rerun()
elif is_processing:
st.info("πŸ”„ Processing in progress...")
# Actually process the files here
process_files_once(current_files, session_id)
elif is_complete:
st.success("βœ… Processing completed!")
if st.button("πŸ”„ Process Again", key=f"reprocess_btn_{session_id}"):
st.session_state[f'processing_complete_{session_id}'] = False
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processed_file_hashes_{session_id}'] = set()
st.rerun()
with col2:
if st.button("πŸ—‘οΈ Clear Files", key=f"clear_files_{session_id}"):
# Clear all session state related to files
keys_to_clear = [
f'uploaded_files_{session_id}',
f'uploaded_file_hashes_{session_id}',
f'processing_complete_{session_id}',
f'currently_processing_{session_id}',
f'processed_file_hashes_{session_id}'
]
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
st.success("πŸ—‘οΈ Files cleared successfully!")
time.sleep(1) # Brief pause to show message
st.rerun()
else:
st.info("πŸ‘† Please select invoice files to upload and process")
# Show processing results if completed
if is_complete:
st.markdown("### πŸ“‹ Recent Processing Results")
try:
data = st.session_state.processor.load_json_data()
recent_invoices = sorted(
data.get("invoices", []),
key=lambda x: x.get('timestamps', {}).get('created_at', ''),
reverse=True
)[:5]
if recent_invoices:
for i, inv in enumerate(recent_invoices, 1):
with st.expander(f"πŸ“„ {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}")
st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}")
st.write(f"**Amount:** β‚Ή{inv.get('amount', 0):.2f}")
with col2:
st.write(f"**Date:** {inv.get('date', 'N/A')}")
st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}")
st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}")
else:
st.info("No recent processing results found.")
except Exception as e:
st.error(f"Error loading recent results: {e}")
# -------------------------------------------------------------------------
# AI CHAT SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ’¬ AI Chat":
st.header("πŸ’¬ AI Chat Interface")
# Display chat history
if st.session_state.chat_history:
st.markdown("### πŸ’¬ Chat History")
for i, message in enumerate(st.session_state.chat_history):
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
st.markdown("### ✍️ Ask a Question")
col1, col2 = st.columns([4, 1])
with col1:
user_input = st.text_input(
"Type your question:",
placeholder="e.g., 'show me total spending'",
key=f"chat_input_{session_id}"
)
with col2:
ask_btn = st.button("πŸš€ Ask", type="primary", key=f"ask_btn_{session_id}")
if ask_btn and user_input:
handle_chat_query(user_input)
# Suggested queries
if not st.session_state.chat_history:
st.markdown("### πŸ’‘ Try These Queries")
col1, col2 = st.columns(2)
with col1:
st.markdown("**πŸ“Š Basic Queries:**")
basic_queries = [
"Show me a summary of all invoices",
"How much have we spent in total?",
"Who are our top suppliers?",
"Find invoices with high amounts"
]
for i, query in enumerate(basic_queries):
if st.button(query, key=f"basic_{session_id}_{i}"):
handle_chat_query(query)
with col2:
st.markdown("**πŸ” Advanced Queries:**")
advanced_queries = [
"Find technology purchases",
"Show office supplies",
"Search consulting services",
"Recent high-value invoices"
]
for i, query in enumerate(advanced_queries):
if st.button(query, key=f"advanced_{session_id}_{i}"):
handle_chat_query(query)
# Clear chat
if st.session_state.chat_history:
if st.button("πŸ—‘οΈ Clear Chat", key=f"clear_chat_{session_id}"):
st.session_state.chat_history = []
st.rerun()
# -------------------------------------------------------------------------
# ANALYTICS SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ“Š Analytics":
st.header("πŸ“Š Analytics Dashboard")
try:
data = st.session_state.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
st.info("πŸ“Š No data available. Upload some invoices to see analytics.")
return
# Convert to DataFrame
df_data = []
for inv in invoices:
df_data.append({
'invoice_number': inv.get('invoice_number', ''),
'supplier_name': inv.get('supplier_name', ''),
'amount': inv.get('amount', 0),
'date': inv.get('date', ''),
'confidence': inv.get('extraction_info', {}).get('confidence', 0)
})
df = pd.DataFrame(df_data)
# Key metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Invoices", len(df))
with col2:
st.metric("Total Amount", f"β‚Ή{df['amount'].sum():,.2f}")
with col3:
st.metric("Avg Amount", f"β‚Ή{df['amount'].mean():,.2f}")
with col4:
st.metric("Unique Suppliers", df['supplier_name'].nunique())
# Visualizations
if len(df) > 0:
# Amount distribution
fig_hist = px.histogram(
df,
x='amount',
title="Invoice Amount Distribution",
labels={'amount': 'Amount (β‚Ή)', 'count': 'Number of Invoices'}
)
st.plotly_chart(fig_hist, use_container_width=True)
# Top suppliers
if df['supplier_name'].notna().any():
supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10)
if len(supplier_amounts) > 0:
fig_suppliers = px.bar(
x=supplier_amounts.values,
y=supplier_amounts.index,
orientation='h',
title="Top 10 Suppliers by Total Amount",
labels={'x': 'Total Amount (β‚Ή)', 'y': 'Supplier'}
)
st.plotly_chart(fig_suppliers, use_container_width=True)
except Exception as e:
st.error(f"Analytics error: {e}")
# -------------------------------------------------------------------------
# DATA EXPLORER SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ“‹ Data Explorer":
st.header("πŸ“‹ Data Explorer")
try:
data = st.session_state.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
st.info("πŸ“Š No data available. Upload some invoices first.")
return
# Convert to DataFrame
df_data = []
for inv in invoices:
df_data.append({
'Invoice Number': inv.get('invoice_number', ''),
'Supplier': inv.get('supplier_name', ''),
'Buyer': inv.get('buyer_name', ''),
'Amount': inv.get('amount', 0),
'Date': inv.get('date', ''),
'Confidence': inv.get('extraction_info', {}).get('confidence', 0),
'Method': inv.get('extraction_info', {}).get('method', ''),
'File': inv.get('file_info', {}).get('file_name', ''),
'Created': inv.get('timestamps', {}).get('created_at', '')[:19]
})
df = pd.DataFrame(df_data)
# Filters
col1, col2, col3 = st.columns(3)
with col1:
suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist())
selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}")
with col2:
methods = ['All'] + sorted(df['Method'].dropna().unique().tolist())
selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}")
with col3:
min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}")
# Apply filters
filtered_df = df.copy()
if selected_supplier != 'All':
filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier]
if selected_method != 'All':
filtered_df = filtered_df[filtered_df['Method'] == selected_method]
if min_amount > 0:
filtered_df = filtered_df[filtered_df['Amount'] >= min_amount]
# Display data
st.dataframe(
filtered_df,
use_container_width=True,
column_config={
"Amount": st.column_config.NumberColumn("Amount", format="β‚Ή%.2f"),
"Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1)
}
)
# Export options
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ“₯ Export CSV", key=f"export_csv_{session_id}"):
csv_data = filtered_df.to_csv(index=False)
st.download_button(
"Download CSV",
csv_data,
f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv",
"text/csv",
key=f"download_csv_{session_id}"
)
with col2:
if st.button("πŸ“„ Export JSON", key=f"export_json_{session_id}"):
filtered_invoices = [inv for inv in invoices
if inv.get('invoice_number') in filtered_df['Invoice Number'].values]
export_data = {
"exported_at": datetime.now().isoformat(),
"total_records": len(filtered_invoices),
"invoices": filtered_invoices
}
st.download_button(
"Download JSON",
json.dumps(export_data, indent=2),
f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json",
"application/json",
key=f"download_json_{session_id}"
)
except Exception as e:
st.error(f"Data explorer error: {e}")
# -------------------------------------------------------------------------
# GLOBAL CHAT INPUT
# -------------------------------------------------------------------------
st.markdown("---")
st.markdown("### πŸ’¬ Quick Chat (Works from any section)")
global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}")
if global_query:
handle_chat_query(global_query, show_response=True)
# Footer
st.markdown("---")
st.markdown("""
<div style="text-align: center; color: #666;">
<p>πŸš€ <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p>
<p>Built with ❀️ using Streamlit, Transformers, and AI</p>
</div>
""", unsafe_allow_html=True)
# ===============================================================================
# HELPER FUNCTIONS
# ===============================================================================
def process_files_once(uploaded_files, session_id):
"""Process uploaded files only once with proper state management"""
if not uploaded_files:
st.error("No files to process!")
st.session_state[f'currently_processing_{session_id}'] = False
return
st.markdown("### πŸ”„ Processing Files...")
# Get already processed file hashes
processed_hashes = st.session_state[f'processed_file_hashes_{session_id}']
# Filter out already processed files
files_to_process = []
for file in uploaded_files:
file_hash = hash((file.name, file.size))
if file_hash not in processed_hashes:
files_to_process.append((file, file_hash))
if not files_to_process:
st.info("βœ… All files have already been processed!")
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processing_complete_{session_id}'] = True
return
# Create containers for dynamic updates
progress_container = st.container()
status_container = st.container()
results_container = st.container()
successful = 0
failed = 0
# Show progress
with progress_container:
progress_bar = st.progress(0)
progress_text = st.empty()
with status_container:
st.info(f"Starting to process {len(files_to_process)} new files...")
# Process each file only once
for i, (uploaded_file, file_hash) in enumerate(files_to_process):
current_progress = (i + 1) / len(files_to_process)
with progress_container:
progress_bar.progress(current_progress)
progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}")
with status_container:
st.info(f"πŸ”„ Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)")
try:
# Process the file
result = st.session_state.processor.process_uploaded_file(uploaded_file)
# Mark file as processed regardless of result
processed_hashes.add(file_hash)
# Show result immediately
with results_container:
if result and hasattr(result, 'invoice_number') and result.invoice_number:
successful += 1
st.success(f"βœ… Successfully processed: {uploaded_file.name}")
# Show extracted data
col1, col2, col3 = st.columns(3)
with col1:
st.write(f"**Invoice #:** {result.invoice_number}")
st.write(f"**Supplier:** {result.supplier_name or 'Not found'}")
with col2:
st.write(f"**Amount:** β‚Ή{result.amount:.2f}")
st.write(f"**Date:** {result.date or 'Not found'}")
with col3:
st.write(f"**Method:** {result.processing_method}")
st.write(f"**Confidence:** {result.extraction_confidence:.1%}")
st.markdown("---")
else:
failed += 1
st.warning(f"⚠️ Could not extract complete data from: {uploaded_file.name}")
if result:
st.write(f"Partial data: {result.supplier_name}, β‚Ή{result.amount}")
st.markdown("---")
except Exception as e:
failed += 1
# Still mark as processed to avoid reprocessing
processed_hashes.add(file_hash)
with results_container:
st.error(f"❌ Error processing {uploaded_file.name}: {str(e)}")
st.markdown("---")
# Update session state
st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes
# Final summary
with progress_container:
progress_bar.progress(1.0)
progress_text.text("βœ… Processing completed!")
with status_container:
if successful > 0:
st.success(f"πŸŽ‰ Processing complete! {successful} successful, {failed} failed")
if successful > 0:
st.balloons()
else:
st.error(f"❌ Processing failed for all {failed} files. Please check file formats and content.")
# Update processing state
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processing_complete_{session_id}'] = True
# Force rerun to update UI
st.rerun()
def process_files(uploaded_files, session_id):
"""Legacy function - redirect to process_files_once"""
return process_files_once(uploaded_files, session_id)
def handle_chat_query(query, show_response=False):
"""Handle chat query"""
st.session_state.chat_history.append({
"role": "user",
"content": query,
"timestamp": datetime.now()
})
try:
with st.spinner("πŸ€– AI is analyzing..."):
response = st.session_state.chatbot.query_database(query)
st.session_state.chat_history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.now()
})
if show_response:
with st.chat_message("assistant"):
st.markdown(response)
st.info("πŸ’‘ Switch to the 'AI Chat' section to see full conversation history!")
st.rerun()
except Exception as e:
st.error(f"Chat error: {e}")
# ===============================================================================
# MAIN ENTRY POINT
# ===============================================================================
def main():
"""Main entry point for Hugging Face Spaces"""
try:
if IS_HF_SPACE:
st.sidebar.info("πŸ€— Running on Hugging Face Spaces")
create_app()
except Exception as e:
st.error(f"""
## 🚨 Application Error
{e}
Please refresh the page or check the logs for more details.
""")
if __name__ == "__main__":
main(),
# Standalone amounts with currency words
r'([0-9,]+\.?\d*)\s*(?:dollars?|rupees?|usd|inr|eur|gbp)',
# Table-like patterns
r'(?:price|cost|rate)\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)',
# Amount with decimal precision
r'(?:^|\s)([0-9]{1,3}(?:,\d{3})*\.?\d{0,2})(?=\s|$)',
],
'date': [
r'date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})',
r'(?:invoice|bill)\s*date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})',
r'(?:^|\s)(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})(?=\s|$)',
r'(\d{4}[/\-\.]\d{1,2}[/\-\.]\d{1,2})',
r'(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d{2,4})',
],
'quantity': [
r'qty\s*:?\s*(\d+)',
r'quantity\s*:?\s*(\d+)',
r'(?:units?|pcs?|pieces?)\s*:?\s*(\d+)',
r'(\d+)\s*(?:pcs?|units?|items?|pieces?)',
]
}
text_lower = text.lower()
# Extract invoice number with multiple attempts
for pattern in patterns['invoice_number']:
match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE)
if match:
invoice_data.invoice_number = match.group(1).upper().strip()
break
# Extract amount with enhanced logic
amounts_found = []
for pattern in patterns['amount']:
matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE)
for match in matches:
try:
amount_str = match.group(1).replace(',', '').replace(' ', '')
amount_val = float(amount_str)
if 0.01 <= amount_val <= 1000000: # Reasonable range
amounts_found.append(amount_val)
except (ValueError, IndexError):
continue
# Choose the most likely amount (highest value or most repeated)
if amounts_found:
# Remove duplicates and sort
unique_amounts = sorted(set(amounts_found), reverse=True)
# Take the highest reasonable amount
invoice_data.amount = unique_amounts[0]
# Extract date
for pattern in patterns['date']:
match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
if match:
invoice_data.date = self.parse_date(match.group(1))
break
# Extract quantity
for pattern in patterns['quantity']:
match = re.search(pattern, text_lower, re.IGNORECASE)
if match:
try:
invoice_data.quantity = int(match.group(1))
break
except ValueError:
continue
# Enhanced company name extraction
company_patterns = [
r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})',
r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})',
r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)',
r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)',
]
companies_found = []
for pattern in company_patterns:
matches = re.findall(pattern, text, re.MULTILINE)
for match in matches:
clean_company = match.strip().title()
if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']):
companies_found.append(clean_company)
# Assign companies (first as supplier, second as buyer)
if companies_found:
invoice_data.supplier_name = companies_found[0]
if len(companies_found) > 1:
invoice_data.buyer_name = companies_found[1]
# Extract product description
desc_patterns = [
r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})',
r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})',
]
for pattern in desc_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
desc = match.group(1).strip()
if len(desc) > 5:
invoice_data.product_description = desc[:200] # Limit length
break
# Set confidence based on how much we extracted
confidence_factors = []
if invoice_data.invoice_number:
confidence_factors.append(0.3)
if invoice_data.amount > 0:
confidence_factors.append(0.3)
if invoice_data.supplier_name:
confidence_factors.append(0.2)
if invoice_data.date:
confidence_factors.append(0.1)
if invoice_data.quantity > 0:
confidence_factors.append(0.1)
invoice_data.extraction_confidence = sum(confidence_factors)
return invoice_data
def parse_date(self, date_str: str) -> str:
"""Parse date to YYYY-MM-DD format"""
if not date_str:
return ""
formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d']
for fmt in formats:
try:
parsed_date = datetime.strptime(date_str, fmt)
return parsed_date.strftime('%Y-%m-%d')
except ValueError:
continue
return date_str
# ===============================================================================
# VECTOR STORE CLASS
# ===============================================================================
class VectorStore:
"""Simplified vector store for Hugging Face Spaces"""
def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
self.embedding_model_name = embedding_model
self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl")
self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl")
self.embedding_model = None
self.vectors = []
self.document_metadata = []
self.embedding_dimension = None
self.setup_embedding_model()
self.load_vector_store()
def setup_embedding_model(self):
"""Initialize the sentence transformer model"""
if not SENTENCE_TRANSFORMERS_AVAILABLE:
st.warning("⚠️ Sentence Transformers not available. Vector search disabled.")
return
try:
with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."):
self.embedding_model = SentenceTransformer(
self.embedding_model_name,
cache_folder=HF_CONFIG["cache_dir"]
)
# Get embedding dimension
test_embedding = self.embedding_model.encode(["test"])
self.embedding_dimension = test_embedding.shape[0]
st.success(f"βœ… Embedding model loaded: {self.embedding_model_name}")
except Exception as e:
st.error(f"❌ Failed to load embedding model: {e}")
self.embedding_model = None
def load_vector_store(self):
"""Load existing vector store"""
try:
if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path):
with open(self.vector_store_path, 'rb') as f:
self.vectors = pickle.load(f)
with open(self.metadata_path, 'rb') as f:
self.document_metadata = pickle.load(f)
st.success(f"βœ… Vector store loaded: {len(self.document_metadata)} documents")
else:
self.vectors = []
self.document_metadata = []
st.info("πŸ“„ New vector store initialized")
except Exception as e:
st.error(f"❌ Error loading vector store: {e}")
self.vectors = []
self.document_metadata = []
def save_vector_store(self):
"""Save vector store to disk"""
try:
with open(self.vector_store_path, 'wb') as f:
pickle.dump(self.vectors, f)
with open(self.metadata_path, 'wb') as f:
pickle.dump(self.document_metadata, f)
return True
except Exception as e:
st.error(f"Error saving vector store: {e}")
return False
def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str:
"""Create searchable text from invoice data"""
text_parts = []
for field, value in invoice_data.items():
if value and field != 'id':
text_parts.append(f"{field}: {value}")
if raw_text:
text_parts.append(f"content: {raw_text[:300]}")
return " | ".join(text_parts)
def add_document(self, invoice_data: dict, raw_text: str = "") -> bool:
"""Add a document to the vector store"""
if not self.embedding_model:
return False
try:
document_text = self.create_document_text(invoice_data, raw_text)
# Generate embedding
embedding = self.embedding_model.encode(document_text, normalize_embeddings=True)
# Create metadata
metadata = {
'invoice_id': invoice_data.get('id', ''),
'invoice_number': invoice_data.get('invoice_number', ''),
'supplier_name': invoice_data.get('supplier_name', ''),
'buyer_name': invoice_data.get('buyer_name', ''),
'amount': invoice_data.get('amount', 0),
'date': invoice_data.get('date', ''),
'file_name': invoice_data.get('file_info', {}).get('file_name', ''),
'document_text': document_text[:200],
'timestamp': datetime.now().isoformat()
}
# Add to store
self.vectors.append(embedding)
self.document_metadata.append(metadata)
return True
except Exception as e:
st.error(f"Error adding document to vector store: {e}")
return False
def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]:
"""Perform semantic search using cosine similarity"""
if not self.embedding_model or not self.vectors:
return []
try:
# Generate query embedding
query_embedding = self.embedding_model.encode(query, normalize_embeddings=True)
# Calculate similarities
similarities = []
for i, doc_embedding in enumerate(self.vectors):
similarity = np.dot(query_embedding, doc_embedding)
similarities.append((similarity, i))
# Sort by similarity
similarities.sort(reverse=True)
# Return top results
results = []
for similarity, idx in similarities[:top_k]:
if similarity > 0.1: # Relevance threshold
metadata = self.document_metadata[idx]
result = VectorSearchResult(
invoice_id=metadata.get('invoice_id', ''),
invoice_number=metadata.get('invoice_number', ''),
supplier_name=metadata.get('supplier_name', ''),
similarity_score=float(similarity),
content_preview=metadata.get('document_text', ''),
metadata=metadata
)
results.append(result)
return results
except Exception as e:
st.error(f"Error in semantic search: {e}")
return []
# ===============================================================================
# MAIN PROCESSOR CLASS
# ===============================================================================
class InvoiceProcessor:
"""Main invoice processor for Hugging Face Spaces"""
def __init__(self):
self.setup_storage()
self.document_processor = DocumentProcessor()
self.ai_extractor = AIExtractor()
self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None
# Initialize stats
self.processing_stats = {
'total_processed': 0,
'successful': 0,
'failed': 0,
'start_time': datetime.now()
}
def setup_storage(self):
"""Setup storage paths"""
self.data_dir = HF_CONFIG["data_dir"]
self.json_path = os.path.join(self.data_dir, "invoices.json")
# Initialize JSON storage
if not os.path.exists(self.json_path):
initial_data = {
"metadata": {
"created_at": datetime.now().isoformat(),
"version": "hf_v1.0",
"total_invoices": 0
},
"invoices": [],
"summary": {
"total_amount": 0.0,
"unique_suppliers": [],
"processing_stats": {"successful": 0, "failed": 0}
}
}
self.save_json_data(initial_data)
def load_json_data(self) -> dict:
"""Load invoice data from JSON"""
try:
with open(self.json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
self.setup_storage()
return self.load_json_data()
def save_json_data(self, data: dict):
"""Save invoice data to JSON"""
try:
with open(self.json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
st.error(f"Error saving data: {e}")
def process_uploaded_file(self, uploaded_file) -> InvoiceData:
"""Process a single uploaded file with enhanced debugging"""
self.processing_stats['total_processed'] += 1
try:
# Debug file info
file_size = len(uploaded_file.getvalue())
file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown'
st.info(f"πŸ“„ Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})")
# Check file size
if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024:
error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB"
st.error(error_msg)
self.processing_stats['failed'] += 1
return InvoiceData()
# Check file type
if file_extension not in ['pdf', 'txt']:
error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)"
st.warning(error_msg)
self.processing_stats['failed'] += 1
return InvoiceData()
# Save temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file:
file_content = uploaded_file.getvalue()
tmp_file.write(file_content)
tmp_file_path = tmp_file.name
st.info(f"πŸ’Ύ Saved temporarily to: {tmp_file_path}")
try:
# Extract text
st.info("πŸ” Extracting text from document...")
text = self.document_processor.extract_text_from_document(tmp_file_path)
if not text or not text.strip():
st.warning(f"❌ No text extracted from {uploaded_file.name}")
self.processing_stats['failed'] += 1
return InvoiceData()
text_length = len(text)
st.info(f"πŸ“ Extracted {text_length} characters of text")
# Show text preview
if text_length > 0:
with st.expander("πŸ“„ Text Preview (First 500 characters)", expanded=False):
st.text(text[:500] + "..." if len(text) > 500 else text)
# Extract invoice data
st.info("πŸ€– Extracting invoice data using AI/Regex...")
invoice_data = self.ai_extractor.extract_with_ai(text)
invoice_data.file_path = uploaded_file.name
# Show extraction results
st.info(f"πŸ“Š Extraction completed with {invoice_data.extraction_confidence:.1%} confidence")
# Save to storage
st.info("πŸ’Ύ Saving extracted data...")
self.save_invoice_data(invoice_data, text, file_size)
self.processing_stats['successful'] += 1
st.success(f"βœ… Successfully processed {uploaded_file.name}")
return invoice_data
finally:
# Cleanup
try:
os.unlink(tmp_file_path)
st.info("🧹 Cleaned up temporary file")
except:
pass
except Exception as e:
error_msg = f"Error processing {uploaded_file.name}: {str(e)}"
st.error(error_msg)
self.processing_stats['failed'] += 1
# Show detailed error for debugging
with st.expander("πŸ” Error Details", expanded=False):
st.code(str(e))
import traceback
st.code(traceback.format_exc())
return InvoiceData()
def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int):
"""Save invoice data to JSON and vector store"""
try:
# Load existing data
data = self.load_json_data()
# Create invoice record
invoice_record = {
"id": len(data["invoices"]) + 1,
"invoice_number": invoice_data.invoice_number,
"supplier_name": invoice_data.supplier_name,
"buyer_name": invoice_data.buyer_name,
"date": invoice_data.date,
"amount": invoice_data.amount,
"quantity": invoice_data.quantity,
"product_description": invoice_data.product_description,
"file_info": {
"file_name": invoice_data.file_path,
"file_size": file_size
},
"extraction_info": {
"confidence": invoice_data.extraction_confidence,
"method": invoice_data.processing_method,
"raw_text_preview": raw_text[:300]
},
"timestamps": {
"created_at": datetime.now().isoformat()
}
}
# Add to invoices
data["invoices"].append(invoice_record)
# Update summary
self.update_summary(data)
# Save JSON
self.save_json_data(data)
# Add to vector store
if self.vector_store:
self.vector_store.add_document(invoice_record, raw_text)
self.vector_store.save_vector_store()
except Exception as e:
st.error(f"Error saving invoice data: {e}")
def update_summary(self, data: dict):
"""Update summary statistics"""
invoices = data["invoices"]
total_amount = sum(inv.get("amount", 0) for inv in invoices)
unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name")))
data["summary"] = {
"total_amount": total_amount,
"unique_suppliers": unique_suppliers,
"processing_stats": {
"successful": self.processing_stats['successful'],
"failed": self.processing_stats['failed'],
"total_processed": self.processing_stats['total_processed']
}
}
data["metadata"]["last_updated"] = datetime.now().isoformat()
data["metadata"]["total_invoices"] = len(invoices)
# ===============================================================================
# CHATBOT CLASS
# ===============================================================================
class ChatBot:
"""Chatbot for invoice queries"""
def __init__(self, processor: InvoiceProcessor):
self.processor = processor
def query_database(self, query: str) -> str:
"""Process user query and return response"""
try:
data = self.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
return "No invoice data found. Please upload some invoices first."
query_lower = query.lower()
# Handle different query types
if any(phrase in query_lower for phrase in ["summary", "overview", "total"]):
return self.generate_summary(data)
elif "count" in query_lower or "how many" in query_lower:
return self.handle_count_query(data)
elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]):
return self.handle_amount_query(data)
elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]):
return self.handle_supplier_query(data, query)
elif self.processor.vector_store:
return self.handle_semantic_search(query)
else:
return self.handle_general_query(data, query)
except Exception as e:
return f"Error processing query: {e}"
def generate_summary(self, data: dict) -> str:
"""Generate comprehensive summary"""
invoices = data.get("invoices", [])
summary = data.get("summary", {})
if not invoices:
return "No invoices found in the system."
total_amount = summary.get("total_amount", 0)
avg_amount = total_amount / len(invoices) if invoices else 0
unique_suppliers = len(summary.get("unique_suppliers", []))
response = f"""
**πŸ“Š Invoice System Summary**
β€’ **Total Invoices**: {len(invoices):,}
β€’ **Total Value**: β‚Ή{total_amount:,.2f}
β€’ **Average Invoice**: β‚Ή{avg_amount:,.2f}
β€’ **Unique Suppliers**: {unique_suppliers}
**πŸ“ˆ Processing Stats**
β€’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)}
β€’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)}
**πŸ” Recent Invoices**
"""
# Show recent invoices
recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5]
for i, inv in enumerate(recent, 1):
response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (β‚Ή{inv.get('amount', 0):,.2f})"
return response
def handle_count_query(self, data: dict) -> str:
"""Handle count-related queries"""
invoices = data.get("invoices", [])
total = len(invoices)
unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number')))
return f"""
**πŸ“Š Invoice Count Summary**
β€’ **Total Records**: {total}
β€’ **Unique Invoice Numbers**: {unique_numbers}
β€’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0}
**πŸ“… Processing Timeline**
β€’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'}
β€’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'}
"""
def handle_amount_query(self, data: dict) -> str:
"""Handle amount-related queries"""
invoices = data.get("invoices", [])
amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0]
if not amounts:
return "No amount information found in invoices."
total_amount = sum(amounts)
avg_amount = total_amount / len(amounts)
max_amount = max(amounts)
min_amount = min(amounts)
# Find high-value invoices
high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount
high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold]
response = f"""
**πŸ’° Financial Analysis**
β€’ **Total Amount**: β‚Ή{total_amount:,.2f}
β€’ **Average Amount**: β‚Ή{avg_amount:,.2f}
β€’ **Highest Invoice**: β‚Ή{max_amount:,.2f}
β€’ **Lowest Invoice**: β‚Ή{min_amount:,.2f}
**🎯 High-Value Invoices (β‚Ή{high_value_threshold:,.2f}+)**
"""
for i, inv in enumerate(high_value_invoices[:5], 1):
response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (β‚Ή{inv.get('amount', 0):,.2f})"
return response
def handle_supplier_query(self, data: dict, query: str) -> str:
"""Handle supplier-related queries"""
invoices = data.get("invoices", [])
# Count invoices by supplier
supplier_counts = {}
supplier_amounts = {}
for inv in invoices:
supplier = inv.get('supplier_name', '').strip()
if supplier:
supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1
supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0)
if not supplier_counts:
return "No supplier information found in invoices."
# Sort suppliers by amount
top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10]
response = f"""
**🏒 Supplier Analysis**
β€’ **Total Unique Suppliers**: {len(supplier_counts)}
β€’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices)
**πŸ’° Top Suppliers by Amount**
"""
for i, (supplier, amount) in enumerate(top_suppliers, 1):
count = supplier_counts[supplier]
avg = amount / count if count > 0 else 0
response += f"\n{i}. **{supplier}** - β‚Ή{amount:,.2f} ({count} invoices, avg: β‚Ή{avg:,.2f})"
return response
def handle_semantic_search(self, query: str) -> str:
"""Handle semantic search queries"""
try:
results = self.processor.vector_store.semantic_search(query, top_k=5)
if not results:
return f"No relevant results found for '{query}'. Try different keywords."
response = f"πŸ” **Semantic Search Results for '{query}'**\n\n"
for i, result in enumerate(results, 1):
response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n"
response += f" β€’ Similarity: {result.similarity_score:.3f}\n"
response += f" β€’ Amount: β‚Ή{result.metadata.get('amount', 0):,.2f}\n"
response += f" β€’ Preview: {result.content_preview[:100]}...\n\n"
return response
except Exception as e:
return f"Semantic search error: {e}"
def handle_general_query(self, data: dict, query: str) -> str:
"""Handle general queries with keyword search"""
invoices = data.get("invoices", [])
query_words = query.lower().split()
# Simple keyword matching
matching_invoices = []
for inv in invoices:
text_to_search = (
inv.get('supplier_name', '') + ' ' +
inv.get('buyer_name', '') + ' ' +
inv.get('product_description', '') + ' ' +
inv.get('extraction_info', {}).get('raw_text_preview', '')
).lower()
if any(word in text_to_search for word in query_words):
matching_invoices.append(inv)
if not matching_invoices:
return f"No invoices found matching '{query}'. Try different keywords or check the summary."
response = f"πŸ” **Found {len(matching_invoices)} invoices matching '{query}'**\n\n"
for i, inv in enumerate(matching_invoices[:5], 1):
response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n"
response += f" β€’ Amount: β‚Ή{inv.get('amount', 0):,.2f}\n"
response += f" β€’ Date: {inv.get('date', 'N/A')}\n\n"
if len(matching_invoices) > 5:
response += f"... and {len(matching_invoices) - 5} more results."
return response
# ===============================================================================
# STREAMLIT APPLICATION
# ===============================================================================
def create_app():
"""Main Streamlit application"""
# Generate unique session ID for this run
if 'session_id' not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())[:8]
session_id = st.session_state.session_id
# Custom CSS
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
text-align: center;
color: #FF6B35;
margin-bottom: 1rem;
}
.feature-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 1rem;
border-radius: 10px;
color: white;
margin: 0.5rem 0;
text-align: center;
}
.status-ok { color: #28a745; font-weight: bold; }
.status-warning { color: #ffc107; font-weight: bold; }
.status-error { color: #dc3545; font-weight: bold; }
</style>
""", unsafe_allow_html=True)
# Header
st.markdown('<h1 class="main-header">πŸ“„ AI Invoice Processing System</h1>', unsafe_allow_html=True)
st.markdown("""
<div style="text-align: center; margin-bottom: 2rem;">
<p style="font-size: 1.1rem; color: #666;">
AI-Powered Document Processing β€’ Semantic Search β€’ Smart Analytics β€’ Hugging Face Spaces
</p>
</div>
""", unsafe_allow_html=True)
# Initialize processor
if 'processor' not in st.session_state:
with st.spinner("πŸ”§ Initializing AI Invoice Processor..."):
try:
st.session_state.processor = InvoiceProcessor()
st.session_state.chatbot = ChatBot(st.session_state.processor)
st.session_state.chat_history = []
st.success("βœ… System initialized successfully!")
except Exception as e:
st.error(f"❌ Initialization failed: {e}")
st.stop()
# Sidebar
with st.sidebar:
st.header("πŸŽ›οΈ System Status")
processor = st.session_state.processor
# Component status
if processor.document_processor.processors:
st.markdown('<span class="status-ok">βœ… Document Processing</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-error">❌ Document Processing</span>', unsafe_allow_html=True)
if processor.ai_extractor.use_transformers:
st.markdown('<span class="status-ok">βœ… AI Extraction</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-warning">⚠️ Regex Extraction</span>', unsafe_allow_html=True)
if processor.vector_store and processor.vector_store.embedding_model:
st.markdown('<span class="status-ok">βœ… Semantic Search</span>', unsafe_allow_html=True)
else:
st.markdown('<span class="status-warning">⚠️ Keyword Search Only</span>', unsafe_allow_html=True)
# Quick stats
st.header("πŸ“Š Quick Stats")
try:
data = processor.load_json_data()
total_invoices = len(data.get("invoices", []))
total_amount = data.get("summary", {}).get("total_amount", 0)
st.metric("Total Invoices", total_invoices)
st.metric("Total Value", f"β‚Ή{total_amount:,.2f}")
st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}")
except Exception as e:
st.error(f"Stats error: {e}")
# System info
st.header("βš™οΈ System Info")
st.info(f"""
**Session ID:** {session_id}
**Limits:**
β€’ Max file size: 10MB
β€’ Max concurrent files: 3
β€’ Timeout: 30s
""")
# Main navigation
selected_tab = st.radio(
"Choose a section:",
["πŸ“€ Upload & Process", "πŸ’¬ AI Chat", "πŸ“Š Analytics", "πŸ“‹ Data Explorer"],
horizontal=True,
key=f"main_navigation_{session_id}"
)
# -------------------------------------------------------------------------
# UPLOAD & PROCESS SECTION
# -------------------------------------------------------------------------
if selected_tab == "πŸ“€ Upload & Process":
st.header("πŸ“€ Upload Invoice Documents")
# Feature highlights
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div class="feature-box">
<h4>πŸ€– AI Extraction</h4>
<p>Advanced NLP models extract structured data automatically</p>
</div>
""", unsafe_allow_html=True)
with col2:
st.markdown("""
<div class="feature-box">
<h4>πŸ” Smart Search</h4>
<p>Semantic search finds invoices using natural language</p>
</div>
""", unsafe_allow_html=True)
with col3:
st.markdown("""
<div class="feature-box">
<h4>πŸ“Š Analytics</h4>
<p>Comprehensive insights and visualizations</p>
</div>
""", unsafe_allow_html=True)
# File upload
st.markdown("### πŸ“ Upload Your Invoices")
# Initialize session state for files if not exists
if f'uploaded_files_{session_id}' not in st.session_state:
st.session_state[f'uploaded_files_{session_id}'] = None
if f'processing_complete_{session_id}' not in st.session_state:
st.session_state[f'processing_complete_{session_id}'] = False
if f'currently_processing_{session_id}' not in st.session_state:
st.session_state[f'currently_processing_{session_id}'] = False
if f'processed_file_hashes_{session_id}' not in st.session_state:
st.session_state[f'processed_file_hashes_{session_id}'] = set()
# File uploader with stable key
uploaded_files = st.file_uploader(
"Choose invoice files (PDF, TXT supported)",
type=['pdf', 'txt'],
accept_multiple_files=True,
help="Maximum file size: 10MB per file",
key=f"file_uploader_stable_{session_id}"
)
# Store uploaded files in session state only if they're new
if uploaded_files:
# Create file hashes to detect if files have changed
current_file_hashes = set()
for file in uploaded_files:
file_hash = hash((file.name, file.size))
current_file_hashes.add(file_hash)
# Check if files have changed
stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set())
if current_file_hashes != stored_hashes:
st.session_state[f'uploaded_files_{session_id}'] = uploaded_files
st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes
st.session_state[f'processing_complete_{session_id}'] = False
st.session_state[f'currently_processing_{session_id}'] = False
st.info("πŸ“„ New files detected - ready for processing")
# Get files from session state
current_files = st.session_state[f'uploaded_files_{session_id}']
is_processing = st.session_state[f'currently_processing_{session_id}']
is_complete = st.session_state[f'processing_complete_{session_id}']
if current_files:
max_files = 3
if len(current_files) > max_files:
st.warning(f"⚠️ Too many files selected. Processing first {max_files} files.")
current_files = current_files[:max_files]
st.info(f"πŸ“Š {len(current_files)} files selected")
# Show file names
st.markdown("**Selected Files:**")
for i, file in enumerate(current_files, 1):
file_size_mb = len(file.getvalue()) / (1024 * 1024)
file_hash = hash((file.name, file.size))
processed_icon = "βœ…" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "πŸ“„"
st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)")
# Process button - only show if not currently processing
col1, col2 = st.columns([1, 1])
with col1:
if not is_processing and not is_complete:
if st.button("πŸš€ Process Files", type="primary", key=f"process_btn_{session_id}"):
st.session_state[f'currently_processing_{session_id}'] = True
st.rerun()
elif is_processing:
st.info("πŸ”„ Processing in progress...")
# Actually process the files here
process_files_once(current_files, session_id)
elif is_complete:
st.success("βœ… Processing completed!")
if st.button("πŸ”„ Process Again", key=f"reprocess_btn_{session_id}"):
st.session_state[f'processing_complete_{session_id}'] = False
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processed_file_hashes_{session_id}'] = set()
st.rerun()
with col2:
if st.button("πŸ—‘οΈ Clear Files", key=f"clear_files_{session_id}"):
st.session_state[f'uploaded_files_{session_id}'] = None
st.session_state[f'uploaded_file_hashes_{session_id}'] = set()
st.session_state[f'processing_complete_{session_id}'] = False
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processed_file_hashes_{session_id}'] = set()
st.rerun()
else:
st.info("πŸ‘† Please select invoice files to upload and process")
# Show processing results if completed
if is_complete:
st.markdown("### πŸ“‹ Recent Processing Results")
try:
data = st.session_state.processor.load_json_data()
recent_invoices = sorted(
data.get("invoices", []),
key=lambda x: x.get('timestamps', {}).get('created_at', ''),
reverse=True
)[:5]
if recent_invoices:
for i, inv in enumerate(recent_invoices, 1):
with st.expander(f"πŸ“„ {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}")
st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}")
st.write(f"**Amount:** β‚Ή{inv.get('amount', 0):.2f}")
with col2:
st.write(f"**Date:** {inv.get('date', 'N/A')}")
st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}")
st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}")
else:
st.info("No recent processing results found.")
except Exception as e:
st.error(f"Error loading recent results: {e}")
# -------------------------------------------------------------------------
# AI CHAT SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ’¬ AI Chat":
st.header("πŸ’¬ AI Chat Interface")
# Display chat history
if st.session_state.chat_history:
st.markdown("### πŸ’¬ Chat History")
for i, message in enumerate(st.session_state.chat_history):
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
st.markdown("### ✍️ Ask a Question")
col1, col2 = st.columns([4, 1])
with col1:
user_input = st.text_input(
"Type your question:",
placeholder="e.g., 'show me total spending'",
key=f"chat_input_{session_id}"
)
with col2:
ask_btn = st.button("πŸš€ Ask", type="primary", key=f"ask_btn_{session_id}")
if ask_btn and user_input:
handle_chat_query(user_input)
# Suggested queries
if not st.session_state.chat_history:
st.markdown("### πŸ’‘ Try These Queries")
col1, col2 = st.columns(2)
with col1:
st.markdown("**πŸ“Š Basic Queries:**")
basic_queries = [
"Show me a summary of all invoices",
"How much have we spent in total?",
"Who are our top suppliers?",
"Find invoices with high amounts"
]
for i, query in enumerate(basic_queries):
if st.button(query, key=f"basic_{session_id}_{i}"):
handle_chat_query(query)
with col2:
st.markdown("**πŸ” Advanced Queries:**")
advanced_queries = [
"Find technology purchases",
"Show office supplies",
"Search consulting services",
"Recent high-value invoices"
]
for i, query in enumerate(advanced_queries):
if st.button(query, key=f"advanced_{session_id}_{i}"):
handle_chat_query(query)
# Clear chat
if st.session_state.chat_history:
if st.button("πŸ—‘οΈ Clear Chat", key=f"clear_chat_{session_id}"):
st.session_state.chat_history = []
st.rerun()
# -------------------------------------------------------------------------
# ANALYTICS SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ“Š Analytics":
st.header("πŸ“Š Analytics Dashboard")
try:
data = st.session_state.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
st.info("πŸ“Š No data available. Upload some invoices to see analytics.")
return
# Convert to DataFrame
df_data = []
for inv in invoices:
df_data.append({
'invoice_number': inv.get('invoice_number', ''),
'supplier_name': inv.get('supplier_name', ''),
'amount': inv.get('amount', 0),
'date': inv.get('date', ''),
'confidence': inv.get('extraction_info', {}).get('confidence', 0)
})
df = pd.DataFrame(df_data)
# Key metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Invoices", len(df))
with col2:
st.metric("Total Amount", f"β‚Ή{df['amount'].sum():,.2f}")
with col3:
st.metric("Avg Amount", f"β‚Ή{df['amount'].mean():,.2f}")
with col4:
st.metric("Unique Suppliers", df['supplier_name'].nunique())
# Visualizations
if len(df) > 0:
# Amount distribution
fig_hist = px.histogram(
df,
x='amount',
title="Invoice Amount Distribution",
labels={'amount': 'Amount (β‚Ή)', 'count': 'Number of Invoices'}
)
st.plotly_chart(fig_hist, use_container_width=True)
# Top suppliers
if df['supplier_name'].notna().any():
supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10)
if len(supplier_amounts) > 0:
fig_suppliers = px.bar(
x=supplier_amounts.values,
y=supplier_amounts.index,
orientation='h',
title="Top 10 Suppliers by Total Amount",
labels={'x': 'Total Amount (β‚Ή)', 'y': 'Supplier'}
)
st.plotly_chart(fig_suppliers, use_container_width=True)
except Exception as e:
st.error(f"Analytics error: {e}")
# -------------------------------------------------------------------------
# DATA EXPLORER SECTION
# -------------------------------------------------------------------------
elif selected_tab == "πŸ“‹ Data Explorer":
st.header("πŸ“‹ Data Explorer")
try:
data = st.session_state.processor.load_json_data()
invoices = data.get("invoices", [])
if not invoices:
st.info("πŸ“Š No data available. Upload some invoices first.")
return
# Convert to DataFrame
df_data = []
for inv in invoices:
df_data.append({
'Invoice Number': inv.get('invoice_number', ''),
'Supplier': inv.get('supplier_name', ''),
'Buyer': inv.get('buyer_name', ''),
'Amount': inv.get('amount', 0),
'Date': inv.get('date', ''),
'Confidence': inv.get('extraction_info', {}).get('confidence', 0),
'Method': inv.get('extraction_info', {}).get('method', ''),
'File': inv.get('file_info', {}).get('file_name', ''),
'Created': inv.get('timestamps', {}).get('created_at', '')[:19]
})
df = pd.DataFrame(df_data)
# Filters
col1, col2, col3 = st.columns(3)
with col1:
suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist())
selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}")
with col2:
methods = ['All'] + sorted(df['Method'].dropna().unique().tolist())
selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}")
with col3:
min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}")
# Apply filters
filtered_df = df.copy()
if selected_supplier != 'All':
filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier]
if selected_method != 'All':
filtered_df = filtered_df[filtered_df['Method'] == selected_method]
if min_amount > 0:
filtered_df = filtered_df[filtered_df['Amount'] >= min_amount]
# Display data
st.dataframe(
filtered_df,
use_container_width=True,
column_config={
"Amount": st.column_config.NumberColumn("Amount", format="β‚Ή%.2f"),
"Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1)
}
)
# Export options
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ“₯ Export CSV", key=f"export_csv_{session_id}"):
csv_data = filtered_df.to_csv(index=False)
st.download_button(
"Download CSV",
csv_data,
f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv",
"text/csv",
key=f"download_csv_{session_id}"
)
with col2:
if st.button("πŸ“„ Export JSON", key=f"export_json_{session_id}"):
filtered_invoices = [inv for inv in invoices
if inv.get('invoice_number') in filtered_df['Invoice Number'].values]
export_data = {
"exported_at": datetime.now().isoformat(),
"total_records": len(filtered_invoices),
"invoices": filtered_invoices
}
st.download_button(
"Download JSON",
json.dumps(export_data, indent=2),
f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json",
"application/json",
key=f"download_json_{session_id}"
)
except Exception as e:
st.error(f"Data explorer error: {e}")
# -------------------------------------------------------------------------
# GLOBAL CHAT INPUT
# -------------------------------------------------------------------------
st.markdown("---")
st.markdown("### πŸ’¬ Quick Chat (Works from any section)")
global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}")
if global_query:
handle_chat_query(global_query, show_response=True)
# Footer
st.markdown("---")
st.markdown("""
<div style="text-align: center; color: #666;">
<p>πŸš€ <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p>
<p>Built with ❀️ using Streamlit, Transformers, and AI</p>
</div>
""", unsafe_allow_html=True)
# ===============================================================================
# HELPER FUNCTIONS
# ===============================================================================
def process_files_once(uploaded_files, session_id):
"""Process uploaded files only once with proper state management"""
if not uploaded_files:
st.error("No files to process!")
st.session_state[f'currently_processing_{session_id}'] = False
return
st.markdown("### πŸ”„ Processing Files...")
# Get already processed file hashes
processed_hashes = st.session_state[f'processed_file_hashes_{session_id}']
# Filter out already processed files
files_to_process = []
for file in uploaded_files:
file_hash = hash((file.name, file.size))
if file_hash not in processed_hashes:
files_to_process.append((file, file_hash))
if not files_to_process:
st.info("βœ… All files have already been processed!")
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processing_complete_{session_id}'] = True
return
# Create containers for dynamic updates
progress_container = st.container()
status_container = st.container()
results_container = st.container()
successful = 0
failed = 0
# Show progress
with progress_container:
progress_bar = st.progress(0)
progress_text = st.empty()
with status_container:
st.info(f"Starting to process {len(files_to_process)} new files...")
# Process each file only once
for i, (uploaded_file, file_hash) in enumerate(files_to_process):
current_progress = (i + 1) / len(files_to_process)
with progress_container:
progress_bar.progress(current_progress)
progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}")
with status_container:
st.info(f"πŸ”„ Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)")
try:
# Process the file
result = st.session_state.processor.process_uploaded_file(uploaded_file)
# Mark file as processed regardless of result
processed_hashes.add(file_hash)
# Show result immediately
with results_container:
if result and hasattr(result, 'invoice_number') and result.invoice_number:
successful += 1
st.success(f"βœ… Successfully processed: {uploaded_file.name}")
# Show extracted data
col1, col2, col3 = st.columns(3)
with col1:
st.write(f"**Invoice #:** {result.invoice_number}")
st.write(f"**Supplier:** {result.supplier_name or 'Not found'}")
with col2:
st.write(f"**Amount:** β‚Ή{result.amount:.2f}")
st.write(f"**Date:** {result.date or 'Not found'}")
with col3:
st.write(f"**Method:** {result.processing_method}")
st.write(f"**Confidence:** {result.extraction_confidence:.1%}")
st.markdown("---")
else:
failed += 1
st.warning(f"⚠️ Could not extract complete data from: {uploaded_file.name}")
if result:
st.write(f"Partial data: {result.supplier_name}, β‚Ή{result.amount}")
st.markdown("---")
except Exception as e:
failed += 1
# Still mark as processed to avoid reprocessing
processed_hashes.add(file_hash)
with results_container:
st.error(f"❌ Error processing {uploaded_file.name}: {str(e)}")
st.markdown("---")
# Update session state
st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes
# Final summary
with progress_container:
progress_bar.progress(1.0)
progress_text.text("βœ… Processing completed!")
with status_container:
if successful > 0:
st.success(f"πŸŽ‰ Processing complete! {successful} successful, {failed} failed")
if successful > 0:
st.balloons()
else:
st.error(f"❌ Processing failed for all {failed} files. Please check file formats and content.")
# Update processing state
st.session_state[f'currently_processing_{session_id}'] = False
st.session_state[f'processing_complete_{session_id}'] = True
# Force rerun to update UI
st.rerun()
def process_files(uploaded_files, session_id):
"""Legacy function - redirect to process_files_once"""
return process_files_once(uploaded_files, session_id)
def handle_chat_query(query, show_response=False):
"""Handle chat query"""
st.session_state.chat_history.append({
"role": "user",
"content": query,
"timestamp": datetime.now()
})
try:
with st.spinner("πŸ€– AI is analyzing..."):
response = st.session_state.chatbot.query_database(query)
st.session_state.chat_history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.now()
})
if show_response:
with st.chat_message("assistant"):
st.markdown(response)
st.info("πŸ’‘ Switch to the 'AI Chat' section to see full conversation history!")
st.rerun()
except Exception as e:
st.error(f"Chat error: {e}")
# ===============================================================================
# MAIN ENTRY POINT
# ===============================================================================
def main():
"""Main entry point for Hugging Face Spaces"""
try:
if IS_HF_SPACE:
st.sidebar.info("πŸ€— Running on Hugging Face Spaces")
create_app()
except Exception as e:
st.error(f"""
## 🚨 Application Error
{e}
Please refresh the page or check the logs for more details.
""")
if __name__ == "__main__":
main()