#!/usr/bin/env python3 """ AI Invoice Processing System - Complete Single File for Hugging Face Spaces A comprehensive system with AI-powered extraction, semantic search, and analytics. Author: AI Assistant Date: 2024 Version: HuggingFace Single File v1.0 """ # =============================================================================== # IMPORTS AND COMPATIBILITY CHECKS # =============================================================================== import os import json import re import tempfile import shutil import pickle import numpy as np from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from pathlib import Path import time import logging import uuid # Check if running on Hugging Face Spaces IS_HF_SPACE = os.getenv("SPACE_ID") is not None # Get Hugging Face token from environment or Streamlit secrets HF_TOKEN = None try: # Try Streamlit secrets first (for HF Spaces) HF_TOKEN = st.secrets.get("HF_TOKEN", None) except: # Fall back to environment variable HF_TOKEN = os.getenv("HF_TOKEN", None) # Streamlit and core libraries import streamlit as st import sqlite3 import pandas as pd import plotly.express as px import plotly.graph_objects as go import requests # Vector storage and embeddings (with fallbacks) try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False st.warning("โš ๏ธ FAISS not available. Vector search will be disabled.") try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False st.warning("โš ๏ธ Sentence Transformers not available. Using fallback methods.") try: import torch TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False # Document processing (simplified for HF) try: import pdfplumber PDF_PROCESSING_AVAILABLE = True PDF_PROCESSOR = "pdfplumber" except ImportError: try: import PyPDF2 PDF_PROCESSING_AVAILABLE = True PDF_PROCESSOR = "PyPDF2" except ImportError: PDF_PROCESSING_AVAILABLE = False PDF_PROCESSOR = None # =============================================================================== # STREAMLIT CONFIGURATION # =============================================================================== st.set_page_config( page_title="AI Invoice Processing System", page_icon="๐Ÿ“„", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://huggingface.co/spaces', 'Report a bug': 'https://huggingface.co/spaces', 'About': """ # AI Invoice Processing System Built for Hugging Face Spaces with AI-powered extraction and semantic search. """ } ) # =============================================================================== # CONFIGURATION # =============================================================================== HF_CONFIG = { "max_file_size_mb": 10, "max_concurrent_files": 3, "timeout_seconds": 30, "use_cpu_only": True, "embedding_model": "all-MiniLM-L6-v2", "cache_dir": "./cache", "data_dir": "./data", "enable_ollama": False, } # Create necessary directories os.makedirs(HF_CONFIG["cache_dir"], exist_ok=True) os.makedirs(HF_CONFIG["data_dir"], exist_ok=True) # =============================================================================== # DATA STRUCTURES # =============================================================================== @dataclass class InvoiceData: """Data structure for extracted invoice information""" supplier_name: str = "" buyer_name: str = "" invoice_number: str = "" date: str = "" amount: float = 0.0 quantity: int = 0 product_description: str = "" file_path: str = "" extraction_confidence: float = 0.0 processing_method: str = "regex" @dataclass class VectorSearchResult: """Data structure for vector search results""" invoice_id: str invoice_number: str supplier_name: str similarity_score: float content_preview: str metadata: Dict # =============================================================================== # DOCUMENT PROCESSING CLASSES # =============================================================================== class DocumentProcessor: """Simplified document processor for Hugging Face Spaces""" def __init__(self): self.setup_processors() def setup_processors(self): """Setup available document processors""" self.processors = {} # PDF processing if PDF_PROCESSING_AVAILABLE: if PDF_PROCESSOR == "pdfplumber": self.processors['pdf'] = self.extract_with_pdfplumber st.success("โœ… PDF processing available (pdfplumber)") elif PDF_PROCESSOR == "PyPDF2": self.processors['pdf'] = self.extract_with_pypdf2 st.success("โœ… PDF processing available (PyPDF2)") else: st.warning("โš ๏ธ No PDF processor available") # Text files self.processors['txt'] = self.extract_text_file def extract_with_pdfplumber(self, file_path: str) -> str: """Extract text using pdfplumber""" try: import pdfplumber text = "" with pdfplumber.open(file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" return text except Exception as e: st.error(f"PDF extraction failed: {e}") return "" def extract_with_pypdf2(self, file_path: str) -> str: """Extract text using PyPDF2""" try: import PyPDF2 text = "" with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() + "\n" return text except Exception as e: st.error(f"PDF extraction failed: {e}") return "" def extract_text_file(self, file_path: str) -> str: """Extract text from text files""" try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: st.error(f"Text file extraction failed: {e}") return "" def extract_text_from_document(self, file_path: str) -> str: """Extract text from document based on file type""" file_ext = Path(file_path).suffix.lower() if file_ext == '.pdf': processor = self.processors.get('pdf') elif file_ext == '.txt': processor = self.processors.get('txt') else: st.warning(f"Unsupported file type: {file_ext}") return "" if processor: return processor(file_path) else: st.error(f"No processor available for {file_ext}") return "" # =============================================================================== # AI EXTRACTION CLASS # =============================================================================== class AIExtractor: """AI extraction for Hugging Face Spaces with Mistral 7B support""" def __init__(self): self.use_mistral = self.setup_mistral() self.use_transformers = self.setup_transformers() if not self.use_mistral else False def setup_mistral(self): """Try to setup Mistral 7B model with proper authentication""" try: # Check if we have HF token if not HF_TOKEN: st.warning("โš ๏ธ Hugging Face token not found. Add HF_TOKEN to secrets for Mistral access.") return False # Check if we're in a high-resource environment import psutil memory_gb = psutil.virtual_memory().total / (1024**3) if memory_gb < 8: st.warning("โš ๏ธ Insufficient memory for Mistral 7B. Using lighter models.") return False from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline from huggingface_hub import login # Login with HF token login(token=HF_TOKEN) with st.spinner("๐Ÿ”„ Loading Mistral 7B model (this may take a few minutes)..."): # Use the instruction-tuned model model_name = "mistralai/Mistral-7B-Instruct-v0.1" # Load with reduced precision for memory efficiency self.mistral_tokenizer = AutoTokenizer.from_pretrained( model_name, cache_dir=HF_CONFIG["cache_dir"], token=HF_TOKEN ) self.mistral_model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if TORCH_AVAILABLE else None, device_map="auto" if TORCH_AVAILABLE else None, load_in_8bit=True, # Use 8-bit quantization cache_dir=HF_CONFIG["cache_dir"], token=HF_TOKEN ) # Create pipeline self.mistral_pipeline = pipeline( "text-generation", model=self.mistral_model, tokenizer=self.mistral_tokenizer, torch_dtype=torch.float16 if TORCH_AVAILABLE else None, device_map="auto" if TORCH_AVAILABLE else None ) st.success("โœ… Mistral 7B model loaded successfully!") return True except ImportError as e: st.warning(f"โš ๏ธ Missing dependencies for Mistral 7B: {e}") return False except Exception as e: st.warning(f"โš ๏ธ Mistral 7B not available: {e}") st.info("๐Ÿ’ก To use Mistral 7B: Add your Hugging Face token to secrets as 'HF_TOKEN'") return False def setup_transformers(self): """Fallback to lighter NER model""" try: from transformers import pipeline with st.spinner("Loading fallback AI model..."): self.ner_pipeline = pipeline( "ner", model="dbmdz/bert-large-cased-finetuned-conll03-english", aggregation_strategy="simple" ) st.success("โœ… Fallback AI extraction model loaded") return True except Exception as e: st.warning(f"โš ๏ธ AI extraction not available: {e}") return False def extract_with_mistral(self, text: str) -> InvoiceData: """Extract invoice data using Mistral 7B""" try: # Create a detailed prompt for Mistral prompt = f"""[INST] You are an expert at extracting structured information from invoices. Extract the following information from this invoice text and respond ONLY with valid JSON: {{ "invoice_number": "invoice or bill number", "supplier_name": "company providing goods/services", "buyer_name": "company receiving goods/services", "date": "date in YYYY-MM-DD format", "amount": "total amount as number only", "quantity": "total quantity as integer", "product_description": "brief description of items/services" }} Invoice text: {text[:2000]} Respond with JSON only: [/INST]""" # Generate response response = self.mistral_pipeline( prompt, max_new_tokens=300, temperature=0.1, do_sample=True, pad_token_id=self.mistral_tokenizer.eos_token_id ) # Extract the generated text generated_text = response[0]['generated_text'] # Find JSON in the response json_start = generated_text.find('{') json_end = generated_text.rfind('}') + 1 if json_start != -1 and json_end > json_start: json_str = generated_text[json_start:json_end] # Parse JSON import json data = json.loads(json_str) # Create InvoiceData object invoice_data = InvoiceData() invoice_data.supplier_name = str(data.get('supplier_name', '')).strip() invoice_data.buyer_name = str(data.get('buyer_name', '')).strip() invoice_data.invoice_number = str(data.get('invoice_number', '')).strip() invoice_data.date = self.parse_date(str(data.get('date', ''))) # Parse amount try: amount_val = data.get('amount', 0) if isinstance(amount_val, str): amount_clean = re.sub(r'[^\d.]', '', amount_val) invoice_data.amount = float(amount_clean) if amount_clean else 0.0 else: invoice_data.amount = float(amount_val) except: invoice_data.amount = 0.0 # Parse quantity try: qty_val = data.get('quantity', 0) invoice_data.quantity = int(float(str(qty_val).replace(',', ''))) except: invoice_data.quantity = 0 invoice_data.product_description = str(data.get('product_description', '')).strip() invoice_data.extraction_confidence = 0.95 # High confidence for Mistral invoice_data.processing_method = "mistral_7b" return invoice_data else: st.warning("โš ๏ธ Mistral response didn't contain valid JSON, falling back to regex") return self.extract_with_regex(text) except Exception as e: st.error(f"Mistral extraction failed: {e}") return self.extract_with_regex(text) def extract_with_ai(self, text: str) -> InvoiceData: """Extract invoice data using available AI method""" if self.use_mistral: st.info("๐Ÿค– Using Mistral 7B for extraction...") return self.extract_with_mistral(text) elif self.use_transformers: st.info("๐Ÿค– Using NER model for extraction...") return self.extract_with_ner(text) else: st.info("๐Ÿ”ง Using regex extraction...") return self.extract_with_regex(text) def extract_with_ner(self, text: str) -> InvoiceData: """Extract using NER model (fallback method)""" try: # Use NER to extract entities entities = self.ner_pipeline(text[:512]) # Limit text length invoice_data = InvoiceData() invoice_data.processing_method = "ai_ner" # Extract specific entities for entity in entities: entity_text = entity['word'].replace('##', '') if entity['entity_group'] == 'ORG': if not invoice_data.supplier_name: invoice_data.supplier_name = entity_text elif not invoice_data.buyer_name: invoice_data.buyer_name = entity_text elif entity['entity_group'] == 'MISC': if not invoice_data.invoice_number and any(c.isdigit() for c in entity_text): invoice_data.invoice_number = entity_text # Fall back to regex for missing fields regex_data = self.extract_with_regex(text) # Combine results if not invoice_data.invoice_number: invoice_data.invoice_number = regex_data.invoice_number if not invoice_data.amount: invoice_data.amount = regex_data.amount if not invoice_data.date: invoice_data.date = regex_data.date if not invoice_data.quantity: invoice_data.quantity = regex_data.quantity invoice_data.extraction_confidence = 0.8 return invoice_data except Exception as e: st.error(f"NER extraction failed: {e}") return self.extract_with_regex(text) def extract_with_regex(self, text: str) -> InvoiceData: """Enhanced regex extraction with better amount detection""" invoice_data = InvoiceData() invoice_data.processing_method = "regex" # Enhanced regex patterns with more comprehensive matching patterns = { 'invoice_number': [ r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', r'bill\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', r'inv\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', r'ref\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', r'#\s*([A-Z0-9\-_/]{3,})', r'(?:^|\s)([A-Z]{2,}\d{3,}|\d{3,}[A-Z]{2,})', # Common patterns like ABC123 or 123ABC ], 'amount': [ # Currency symbols with amounts r'total\s*(?:amount)?\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', r'amount\s*(?:due|paid|total)?\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', r'grand\s*total\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', r'net\s*(?:amount|total)\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', r'sub\s*total\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', # Currency symbols at the beginning r'[\$โ‚นยฃโ‚ฌ]\s*([0-9,]+\.?\d*)', # Amounts at end of lines (common in invoices) r'([0-9,]+\.?\d*)\s*[\$โ‚นยฃโ‚ฌ]?\s* def parse_date(self, date_str: str) -> str: """Parse date to YYYY-MM-DD format""" if not date_str: return "" formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) return parsed_date.strftime('%Y-%m-%d') except ValueError: continue return date_str # =============================================================================== # VECTOR STORE CLASS # =============================================================================== class VectorStore: """Simplified vector store for Hugging Face Spaces""" def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): self.embedding_model_name = embedding_model self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") self.embedding_model = None self.vectors = [] self.document_metadata = [] self.embedding_dimension = None self.setup_embedding_model() self.load_vector_store() def setup_embedding_model(self): """Initialize the sentence transformer model""" if not SENTENCE_TRANSFORMERS_AVAILABLE: st.warning("โš ๏ธ Sentence Transformers not available. Vector search disabled.") return try: with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): self.embedding_model = SentenceTransformer( self.embedding_model_name, cache_folder=HF_CONFIG["cache_dir"] ) # Get embedding dimension test_embedding = self.embedding_model.encode(["test"]) self.embedding_dimension = test_embedding.shape[0] st.success(f"โœ… Embedding model loaded: {self.embedding_model_name}") except Exception as e: st.error(f"โŒ Failed to load embedding model: {e}") self.embedding_model = None def load_vector_store(self): """Load existing vector store""" try: if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): with open(self.vector_store_path, 'rb') as f: self.vectors = pickle.load(f) with open(self.metadata_path, 'rb') as f: self.document_metadata = pickle.load(f) st.success(f"โœ… Vector store loaded: {len(self.document_metadata)} documents") else: self.vectors = [] self.document_metadata = [] st.info("๐Ÿ“„ New vector store initialized") except Exception as e: st.error(f"โŒ Error loading vector store: {e}") self.vectors = [] self.document_metadata = [] def save_vector_store(self): """Save vector store to disk""" try: with open(self.vector_store_path, 'wb') as f: pickle.dump(self.vectors, f) with open(self.metadata_path, 'wb') as f: pickle.dump(self.document_metadata, f) return True except Exception as e: st.error(f"Error saving vector store: {e}") return False def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: """Create searchable text from invoice data""" text_parts = [] for field, value in invoice_data.items(): if value and field != 'id': text_parts.append(f"{field}: {value}") if raw_text: text_parts.append(f"content: {raw_text[:300]}") return " | ".join(text_parts) def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: """Add a document to the vector store""" if not self.embedding_model: return False try: document_text = self.create_document_text(invoice_data, raw_text) # Generate embedding embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) # Create metadata metadata = { 'invoice_id': invoice_data.get('id', ''), 'invoice_number': invoice_data.get('invoice_number', ''), 'supplier_name': invoice_data.get('supplier_name', ''), 'buyer_name': invoice_data.get('buyer_name', ''), 'amount': invoice_data.get('amount', 0), 'date': invoice_data.get('date', ''), 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), 'document_text': document_text[:200], 'timestamp': datetime.now().isoformat() } # Add to store self.vectors.append(embedding) self.document_metadata.append(metadata) return True except Exception as e: st.error(f"Error adding document to vector store: {e}") return False def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: """Perform semantic search using cosine similarity""" if not self.embedding_model or not self.vectors: return [] try: # Generate query embedding query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) # Calculate similarities similarities = [] for i, doc_embedding in enumerate(self.vectors): similarity = np.dot(query_embedding, doc_embedding) similarities.append((similarity, i)) # Sort by similarity similarities.sort(reverse=True) # Return top results results = [] for similarity, idx in similarities[:top_k]: if similarity > 0.1: # Relevance threshold metadata = self.document_metadata[idx] result = VectorSearchResult( invoice_id=metadata.get('invoice_id', ''), invoice_number=metadata.get('invoice_number', ''), supplier_name=metadata.get('supplier_name', ''), similarity_score=float(similarity), content_preview=metadata.get('document_text', ''), metadata=metadata ) results.append(result) return results except Exception as e: st.error(f"Error in semantic search: {e}") return [] # =============================================================================== # MAIN PROCESSOR CLASS # =============================================================================== class InvoiceProcessor: """Main invoice processor for Hugging Face Spaces""" def __init__(self): self.setup_storage() self.document_processor = DocumentProcessor() self.ai_extractor = AIExtractor() self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None # Initialize stats self.processing_stats = { 'total_processed': 0, 'successful': 0, 'failed': 0, 'start_time': datetime.now() } def setup_storage(self): """Setup storage paths""" self.data_dir = HF_CONFIG["data_dir"] self.json_path = os.path.join(self.data_dir, "invoices.json") # Initialize JSON storage if not os.path.exists(self.json_path): initial_data = { "metadata": { "created_at": datetime.now().isoformat(), "version": "hf_v1.0", "total_invoices": 0 }, "invoices": [], "summary": { "total_amount": 0.0, "unique_suppliers": [], "processing_stats": {"successful": 0, "failed": 0} } } self.save_json_data(initial_data) def load_json_data(self) -> dict: """Load invoice data from JSON""" try: with open(self.json_path, 'r', encoding='utf-8') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): self.setup_storage() return self.load_json_data() def save_json_data(self, data: dict): """Save invoice data to JSON""" try: with open(self.json_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: st.error(f"Error saving data: {e}") def process_uploaded_file(self, uploaded_file) -> InvoiceData: """Process a single uploaded file with enhanced debugging""" self.processing_stats['total_processed'] += 1 try: # Debug file info file_size = len(uploaded_file.getvalue()) file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' st.info(f"๐Ÿ“„ Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") # Check file size if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" st.error(error_msg) self.processing_stats['failed'] += 1 return InvoiceData() # Check file type if file_extension not in ['pdf', 'txt']: error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" st.warning(error_msg) self.processing_stats['failed'] += 1 return InvoiceData() # Save temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: file_content = uploaded_file.getvalue() tmp_file.write(file_content) tmp_file_path = tmp_file.name st.info(f"๐Ÿ’พ Saved temporarily to: {tmp_file_path}") try: # Extract text st.info("๐Ÿ” Extracting text from document...") text = self.document_processor.extract_text_from_document(tmp_file_path) if not text or not text.strip(): st.warning(f"โŒ No text extracted from {uploaded_file.name}") self.processing_stats['failed'] += 1 return InvoiceData() text_length = len(text) st.info(f"๐Ÿ“ Extracted {text_length} characters of text") # Show text preview if text_length > 0: with st.expander("๐Ÿ“„ Text Preview (First 500 characters)", expanded=False): st.text(text[:500] + "..." if len(text) > 500 else text) # Extract invoice data st.info("๐Ÿค– Extracting invoice data using AI/Regex...") invoice_data = self.ai_extractor.extract_with_ai(text) invoice_data.file_path = uploaded_file.name # Show extraction results st.info(f"๐Ÿ“Š Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") # Save to storage st.info("๐Ÿ’พ Saving extracted data...") self.save_invoice_data(invoice_data, text, file_size) self.processing_stats['successful'] += 1 st.success(f"โœ… Successfully processed {uploaded_file.name}") return invoice_data finally: # Cleanup try: os.unlink(tmp_file_path) st.info("๐Ÿงน Cleaned up temporary file") except: pass except Exception as e: error_msg = f"Error processing {uploaded_file.name}: {str(e)}" st.error(error_msg) self.processing_stats['failed'] += 1 # Show detailed error for debugging with st.expander("๐Ÿ” Error Details", expanded=False): st.code(str(e)) import traceback st.code(traceback.format_exc()) return InvoiceData() def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): """Save invoice data to JSON and vector store""" try: # Load existing data data = self.load_json_data() # Create invoice record invoice_record = { "id": len(data["invoices"]) + 1, "invoice_number": invoice_data.invoice_number, "supplier_name": invoice_data.supplier_name, "buyer_name": invoice_data.buyer_name, "date": invoice_data.date, "amount": invoice_data.amount, "quantity": invoice_data.quantity, "product_description": invoice_data.product_description, "file_info": { "file_name": invoice_data.file_path, "file_size": file_size }, "extraction_info": { "confidence": invoice_data.extraction_confidence, "method": invoice_data.processing_method, "raw_text_preview": raw_text[:300] }, "timestamps": { "created_at": datetime.now().isoformat() } } # Add to invoices data["invoices"].append(invoice_record) # Update summary self.update_summary(data) # Save JSON self.save_json_data(data) # Add to vector store if self.vector_store: self.vector_store.add_document(invoice_record, raw_text) self.vector_store.save_vector_store() except Exception as e: st.error(f"Error saving invoice data: {e}") def update_summary(self, data: dict): """Update summary statistics""" invoices = data["invoices"] total_amount = sum(inv.get("amount", 0) for inv in invoices) unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) data["summary"] = { "total_amount": total_amount, "unique_suppliers": unique_suppliers, "processing_stats": { "successful": self.processing_stats['successful'], "failed": self.processing_stats['failed'], "total_processed": self.processing_stats['total_processed'] } } data["metadata"]["last_updated"] = datetime.now().isoformat() data["metadata"]["total_invoices"] = len(invoices) # =============================================================================== # CHATBOT CLASS # =============================================================================== class ChatBot: """Chatbot for invoice queries""" def __init__(self, processor: InvoiceProcessor): self.processor = processor def query_database(self, query: str) -> str: """Process user query and return response""" try: data = self.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: return "No invoice data found. Please upload some invoices first." query_lower = query.lower() # Handle different query types if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): return self.generate_summary(data) elif "count" in query_lower or "how many" in query_lower: return self.handle_count_query(data) elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): return self.handle_amount_query(data) elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): return self.handle_supplier_query(data, query) elif self.processor.vector_store: return self.handle_semantic_search(query) else: return self.handle_general_query(data, query) except Exception as e: return f"Error processing query: {e}" def generate_summary(self, data: dict) -> str: """Generate comprehensive summary""" invoices = data.get("invoices", []) summary = data.get("summary", {}) if not invoices: return "No invoices found in the system." total_amount = summary.get("total_amount", 0) avg_amount = total_amount / len(invoices) if invoices else 0 unique_suppliers = len(summary.get("unique_suppliers", [])) response = f""" **๐Ÿ“Š Invoice System Summary** โ€ข **Total Invoices**: {len(invoices):,} โ€ข **Total Value**: โ‚น{total_amount:,.2f} โ€ข **Average Invoice**: โ‚น{avg_amount:,.2f} โ€ข **Unique Suppliers**: {unique_suppliers} **๐Ÿ“ˆ Processing Stats** โ€ข **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} โ€ข **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} **๐Ÿ” Recent Invoices** """ # Show recent invoices recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] for i, inv in enumerate(recent, 1): response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (โ‚น{inv.get('amount', 0):,.2f})" return response def handle_count_query(self, data: dict) -> str: """Handle count-related queries""" invoices = data.get("invoices", []) total = len(invoices) unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) return f""" **๐Ÿ“Š Invoice Count Summary** โ€ข **Total Records**: {total} โ€ข **Unique Invoice Numbers**: {unique_numbers} โ€ข **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} **๐Ÿ“… Processing Timeline** โ€ข **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} โ€ข **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} """ def handle_amount_query(self, data: dict) -> str: """Handle amount-related queries""" invoices = data.get("invoices", []) amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] if not amounts: return "No amount information found in invoices." total_amount = sum(amounts) avg_amount = total_amount / len(amounts) max_amount = max(amounts) min_amount = min(amounts) # Find high-value invoices high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] response = f""" **๐Ÿ’ฐ Financial Analysis** โ€ข **Total Amount**: โ‚น{total_amount:,.2f} โ€ข **Average Amount**: โ‚น{avg_amount:,.2f} โ€ข **Highest Invoice**: โ‚น{max_amount:,.2f} โ€ข **Lowest Invoice**: โ‚น{min_amount:,.2f} **๐ŸŽฏ High-Value Invoices (โ‚น{high_value_threshold:,.2f}+)** """ for i, inv in enumerate(high_value_invoices[:5], 1): response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (โ‚น{inv.get('amount', 0):,.2f})" return response def handle_supplier_query(self, data: dict, query: str) -> str: """Handle supplier-related queries""" invoices = data.get("invoices", []) # Count invoices by supplier supplier_counts = {} supplier_amounts = {} for inv in invoices: supplier = inv.get('supplier_name', '').strip() if supplier: supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) if not supplier_counts: return "No supplier information found in invoices." # Sort suppliers by amount top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] response = f""" **๐Ÿข Supplier Analysis** โ€ข **Total Unique Suppliers**: {len(supplier_counts)} โ€ข **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) **๐Ÿ’ฐ Top Suppliers by Amount** """ for i, (supplier, amount) in enumerate(top_suppliers, 1): count = supplier_counts[supplier] avg = amount / count if count > 0 else 0 response += f"\n{i}. **{supplier}** - โ‚น{amount:,.2f} ({count} invoices, avg: โ‚น{avg:,.2f})" return response def handle_semantic_search(self, query: str) -> str: """Handle semantic search queries""" try: results = self.processor.vector_store.semantic_search(query, top_k=5) if not results: return f"No relevant results found for '{query}'. Try different keywords." response = f"๐Ÿ” **Semantic Search Results for '{query}'**\n\n" for i, result in enumerate(results, 1): response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" response += f" โ€ข Similarity: {result.similarity_score:.3f}\n" response += f" โ€ข Amount: โ‚น{result.metadata.get('amount', 0):,.2f}\n" response += f" โ€ข Preview: {result.content_preview[:100]}...\n\n" return response except Exception as e: return f"Semantic search error: {e}" def handle_general_query(self, data: dict, query: str) -> str: """Handle general queries with keyword search""" invoices = data.get("invoices", []) query_words = query.lower().split() # Simple keyword matching matching_invoices = [] for inv in invoices: text_to_search = ( inv.get('supplier_name', '') + ' ' + inv.get('buyer_name', '') + ' ' + inv.get('product_description', '') + ' ' + inv.get('extraction_info', {}).get('raw_text_preview', '') ).lower() if any(word in text_to_search for word in query_words): matching_invoices.append(inv) if not matching_invoices: return f"No invoices found matching '{query}'. Try different keywords or check the summary." response = f"๐Ÿ” **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" for i, inv in enumerate(matching_invoices[:5], 1): response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" response += f" โ€ข Amount: โ‚น{inv.get('amount', 0):,.2f}\n" response += f" โ€ข Date: {inv.get('date', 'N/A')}\n\n" if len(matching_invoices) > 5: response += f"... and {len(matching_invoices) - 5} more results." return response # =============================================================================== # STREAMLIT APPLICATION # =============================================================================== def create_app(): """Main Streamlit application""" # Generate unique session ID for this run if 'session_id' not in st.session_state: st.session_state.session_id = str(uuid.uuid4())[:8] session_id = st.session_state.session_id # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # Header st.markdown('

๐Ÿ“„ AI Invoice Processing System

', unsafe_allow_html=True) st.markdown("""

AI-Powered Document Processing โ€ข Semantic Search โ€ข Smart Analytics โ€ข Hugging Face Spaces

""", unsafe_allow_html=True) # Initialize processor if 'processor' not in st.session_state: with st.spinner("๐Ÿ”ง Initializing AI Invoice Processor..."): try: st.session_state.processor = InvoiceProcessor() st.session_state.chatbot = ChatBot(st.session_state.processor) st.session_state.chat_history = [] st.success("โœ… System initialized successfully!") except Exception as e: st.error(f"โŒ Initialization failed: {e}") st.stop() # Sidebar with st.sidebar: st.header("๐ŸŽ›๏ธ System Status") processor = st.session_state.processor # Component status if processor.document_processor.processors: st.markdown('โœ… Document Processing', unsafe_allow_html=True) else: st.markdown('โŒ Document Processing', unsafe_allow_html=True) if processor.ai_extractor.use_transformers: st.markdown('โœ… AI Extraction', unsafe_allow_html=True) else: st.markdown('โš ๏ธ Regex Extraction', unsafe_allow_html=True) if processor.vector_store and processor.vector_store.embedding_model: st.markdown('โœ… Semantic Search', unsafe_allow_html=True) else: st.markdown('โš ๏ธ Keyword Search Only', unsafe_allow_html=True) # Quick stats st.header("๐Ÿ“Š Quick Stats") try: data = processor.load_json_data() total_invoices = len(data.get("invoices", [])) total_amount = data.get("summary", {}).get("total_amount", 0) st.metric("Total Invoices", total_invoices) st.metric("Total Value", f"โ‚น{total_amount:,.2f}") st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") except Exception as e: st.error(f"Stats error: {e}") # System info st.header("โš™๏ธ System Info") st.info(f""" **Session ID:** {session_id} **Limits:** โ€ข Max file size: 10MB โ€ข Max concurrent files: 3 โ€ข Timeout: 30s """) # Main navigation selected_tab = st.radio( "Choose a section:", ["๐Ÿ“ค Upload & Process", "๐Ÿ’ฌ AI Chat", "๐Ÿ“Š Analytics", "๐Ÿ“‹ Data Explorer"], horizontal=True, key=f"main_navigation_{session_id}" ) # ------------------------------------------------------------------------- # UPLOAD & PROCESS SECTION # ------------------------------------------------------------------------- if selected_tab == "๐Ÿ“ค Upload & Process": st.header("๐Ÿ“ค Upload Invoice Documents") # Feature highlights col1, col2, col3 = st.columns(3) with col1: st.markdown("""

๐Ÿค– AI Extraction

Advanced NLP models extract structured data automatically

""", unsafe_allow_html=True) with col2: st.markdown("""

๐Ÿ” Smart Search

Semantic search finds invoices using natural language

""", unsafe_allow_html=True) with col3: st.markdown("""

๐Ÿ“Š Analytics

Comprehensive insights and visualizations

""", unsafe_allow_html=True) # File upload st.markdown("### ๐Ÿ“ Upload Your Invoices") # Initialize session state for files if not exists if f'uploaded_files_{session_id}' not in st.session_state: st.session_state[f'uploaded_files_{session_id}'] = None if f'processing_complete_{session_id}' not in st.session_state: st.session_state[f'processing_complete_{session_id}'] = False if f'currently_processing_{session_id}' not in st.session_state: st.session_state[f'currently_processing_{session_id}'] = False if f'processed_file_hashes_{session_id}' not in st.session_state: st.session_state[f'processed_file_hashes_{session_id}'] = set() # File uploader with stable key uploaded_files = st.file_uploader( "Choose invoice files (PDF, TXT supported)", type=['pdf', 'txt'], accept_multiple_files=True, help="Maximum file size: 10MB per file", key=f"file_uploader_stable_{session_id}" ) # Store uploaded files in session state only if they're new if uploaded_files: # Create file hashes to detect if files have changed current_file_hashes = set() for file in uploaded_files: file_hash = hash((file.name, file.size)) current_file_hashes.add(file_hash) # Check if files have changed stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) if current_file_hashes != stored_hashes: st.session_state[f'uploaded_files_{session_id}'] = uploaded_files st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes st.session_state[f'processing_complete_{session_id}'] = False st.session_state[f'currently_processing_{session_id}'] = False st.info("๐Ÿ“„ New files detected - ready for processing") # Get files from session state current_files = st.session_state[f'uploaded_files_{session_id}'] is_processing = st.session_state[f'currently_processing_{session_id}'] is_complete = st.session_state[f'processing_complete_{session_id}'] if current_files: max_files = 3 if len(current_files) > max_files: st.warning(f"โš ๏ธ Too many files selected. Processing first {max_files} files.") current_files = current_files[:max_files] st.info(f"๐Ÿ“Š {len(current_files)} files selected") # Show file names st.markdown("**Selected Files:**") for i, file in enumerate(current_files, 1): file_size_mb = len(file.getvalue()) / (1024 * 1024) file_hash = hash((file.name, file.size)) processed_icon = "โœ…" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "๐Ÿ“„" st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") # Process button - only show if not currently processing col1, col2 = st.columns([1, 1]) with col1: if not is_processing and not is_complete: if st.button("๐Ÿš€ Process Files", type="primary", key=f"process_btn_{session_id}"): st.session_state[f'currently_processing_{session_id}'] = True st.rerun() elif is_processing: st.info("๐Ÿ”„ Processing in progress...") # Actually process the files here process_files_once(current_files, session_id) elif is_complete: st.success("โœ… Processing completed!") if st.button("๐Ÿ”„ Process Again", key=f"reprocess_btn_{session_id}"): st.session_state[f'processing_complete_{session_id}'] = False st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processed_file_hashes_{session_id}'] = set() st.rerun() with col2: if st.button("๐Ÿ—‘๏ธ Clear Files", key=f"clear_files_{session_id}"): # Clear all session state related to files keys_to_clear = [ f'uploaded_files_{session_id}', f'uploaded_file_hashes_{session_id}', f'processing_complete_{session_id}', f'currently_processing_{session_id}', f'processed_file_hashes_{session_id}' ] for key in keys_to_clear: if key in st.session_state: del st.session_state[key] st.success("๐Ÿ—‘๏ธ Files cleared successfully!") time.sleep(1) # Brief pause to show message st.rerun() else: st.info("๐Ÿ‘† Please select invoice files to upload and process") # Show processing results if completed if is_complete: st.markdown("### ๐Ÿ“‹ Recent Processing Results") try: data = st.session_state.processor.load_json_data() recent_invoices = sorted( data.get("invoices", []), key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True )[:5] if recent_invoices: for i, inv in enumerate(recent_invoices, 1): with st.expander(f"๐Ÿ“„ {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): col1, col2 = st.columns(2) with col1: st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") st.write(f"**Amount:** โ‚น{inv.get('amount', 0):.2f}") with col2: st.write(f"**Date:** {inv.get('date', 'N/A')}") st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") else: st.info("No recent processing results found.") except Exception as e: st.error(f"Error loading recent results: {e}") # ------------------------------------------------------------------------- # AI CHAT SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ’ฌ AI Chat": st.header("๐Ÿ’ฌ AI Chat Interface") # Display chat history if st.session_state.chat_history: st.markdown("### ๐Ÿ’ฌ Chat History") for i, message in enumerate(st.session_state.chat_history): with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input st.markdown("### โœ๏ธ Ask a Question") col1, col2 = st.columns([4, 1]) with col1: user_input = st.text_input( "Type your question:", placeholder="e.g., 'show me total spending'", key=f"chat_input_{session_id}" ) with col2: ask_btn = st.button("๐Ÿš€ Ask", type="primary", key=f"ask_btn_{session_id}") if ask_btn and user_input: handle_chat_query(user_input) # Suggested queries if not st.session_state.chat_history: st.markdown("### ๐Ÿ’ก Try These Queries") col1, col2 = st.columns(2) with col1: st.markdown("**๐Ÿ“Š Basic Queries:**") basic_queries = [ "Show me a summary of all invoices", "How much have we spent in total?", "Who are our top suppliers?", "Find invoices with high amounts" ] for i, query in enumerate(basic_queries): if st.button(query, key=f"basic_{session_id}_{i}"): handle_chat_query(query) with col2: st.markdown("**๐Ÿ” Advanced Queries:**") advanced_queries = [ "Find technology purchases", "Show office supplies", "Search consulting services", "Recent high-value invoices" ] for i, query in enumerate(advanced_queries): if st.button(query, key=f"advanced_{session_id}_{i}"): handle_chat_query(query) # Clear chat if st.session_state.chat_history: if st.button("๐Ÿ—‘๏ธ Clear Chat", key=f"clear_chat_{session_id}"): st.session_state.chat_history = [] st.rerun() # ------------------------------------------------------------------------- # ANALYTICS SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ“Š Analytics": st.header("๐Ÿ“Š Analytics Dashboard") try: data = st.session_state.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: st.info("๐Ÿ“Š No data available. Upload some invoices to see analytics.") return # Convert to DataFrame df_data = [] for inv in invoices: df_data.append({ 'invoice_number': inv.get('invoice_number', ''), 'supplier_name': inv.get('supplier_name', ''), 'amount': inv.get('amount', 0), 'date': inv.get('date', ''), 'confidence': inv.get('extraction_info', {}).get('confidence', 0) }) df = pd.DataFrame(df_data) # Key metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Invoices", len(df)) with col2: st.metric("Total Amount", f"โ‚น{df['amount'].sum():,.2f}") with col3: st.metric("Avg Amount", f"โ‚น{df['amount'].mean():,.2f}") with col4: st.metric("Unique Suppliers", df['supplier_name'].nunique()) # Visualizations if len(df) > 0: # Amount distribution fig_hist = px.histogram( df, x='amount', title="Invoice Amount Distribution", labels={'amount': 'Amount (โ‚น)', 'count': 'Number of Invoices'} ) st.plotly_chart(fig_hist, use_container_width=True) # Top suppliers if df['supplier_name'].notna().any(): supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) if len(supplier_amounts) > 0: fig_suppliers = px.bar( x=supplier_amounts.values, y=supplier_amounts.index, orientation='h', title="Top 10 Suppliers by Total Amount", labels={'x': 'Total Amount (โ‚น)', 'y': 'Supplier'} ) st.plotly_chart(fig_suppliers, use_container_width=True) except Exception as e: st.error(f"Analytics error: {e}") # ------------------------------------------------------------------------- # DATA EXPLORER SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ“‹ Data Explorer": st.header("๐Ÿ“‹ Data Explorer") try: data = st.session_state.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: st.info("๐Ÿ“Š No data available. Upload some invoices first.") return # Convert to DataFrame df_data = [] for inv in invoices: df_data.append({ 'Invoice Number': inv.get('invoice_number', ''), 'Supplier': inv.get('supplier_name', ''), 'Buyer': inv.get('buyer_name', ''), 'Amount': inv.get('amount', 0), 'Date': inv.get('date', ''), 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), 'Method': inv.get('extraction_info', {}).get('method', ''), 'File': inv.get('file_info', {}).get('file_name', ''), 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] }) df = pd.DataFrame(df_data) # Filters col1, col2, col3 = st.columns(3) with col1: suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") with col2: methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") with col3: min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") # Apply filters filtered_df = df.copy() if selected_supplier != 'All': filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] if selected_method != 'All': filtered_df = filtered_df[filtered_df['Method'] == selected_method] if min_amount > 0: filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] # Display data st.dataframe( filtered_df, use_container_width=True, column_config={ "Amount": st.column_config.NumberColumn("Amount", format="โ‚น%.2f"), "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) } ) # Export options col1, col2 = st.columns(2) with col1: if st.button("๐Ÿ“ฅ Export CSV", key=f"export_csv_{session_id}"): csv_data = filtered_df.to_csv(index=False) st.download_button( "Download CSV", csv_data, f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", "text/csv", key=f"download_csv_{session_id}" ) with col2: if st.button("๐Ÿ“„ Export JSON", key=f"export_json_{session_id}"): filtered_invoices = [inv for inv in invoices if inv.get('invoice_number') in filtered_df['Invoice Number'].values] export_data = { "exported_at": datetime.now().isoformat(), "total_records": len(filtered_invoices), "invoices": filtered_invoices } st.download_button( "Download JSON", json.dumps(export_data, indent=2), f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "application/json", key=f"download_json_{session_id}" ) except Exception as e: st.error(f"Data explorer error: {e}") # ------------------------------------------------------------------------- # GLOBAL CHAT INPUT # ------------------------------------------------------------------------- st.markdown("---") st.markdown("### ๐Ÿ’ฌ Quick Chat (Works from any section)") global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") if global_query: handle_chat_query(global_query, show_response=True) # Footer st.markdown("---") st.markdown("""

๐Ÿš€ AI Invoice Processing System - Optimized for Hugging Face Spaces

Built with โค๏ธ using Streamlit, Transformers, and AI

""", unsafe_allow_html=True) # =============================================================================== # HELPER FUNCTIONS # =============================================================================== def process_files_once(uploaded_files, session_id): """Process uploaded files only once with proper state management""" if not uploaded_files: st.error("No files to process!") st.session_state[f'currently_processing_{session_id}'] = False return st.markdown("### ๐Ÿ”„ Processing Files...") # Get already processed file hashes processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] # Filter out already processed files files_to_process = [] for file in uploaded_files: file_hash = hash((file.name, file.size)) if file_hash not in processed_hashes: files_to_process.append((file, file_hash)) if not files_to_process: st.info("โœ… All files have already been processed!") st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processing_complete_{session_id}'] = True return # Create containers for dynamic updates progress_container = st.container() status_container = st.container() results_container = st.container() successful = 0 failed = 0 # Show progress with progress_container: progress_bar = st.progress(0) progress_text = st.empty() with status_container: st.info(f"Starting to process {len(files_to_process)} new files...") # Process each file only once for i, (uploaded_file, file_hash) in enumerate(files_to_process): current_progress = (i + 1) / len(files_to_process) with progress_container: progress_bar.progress(current_progress) progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") with status_container: st.info(f"๐Ÿ”„ Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") try: # Process the file result = st.session_state.processor.process_uploaded_file(uploaded_file) # Mark file as processed regardless of result processed_hashes.add(file_hash) # Show result immediately with results_container: if result and hasattr(result, 'invoice_number') and result.invoice_number: successful += 1 st.success(f"โœ… Successfully processed: {uploaded_file.name}") # Show extracted data col1, col2, col3 = st.columns(3) with col1: st.write(f"**Invoice #:** {result.invoice_number}") st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") with col2: st.write(f"**Amount:** โ‚น{result.amount:.2f}") st.write(f"**Date:** {result.date or 'Not found'}") with col3: st.write(f"**Method:** {result.processing_method}") st.write(f"**Confidence:** {result.extraction_confidence:.1%}") st.markdown("---") else: failed += 1 st.warning(f"โš ๏ธ Could not extract complete data from: {uploaded_file.name}") if result: st.write(f"Partial data: {result.supplier_name}, โ‚น{result.amount}") st.markdown("---") except Exception as e: failed += 1 # Still mark as processed to avoid reprocessing processed_hashes.add(file_hash) with results_container: st.error(f"โŒ Error processing {uploaded_file.name}: {str(e)}") st.markdown("---") # Update session state st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes # Final summary with progress_container: progress_bar.progress(1.0) progress_text.text("โœ… Processing completed!") with status_container: if successful > 0: st.success(f"๐ŸŽ‰ Processing complete! {successful} successful, {failed} failed") if successful > 0: st.balloons() else: st.error(f"โŒ Processing failed for all {failed} files. Please check file formats and content.") # Update processing state st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processing_complete_{session_id}'] = True # Force rerun to update UI st.rerun() def process_files(uploaded_files, session_id): """Legacy function - redirect to process_files_once""" return process_files_once(uploaded_files, session_id) def handle_chat_query(query, show_response=False): """Handle chat query""" st.session_state.chat_history.append({ "role": "user", "content": query, "timestamp": datetime.now() }) try: with st.spinner("๐Ÿค– AI is analyzing..."): response = st.session_state.chatbot.query_database(query) st.session_state.chat_history.append({ "role": "assistant", "content": response, "timestamp": datetime.now() }) if show_response: with st.chat_message("assistant"): st.markdown(response) st.info("๐Ÿ’ก Switch to the 'AI Chat' section to see full conversation history!") st.rerun() except Exception as e: st.error(f"Chat error: {e}") # =============================================================================== # MAIN ENTRY POINT # =============================================================================== def main(): """Main entry point for Hugging Face Spaces""" try: if IS_HF_SPACE: st.sidebar.info("๐Ÿค— Running on Hugging Face Spaces") create_app() except Exception as e: st.error(f""" ## ๐Ÿšจ Application Error {e} Please refresh the page or check the logs for more details. """) if __name__ == "__main__": main(), # Standalone amounts with currency words r'([0-9,]+\.?\d*)\s*(?:dollars?|rupees?|usd|inr|eur|gbp)', # Table-like patterns r'(?:price|cost|rate)\s*:?\s*[\$โ‚นยฃโ‚ฌ]?\s*([0-9,]+\.?\d*)', # Amount with decimal precision r'(?:^|\s)([0-9]{1,3}(?:,\d{3})*\.?\d{0,2})(?=\s|$)', ], 'date': [ r'date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', r'(?:invoice|bill)\s*date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', r'(?:^|\s)(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})(?=\s|$)', r'(\d{4}[/\-\.]\d{1,2}[/\-\.]\d{1,2})', r'(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d{2,4})', ], 'quantity': [ r'qty\s*:?\s*(\d+)', r'quantity\s*:?\s*(\d+)', r'(?:units?|pcs?|pieces?)\s*:?\s*(\d+)', r'(\d+)\s*(?:pcs?|units?|items?|pieces?)', ] } text_lower = text.lower() # Extract invoice number with multiple attempts for pattern in patterns['invoice_number']: match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE) if match: invoice_data.invoice_number = match.group(1).upper().strip() break # Extract amount with enhanced logic amounts_found = [] for pattern in patterns['amount']: matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE) for match in matches: try: amount_str = match.group(1).replace(',', '').replace(' ', '') amount_val = float(amount_str) if 0.01 <= amount_val <= 1000000: # Reasonable range amounts_found.append(amount_val) except (ValueError, IndexError): continue # Choose the most likely amount (highest value or most repeated) if amounts_found: # Remove duplicates and sort unique_amounts = sorted(set(amounts_found), reverse=True) # Take the highest reasonable amount invoice_data.amount = unique_amounts[0] # Extract date for pattern in patterns['date']: match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) if match: invoice_data.date = self.parse_date(match.group(1)) break # Extract quantity for pattern in patterns['quantity']: match = re.search(pattern, text_lower, re.IGNORECASE) if match: try: invoice_data.quantity = int(match.group(1)) break except ValueError: continue # Enhanced company name extraction company_patterns = [ r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)', r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)', ] companies_found = [] for pattern in company_patterns: matches = re.findall(pattern, text, re.MULTILINE) for match in matches: clean_company = match.strip().title() if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']): companies_found.append(clean_company) # Assign companies (first as supplier, second as buyer) if companies_found: invoice_data.supplier_name = companies_found[0] if len(companies_found) > 1: invoice_data.buyer_name = companies_found[1] # Extract product description desc_patterns = [ r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', ] for pattern in desc_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: desc = match.group(1).strip() if len(desc) > 5: invoice_data.product_description = desc[:200] # Limit length break # Set confidence based on how much we extracted confidence_factors = [] if invoice_data.invoice_number: confidence_factors.append(0.3) if invoice_data.amount > 0: confidence_factors.append(0.3) if invoice_data.supplier_name: confidence_factors.append(0.2) if invoice_data.date: confidence_factors.append(0.1) if invoice_data.quantity > 0: confidence_factors.append(0.1) invoice_data.extraction_confidence = sum(confidence_factors) return invoice_data def parse_date(self, date_str: str) -> str: """Parse date to YYYY-MM-DD format""" if not date_str: return "" formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] for fmt in formats: try: parsed_date = datetime.strptime(date_str, fmt) return parsed_date.strftime('%Y-%m-%d') except ValueError: continue return date_str # =============================================================================== # VECTOR STORE CLASS # =============================================================================== class VectorStore: """Simplified vector store for Hugging Face Spaces""" def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): self.embedding_model_name = embedding_model self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") self.embedding_model = None self.vectors = [] self.document_metadata = [] self.embedding_dimension = None self.setup_embedding_model() self.load_vector_store() def setup_embedding_model(self): """Initialize the sentence transformer model""" if not SENTENCE_TRANSFORMERS_AVAILABLE: st.warning("โš ๏ธ Sentence Transformers not available. Vector search disabled.") return try: with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): self.embedding_model = SentenceTransformer( self.embedding_model_name, cache_folder=HF_CONFIG["cache_dir"] ) # Get embedding dimension test_embedding = self.embedding_model.encode(["test"]) self.embedding_dimension = test_embedding.shape[0] st.success(f"โœ… Embedding model loaded: {self.embedding_model_name}") except Exception as e: st.error(f"โŒ Failed to load embedding model: {e}") self.embedding_model = None def load_vector_store(self): """Load existing vector store""" try: if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): with open(self.vector_store_path, 'rb') as f: self.vectors = pickle.load(f) with open(self.metadata_path, 'rb') as f: self.document_metadata = pickle.load(f) st.success(f"โœ… Vector store loaded: {len(self.document_metadata)} documents") else: self.vectors = [] self.document_metadata = [] st.info("๐Ÿ“„ New vector store initialized") except Exception as e: st.error(f"โŒ Error loading vector store: {e}") self.vectors = [] self.document_metadata = [] def save_vector_store(self): """Save vector store to disk""" try: with open(self.vector_store_path, 'wb') as f: pickle.dump(self.vectors, f) with open(self.metadata_path, 'wb') as f: pickle.dump(self.document_metadata, f) return True except Exception as e: st.error(f"Error saving vector store: {e}") return False def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: """Create searchable text from invoice data""" text_parts = [] for field, value in invoice_data.items(): if value and field != 'id': text_parts.append(f"{field}: {value}") if raw_text: text_parts.append(f"content: {raw_text[:300]}") return " | ".join(text_parts) def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: """Add a document to the vector store""" if not self.embedding_model: return False try: document_text = self.create_document_text(invoice_data, raw_text) # Generate embedding embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) # Create metadata metadata = { 'invoice_id': invoice_data.get('id', ''), 'invoice_number': invoice_data.get('invoice_number', ''), 'supplier_name': invoice_data.get('supplier_name', ''), 'buyer_name': invoice_data.get('buyer_name', ''), 'amount': invoice_data.get('amount', 0), 'date': invoice_data.get('date', ''), 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), 'document_text': document_text[:200], 'timestamp': datetime.now().isoformat() } # Add to store self.vectors.append(embedding) self.document_metadata.append(metadata) return True except Exception as e: st.error(f"Error adding document to vector store: {e}") return False def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: """Perform semantic search using cosine similarity""" if not self.embedding_model or not self.vectors: return [] try: # Generate query embedding query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) # Calculate similarities similarities = [] for i, doc_embedding in enumerate(self.vectors): similarity = np.dot(query_embedding, doc_embedding) similarities.append((similarity, i)) # Sort by similarity similarities.sort(reverse=True) # Return top results results = [] for similarity, idx in similarities[:top_k]: if similarity > 0.1: # Relevance threshold metadata = self.document_metadata[idx] result = VectorSearchResult( invoice_id=metadata.get('invoice_id', ''), invoice_number=metadata.get('invoice_number', ''), supplier_name=metadata.get('supplier_name', ''), similarity_score=float(similarity), content_preview=metadata.get('document_text', ''), metadata=metadata ) results.append(result) return results except Exception as e: st.error(f"Error in semantic search: {e}") return [] # =============================================================================== # MAIN PROCESSOR CLASS # =============================================================================== class InvoiceProcessor: """Main invoice processor for Hugging Face Spaces""" def __init__(self): self.setup_storage() self.document_processor = DocumentProcessor() self.ai_extractor = AIExtractor() self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None # Initialize stats self.processing_stats = { 'total_processed': 0, 'successful': 0, 'failed': 0, 'start_time': datetime.now() } def setup_storage(self): """Setup storage paths""" self.data_dir = HF_CONFIG["data_dir"] self.json_path = os.path.join(self.data_dir, "invoices.json") # Initialize JSON storage if not os.path.exists(self.json_path): initial_data = { "metadata": { "created_at": datetime.now().isoformat(), "version": "hf_v1.0", "total_invoices": 0 }, "invoices": [], "summary": { "total_amount": 0.0, "unique_suppliers": [], "processing_stats": {"successful": 0, "failed": 0} } } self.save_json_data(initial_data) def load_json_data(self) -> dict: """Load invoice data from JSON""" try: with open(self.json_path, 'r', encoding='utf-8') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): self.setup_storage() return self.load_json_data() def save_json_data(self, data: dict): """Save invoice data to JSON""" try: with open(self.json_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: st.error(f"Error saving data: {e}") def process_uploaded_file(self, uploaded_file) -> InvoiceData: """Process a single uploaded file with enhanced debugging""" self.processing_stats['total_processed'] += 1 try: # Debug file info file_size = len(uploaded_file.getvalue()) file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' st.info(f"๐Ÿ“„ Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") # Check file size if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" st.error(error_msg) self.processing_stats['failed'] += 1 return InvoiceData() # Check file type if file_extension not in ['pdf', 'txt']: error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" st.warning(error_msg) self.processing_stats['failed'] += 1 return InvoiceData() # Save temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: file_content = uploaded_file.getvalue() tmp_file.write(file_content) tmp_file_path = tmp_file.name st.info(f"๐Ÿ’พ Saved temporarily to: {tmp_file_path}") try: # Extract text st.info("๐Ÿ” Extracting text from document...") text = self.document_processor.extract_text_from_document(tmp_file_path) if not text or not text.strip(): st.warning(f"โŒ No text extracted from {uploaded_file.name}") self.processing_stats['failed'] += 1 return InvoiceData() text_length = len(text) st.info(f"๐Ÿ“ Extracted {text_length} characters of text") # Show text preview if text_length > 0: with st.expander("๐Ÿ“„ Text Preview (First 500 characters)", expanded=False): st.text(text[:500] + "..." if len(text) > 500 else text) # Extract invoice data st.info("๐Ÿค– Extracting invoice data using AI/Regex...") invoice_data = self.ai_extractor.extract_with_ai(text) invoice_data.file_path = uploaded_file.name # Show extraction results st.info(f"๐Ÿ“Š Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") # Save to storage st.info("๐Ÿ’พ Saving extracted data...") self.save_invoice_data(invoice_data, text, file_size) self.processing_stats['successful'] += 1 st.success(f"โœ… Successfully processed {uploaded_file.name}") return invoice_data finally: # Cleanup try: os.unlink(tmp_file_path) st.info("๐Ÿงน Cleaned up temporary file") except: pass except Exception as e: error_msg = f"Error processing {uploaded_file.name}: {str(e)}" st.error(error_msg) self.processing_stats['failed'] += 1 # Show detailed error for debugging with st.expander("๐Ÿ” Error Details", expanded=False): st.code(str(e)) import traceback st.code(traceback.format_exc()) return InvoiceData() def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): """Save invoice data to JSON and vector store""" try: # Load existing data data = self.load_json_data() # Create invoice record invoice_record = { "id": len(data["invoices"]) + 1, "invoice_number": invoice_data.invoice_number, "supplier_name": invoice_data.supplier_name, "buyer_name": invoice_data.buyer_name, "date": invoice_data.date, "amount": invoice_data.amount, "quantity": invoice_data.quantity, "product_description": invoice_data.product_description, "file_info": { "file_name": invoice_data.file_path, "file_size": file_size }, "extraction_info": { "confidence": invoice_data.extraction_confidence, "method": invoice_data.processing_method, "raw_text_preview": raw_text[:300] }, "timestamps": { "created_at": datetime.now().isoformat() } } # Add to invoices data["invoices"].append(invoice_record) # Update summary self.update_summary(data) # Save JSON self.save_json_data(data) # Add to vector store if self.vector_store: self.vector_store.add_document(invoice_record, raw_text) self.vector_store.save_vector_store() except Exception as e: st.error(f"Error saving invoice data: {e}") def update_summary(self, data: dict): """Update summary statistics""" invoices = data["invoices"] total_amount = sum(inv.get("amount", 0) for inv in invoices) unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) data["summary"] = { "total_amount": total_amount, "unique_suppliers": unique_suppliers, "processing_stats": { "successful": self.processing_stats['successful'], "failed": self.processing_stats['failed'], "total_processed": self.processing_stats['total_processed'] } } data["metadata"]["last_updated"] = datetime.now().isoformat() data["metadata"]["total_invoices"] = len(invoices) # =============================================================================== # CHATBOT CLASS # =============================================================================== class ChatBot: """Chatbot for invoice queries""" def __init__(self, processor: InvoiceProcessor): self.processor = processor def query_database(self, query: str) -> str: """Process user query and return response""" try: data = self.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: return "No invoice data found. Please upload some invoices first." query_lower = query.lower() # Handle different query types if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): return self.generate_summary(data) elif "count" in query_lower or "how many" in query_lower: return self.handle_count_query(data) elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): return self.handle_amount_query(data) elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): return self.handle_supplier_query(data, query) elif self.processor.vector_store: return self.handle_semantic_search(query) else: return self.handle_general_query(data, query) except Exception as e: return f"Error processing query: {e}" def generate_summary(self, data: dict) -> str: """Generate comprehensive summary""" invoices = data.get("invoices", []) summary = data.get("summary", {}) if not invoices: return "No invoices found in the system." total_amount = summary.get("total_amount", 0) avg_amount = total_amount / len(invoices) if invoices else 0 unique_suppliers = len(summary.get("unique_suppliers", [])) response = f""" **๐Ÿ“Š Invoice System Summary** โ€ข **Total Invoices**: {len(invoices):,} โ€ข **Total Value**: โ‚น{total_amount:,.2f} โ€ข **Average Invoice**: โ‚น{avg_amount:,.2f} โ€ข **Unique Suppliers**: {unique_suppliers} **๐Ÿ“ˆ Processing Stats** โ€ข **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} โ€ข **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} **๐Ÿ” Recent Invoices** """ # Show recent invoices recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] for i, inv in enumerate(recent, 1): response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (โ‚น{inv.get('amount', 0):,.2f})" return response def handle_count_query(self, data: dict) -> str: """Handle count-related queries""" invoices = data.get("invoices", []) total = len(invoices) unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) return f""" **๐Ÿ“Š Invoice Count Summary** โ€ข **Total Records**: {total} โ€ข **Unique Invoice Numbers**: {unique_numbers} โ€ข **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} **๐Ÿ“… Processing Timeline** โ€ข **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} โ€ข **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} """ def handle_amount_query(self, data: dict) -> str: """Handle amount-related queries""" invoices = data.get("invoices", []) amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] if not amounts: return "No amount information found in invoices." total_amount = sum(amounts) avg_amount = total_amount / len(amounts) max_amount = max(amounts) min_amount = min(amounts) # Find high-value invoices high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] response = f""" **๐Ÿ’ฐ Financial Analysis** โ€ข **Total Amount**: โ‚น{total_amount:,.2f} โ€ข **Average Amount**: โ‚น{avg_amount:,.2f} โ€ข **Highest Invoice**: โ‚น{max_amount:,.2f} โ€ข **Lowest Invoice**: โ‚น{min_amount:,.2f} **๐ŸŽฏ High-Value Invoices (โ‚น{high_value_threshold:,.2f}+)** """ for i, inv in enumerate(high_value_invoices[:5], 1): response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (โ‚น{inv.get('amount', 0):,.2f})" return response def handle_supplier_query(self, data: dict, query: str) -> str: """Handle supplier-related queries""" invoices = data.get("invoices", []) # Count invoices by supplier supplier_counts = {} supplier_amounts = {} for inv in invoices: supplier = inv.get('supplier_name', '').strip() if supplier: supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) if not supplier_counts: return "No supplier information found in invoices." # Sort suppliers by amount top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] response = f""" **๐Ÿข Supplier Analysis** โ€ข **Total Unique Suppliers**: {len(supplier_counts)} โ€ข **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) **๐Ÿ’ฐ Top Suppliers by Amount** """ for i, (supplier, amount) in enumerate(top_suppliers, 1): count = supplier_counts[supplier] avg = amount / count if count > 0 else 0 response += f"\n{i}. **{supplier}** - โ‚น{amount:,.2f} ({count} invoices, avg: โ‚น{avg:,.2f})" return response def handle_semantic_search(self, query: str) -> str: """Handle semantic search queries""" try: results = self.processor.vector_store.semantic_search(query, top_k=5) if not results: return f"No relevant results found for '{query}'. Try different keywords." response = f"๐Ÿ” **Semantic Search Results for '{query}'**\n\n" for i, result in enumerate(results, 1): response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" response += f" โ€ข Similarity: {result.similarity_score:.3f}\n" response += f" โ€ข Amount: โ‚น{result.metadata.get('amount', 0):,.2f}\n" response += f" โ€ข Preview: {result.content_preview[:100]}...\n\n" return response except Exception as e: return f"Semantic search error: {e}" def handle_general_query(self, data: dict, query: str) -> str: """Handle general queries with keyword search""" invoices = data.get("invoices", []) query_words = query.lower().split() # Simple keyword matching matching_invoices = [] for inv in invoices: text_to_search = ( inv.get('supplier_name', '') + ' ' + inv.get('buyer_name', '') + ' ' + inv.get('product_description', '') + ' ' + inv.get('extraction_info', {}).get('raw_text_preview', '') ).lower() if any(word in text_to_search for word in query_words): matching_invoices.append(inv) if not matching_invoices: return f"No invoices found matching '{query}'. Try different keywords or check the summary." response = f"๐Ÿ” **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" for i, inv in enumerate(matching_invoices[:5], 1): response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" response += f" โ€ข Amount: โ‚น{inv.get('amount', 0):,.2f}\n" response += f" โ€ข Date: {inv.get('date', 'N/A')}\n\n" if len(matching_invoices) > 5: response += f"... and {len(matching_invoices) - 5} more results." return response # =============================================================================== # STREAMLIT APPLICATION # =============================================================================== def create_app(): """Main Streamlit application""" # Generate unique session ID for this run if 'session_id' not in st.session_state: st.session_state.session_id = str(uuid.uuid4())[:8] session_id = st.session_state.session_id # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # Header st.markdown('

๐Ÿ“„ AI Invoice Processing System

', unsafe_allow_html=True) st.markdown("""

AI-Powered Document Processing โ€ข Semantic Search โ€ข Smart Analytics โ€ข Hugging Face Spaces

""", unsafe_allow_html=True) # Initialize processor if 'processor' not in st.session_state: with st.spinner("๐Ÿ”ง Initializing AI Invoice Processor..."): try: st.session_state.processor = InvoiceProcessor() st.session_state.chatbot = ChatBot(st.session_state.processor) st.session_state.chat_history = [] st.success("โœ… System initialized successfully!") except Exception as e: st.error(f"โŒ Initialization failed: {e}") st.stop() # Sidebar with st.sidebar: st.header("๐ŸŽ›๏ธ System Status") processor = st.session_state.processor # Component status if processor.document_processor.processors: st.markdown('โœ… Document Processing', unsafe_allow_html=True) else: st.markdown('โŒ Document Processing', unsafe_allow_html=True) if processor.ai_extractor.use_transformers: st.markdown('โœ… AI Extraction', unsafe_allow_html=True) else: st.markdown('โš ๏ธ Regex Extraction', unsafe_allow_html=True) if processor.vector_store and processor.vector_store.embedding_model: st.markdown('โœ… Semantic Search', unsafe_allow_html=True) else: st.markdown('โš ๏ธ Keyword Search Only', unsafe_allow_html=True) # Quick stats st.header("๐Ÿ“Š Quick Stats") try: data = processor.load_json_data() total_invoices = len(data.get("invoices", [])) total_amount = data.get("summary", {}).get("total_amount", 0) st.metric("Total Invoices", total_invoices) st.metric("Total Value", f"โ‚น{total_amount:,.2f}") st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") except Exception as e: st.error(f"Stats error: {e}") # System info st.header("โš™๏ธ System Info") st.info(f""" **Session ID:** {session_id} **Limits:** โ€ข Max file size: 10MB โ€ข Max concurrent files: 3 โ€ข Timeout: 30s """) # Main navigation selected_tab = st.radio( "Choose a section:", ["๐Ÿ“ค Upload & Process", "๐Ÿ’ฌ AI Chat", "๐Ÿ“Š Analytics", "๐Ÿ“‹ Data Explorer"], horizontal=True, key=f"main_navigation_{session_id}" ) # ------------------------------------------------------------------------- # UPLOAD & PROCESS SECTION # ------------------------------------------------------------------------- if selected_tab == "๐Ÿ“ค Upload & Process": st.header("๐Ÿ“ค Upload Invoice Documents") # Feature highlights col1, col2, col3 = st.columns(3) with col1: st.markdown("""

๐Ÿค– AI Extraction

Advanced NLP models extract structured data automatically

""", unsafe_allow_html=True) with col2: st.markdown("""

๐Ÿ” Smart Search

Semantic search finds invoices using natural language

""", unsafe_allow_html=True) with col3: st.markdown("""

๐Ÿ“Š Analytics

Comprehensive insights and visualizations

""", unsafe_allow_html=True) # File upload st.markdown("### ๐Ÿ“ Upload Your Invoices") # Initialize session state for files if not exists if f'uploaded_files_{session_id}' not in st.session_state: st.session_state[f'uploaded_files_{session_id}'] = None if f'processing_complete_{session_id}' not in st.session_state: st.session_state[f'processing_complete_{session_id}'] = False if f'currently_processing_{session_id}' not in st.session_state: st.session_state[f'currently_processing_{session_id}'] = False if f'processed_file_hashes_{session_id}' not in st.session_state: st.session_state[f'processed_file_hashes_{session_id}'] = set() # File uploader with stable key uploaded_files = st.file_uploader( "Choose invoice files (PDF, TXT supported)", type=['pdf', 'txt'], accept_multiple_files=True, help="Maximum file size: 10MB per file", key=f"file_uploader_stable_{session_id}" ) # Store uploaded files in session state only if they're new if uploaded_files: # Create file hashes to detect if files have changed current_file_hashes = set() for file in uploaded_files: file_hash = hash((file.name, file.size)) current_file_hashes.add(file_hash) # Check if files have changed stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) if current_file_hashes != stored_hashes: st.session_state[f'uploaded_files_{session_id}'] = uploaded_files st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes st.session_state[f'processing_complete_{session_id}'] = False st.session_state[f'currently_processing_{session_id}'] = False st.info("๐Ÿ“„ New files detected - ready for processing") # Get files from session state current_files = st.session_state[f'uploaded_files_{session_id}'] is_processing = st.session_state[f'currently_processing_{session_id}'] is_complete = st.session_state[f'processing_complete_{session_id}'] if current_files: max_files = 3 if len(current_files) > max_files: st.warning(f"โš ๏ธ Too many files selected. Processing first {max_files} files.") current_files = current_files[:max_files] st.info(f"๐Ÿ“Š {len(current_files)} files selected") # Show file names st.markdown("**Selected Files:**") for i, file in enumerate(current_files, 1): file_size_mb = len(file.getvalue()) / (1024 * 1024) file_hash = hash((file.name, file.size)) processed_icon = "โœ…" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "๐Ÿ“„" st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") # Process button - only show if not currently processing col1, col2 = st.columns([1, 1]) with col1: if not is_processing and not is_complete: if st.button("๐Ÿš€ Process Files", type="primary", key=f"process_btn_{session_id}"): st.session_state[f'currently_processing_{session_id}'] = True st.rerun() elif is_processing: st.info("๐Ÿ”„ Processing in progress...") # Actually process the files here process_files_once(current_files, session_id) elif is_complete: st.success("โœ… Processing completed!") if st.button("๐Ÿ”„ Process Again", key=f"reprocess_btn_{session_id}"): st.session_state[f'processing_complete_{session_id}'] = False st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processed_file_hashes_{session_id}'] = set() st.rerun() with col2: if st.button("๐Ÿ—‘๏ธ Clear Files", key=f"clear_files_{session_id}"): st.session_state[f'uploaded_files_{session_id}'] = None st.session_state[f'uploaded_file_hashes_{session_id}'] = set() st.session_state[f'processing_complete_{session_id}'] = False st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processed_file_hashes_{session_id}'] = set() st.rerun() else: st.info("๐Ÿ‘† Please select invoice files to upload and process") # Show processing results if completed if is_complete: st.markdown("### ๐Ÿ“‹ Recent Processing Results") try: data = st.session_state.processor.load_json_data() recent_invoices = sorted( data.get("invoices", []), key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True )[:5] if recent_invoices: for i, inv in enumerate(recent_invoices, 1): with st.expander(f"๐Ÿ“„ {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): col1, col2 = st.columns(2) with col1: st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") st.write(f"**Amount:** โ‚น{inv.get('amount', 0):.2f}") with col2: st.write(f"**Date:** {inv.get('date', 'N/A')}") st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") else: st.info("No recent processing results found.") except Exception as e: st.error(f"Error loading recent results: {e}") # ------------------------------------------------------------------------- # AI CHAT SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ’ฌ AI Chat": st.header("๐Ÿ’ฌ AI Chat Interface") # Display chat history if st.session_state.chat_history: st.markdown("### ๐Ÿ’ฌ Chat History") for i, message in enumerate(st.session_state.chat_history): with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input st.markdown("### โœ๏ธ Ask a Question") col1, col2 = st.columns([4, 1]) with col1: user_input = st.text_input( "Type your question:", placeholder="e.g., 'show me total spending'", key=f"chat_input_{session_id}" ) with col2: ask_btn = st.button("๐Ÿš€ Ask", type="primary", key=f"ask_btn_{session_id}") if ask_btn and user_input: handle_chat_query(user_input) # Suggested queries if not st.session_state.chat_history: st.markdown("### ๐Ÿ’ก Try These Queries") col1, col2 = st.columns(2) with col1: st.markdown("**๐Ÿ“Š Basic Queries:**") basic_queries = [ "Show me a summary of all invoices", "How much have we spent in total?", "Who are our top suppliers?", "Find invoices with high amounts" ] for i, query in enumerate(basic_queries): if st.button(query, key=f"basic_{session_id}_{i}"): handle_chat_query(query) with col2: st.markdown("**๐Ÿ” Advanced Queries:**") advanced_queries = [ "Find technology purchases", "Show office supplies", "Search consulting services", "Recent high-value invoices" ] for i, query in enumerate(advanced_queries): if st.button(query, key=f"advanced_{session_id}_{i}"): handle_chat_query(query) # Clear chat if st.session_state.chat_history: if st.button("๐Ÿ—‘๏ธ Clear Chat", key=f"clear_chat_{session_id}"): st.session_state.chat_history = [] st.rerun() # ------------------------------------------------------------------------- # ANALYTICS SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ“Š Analytics": st.header("๐Ÿ“Š Analytics Dashboard") try: data = st.session_state.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: st.info("๐Ÿ“Š No data available. Upload some invoices to see analytics.") return # Convert to DataFrame df_data = [] for inv in invoices: df_data.append({ 'invoice_number': inv.get('invoice_number', ''), 'supplier_name': inv.get('supplier_name', ''), 'amount': inv.get('amount', 0), 'date': inv.get('date', ''), 'confidence': inv.get('extraction_info', {}).get('confidence', 0) }) df = pd.DataFrame(df_data) # Key metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Invoices", len(df)) with col2: st.metric("Total Amount", f"โ‚น{df['amount'].sum():,.2f}") with col3: st.metric("Avg Amount", f"โ‚น{df['amount'].mean():,.2f}") with col4: st.metric("Unique Suppliers", df['supplier_name'].nunique()) # Visualizations if len(df) > 0: # Amount distribution fig_hist = px.histogram( df, x='amount', title="Invoice Amount Distribution", labels={'amount': 'Amount (โ‚น)', 'count': 'Number of Invoices'} ) st.plotly_chart(fig_hist, use_container_width=True) # Top suppliers if df['supplier_name'].notna().any(): supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) if len(supplier_amounts) > 0: fig_suppliers = px.bar( x=supplier_amounts.values, y=supplier_amounts.index, orientation='h', title="Top 10 Suppliers by Total Amount", labels={'x': 'Total Amount (โ‚น)', 'y': 'Supplier'} ) st.plotly_chart(fig_suppliers, use_container_width=True) except Exception as e: st.error(f"Analytics error: {e}") # ------------------------------------------------------------------------- # DATA EXPLORER SECTION # ------------------------------------------------------------------------- elif selected_tab == "๐Ÿ“‹ Data Explorer": st.header("๐Ÿ“‹ Data Explorer") try: data = st.session_state.processor.load_json_data() invoices = data.get("invoices", []) if not invoices: st.info("๐Ÿ“Š No data available. Upload some invoices first.") return # Convert to DataFrame df_data = [] for inv in invoices: df_data.append({ 'Invoice Number': inv.get('invoice_number', ''), 'Supplier': inv.get('supplier_name', ''), 'Buyer': inv.get('buyer_name', ''), 'Amount': inv.get('amount', 0), 'Date': inv.get('date', ''), 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), 'Method': inv.get('extraction_info', {}).get('method', ''), 'File': inv.get('file_info', {}).get('file_name', ''), 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] }) df = pd.DataFrame(df_data) # Filters col1, col2, col3 = st.columns(3) with col1: suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") with col2: methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") with col3: min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") # Apply filters filtered_df = df.copy() if selected_supplier != 'All': filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] if selected_method != 'All': filtered_df = filtered_df[filtered_df['Method'] == selected_method] if min_amount > 0: filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] # Display data st.dataframe( filtered_df, use_container_width=True, column_config={ "Amount": st.column_config.NumberColumn("Amount", format="โ‚น%.2f"), "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) } ) # Export options col1, col2 = st.columns(2) with col1: if st.button("๐Ÿ“ฅ Export CSV", key=f"export_csv_{session_id}"): csv_data = filtered_df.to_csv(index=False) st.download_button( "Download CSV", csv_data, f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", "text/csv", key=f"download_csv_{session_id}" ) with col2: if st.button("๐Ÿ“„ Export JSON", key=f"export_json_{session_id}"): filtered_invoices = [inv for inv in invoices if inv.get('invoice_number') in filtered_df['Invoice Number'].values] export_data = { "exported_at": datetime.now().isoformat(), "total_records": len(filtered_invoices), "invoices": filtered_invoices } st.download_button( "Download JSON", json.dumps(export_data, indent=2), f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", "application/json", key=f"download_json_{session_id}" ) except Exception as e: st.error(f"Data explorer error: {e}") # ------------------------------------------------------------------------- # GLOBAL CHAT INPUT # ------------------------------------------------------------------------- st.markdown("---") st.markdown("### ๐Ÿ’ฌ Quick Chat (Works from any section)") global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") if global_query: handle_chat_query(global_query, show_response=True) # Footer st.markdown("---") st.markdown("""

๐Ÿš€ AI Invoice Processing System - Optimized for Hugging Face Spaces

Built with โค๏ธ using Streamlit, Transformers, and AI

""", unsafe_allow_html=True) # =============================================================================== # HELPER FUNCTIONS # =============================================================================== def process_files_once(uploaded_files, session_id): """Process uploaded files only once with proper state management""" if not uploaded_files: st.error("No files to process!") st.session_state[f'currently_processing_{session_id}'] = False return st.markdown("### ๐Ÿ”„ Processing Files...") # Get already processed file hashes processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] # Filter out already processed files files_to_process = [] for file in uploaded_files: file_hash = hash((file.name, file.size)) if file_hash not in processed_hashes: files_to_process.append((file, file_hash)) if not files_to_process: st.info("โœ… All files have already been processed!") st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processing_complete_{session_id}'] = True return # Create containers for dynamic updates progress_container = st.container() status_container = st.container() results_container = st.container() successful = 0 failed = 0 # Show progress with progress_container: progress_bar = st.progress(0) progress_text = st.empty() with status_container: st.info(f"Starting to process {len(files_to_process)} new files...") # Process each file only once for i, (uploaded_file, file_hash) in enumerate(files_to_process): current_progress = (i + 1) / len(files_to_process) with progress_container: progress_bar.progress(current_progress) progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") with status_container: st.info(f"๐Ÿ”„ Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") try: # Process the file result = st.session_state.processor.process_uploaded_file(uploaded_file) # Mark file as processed regardless of result processed_hashes.add(file_hash) # Show result immediately with results_container: if result and hasattr(result, 'invoice_number') and result.invoice_number: successful += 1 st.success(f"โœ… Successfully processed: {uploaded_file.name}") # Show extracted data col1, col2, col3 = st.columns(3) with col1: st.write(f"**Invoice #:** {result.invoice_number}") st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") with col2: st.write(f"**Amount:** โ‚น{result.amount:.2f}") st.write(f"**Date:** {result.date or 'Not found'}") with col3: st.write(f"**Method:** {result.processing_method}") st.write(f"**Confidence:** {result.extraction_confidence:.1%}") st.markdown("---") else: failed += 1 st.warning(f"โš ๏ธ Could not extract complete data from: {uploaded_file.name}") if result: st.write(f"Partial data: {result.supplier_name}, โ‚น{result.amount}") st.markdown("---") except Exception as e: failed += 1 # Still mark as processed to avoid reprocessing processed_hashes.add(file_hash) with results_container: st.error(f"โŒ Error processing {uploaded_file.name}: {str(e)}") st.markdown("---") # Update session state st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes # Final summary with progress_container: progress_bar.progress(1.0) progress_text.text("โœ… Processing completed!") with status_container: if successful > 0: st.success(f"๐ŸŽ‰ Processing complete! {successful} successful, {failed} failed") if successful > 0: st.balloons() else: st.error(f"โŒ Processing failed for all {failed} files. Please check file formats and content.") # Update processing state st.session_state[f'currently_processing_{session_id}'] = False st.session_state[f'processing_complete_{session_id}'] = True # Force rerun to update UI st.rerun() def process_files(uploaded_files, session_id): """Legacy function - redirect to process_files_once""" return process_files_once(uploaded_files, session_id) def handle_chat_query(query, show_response=False): """Handle chat query""" st.session_state.chat_history.append({ "role": "user", "content": query, "timestamp": datetime.now() }) try: with st.spinner("๐Ÿค– AI is analyzing..."): response = st.session_state.chatbot.query_database(query) st.session_state.chat_history.append({ "role": "assistant", "content": response, "timestamp": datetime.now() }) if show_response: with st.chat_message("assistant"): st.markdown(response) st.info("๐Ÿ’ก Switch to the 'AI Chat' section to see full conversation history!") st.rerun() except Exception as e: st.error(f"Chat error: {e}") # =============================================================================== # MAIN ENTRY POINT # =============================================================================== def main(): """Main entry point for Hugging Face Spaces""" try: if IS_HF_SPACE: st.sidebar.info("๐Ÿค— Running on Hugging Face Spaces") create_app() except Exception as e: st.error(f""" ## ๐Ÿšจ Application Error {e} Please refresh the page or check the logs for more details. """) if __name__ == "__main__": main()