| | |
| | """ |
| | AI Invoice Processing System - Complete Single File for Hugging Face Spaces |
| | A comprehensive system with AI-powered extraction, semantic search, and analytics. |
| | |
| | Author: AI Assistant |
| | Date: 2024 |
| | Version: HuggingFace Single File v1.0 |
| | """ |
| |
|
| | |
| | |
| | |
| |
|
| | import os |
| | import json |
| | import re |
| | import tempfile |
| | import shutil |
| | import pickle |
| | import numpy as np |
| | from datetime import datetime |
| | from typing import Dict, List, Optional, Tuple |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | import time |
| | import logging |
| | import uuid |
| |
|
| | |
| | IS_HF_SPACE = os.getenv("SPACE_ID") is not None |
| |
|
| | |
| | HF_TOKEN = None |
| | try: |
| | |
| | HF_TOKEN = st.secrets.get("HF_TOKEN", None) |
| | except: |
| | |
| | HF_TOKEN = os.getenv("HF_TOKEN", None) |
| |
|
| | |
| | import streamlit as st |
| | import sqlite3 |
| | import pandas as pd |
| | import plotly.express as px |
| | import plotly.graph_objects as go |
| | import requests |
| |
|
| | |
| | try: |
| | import faiss |
| | FAISS_AVAILABLE = True |
| | except ImportError: |
| | FAISS_AVAILABLE = False |
| | st.warning("β οΈ FAISS not available. Vector search will be disabled.") |
| |
|
| | try: |
| | from sentence_transformers import SentenceTransformer |
| | SENTENCE_TRANSFORMERS_AVAILABLE = True |
| | except ImportError: |
| | SENTENCE_TRANSFORMERS_AVAILABLE = False |
| | st.warning("β οΈ Sentence Transformers not available. Using fallback methods.") |
| |
|
| | try: |
| | import torch |
| | TORCH_AVAILABLE = True |
| | except ImportError: |
| | TORCH_AVAILABLE = False |
| |
|
| | |
| | try: |
| | import pdfplumber |
| | PDF_PROCESSING_AVAILABLE = True |
| | PDF_PROCESSOR = "pdfplumber" |
| | except ImportError: |
| | try: |
| | import PyPDF2 |
| | PDF_PROCESSING_AVAILABLE = True |
| | PDF_PROCESSOR = "PyPDF2" |
| | except ImportError: |
| | PDF_PROCESSING_AVAILABLE = False |
| | PDF_PROCESSOR = None |
| |
|
| | |
| | |
| | |
| |
|
| | st.set_page_config( |
| | page_title="AI Invoice Processing System", |
| | page_icon="π", |
| | layout="wide", |
| | initial_sidebar_state="expanded", |
| | menu_items={ |
| | 'Get Help': 'https://huggingface.co/spaces', |
| | 'Report a bug': 'https://huggingface.co/spaces', |
| | 'About': """ |
| | # AI Invoice Processing System |
| | Built for Hugging Face Spaces with AI-powered extraction and semantic search. |
| | """ |
| | } |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | HF_CONFIG = { |
| | "max_file_size_mb": 10, |
| | "max_concurrent_files": 3, |
| | "timeout_seconds": 30, |
| | "use_cpu_only": True, |
| | "embedding_model": "all-MiniLM-L6-v2", |
| | "cache_dir": "./cache", |
| | "data_dir": "./data", |
| | "enable_ollama": False, |
| | } |
| |
|
| | |
| | os.makedirs(HF_CONFIG["cache_dir"], exist_ok=True) |
| | os.makedirs(HF_CONFIG["data_dir"], exist_ok=True) |
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class InvoiceData: |
| | """Data structure for extracted invoice information""" |
| | supplier_name: str = "" |
| | buyer_name: str = "" |
| | invoice_number: str = "" |
| | date: str = "" |
| | amount: float = 0.0 |
| | quantity: int = 0 |
| | product_description: str = "" |
| | file_path: str = "" |
| | extraction_confidence: float = 0.0 |
| | processing_method: str = "regex" |
| |
|
| | @dataclass |
| | class VectorSearchResult: |
| | """Data structure for vector search results""" |
| | invoice_id: str |
| | invoice_number: str |
| | supplier_name: str |
| | similarity_score: float |
| | content_preview: str |
| | metadata: Dict |
| |
|
| | |
| | |
| | |
| |
|
| | class DocumentProcessor: |
| | """Simplified document processor for Hugging Face Spaces""" |
| | |
| | def __init__(self): |
| | self.setup_processors() |
| | |
| | def setup_processors(self): |
| | """Setup available document processors""" |
| | self.processors = {} |
| | |
| | |
| | if PDF_PROCESSING_AVAILABLE: |
| | if PDF_PROCESSOR == "pdfplumber": |
| | self.processors['pdf'] = self.extract_with_pdfplumber |
| | st.success("β
PDF processing available (pdfplumber)") |
| | elif PDF_PROCESSOR == "PyPDF2": |
| | self.processors['pdf'] = self.extract_with_pypdf2 |
| | st.success("β
PDF processing available (PyPDF2)") |
| | else: |
| | st.warning("β οΈ No PDF processor available") |
| | |
| | |
| | self.processors['txt'] = self.extract_text_file |
| | |
| | def extract_with_pdfplumber(self, file_path: str) -> str: |
| | """Extract text using pdfplumber""" |
| | try: |
| | import pdfplumber |
| | text = "" |
| | with pdfplumber.open(file_path) as pdf: |
| | for page in pdf.pages: |
| | page_text = page.extract_text() |
| | if page_text: |
| | text += page_text + "\n" |
| | return text |
| | except Exception as e: |
| | st.error(f"PDF extraction failed: {e}") |
| | return "" |
| | |
| | def extract_with_pypdf2(self, file_path: str) -> str: |
| | """Extract text using PyPDF2""" |
| | try: |
| | import PyPDF2 |
| | text = "" |
| | with open(file_path, 'rb') as file: |
| | pdf_reader = PyPDF2.PdfReader(file) |
| | for page in pdf_reader.pages: |
| | text += page.extract_text() + "\n" |
| | return text |
| | except Exception as e: |
| | st.error(f"PDF extraction failed: {e}") |
| | return "" |
| | |
| | def extract_text_file(self, file_path: str) -> str: |
| | """Extract text from text files""" |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | return f.read() |
| | except Exception as e: |
| | st.error(f"Text file extraction failed: {e}") |
| | return "" |
| | |
| | def extract_text_from_document(self, file_path: str) -> str: |
| | """Extract text from document based on file type""" |
| | file_ext = Path(file_path).suffix.lower() |
| | |
| | if file_ext == '.pdf': |
| | processor = self.processors.get('pdf') |
| | elif file_ext == '.txt': |
| | processor = self.processors.get('txt') |
| | else: |
| | st.warning(f"Unsupported file type: {file_ext}") |
| | return "" |
| | |
| | if processor: |
| | return processor(file_path) |
| | else: |
| | st.error(f"No processor available for {file_ext}") |
| | return "" |
| |
|
| | |
| | |
| | |
| |
|
| | class AIExtractor: |
| | """AI extraction for Hugging Face Spaces with Mistral 7B support""" |
| | |
| | def __init__(self): |
| | self.use_mistral = self.setup_mistral() |
| | self.use_transformers = self.setup_transformers() if not self.use_mistral else False |
| | |
| | def setup_mistral(self): |
| | """Try to setup Mistral 7B model with proper authentication""" |
| | try: |
| | |
| | if not HF_TOKEN: |
| | st.warning("β οΈ Hugging Face token not found. Add HF_TOKEN to secrets for Mistral access.") |
| | return False |
| | |
| | |
| | import psutil |
| | memory_gb = psutil.virtual_memory().total / (1024**3) |
| | |
| | if memory_gb < 8: |
| | st.warning("β οΈ Insufficient memory for Mistral 7B. Using lighter models.") |
| | return False |
| | |
| | from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline |
| | from huggingface_hub import login |
| | |
| | |
| | login(token=HF_TOKEN) |
| | |
| | with st.spinner("π Loading Mistral 7B model (this may take a few minutes)..."): |
| | |
| | model_name = "mistralai/Mistral-7B-Instruct-v0.1" |
| | |
| | |
| | self.mistral_tokenizer = AutoTokenizer.from_pretrained( |
| | model_name, |
| | cache_dir=HF_CONFIG["cache_dir"], |
| | token=HF_TOKEN |
| | ) |
| | |
| | self.mistral_model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | torch_dtype=torch.float16 if TORCH_AVAILABLE else None, |
| | device_map="auto" if TORCH_AVAILABLE else None, |
| | load_in_8bit=True, |
| | cache_dir=HF_CONFIG["cache_dir"], |
| | token=HF_TOKEN |
| | ) |
| | |
| | |
| | self.mistral_pipeline = pipeline( |
| | "text-generation", |
| | model=self.mistral_model, |
| | tokenizer=self.mistral_tokenizer, |
| | torch_dtype=torch.float16 if TORCH_AVAILABLE else None, |
| | device_map="auto" if TORCH_AVAILABLE else None |
| | ) |
| | |
| | st.success("β
Mistral 7B model loaded successfully!") |
| | return True |
| | |
| | except ImportError as e: |
| | st.warning(f"β οΈ Missing dependencies for Mistral 7B: {e}") |
| | return False |
| | except Exception as e: |
| | st.warning(f"β οΈ Mistral 7B not available: {e}") |
| | st.info("π‘ To use Mistral 7B: Add your Hugging Face token to secrets as 'HF_TOKEN'") |
| | return False |
| | |
| | def setup_transformers(self): |
| | """Fallback to lighter NER model""" |
| | try: |
| | from transformers import pipeline |
| | |
| | with st.spinner("Loading fallback AI model..."): |
| | self.ner_pipeline = pipeline( |
| | "ner", |
| | model="dbmdz/bert-large-cased-finetuned-conll03-english", |
| | aggregation_strategy="simple" |
| | ) |
| | |
| | st.success("β
Fallback AI extraction model loaded") |
| | return True |
| | |
| | except Exception as e: |
| | st.warning(f"β οΈ AI extraction not available: {e}") |
| | return False |
| | |
| | def extract_with_mistral(self, text: str) -> InvoiceData: |
| | """Extract invoice data using Mistral 7B""" |
| | try: |
| | |
| | prompt = f"""<s>[INST] You are an expert at extracting structured information from invoices. |
| | |
| | Extract the following information from this invoice text and respond ONLY with valid JSON: |
| | |
| | {{ |
| | "invoice_number": "invoice or bill number", |
| | "supplier_name": "company providing goods/services", |
| | "buyer_name": "company receiving goods/services", |
| | "date": "date in YYYY-MM-DD format", |
| | "amount": "total amount as number only", |
| | "quantity": "total quantity as integer", |
| | "product_description": "brief description of items/services" |
| | }} |
| | |
| | Invoice text: |
| | {text[:2000]} |
| | |
| | Respond with JSON only: [/INST]""" |
| |
|
| | |
| | response = self.mistral_pipeline( |
| | prompt, |
| | max_new_tokens=300, |
| | temperature=0.1, |
| | do_sample=True, |
| | pad_token_id=self.mistral_tokenizer.eos_token_id |
| | ) |
| | |
| | |
| | generated_text = response[0]['generated_text'] |
| | |
| | |
| | json_start = generated_text.find('{') |
| | json_end = generated_text.rfind('}') + 1 |
| | |
| | if json_start != -1 and json_end > json_start: |
| | json_str = generated_text[json_start:json_end] |
| | |
| | |
| | import json |
| | data = json.loads(json_str) |
| | |
| | |
| | invoice_data = InvoiceData() |
| | invoice_data.supplier_name = str(data.get('supplier_name', '')).strip() |
| | invoice_data.buyer_name = str(data.get('buyer_name', '')).strip() |
| | invoice_data.invoice_number = str(data.get('invoice_number', '')).strip() |
| | invoice_data.date = self.parse_date(str(data.get('date', ''))) |
| | |
| | |
| | try: |
| | amount_val = data.get('amount', 0) |
| | if isinstance(amount_val, str): |
| | amount_clean = re.sub(r'[^\d.]', '', amount_val) |
| | invoice_data.amount = float(amount_clean) if amount_clean else 0.0 |
| | else: |
| | invoice_data.amount = float(amount_val) |
| | except: |
| | invoice_data.amount = 0.0 |
| | |
| | |
| | try: |
| | qty_val = data.get('quantity', 0) |
| | invoice_data.quantity = int(float(str(qty_val).replace(',', ''))) |
| | except: |
| | invoice_data.quantity = 0 |
| | |
| | invoice_data.product_description = str(data.get('product_description', '')).strip() |
| | invoice_data.extraction_confidence = 0.95 |
| | invoice_data.processing_method = "mistral_7b" |
| | |
| | return invoice_data |
| | else: |
| | st.warning("β οΈ Mistral response didn't contain valid JSON, falling back to regex") |
| | return self.extract_with_regex(text) |
| | |
| | except Exception as e: |
| | st.error(f"Mistral extraction failed: {e}") |
| | return self.extract_with_regex(text) |
| | |
| | def extract_with_ai(self, text: str) -> InvoiceData: |
| | """Extract invoice data using available AI method""" |
| | if self.use_mistral: |
| | st.info("π€ Using Mistral 7B for extraction...") |
| | return self.extract_with_mistral(text) |
| | elif self.use_transformers: |
| | st.info("π€ Using NER model for extraction...") |
| | return self.extract_with_ner(text) |
| | else: |
| | st.info("π§ Using regex extraction...") |
| | return self.extract_with_regex(text) |
| | |
| | def extract_with_ner(self, text: str) -> InvoiceData: |
| | """Extract using NER model (fallback method)""" |
| | try: |
| | |
| | entities = self.ner_pipeline(text[:512]) |
| | |
| | invoice_data = InvoiceData() |
| | invoice_data.processing_method = "ai_ner" |
| | |
| | |
| | for entity in entities: |
| | entity_text = entity['word'].replace('##', '') |
| | |
| | if entity['entity_group'] == 'ORG': |
| | if not invoice_data.supplier_name: |
| | invoice_data.supplier_name = entity_text |
| | elif not invoice_data.buyer_name: |
| | invoice_data.buyer_name = entity_text |
| | |
| | elif entity['entity_group'] == 'MISC': |
| | if not invoice_data.invoice_number and any(c.isdigit() for c in entity_text): |
| | invoice_data.invoice_number = entity_text |
| | |
| | |
| | regex_data = self.extract_with_regex(text) |
| | |
| | |
| | if not invoice_data.invoice_number: |
| | invoice_data.invoice_number = regex_data.invoice_number |
| | if not invoice_data.amount: |
| | invoice_data.amount = regex_data.amount |
| | if not invoice_data.date: |
| | invoice_data.date = regex_data.date |
| | if not invoice_data.quantity: |
| | invoice_data.quantity = regex_data.quantity |
| | |
| | invoice_data.extraction_confidence = 0.8 |
| | |
| | return invoice_data |
| | |
| | except Exception as e: |
| | st.error(f"NER extraction failed: {e}") |
| | return self.extract_with_regex(text) |
| | |
| | def extract_with_regex(self, text: str) -> InvoiceData: |
| | """Enhanced regex extraction with better amount detection""" |
| | invoice_data = InvoiceData() |
| | invoice_data.processing_method = "regex" |
| | |
| | |
| | patterns = { |
| | 'invoice_number': [ |
| | r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| | r'bill\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| | r'inv\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| | r'ref\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| | r'#\s*([A-Z0-9\-_/]{3,})', |
| | r'(?:^|\s)([A-Z]{2,}\d{3,}|\d{3,}[A-Z]{2,})', |
| | ], |
| | 'amount': [ |
| | |
| | r'total\s*(?:amount)?\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | r'amount\s*(?:due|paid|total)?\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | r'grand\s*total\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | r'net\s*(?:amount|total)\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | r'sub\s*total\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | |
| | |
| | r'[\$βΉΒ£β¬]\s*([0-9,]+\.?\d*)', |
| | |
| | |
| | r'([0-9,]+\.?\d*)\s*[\$βΉΒ£β¬]?\s* |
| | |
| | def parse_date(self, date_str: str) -> str: |
| | """Parse date to YYYY-MM-DD format""" |
| | if not date_str: |
| | return "" |
| | |
| | formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] |
| | |
| | for fmt in formats: |
| | try: |
| | parsed_date = datetime.strptime(date_str, fmt) |
| | return parsed_date.strftime('%Y-%m-%d') |
| | except ValueError: |
| | continue |
| | |
| | return date_str |
| | |
| | # =============================================================================== |
| | # VECTOR STORE CLASS |
| | # =============================================================================== |
| | |
| | class VectorStore: |
| | """Simplified vector store for Hugging Face Spaces""" |
| | |
| | def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): |
| | self.embedding_model_name = embedding_model |
| | self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") |
| | self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") |
| | self.embedding_model = None |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | self.embedding_dimension = None |
| | |
| | self.setup_embedding_model() |
| | self.load_vector_store() |
| | |
| | def setup_embedding_model(self): |
| | """Initialize the sentence transformer model""" |
| | if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| | st.warning("β οΈ Sentence Transformers not available. Vector search disabled.") |
| | return |
| | |
| | try: |
| | with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): |
| | self.embedding_model = SentenceTransformer( |
| | self.embedding_model_name, |
| | cache_folder=HF_CONFIG["cache_dir"] |
| | ) |
| | |
| | # Get embedding dimension |
| | test_embedding = self.embedding_model.encode(["test"]) |
| | self.embedding_dimension = test_embedding.shape[0] |
| | |
| | st.success(f"β
Embedding model loaded: {self.embedding_model_name}") |
| | |
| | except Exception as e: |
| | st.error(f"β Failed to load embedding model: {e}") |
| | self.embedding_model = None |
| | |
| | def load_vector_store(self): |
| | """Load existing vector store""" |
| | try: |
| | if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): |
| | with open(self.vector_store_path, 'rb') as f: |
| | self.vectors = pickle.load(f) |
| | |
| | with open(self.metadata_path, 'rb') as f: |
| | self.document_metadata = pickle.load(f) |
| | |
| | st.success(f"β
Vector store loaded: {len(self.document_metadata)} documents") |
| | else: |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | st.info("π New vector store initialized") |
| | |
| | except Exception as e: |
| | st.error(f"β Error loading vector store: {e}") |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | |
| | def save_vector_store(self): |
| | """Save vector store to disk""" |
| | try: |
| | with open(self.vector_store_path, 'wb') as f: |
| | pickle.dump(self.vectors, f) |
| | |
| | with open(self.metadata_path, 'wb') as f: |
| | pickle.dump(self.document_metadata, f) |
| | |
| | return True |
| | except Exception as e: |
| | st.error(f"Error saving vector store: {e}") |
| | return False |
| | |
| | def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: |
| | """Create searchable text from invoice data""" |
| | text_parts = [] |
| | |
| | for field, value in invoice_data.items(): |
| | if value and field != 'id': |
| | text_parts.append(f"{field}: {value}") |
| | |
| | if raw_text: |
| | text_parts.append(f"content: {raw_text[:300]}") |
| | |
| | return " | ".join(text_parts) |
| | |
| | def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: |
| | """Add a document to the vector store""" |
| | if not self.embedding_model: |
| | return False |
| | |
| | try: |
| | document_text = self.create_document_text(invoice_data, raw_text) |
| | |
| | # Generate embedding |
| | embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) |
| | |
| | # Create metadata |
| | metadata = { |
| | 'invoice_id': invoice_data.get('id', ''), |
| | 'invoice_number': invoice_data.get('invoice_number', ''), |
| | 'supplier_name': invoice_data.get('supplier_name', ''), |
| | 'buyer_name': invoice_data.get('buyer_name', ''), |
| | 'amount': invoice_data.get('amount', 0), |
| | 'date': invoice_data.get('date', ''), |
| | 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), |
| | 'document_text': document_text[:200], |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | |
| | # Add to store |
| | self.vectors.append(embedding) |
| | self.document_metadata.append(metadata) |
| | |
| | return True |
| | |
| | except Exception as e: |
| | st.error(f"Error adding document to vector store: {e}") |
| | return False |
| | |
| | def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: |
| | """Perform semantic search using cosine similarity""" |
| | if not self.embedding_model or not self.vectors: |
| | return [] |
| | |
| | try: |
| | # Generate query embedding |
| | query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) |
| | |
| | # Calculate similarities |
| | similarities = [] |
| | for i, doc_embedding in enumerate(self.vectors): |
| | similarity = np.dot(query_embedding, doc_embedding) |
| | similarities.append((similarity, i)) |
| | |
| | # Sort by similarity |
| | similarities.sort(reverse=True) |
| | |
| | # Return top results |
| | results = [] |
| | for similarity, idx in similarities[:top_k]: |
| | if similarity > 0.1: # Relevance threshold |
| | metadata = self.document_metadata[idx] |
| | result = VectorSearchResult( |
| | invoice_id=metadata.get('invoice_id', ''), |
| | invoice_number=metadata.get('invoice_number', ''), |
| | supplier_name=metadata.get('supplier_name', ''), |
| | similarity_score=float(similarity), |
| | content_preview=metadata.get('document_text', ''), |
| | metadata=metadata |
| | ) |
| | results.append(result) |
| | |
| | return results |
| | |
| | except Exception as e: |
| | st.error(f"Error in semantic search: {e}") |
| | return [] |
| | |
| | # =============================================================================== |
| | # MAIN PROCESSOR CLASS |
| | # =============================================================================== |
| | |
| | class InvoiceProcessor: |
| | """Main invoice processor for Hugging Face Spaces""" |
| | |
| | def __init__(self): |
| | self.setup_storage() |
| | self.document_processor = DocumentProcessor() |
| | self.ai_extractor = AIExtractor() |
| | self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None |
| | |
| | # Initialize stats |
| | self.processing_stats = { |
| | 'total_processed': 0, |
| | 'successful': 0, |
| | 'failed': 0, |
| | 'start_time': datetime.now() |
| | } |
| | |
| | def setup_storage(self): |
| | """Setup storage paths""" |
| | self.data_dir = HF_CONFIG["data_dir"] |
| | self.json_path = os.path.join(self.data_dir, "invoices.json") |
| | |
| | # Initialize JSON storage |
| | if not os.path.exists(self.json_path): |
| | initial_data = { |
| | "metadata": { |
| | "created_at": datetime.now().isoformat(), |
| | "version": "hf_v1.0", |
| | "total_invoices": 0 |
| | }, |
| | "invoices": [], |
| | "summary": { |
| | "total_amount": 0.0, |
| | "unique_suppliers": [], |
| | "processing_stats": {"successful": 0, "failed": 0} |
| | } |
| | } |
| | self.save_json_data(initial_data) |
| | |
| | def load_json_data(self) -> dict: |
| | """Load invoice data from JSON""" |
| | try: |
| | with open(self.json_path, 'r', encoding='utf-8') as f: |
| | return json.load(f) |
| | except (FileNotFoundError, json.JSONDecodeError): |
| | self.setup_storage() |
| | return self.load_json_data() |
| | |
| | def save_json_data(self, data: dict): |
| | """Save invoice data to JSON""" |
| | try: |
| | with open(self.json_path, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | st.error(f"Error saving data: {e}") |
| | |
| | def process_uploaded_file(self, uploaded_file) -> InvoiceData: |
| | """Process a single uploaded file with enhanced debugging""" |
| | self.processing_stats['total_processed'] += 1 |
| | |
| | try: |
| | # Debug file info |
| | file_size = len(uploaded_file.getvalue()) |
| | file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' |
| | |
| | st.info(f"π Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") |
| | |
| | # Check file size |
| | if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: |
| | error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" |
| | st.error(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | # Check file type |
| | if file_extension not in ['pdf', 'txt']: |
| | error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" |
| | st.warning(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | # Save temporarily |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: |
| | file_content = uploaded_file.getvalue() |
| | tmp_file.write(file_content) |
| | tmp_file_path = tmp_file.name |
| | |
| | st.info(f"πΎ Saved temporarily to: {tmp_file_path}") |
| | |
| | try: |
| | # Extract text |
| | st.info("π Extracting text from document...") |
| | text = self.document_processor.extract_text_from_document(tmp_file_path) |
| | |
| | if not text or not text.strip(): |
| | st.warning(f"β No text extracted from {uploaded_file.name}") |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | text_length = len(text) |
| | st.info(f"π Extracted {text_length} characters of text") |
| | |
| | # Show text preview |
| | if text_length > 0: |
| | with st.expander("π Text Preview (First 500 characters)", expanded=False): |
| | st.text(text[:500] + "..." if len(text) > 500 else text) |
| | |
| | # Extract invoice data |
| | st.info("π€ Extracting invoice data using AI/Regex...") |
| | invoice_data = self.ai_extractor.extract_with_ai(text) |
| | invoice_data.file_path = uploaded_file.name |
| | |
| | # Show extraction results |
| | st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| | |
| | # Save to storage |
| | st.info("πΎ Saving extracted data...") |
| | self.save_invoice_data(invoice_data, text, file_size) |
| | |
| | self.processing_stats['successful'] += 1 |
| | st.success(f"β
Successfully processed {uploaded_file.name}") |
| | |
| | return invoice_data |
| | |
| | finally: |
| | # Cleanup |
| | try: |
| | os.unlink(tmp_file_path) |
| | st.info("π§Ή Cleaned up temporary file") |
| | except: |
| | pass |
| | |
| | except Exception as e: |
| | error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| | st.error(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | |
| | # Show detailed error for debugging |
| | with st.expander("π Error Details", expanded=False): |
| | st.code(str(e)) |
| | import traceback |
| | st.code(traceback.format_exc()) |
| | |
| | return InvoiceData() |
| | |
| | def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| | """Save invoice data to JSON and vector store""" |
| | try: |
| | # Load existing data |
| | data = self.load_json_data() |
| | |
| | # Create invoice record |
| | invoice_record = { |
| | "id": len(data["invoices"]) + 1, |
| | "invoice_number": invoice_data.invoice_number, |
| | "supplier_name": invoice_data.supplier_name, |
| | "buyer_name": invoice_data.buyer_name, |
| | "date": invoice_data.date, |
| | "amount": invoice_data.amount, |
| | "quantity": invoice_data.quantity, |
| | "product_description": invoice_data.product_description, |
| | "file_info": { |
| | "file_name": invoice_data.file_path, |
| | "file_size": file_size |
| | }, |
| | "extraction_info": { |
| | "confidence": invoice_data.extraction_confidence, |
| | "method": invoice_data.processing_method, |
| | "raw_text_preview": raw_text[:300] |
| | }, |
| | "timestamps": { |
| | "created_at": datetime.now().isoformat() |
| | } |
| | } |
| | |
| | # Add to invoices |
| | data["invoices"].append(invoice_record) |
| | |
| | # Update summary |
| | self.update_summary(data) |
| | |
| | # Save JSON |
| | self.save_json_data(data) |
| | |
| | # Add to vector store |
| | if self.vector_store: |
| | self.vector_store.add_document(invoice_record, raw_text) |
| | self.vector_store.save_vector_store() |
| | |
| | except Exception as e: |
| | st.error(f"Error saving invoice data: {e}") |
| | |
| | def update_summary(self, data: dict): |
| | """Update summary statistics""" |
| | invoices = data["invoices"] |
| | |
| | total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| | unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| | |
| | data["summary"] = { |
| | "total_amount": total_amount, |
| | "unique_suppliers": unique_suppliers, |
| | "processing_stats": { |
| | "successful": self.processing_stats['successful'], |
| | "failed": self.processing_stats['failed'], |
| | "total_processed": self.processing_stats['total_processed'] |
| | } |
| | } |
| | |
| | data["metadata"]["last_updated"] = datetime.now().isoformat() |
| | data["metadata"]["total_invoices"] = len(invoices) |
| | |
| | # =============================================================================== |
| | # CHATBOT CLASS |
| | # =============================================================================== |
| | |
| | class ChatBot: |
| | """Chatbot for invoice queries""" |
| | |
| | def __init__(self, processor: InvoiceProcessor): |
| | self.processor = processor |
| | |
| | def query_database(self, query: str) -> str: |
| | """Process user query and return response""" |
| | try: |
| | data = self.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | return "No invoice data found. Please upload some invoices first." |
| | |
| | query_lower = query.lower() |
| | |
| | # Handle different query types |
| | if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| | return self.generate_summary(data) |
| | |
| | elif "count" in query_lower or "how many" in query_lower: |
| | return self.handle_count_query(data) |
| | |
| | elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| | return self.handle_amount_query(data) |
| | |
| | elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| | return self.handle_supplier_query(data, query) |
| | |
| | elif self.processor.vector_store: |
| | return self.handle_semantic_search(query) |
| | |
| | else: |
| | return self.handle_general_query(data, query) |
| | |
| | except Exception as e: |
| | return f"Error processing query: {e}" |
| | |
| | def generate_summary(self, data: dict) -> str: |
| | """Generate comprehensive summary""" |
| | invoices = data.get("invoices", []) |
| | summary = data.get("summary", {}) |
| | |
| | if not invoices: |
| | return "No invoices found in the system." |
| | |
| | total_amount = summary.get("total_amount", 0) |
| | avg_amount = total_amount / len(invoices) if invoices else 0 |
| | unique_suppliers = len(summary.get("unique_suppliers", [])) |
| | |
| | response = f""" |
| | **π Invoice System Summary** |
| | |
| | β’ **Total Invoices**: {len(invoices):,} |
| | β’ **Total Value**: βΉ{total_amount:,.2f} |
| | β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| | β’ **Unique Suppliers**: {unique_suppliers} |
| | |
| | **π Processing Stats** |
| | β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| | β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| | |
| | **π Recent Invoices** |
| | """ |
| | |
| | # Show recent invoices |
| | recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| | for i, inv in enumerate(recent, 1): |
| | response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| | |
| | return response |
| | |
| | def handle_count_query(self, data: dict) -> str: |
| | """Handle count-related queries""" |
| | invoices = data.get("invoices", []) |
| | total = len(invoices) |
| | unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| | |
| | return f""" |
| | **π Invoice Count Summary** |
| | |
| | β’ **Total Records**: {total} |
| | β’ **Unique Invoice Numbers**: {unique_numbers} |
| | β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| | |
| | **π
Processing Timeline** |
| | β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| | β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| | """ |
| | |
| | def handle_amount_query(self, data: dict) -> str: |
| | """Handle amount-related queries""" |
| | invoices = data.get("invoices", []) |
| | amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| | |
| | if not amounts: |
| | return "No amount information found in invoices." |
| | |
| | total_amount = sum(amounts) |
| | avg_amount = total_amount / len(amounts) |
| | max_amount = max(amounts) |
| | min_amount = min(amounts) |
| | |
| | # Find high-value invoices |
| | high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| | high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| | |
| | response = f""" |
| | **π° Financial Analysis** |
| | |
| | β’ **Total Amount**: βΉ{total_amount:,.2f} |
| | β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| | β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| | β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| | |
| | **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| | """ |
| | |
| | for i, inv in enumerate(high_value_invoices[:5], 1): |
| | response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| | |
| | return response |
| | |
| | def handle_supplier_query(self, data: dict, query: str) -> str: |
| | """Handle supplier-related queries""" |
| | invoices = data.get("invoices", []) |
| | |
| | # Count invoices by supplier |
| | supplier_counts = {} |
| | supplier_amounts = {} |
| | |
| | for inv in invoices: |
| | supplier = inv.get('supplier_name', '').strip() |
| | if supplier: |
| | supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| | supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| | |
| | if not supplier_counts: |
| | return "No supplier information found in invoices." |
| | |
| | # Sort suppliers by amount |
| | top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| | |
| | response = f""" |
| | **π’ Supplier Analysis** |
| | |
| | β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| | β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| | |
| | **π° Top Suppliers by Amount** |
| | """ |
| | |
| | for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| | count = supplier_counts[supplier] |
| | avg = amount / count if count > 0 else 0 |
| | response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| | |
| | return response |
| | |
| | def handle_semantic_search(self, query: str) -> str: |
| | """Handle semantic search queries""" |
| | try: |
| | results = self.processor.vector_store.semantic_search(query, top_k=5) |
| | |
| | if not results: |
| | return f"No relevant results found for '{query}'. Try different keywords." |
| | |
| | response = f"π **Semantic Search Results for '{query}'**\n\n" |
| | |
| | for i, result in enumerate(results, 1): |
| | response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| | response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| | response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| | response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| | |
| | return response |
| | |
| | except Exception as e: |
| | return f"Semantic search error: {e}" |
| | |
| | def handle_general_query(self, data: dict, query: str) -> str: |
| | """Handle general queries with keyword search""" |
| | invoices = data.get("invoices", []) |
| | query_words = query.lower().split() |
| | |
| | # Simple keyword matching |
| | matching_invoices = [] |
| | for inv in invoices: |
| | text_to_search = ( |
| | inv.get('supplier_name', '') + ' ' + |
| | inv.get('buyer_name', '') + ' ' + |
| | inv.get('product_description', '') + ' ' + |
| | inv.get('extraction_info', {}).get('raw_text_preview', '') |
| | ).lower() |
| | |
| | if any(word in text_to_search for word in query_words): |
| | matching_invoices.append(inv) |
| | |
| | if not matching_invoices: |
| | return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| | |
| | response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| | |
| | for i, inv in enumerate(matching_invoices[:5], 1): |
| | response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| | response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| | response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| | |
| | if len(matching_invoices) > 5: |
| | response += f"... and {len(matching_invoices) - 5} more results." |
| | |
| | return response |
| | |
| | # =============================================================================== |
| | # STREAMLIT APPLICATION |
| | # =============================================================================== |
| | |
| | def create_app(): |
| | """Main Streamlit application""" |
| | |
| | # Generate unique session ID for this run |
| | if 'session_id' not in st.session_state: |
| | st.session_state.session_id = str(uuid.uuid4())[:8] |
| | |
| | session_id = st.session_state.session_id |
| | |
| | # Custom CSS |
| | st.markdown(""" |
| | <style> |
| | .main-header { |
| | font-size: 2.5rem; |
| | font-weight: bold; |
| | text-align: center; |
| | color: #FF6B35; |
| | margin-bottom: 1rem; |
| | } |
| | .feature-box { |
| | background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| | padding: 1rem; |
| | border-radius: 10px; |
| | color: white; |
| | margin: 0.5rem 0; |
| | text-align: center; |
| | } |
| | .status-ok { color: #28a745; font-weight: bold; } |
| | .status-warning { color: #ffc107; font-weight: bold; } |
| | .status-error { color: #dc3545; font-weight: bold; } |
| | </style> |
| | """, unsafe_allow_html=True) |
| | |
| | # Header |
| | st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| | st.markdown(""" |
| | <div style="text-align: center; margin-bottom: 2rem;"> |
| | <p style="font-size: 1.1rem; color: #666;"> |
| | AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| | </p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | # Initialize processor |
| | if 'processor' not in st.session_state: |
| | with st.spinner("π§ Initializing AI Invoice Processor..."): |
| | try: |
| | st.session_state.processor = InvoiceProcessor() |
| | st.session_state.chatbot = ChatBot(st.session_state.processor) |
| | st.session_state.chat_history = [] |
| | st.success("β
System initialized successfully!") |
| | except Exception as e: |
| | st.error(f"β Initialization failed: {e}") |
| | st.stop() |
| | |
| | # Sidebar |
| | with st.sidebar: |
| | st.header("ποΈ System Status") |
| | |
| | processor = st.session_state.processor |
| | |
| | # Component status |
| | if processor.document_processor.processors: |
| | st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| | |
| | if processor.ai_extractor.use_transformers: |
| | st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| | |
| | if processor.vector_store and processor.vector_store.embedding_model: |
| | st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| | |
| | # Quick stats |
| | st.header("π Quick Stats") |
| | try: |
| | data = processor.load_json_data() |
| | total_invoices = len(data.get("invoices", [])) |
| | total_amount = data.get("summary", {}).get("total_amount", 0) |
| | |
| | st.metric("Total Invoices", total_invoices) |
| | st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| | st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| | |
| | except Exception as e: |
| | st.error(f"Stats error: {e}") |
| | |
| | # System info |
| | st.header("βοΈ System Info") |
| | st.info(f""" |
| | **Session ID:** {session_id} |
| | |
| | **Limits:** |
| | β’ Max file size: 10MB |
| | β’ Max concurrent files: 3 |
| | β’ Timeout: 30s |
| | """) |
| | |
| | # Main navigation |
| | selected_tab = st.radio( |
| | "Choose a section:", |
| | ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| | horizontal=True, |
| | key=f"main_navigation_{session_id}" |
| | ) |
| | |
| | # ------------------------------------------------------------------------- |
| | # UPLOAD & PROCESS SECTION |
| | # ------------------------------------------------------------------------- |
| | |
| | if selected_tab == "π€ Upload & Process": |
| | st.header("π€ Upload Invoice Documents") |
| | |
| | # Feature highlights |
| | col1, col2, col3 = st.columns(3) |
| | |
| | with col1: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π€ AI Extraction</h4> |
| | <p>Advanced NLP models extract structured data automatically</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | with col2: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π Smart Search</h4> |
| | <p>Semantic search finds invoices using natural language</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | with col3: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π Analytics</h4> |
| | <p>Comprehensive insights and visualizations</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | # File upload |
| | st.markdown("### π Upload Your Invoices") |
| | |
| | # Initialize session state for files if not exists |
| | if f'uploaded_files_{session_id}' not in st.session_state: |
| | st.session_state[f'uploaded_files_{session_id}'] = None |
| | if f'processing_complete_{session_id}' not in st.session_state: |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | if f'currently_processing_{session_id}' not in st.session_state: |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | if f'processed_file_hashes_{session_id}' not in st.session_state: |
| | st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| | |
| | # File uploader with stable key |
| | uploaded_files = st.file_uploader( |
| | "Choose invoice files (PDF, TXT supported)", |
| | type=['pdf', 'txt'], |
| | accept_multiple_files=True, |
| | help="Maximum file size: 10MB per file", |
| | key=f"file_uploader_stable_{session_id}" |
| | ) |
| | |
| | # Store uploaded files in session state only if they're new |
| | if uploaded_files: |
| | |
| | current_file_hashes = set() |
| | for file in uploaded_files: |
| | file_hash = hash((file.name, file.size)) |
| | current_file_hashes.add(file_hash) |
| | |
| | |
| | stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| | if current_file_hashes != stored_hashes: |
| | st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| | st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.info("π New files detected - ready for processing") |
| | |
| | |
| | current_files = st.session_state[f'uploaded_files_{session_id}'] |
| | is_processing = st.session_state[f'currently_processing_{session_id}'] |
| | is_complete = st.session_state[f'processing_complete_{session_id}'] |
| | |
| | if current_files: |
| | max_files = 3 |
| | if len(current_files) > max_files: |
| | st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| | current_files = current_files[:max_files] |
| | |
| | st.info(f"π {len(current_files)} files selected") |
| | |
| | |
| | st.markdown("**Selected Files:**") |
| | for i, file in enumerate(current_files, 1): |
| | file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| | file_hash = hash((file.name, file.size)) |
| | processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| | st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| | |
| | |
| | col1, col2 = st.columns([1, 1]) |
| | |
| | with col1: |
| | if not is_processing and not is_complete: |
| | if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| | st.session_state[f'currently_processing_{session_id}'] = True |
| | st.rerun() |
| | elif is_processing: |
| | st.info("π Processing in progress...") |
| | |
| | process_files_once(current_files, session_id) |
| | elif is_complete: |
| | st.success("β
Processing completed!") |
| | if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| | st.rerun() |
| | |
| | with col2: |
| | if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| | |
| | keys_to_clear = [ |
| | f'uploaded_files_{session_id}', |
| | f'uploaded_file_hashes_{session_id}', |
| | f'processing_complete_{session_id}', |
| | f'currently_processing_{session_id}', |
| | f'processed_file_hashes_{session_id}' |
| | ] |
| | |
| | for key in keys_to_clear: |
| | if key in st.session_state: |
| | del st.session_state[key] |
| | |
| | st.success("ποΈ Files cleared successfully!") |
| | time.sleep(1) |
| | st.rerun() |
| | |
| | else: |
| | st.info("π Please select invoice files to upload and process") |
| | |
| | |
| | if is_complete: |
| | st.markdown("### π Recent Processing Results") |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | recent_invoices = sorted( |
| | data.get("invoices", []), |
| | key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| | reverse=True |
| | )[:5] |
| | |
| | if recent_invoices: |
| | for i, inv in enumerate(recent_invoices, 1): |
| | with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| | st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| | st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| | with col2: |
| | st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| | st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| | st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| | else: |
| | st.info("No recent processing results found.") |
| | except Exception as e: |
| | st.error(f"Error loading recent results: {e}") |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π¬ AI Chat": |
| | st.header("π¬ AI Chat Interface") |
| | |
| | |
| | if st.session_state.chat_history: |
| | st.markdown("### π¬ Chat History") |
| | for i, message in enumerate(st.session_state.chat_history): |
| | with st.chat_message(message["role"]): |
| | st.markdown(message["content"]) |
| | |
| | |
| | st.markdown("### βοΈ Ask a Question") |
| | |
| | col1, col2 = st.columns([4, 1]) |
| | |
| | with col1: |
| | user_input = st.text_input( |
| | "Type your question:", |
| | placeholder="e.g., 'show me total spending'", |
| | key=f"chat_input_{session_id}" |
| | ) |
| | |
| | with col2: |
| | ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| | |
| | if ask_btn and user_input: |
| | handle_chat_query(user_input) |
| | |
| | |
| | if not st.session_state.chat_history: |
| | st.markdown("### π‘ Try These Queries") |
| | |
| | col1, col2 = st.columns(2) |
| | |
| | with col1: |
| | st.markdown("**π Basic Queries:**") |
| | basic_queries = [ |
| | "Show me a summary of all invoices", |
| | "How much have we spent in total?", |
| | "Who are our top suppliers?", |
| | "Find invoices with high amounts" |
| | ] |
| | for i, query in enumerate(basic_queries): |
| | if st.button(query, key=f"basic_{session_id}_{i}"): |
| | handle_chat_query(query) |
| | |
| | with col2: |
| | st.markdown("**π Advanced Queries:**") |
| | advanced_queries = [ |
| | "Find technology purchases", |
| | "Show office supplies", |
| | "Search consulting services", |
| | "Recent high-value invoices" |
| | ] |
| | for i, query in enumerate(advanced_queries): |
| | if st.button(query, key=f"advanced_{session_id}_{i}"): |
| | handle_chat_query(query) |
| | |
| | |
| | if st.session_state.chat_history: |
| | if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| | st.session_state.chat_history = [] |
| | st.rerun() |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π Analytics": |
| | st.header("π Analytics Dashboard") |
| | |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | st.info("π No data available. Upload some invoices to see analytics.") |
| | return |
| | |
| | |
| | df_data = [] |
| | for inv in invoices: |
| | df_data.append({ |
| | 'invoice_number': inv.get('invoice_number', ''), |
| | 'supplier_name': inv.get('supplier_name', ''), |
| | 'amount': inv.get('amount', 0), |
| | 'date': inv.get('date', ''), |
| | 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| | }) |
| | |
| | df = pd.DataFrame(df_data) |
| | |
| | |
| | col1, col2, col3, col4 = st.columns(4) |
| | |
| | with col1: |
| | st.metric("Total Invoices", len(df)) |
| | with col2: |
| | st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| | with col3: |
| | st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| | with col4: |
| | st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| | |
| | |
| | if len(df) > 0: |
| | |
| | fig_hist = px.histogram( |
| | df, |
| | x='amount', |
| | title="Invoice Amount Distribution", |
| | labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| | ) |
| | st.plotly_chart(fig_hist, use_container_width=True) |
| | |
| | |
| | if df['supplier_name'].notna().any(): |
| | supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| | |
| | if len(supplier_amounts) > 0: |
| | fig_suppliers = px.bar( |
| | x=supplier_amounts.values, |
| | y=supplier_amounts.index, |
| | orientation='h', |
| | title="Top 10 Suppliers by Total Amount", |
| | labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| | ) |
| | st.plotly_chart(fig_suppliers, use_container_width=True) |
| | |
| | except Exception as e: |
| | st.error(f"Analytics error: {e}") |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π Data Explorer": |
| | st.header("π Data Explorer") |
| | |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | st.info("π No data available. Upload some invoices first.") |
| | return |
| | |
| | |
| | df_data = [] |
| | for inv in invoices: |
| | df_data.append({ |
| | 'Invoice Number': inv.get('invoice_number', ''), |
| | 'Supplier': inv.get('supplier_name', ''), |
| | 'Buyer': inv.get('buyer_name', ''), |
| | 'Amount': inv.get('amount', 0), |
| | 'Date': inv.get('date', ''), |
| | 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| | 'Method': inv.get('extraction_info', {}).get('method', ''), |
| | 'File': inv.get('file_info', {}).get('file_name', ''), |
| | 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| | }) |
| | |
| | df = pd.DataFrame(df_data) |
| | |
| | |
| | col1, col2, col3 = st.columns(3) |
| | |
| | with col1: |
| | suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| | selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| | |
| | with col2: |
| | methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| | selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| | |
| | with col3: |
| | min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| | |
| | |
| | filtered_df = df.copy() |
| | if selected_supplier != 'All': |
| | filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| | if selected_method != 'All': |
| | filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| | if min_amount > 0: |
| | filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| | |
| | |
| | st.dataframe( |
| | filtered_df, |
| | use_container_width=True, |
| | column_config={ |
| | "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| | "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| | } |
| | ) |
| | |
| | |
| | col1, col2 = st.columns(2) |
| | |
| | with col1: |
| | if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| | csv_data = filtered_df.to_csv(index=False) |
| | st.download_button( |
| | "Download CSV", |
| | csv_data, |
| | f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| | "text/csv", |
| | key=f"download_csv_{session_id}" |
| | ) |
| | |
| | with col2: |
| | if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| | filtered_invoices = [inv for inv in invoices |
| | if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| | |
| | export_data = { |
| | "exported_at": datetime.now().isoformat(), |
| | "total_records": len(filtered_invoices), |
| | "invoices": filtered_invoices |
| | } |
| | |
| | st.download_button( |
| | "Download JSON", |
| | json.dumps(export_data, indent=2), |
| | f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| | "application/json", |
| | key=f"download_json_{session_id}" |
| | ) |
| | |
| | except Exception as e: |
| | st.error(f"Data explorer error: {e}") |
| | |
| | |
| | |
| | |
| | |
| | st.markdown("---") |
| | st.markdown("### π¬ Quick Chat (Works from any section)") |
| | |
| | global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| | |
| | if global_query: |
| | handle_chat_query(global_query, show_response=True) |
| | |
| | |
| | st.markdown("---") |
| | st.markdown(""" |
| | <div style="text-align: center; color: #666;"> |
| | <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| | <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| |
|
| | |
| | |
| | |
| |
|
| | def process_files_once(uploaded_files, session_id): |
| | """Process uploaded files only once with proper state management""" |
| | if not uploaded_files: |
| | st.error("No files to process!") |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | return |
| | |
| | st.markdown("### π Processing Files...") |
| | |
| | |
| | processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| | |
| | |
| | files_to_process = [] |
| | for file in uploaded_files: |
| | file_hash = hash((file.name, file.size)) |
| | if file_hash not in processed_hashes: |
| | files_to_process.append((file, file_hash)) |
| | |
| | if not files_to_process: |
| | st.info("β
All files have already been processed!") |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processing_complete_{session_id}'] = True |
| | return |
| | |
| | |
| | progress_container = st.container() |
| | status_container = st.container() |
| | results_container = st.container() |
| | |
| | successful = 0 |
| | failed = 0 |
| | |
| | |
| | with progress_container: |
| | progress_bar = st.progress(0) |
| | progress_text = st.empty() |
| | |
| | with status_container: |
| | st.info(f"Starting to process {len(files_to_process)} new files...") |
| | |
| | |
| | for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| | current_progress = (i + 1) / len(files_to_process) |
| | |
| | with progress_container: |
| | progress_bar.progress(current_progress) |
| | progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| | |
| | with status_container: |
| | st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| | |
| | try: |
| | |
| | result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| | |
| | |
| | processed_hashes.add(file_hash) |
| | |
| | |
| | with results_container: |
| | if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| | successful += 1 |
| | st.success(f"β
Successfully processed: {uploaded_file.name}") |
| | |
| | |
| | col1, col2, col3 = st.columns(3) |
| | with col1: |
| | st.write(f"**Invoice #:** {result.invoice_number}") |
| | st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| | with col2: |
| | st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| | st.write(f"**Date:** {result.date or 'Not found'}") |
| | with col3: |
| | st.write(f"**Method:** {result.processing_method}") |
| | st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| | |
| | st.markdown("---") |
| | else: |
| | failed += 1 |
| | st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| | if result: |
| | st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| | st.markdown("---") |
| | |
| | except Exception as e: |
| | failed += 1 |
| | |
| | processed_hashes.add(file_hash) |
| | |
| | with results_container: |
| | st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| | st.markdown("---") |
| | |
| | |
| | st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| | |
| | |
| | with progress_container: |
| | progress_bar.progress(1.0) |
| | progress_text.text("β
Processing completed!") |
| | |
| | with status_container: |
| | if successful > 0: |
| | st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| | if successful > 0: |
| | st.balloons() |
| | else: |
| | st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| | |
| | |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processing_complete_{session_id}'] = True |
| | |
| | |
| | st.rerun() |
| |
|
| | def process_files(uploaded_files, session_id): |
| | """Legacy function - redirect to process_files_once""" |
| | return process_files_once(uploaded_files, session_id) |
| |
|
| | def handle_chat_query(query, show_response=False): |
| | """Handle chat query""" |
| | st.session_state.chat_history.append({ |
| | "role": "user", |
| | "content": query, |
| | "timestamp": datetime.now() |
| | }) |
| | |
| | try: |
| | with st.spinner("π€ AI is analyzing..."): |
| | response = st.session_state.chatbot.query_database(query) |
| | |
| | st.session_state.chat_history.append({ |
| | "role": "assistant", |
| | "content": response, |
| | "timestamp": datetime.now() |
| | }) |
| | |
| | if show_response: |
| | with st.chat_message("assistant"): |
| | st.markdown(response) |
| | st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| | |
| | st.rerun() |
| | |
| | except Exception as e: |
| | st.error(f"Chat error: {e}") |
| |
|
| | |
| | |
| | |
| |
|
| | def main(): |
| | """Main entry point for Hugging Face Spaces""" |
| | try: |
| | if IS_HF_SPACE: |
| | st.sidebar.info("π€ Running on Hugging Face Spaces") |
| | |
| | create_app() |
| | |
| | except Exception as e: |
| | st.error(f""" |
| | ## π¨ Application Error |
| | |
| | {e} |
| | |
| | Please refresh the page or check the logs for more details. |
| | """) |
| |
|
| | if __name__ == "__main__": |
| | main(), |
| | |
| | |
| | r'([0-9,]+\.?\d*)\s*(?:dollars?|rupees?|usd|inr|eur|gbp)', |
| | |
| | |
| | r'(?:price|cost|rate)\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| | |
| | |
| | r'(?:^|\s)([0-9]{1,3}(?:,\d{3})*\.?\d{0,2})(?=\s|$)', |
| | ], |
| | 'date': [ |
| | r'date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| | r'(?:invoice|bill)\s*date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| | r'(?:^|\s)(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})(?=\s|$)', |
| | r'(\d{4}[/\-\.]\d{1,2}[/\-\.]\d{1,2})', |
| | r'(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d{2,4})', |
| | ], |
| | 'quantity': [ |
| | r'qty\s*:?\s*(\d+)', |
| | r'quantity\s*:?\s*(\d+)', |
| | r'(?:units?|pcs?|pieces?)\s*:?\s*(\d+)', |
| | r'(\d+)\s*(?:pcs?|units?|items?|pieces?)', |
| | ] |
| | } |
| | |
| | text_lower = text.lower() |
| | |
| | |
| | for pattern in patterns['invoice_number']: |
| | match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| | if match: |
| | invoice_data.invoice_number = match.group(1).upper().strip() |
| | break |
| | |
| | |
| | amounts_found = [] |
| | for pattern in patterns['amount']: |
| | matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| | for match in matches: |
| | try: |
| | amount_str = match.group(1).replace(',', '').replace(' ', '') |
| | amount_val = float(amount_str) |
| | if 0.01 <= amount_val <= 1000000: |
| | amounts_found.append(amount_val) |
| | except (ValueError, IndexError): |
| | continue |
| | |
| | |
| | if amounts_found: |
| | |
| | unique_amounts = sorted(set(amounts_found), reverse=True) |
| | |
| | invoice_data.amount = unique_amounts[0] |
| | |
| | |
| | for pattern in patterns['date']: |
| | match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) |
| | if match: |
| | invoice_data.date = self.parse_date(match.group(1)) |
| | break |
| | |
| | |
| | for pattern in patterns['quantity']: |
| | match = re.search(pattern, text_lower, re.IGNORECASE) |
| | if match: |
| | try: |
| | invoice_data.quantity = int(match.group(1)) |
| | break |
| | except ValueError: |
| | continue |
| | |
| | |
| | company_patterns = [ |
| | r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| | r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| | r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)', |
| | r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)', |
| | ] |
| | |
| | companies_found = [] |
| | for pattern in company_patterns: |
| | matches = re.findall(pattern, text, re.MULTILINE) |
| | for match in matches: |
| | clean_company = match.strip().title() |
| | if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']): |
| | companies_found.append(clean_company) |
| | |
| | |
| | if companies_found: |
| | invoice_data.supplier_name = companies_found[0] |
| | if len(companies_found) > 1: |
| | invoice_data.buyer_name = companies_found[1] |
| | |
| | |
| | desc_patterns = [ |
| | r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| | r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| | ] |
| | |
| | for pattern in desc_patterns: |
| | match = re.search(pattern, text, re.IGNORECASE) |
| | if match: |
| | desc = match.group(1).strip() |
| | if len(desc) > 5: |
| | invoice_data.product_description = desc[:200] |
| | break |
| | |
| | |
| | confidence_factors = [] |
| | if invoice_data.invoice_number: |
| | confidence_factors.append(0.3) |
| | if invoice_data.amount > 0: |
| | confidence_factors.append(0.3) |
| | if invoice_data.supplier_name: |
| | confidence_factors.append(0.2) |
| | if invoice_data.date: |
| | confidence_factors.append(0.1) |
| | if invoice_data.quantity > 0: |
| | confidence_factors.append(0.1) |
| | |
| | invoice_data.extraction_confidence = sum(confidence_factors) |
| | |
| | return invoice_data |
| | |
| | def parse_date(self, date_str: str) -> str: |
| | """Parse date to YYYY-MM-DD format""" |
| | if not date_str: |
| | return "" |
| | |
| | formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] |
| | |
| | for fmt in formats: |
| | try: |
| | parsed_date = datetime.strptime(date_str, fmt) |
| | return parsed_date.strftime('%Y-%m-%d') |
| | except ValueError: |
| | continue |
| | |
| | return date_str |
| |
|
| | |
| | |
| | |
| |
|
| | class VectorStore: |
| | """Simplified vector store for Hugging Face Spaces""" |
| | |
| | def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): |
| | self.embedding_model_name = embedding_model |
| | self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") |
| | self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") |
| | self.embedding_model = None |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | self.embedding_dimension = None |
| | |
| | self.setup_embedding_model() |
| | self.load_vector_store() |
| | |
| | def setup_embedding_model(self): |
| | """Initialize the sentence transformer model""" |
| | if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| | st.warning("β οΈ Sentence Transformers not available. Vector search disabled.") |
| | return |
| | |
| | try: |
| | with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): |
| | self.embedding_model = SentenceTransformer( |
| | self.embedding_model_name, |
| | cache_folder=HF_CONFIG["cache_dir"] |
| | ) |
| | |
| | |
| | test_embedding = self.embedding_model.encode(["test"]) |
| | self.embedding_dimension = test_embedding.shape[0] |
| | |
| | st.success(f"β
Embedding model loaded: {self.embedding_model_name}") |
| | |
| | except Exception as e: |
| | st.error(f"β Failed to load embedding model: {e}") |
| | self.embedding_model = None |
| | |
| | def load_vector_store(self): |
| | """Load existing vector store""" |
| | try: |
| | if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): |
| | with open(self.vector_store_path, 'rb') as f: |
| | self.vectors = pickle.load(f) |
| | |
| | with open(self.metadata_path, 'rb') as f: |
| | self.document_metadata = pickle.load(f) |
| | |
| | st.success(f"β
Vector store loaded: {len(self.document_metadata)} documents") |
| | else: |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | st.info("π New vector store initialized") |
| | |
| | except Exception as e: |
| | st.error(f"β Error loading vector store: {e}") |
| | self.vectors = [] |
| | self.document_metadata = [] |
| | |
| | def save_vector_store(self): |
| | """Save vector store to disk""" |
| | try: |
| | with open(self.vector_store_path, 'wb') as f: |
| | pickle.dump(self.vectors, f) |
| | |
| | with open(self.metadata_path, 'wb') as f: |
| | pickle.dump(self.document_metadata, f) |
| | |
| | return True |
| | except Exception as e: |
| | st.error(f"Error saving vector store: {e}") |
| | return False |
| | |
| | def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: |
| | """Create searchable text from invoice data""" |
| | text_parts = [] |
| | |
| | for field, value in invoice_data.items(): |
| | if value and field != 'id': |
| | text_parts.append(f"{field}: {value}") |
| | |
| | if raw_text: |
| | text_parts.append(f"content: {raw_text[:300]}") |
| | |
| | return " | ".join(text_parts) |
| | |
| | def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: |
| | """Add a document to the vector store""" |
| | if not self.embedding_model: |
| | return False |
| | |
| | try: |
| | document_text = self.create_document_text(invoice_data, raw_text) |
| | |
| | |
| | embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) |
| | |
| | |
| | metadata = { |
| | 'invoice_id': invoice_data.get('id', ''), |
| | 'invoice_number': invoice_data.get('invoice_number', ''), |
| | 'supplier_name': invoice_data.get('supplier_name', ''), |
| | 'buyer_name': invoice_data.get('buyer_name', ''), |
| | 'amount': invoice_data.get('amount', 0), |
| | 'date': invoice_data.get('date', ''), |
| | 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), |
| | 'document_text': document_text[:200], |
| | 'timestamp': datetime.now().isoformat() |
| | } |
| | |
| | |
| | self.vectors.append(embedding) |
| | self.document_metadata.append(metadata) |
| | |
| | return True |
| | |
| | except Exception as e: |
| | st.error(f"Error adding document to vector store: {e}") |
| | return False |
| | |
| | def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: |
| | """Perform semantic search using cosine similarity""" |
| | if not self.embedding_model or not self.vectors: |
| | return [] |
| | |
| | try: |
| | |
| | query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) |
| | |
| | |
| | similarities = [] |
| | for i, doc_embedding in enumerate(self.vectors): |
| | similarity = np.dot(query_embedding, doc_embedding) |
| | similarities.append((similarity, i)) |
| | |
| | |
| | similarities.sort(reverse=True) |
| | |
| | |
| | results = [] |
| | for similarity, idx in similarities[:top_k]: |
| | if similarity > 0.1: |
| | metadata = self.document_metadata[idx] |
| | result = VectorSearchResult( |
| | invoice_id=metadata.get('invoice_id', ''), |
| | invoice_number=metadata.get('invoice_number', ''), |
| | supplier_name=metadata.get('supplier_name', ''), |
| | similarity_score=float(similarity), |
| | content_preview=metadata.get('document_text', ''), |
| | metadata=metadata |
| | ) |
| | results.append(result) |
| | |
| | return results |
| | |
| | except Exception as e: |
| | st.error(f"Error in semantic search: {e}") |
| | return [] |
| |
|
| | |
| | |
| | |
| |
|
| | class InvoiceProcessor: |
| | """Main invoice processor for Hugging Face Spaces""" |
| | |
| | def __init__(self): |
| | self.setup_storage() |
| | self.document_processor = DocumentProcessor() |
| | self.ai_extractor = AIExtractor() |
| | self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None |
| | |
| | |
| | self.processing_stats = { |
| | 'total_processed': 0, |
| | 'successful': 0, |
| | 'failed': 0, |
| | 'start_time': datetime.now() |
| | } |
| | |
| | def setup_storage(self): |
| | """Setup storage paths""" |
| | self.data_dir = HF_CONFIG["data_dir"] |
| | self.json_path = os.path.join(self.data_dir, "invoices.json") |
| | |
| | |
| | if not os.path.exists(self.json_path): |
| | initial_data = { |
| | "metadata": { |
| | "created_at": datetime.now().isoformat(), |
| | "version": "hf_v1.0", |
| | "total_invoices": 0 |
| | }, |
| | "invoices": [], |
| | "summary": { |
| | "total_amount": 0.0, |
| | "unique_suppliers": [], |
| | "processing_stats": {"successful": 0, "failed": 0} |
| | } |
| | } |
| | self.save_json_data(initial_data) |
| | |
| | def load_json_data(self) -> dict: |
| | """Load invoice data from JSON""" |
| | try: |
| | with open(self.json_path, 'r', encoding='utf-8') as f: |
| | return json.load(f) |
| | except (FileNotFoundError, json.JSONDecodeError): |
| | self.setup_storage() |
| | return self.load_json_data() |
| | |
| | def save_json_data(self, data: dict): |
| | """Save invoice data to JSON""" |
| | try: |
| | with open(self.json_path, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | st.error(f"Error saving data: {e}") |
| | |
| | def process_uploaded_file(self, uploaded_file) -> InvoiceData: |
| | """Process a single uploaded file with enhanced debugging""" |
| | self.processing_stats['total_processed'] += 1 |
| | |
| | try: |
| | |
| | file_size = len(uploaded_file.getvalue()) |
| | file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' |
| | |
| | st.info(f"π Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") |
| | |
| | |
| | if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: |
| | error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" |
| | st.error(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | |
| | if file_extension not in ['pdf', 'txt']: |
| | error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" |
| | st.warning(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: |
| | file_content = uploaded_file.getvalue() |
| | tmp_file.write(file_content) |
| | tmp_file_path = tmp_file.name |
| | |
| | st.info(f"πΎ Saved temporarily to: {tmp_file_path}") |
| | |
| | try: |
| | |
| | st.info("π Extracting text from document...") |
| | text = self.document_processor.extract_text_from_document(tmp_file_path) |
| | |
| | if not text or not text.strip(): |
| | st.warning(f"β No text extracted from {uploaded_file.name}") |
| | self.processing_stats['failed'] += 1 |
| | return InvoiceData() |
| | |
| | text_length = len(text) |
| | st.info(f"π Extracted {text_length} characters of text") |
| | |
| | |
| | if text_length > 0: |
| | with st.expander("π Text Preview (First 500 characters)", expanded=False): |
| | st.text(text[:500] + "..." if len(text) > 500 else text) |
| | |
| | |
| | st.info("π€ Extracting invoice data using AI/Regex...") |
| | invoice_data = self.ai_extractor.extract_with_ai(text) |
| | invoice_data.file_path = uploaded_file.name |
| | |
| | |
| | st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| | |
| | |
| | st.info("πΎ Saving extracted data...") |
| | self.save_invoice_data(invoice_data, text, file_size) |
| | |
| | self.processing_stats['successful'] += 1 |
| | st.success(f"β
Successfully processed {uploaded_file.name}") |
| | |
| | return invoice_data |
| | |
| | finally: |
| | |
| | try: |
| | os.unlink(tmp_file_path) |
| | st.info("π§Ή Cleaned up temporary file") |
| | except: |
| | pass |
| | |
| | except Exception as e: |
| | error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| | st.error(error_msg) |
| | self.processing_stats['failed'] += 1 |
| | |
| | |
| | with st.expander("π Error Details", expanded=False): |
| | st.code(str(e)) |
| | import traceback |
| | st.code(traceback.format_exc()) |
| | |
| | return InvoiceData() |
| | |
| | def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| | """Save invoice data to JSON and vector store""" |
| | try: |
| | |
| | data = self.load_json_data() |
| | |
| | |
| | invoice_record = { |
| | "id": len(data["invoices"]) + 1, |
| | "invoice_number": invoice_data.invoice_number, |
| | "supplier_name": invoice_data.supplier_name, |
| | "buyer_name": invoice_data.buyer_name, |
| | "date": invoice_data.date, |
| | "amount": invoice_data.amount, |
| | "quantity": invoice_data.quantity, |
| | "product_description": invoice_data.product_description, |
| | "file_info": { |
| | "file_name": invoice_data.file_path, |
| | "file_size": file_size |
| | }, |
| | "extraction_info": { |
| | "confidence": invoice_data.extraction_confidence, |
| | "method": invoice_data.processing_method, |
| | "raw_text_preview": raw_text[:300] |
| | }, |
| | "timestamps": { |
| | "created_at": datetime.now().isoformat() |
| | } |
| | } |
| | |
| | |
| | data["invoices"].append(invoice_record) |
| | |
| | |
| | self.update_summary(data) |
| | |
| | |
| | self.save_json_data(data) |
| | |
| | |
| | if self.vector_store: |
| | self.vector_store.add_document(invoice_record, raw_text) |
| | self.vector_store.save_vector_store() |
| | |
| | except Exception as e: |
| | st.error(f"Error saving invoice data: {e}") |
| | |
| | def update_summary(self, data: dict): |
| | """Update summary statistics""" |
| | invoices = data["invoices"] |
| | |
| | total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| | unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| | |
| | data["summary"] = { |
| | "total_amount": total_amount, |
| | "unique_suppliers": unique_suppliers, |
| | "processing_stats": { |
| | "successful": self.processing_stats['successful'], |
| | "failed": self.processing_stats['failed'], |
| | "total_processed": self.processing_stats['total_processed'] |
| | } |
| | } |
| | |
| | data["metadata"]["last_updated"] = datetime.now().isoformat() |
| | data["metadata"]["total_invoices"] = len(invoices) |
| |
|
| | |
| | |
| | |
| |
|
| | class ChatBot: |
| | """Chatbot for invoice queries""" |
| | |
| | def __init__(self, processor: InvoiceProcessor): |
| | self.processor = processor |
| | |
| | def query_database(self, query: str) -> str: |
| | """Process user query and return response""" |
| | try: |
| | data = self.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | return "No invoice data found. Please upload some invoices first." |
| | |
| | query_lower = query.lower() |
| | |
| | |
| | if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| | return self.generate_summary(data) |
| | |
| | elif "count" in query_lower or "how many" in query_lower: |
| | return self.handle_count_query(data) |
| | |
| | elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| | return self.handle_amount_query(data) |
| | |
| | elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| | return self.handle_supplier_query(data, query) |
| | |
| | elif self.processor.vector_store: |
| | return self.handle_semantic_search(query) |
| | |
| | else: |
| | return self.handle_general_query(data, query) |
| | |
| | except Exception as e: |
| | return f"Error processing query: {e}" |
| | |
| | def generate_summary(self, data: dict) -> str: |
| | """Generate comprehensive summary""" |
| | invoices = data.get("invoices", []) |
| | summary = data.get("summary", {}) |
| | |
| | if not invoices: |
| | return "No invoices found in the system." |
| | |
| | total_amount = summary.get("total_amount", 0) |
| | avg_amount = total_amount / len(invoices) if invoices else 0 |
| | unique_suppliers = len(summary.get("unique_suppliers", [])) |
| | |
| | response = f""" |
| | **π Invoice System Summary** |
| | |
| | β’ **Total Invoices**: {len(invoices):,} |
| | β’ **Total Value**: βΉ{total_amount:,.2f} |
| | β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| | β’ **Unique Suppliers**: {unique_suppliers} |
| | |
| | **π Processing Stats** |
| | β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| | β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| | |
| | **π Recent Invoices** |
| | """ |
| | |
| | |
| | recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| | for i, inv in enumerate(recent, 1): |
| | response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| | |
| | return response |
| | |
| | def handle_count_query(self, data: dict) -> str: |
| | """Handle count-related queries""" |
| | invoices = data.get("invoices", []) |
| | total = len(invoices) |
| | unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| | |
| | return f""" |
| | **π Invoice Count Summary** |
| | |
| | β’ **Total Records**: {total} |
| | β’ **Unique Invoice Numbers**: {unique_numbers} |
| | β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| | |
| | **π
Processing Timeline** |
| | β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| | β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| | """ |
| | |
| | def handle_amount_query(self, data: dict) -> str: |
| | """Handle amount-related queries""" |
| | invoices = data.get("invoices", []) |
| | amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| | |
| | if not amounts: |
| | return "No amount information found in invoices." |
| | |
| | total_amount = sum(amounts) |
| | avg_amount = total_amount / len(amounts) |
| | max_amount = max(amounts) |
| | min_amount = min(amounts) |
| | |
| | |
| | high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| | high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| | |
| | response = f""" |
| | **π° Financial Analysis** |
| | |
| | β’ **Total Amount**: βΉ{total_amount:,.2f} |
| | β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| | β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| | β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| | |
| | **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| | """ |
| | |
| | for i, inv in enumerate(high_value_invoices[:5], 1): |
| | response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| | |
| | return response |
| | |
| | def handle_supplier_query(self, data: dict, query: str) -> str: |
| | """Handle supplier-related queries""" |
| | invoices = data.get("invoices", []) |
| | |
| | |
| | supplier_counts = {} |
| | supplier_amounts = {} |
| | |
| | for inv in invoices: |
| | supplier = inv.get('supplier_name', '').strip() |
| | if supplier: |
| | supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| | supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| | |
| | if not supplier_counts: |
| | return "No supplier information found in invoices." |
| | |
| | |
| | top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| | |
| | response = f""" |
| | **π’ Supplier Analysis** |
| | |
| | β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| | β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| | |
| | **π° Top Suppliers by Amount** |
| | """ |
| | |
| | for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| | count = supplier_counts[supplier] |
| | avg = amount / count if count > 0 else 0 |
| | response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| | |
| | return response |
| | |
| | def handle_semantic_search(self, query: str) -> str: |
| | """Handle semantic search queries""" |
| | try: |
| | results = self.processor.vector_store.semantic_search(query, top_k=5) |
| | |
| | if not results: |
| | return f"No relevant results found for '{query}'. Try different keywords." |
| | |
| | response = f"π **Semantic Search Results for '{query}'**\n\n" |
| | |
| | for i, result in enumerate(results, 1): |
| | response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| | response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| | response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| | response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| | |
| | return response |
| | |
| | except Exception as e: |
| | return f"Semantic search error: {e}" |
| | |
| | def handle_general_query(self, data: dict, query: str) -> str: |
| | """Handle general queries with keyword search""" |
| | invoices = data.get("invoices", []) |
| | query_words = query.lower().split() |
| | |
| | |
| | matching_invoices = [] |
| | for inv in invoices: |
| | text_to_search = ( |
| | inv.get('supplier_name', '') + ' ' + |
| | inv.get('buyer_name', '') + ' ' + |
| | inv.get('product_description', '') + ' ' + |
| | inv.get('extraction_info', {}).get('raw_text_preview', '') |
| | ).lower() |
| | |
| | if any(word in text_to_search for word in query_words): |
| | matching_invoices.append(inv) |
| | |
| | if not matching_invoices: |
| | return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| | |
| | response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| | |
| | for i, inv in enumerate(matching_invoices[:5], 1): |
| | response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| | response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| | response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| | |
| | if len(matching_invoices) > 5: |
| | response += f"... and {len(matching_invoices) - 5} more results." |
| | |
| | return response |
| |
|
| | |
| | |
| | |
| |
|
| | def create_app(): |
| | """Main Streamlit application""" |
| | |
| | |
| | if 'session_id' not in st.session_state: |
| | st.session_state.session_id = str(uuid.uuid4())[:8] |
| | |
| | session_id = st.session_state.session_id |
| | |
| | |
| | st.markdown(""" |
| | <style> |
| | .main-header { |
| | font-size: 2.5rem; |
| | font-weight: bold; |
| | text-align: center; |
| | color: #FF6B35; |
| | margin-bottom: 1rem; |
| | } |
| | .feature-box { |
| | background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| | padding: 1rem; |
| | border-radius: 10px; |
| | color: white; |
| | margin: 0.5rem 0; |
| | text-align: center; |
| | } |
| | .status-ok { color: #28a745; font-weight: bold; } |
| | .status-warning { color: #ffc107; font-weight: bold; } |
| | .status-error { color: #dc3545; font-weight: bold; } |
| | </style> |
| | """, unsafe_allow_html=True) |
| | |
| | |
| | st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| | st.markdown(""" |
| | <div style="text-align: center; margin-bottom: 2rem;"> |
| | <p style="font-size: 1.1rem; color: #666;"> |
| | AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| | </p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | |
| | if 'processor' not in st.session_state: |
| | with st.spinner("π§ Initializing AI Invoice Processor..."): |
| | try: |
| | st.session_state.processor = InvoiceProcessor() |
| | st.session_state.chatbot = ChatBot(st.session_state.processor) |
| | st.session_state.chat_history = [] |
| | st.success("β
System initialized successfully!") |
| | except Exception as e: |
| | st.error(f"β Initialization failed: {e}") |
| | st.stop() |
| | |
| | |
| | with st.sidebar: |
| | st.header("ποΈ System Status") |
| | |
| | processor = st.session_state.processor |
| | |
| | |
| | if processor.document_processor.processors: |
| | st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| | |
| | if processor.ai_extractor.use_transformers: |
| | st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| | |
| | if processor.vector_store and processor.vector_store.embedding_model: |
| | st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| | else: |
| | st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| | |
| | |
| | st.header("π Quick Stats") |
| | try: |
| | data = processor.load_json_data() |
| | total_invoices = len(data.get("invoices", [])) |
| | total_amount = data.get("summary", {}).get("total_amount", 0) |
| | |
| | st.metric("Total Invoices", total_invoices) |
| | st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| | st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| | |
| | except Exception as e: |
| | st.error(f"Stats error: {e}") |
| | |
| | |
| | st.header("βοΈ System Info") |
| | st.info(f""" |
| | **Session ID:** {session_id} |
| | |
| | **Limits:** |
| | β’ Max file size: 10MB |
| | β’ Max concurrent files: 3 |
| | β’ Timeout: 30s |
| | """) |
| | |
| | |
| | selected_tab = st.radio( |
| | "Choose a section:", |
| | ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| | horizontal=True, |
| | key=f"main_navigation_{session_id}" |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | if selected_tab == "π€ Upload & Process": |
| | st.header("π€ Upload Invoice Documents") |
| | |
| | |
| | col1, col2, col3 = st.columns(3) |
| | |
| | with col1: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π€ AI Extraction</h4> |
| | <p>Advanced NLP models extract structured data automatically</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | with col2: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π Smart Search</h4> |
| | <p>Semantic search finds invoices using natural language</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | with col3: |
| | st.markdown(""" |
| | <div class="feature-box"> |
| | <h4>π Analytics</h4> |
| | <p>Comprehensive insights and visualizations</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| | |
| | |
| | st.markdown("### π Upload Your Invoices") |
| | |
| | |
| | if f'uploaded_files_{session_id}' not in st.session_state: |
| | st.session_state[f'uploaded_files_{session_id}'] = None |
| | if f'processing_complete_{session_id}' not in st.session_state: |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | if f'currently_processing_{session_id}' not in st.session_state: |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | if f'processed_file_hashes_{session_id}' not in st.session_state: |
| | st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| | |
| | |
| | uploaded_files = st.file_uploader( |
| | "Choose invoice files (PDF, TXT supported)", |
| | type=['pdf', 'txt'], |
| | accept_multiple_files=True, |
| | help="Maximum file size: 10MB per file", |
| | key=f"file_uploader_stable_{session_id}" |
| | ) |
| | |
| | |
| | if uploaded_files: |
| | |
| | current_file_hashes = set() |
| | for file in uploaded_files: |
| | file_hash = hash((file.name, file.size)) |
| | current_file_hashes.add(file_hash) |
| | |
| | |
| | stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| | if current_file_hashes != stored_hashes: |
| | st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| | st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.info("π New files detected - ready for processing") |
| | |
| | |
| | current_files = st.session_state[f'uploaded_files_{session_id}'] |
| | is_processing = st.session_state[f'currently_processing_{session_id}'] |
| | is_complete = st.session_state[f'processing_complete_{session_id}'] |
| | |
| | if current_files: |
| | max_files = 3 |
| | if len(current_files) > max_files: |
| | st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| | current_files = current_files[:max_files] |
| | |
| | st.info(f"π {len(current_files)} files selected") |
| | |
| | |
| | st.markdown("**Selected Files:**") |
| | for i, file in enumerate(current_files, 1): |
| | file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| | file_hash = hash((file.name, file.size)) |
| | processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| | st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| | |
| | |
| | col1, col2 = st.columns([1, 1]) |
| | |
| | with col1: |
| | if not is_processing and not is_complete: |
| | if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| | st.session_state[f'currently_processing_{session_id}'] = True |
| | st.rerun() |
| | elif is_processing: |
| | st.info("π Processing in progress...") |
| | |
| | process_files_once(current_files, session_id) |
| | elif is_complete: |
| | st.success("β
Processing completed!") |
| | if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| | st.rerun() |
| | |
| | with col2: |
| | if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| | st.session_state[f'uploaded_files_{session_id}'] = None |
| | st.session_state[f'uploaded_file_hashes_{session_id}'] = set() |
| | st.session_state[f'processing_complete_{session_id}'] = False |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| | st.rerun() |
| | |
| | else: |
| | st.info("π Please select invoice files to upload and process") |
| | |
| | |
| | if is_complete: |
| | st.markdown("### π Recent Processing Results") |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | recent_invoices = sorted( |
| | data.get("invoices", []), |
| | key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| | reverse=True |
| | )[:5] |
| | |
| | if recent_invoices: |
| | for i, inv in enumerate(recent_invoices, 1): |
| | with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| | st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| | st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| | with col2: |
| | st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| | st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| | st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| | else: |
| | st.info("No recent processing results found.") |
| | except Exception as e: |
| | st.error(f"Error loading recent results: {e}") |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π¬ AI Chat": |
| | st.header("π¬ AI Chat Interface") |
| | |
| | |
| | if st.session_state.chat_history: |
| | st.markdown("### π¬ Chat History") |
| | for i, message in enumerate(st.session_state.chat_history): |
| | with st.chat_message(message["role"]): |
| | st.markdown(message["content"]) |
| | |
| | |
| | st.markdown("### βοΈ Ask a Question") |
| | |
| | col1, col2 = st.columns([4, 1]) |
| | |
| | with col1: |
| | user_input = st.text_input( |
| | "Type your question:", |
| | placeholder="e.g., 'show me total spending'", |
| | key=f"chat_input_{session_id}" |
| | ) |
| | |
| | with col2: |
| | ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| | |
| | if ask_btn and user_input: |
| | handle_chat_query(user_input) |
| | |
| | |
| | if not st.session_state.chat_history: |
| | st.markdown("### π‘ Try These Queries") |
| | |
| | col1, col2 = st.columns(2) |
| | |
| | with col1: |
| | st.markdown("**π Basic Queries:**") |
| | basic_queries = [ |
| | "Show me a summary of all invoices", |
| | "How much have we spent in total?", |
| | "Who are our top suppliers?", |
| | "Find invoices with high amounts" |
| | ] |
| | for i, query in enumerate(basic_queries): |
| | if st.button(query, key=f"basic_{session_id}_{i}"): |
| | handle_chat_query(query) |
| | |
| | with col2: |
| | st.markdown("**π Advanced Queries:**") |
| | advanced_queries = [ |
| | "Find technology purchases", |
| | "Show office supplies", |
| | "Search consulting services", |
| | "Recent high-value invoices" |
| | ] |
| | for i, query in enumerate(advanced_queries): |
| | if st.button(query, key=f"advanced_{session_id}_{i}"): |
| | handle_chat_query(query) |
| | |
| | |
| | if st.session_state.chat_history: |
| | if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| | st.session_state.chat_history = [] |
| | st.rerun() |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π Analytics": |
| | st.header("π Analytics Dashboard") |
| | |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | st.info("π No data available. Upload some invoices to see analytics.") |
| | return |
| | |
| | |
| | df_data = [] |
| | for inv in invoices: |
| | df_data.append({ |
| | 'invoice_number': inv.get('invoice_number', ''), |
| | 'supplier_name': inv.get('supplier_name', ''), |
| | 'amount': inv.get('amount', 0), |
| | 'date': inv.get('date', ''), |
| | 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| | }) |
| | |
| | df = pd.DataFrame(df_data) |
| | |
| | |
| | col1, col2, col3, col4 = st.columns(4) |
| | |
| | with col1: |
| | st.metric("Total Invoices", len(df)) |
| | with col2: |
| | st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| | with col3: |
| | st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| | with col4: |
| | st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| | |
| | |
| | if len(df) > 0: |
| | |
| | fig_hist = px.histogram( |
| | df, |
| | x='amount', |
| | title="Invoice Amount Distribution", |
| | labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| | ) |
| | st.plotly_chart(fig_hist, use_container_width=True) |
| | |
| | |
| | if df['supplier_name'].notna().any(): |
| | supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| | |
| | if len(supplier_amounts) > 0: |
| | fig_suppliers = px.bar( |
| | x=supplier_amounts.values, |
| | y=supplier_amounts.index, |
| | orientation='h', |
| | title="Top 10 Suppliers by Total Amount", |
| | labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| | ) |
| | st.plotly_chart(fig_suppliers, use_container_width=True) |
| | |
| | except Exception as e: |
| | st.error(f"Analytics error: {e}") |
| | |
| | |
| | |
| | |
| | |
| | elif selected_tab == "π Data Explorer": |
| | st.header("π Data Explorer") |
| | |
| | try: |
| | data = st.session_state.processor.load_json_data() |
| | invoices = data.get("invoices", []) |
| | |
| | if not invoices: |
| | st.info("π No data available. Upload some invoices first.") |
| | return |
| | |
| | |
| | df_data = [] |
| | for inv in invoices: |
| | df_data.append({ |
| | 'Invoice Number': inv.get('invoice_number', ''), |
| | 'Supplier': inv.get('supplier_name', ''), |
| | 'Buyer': inv.get('buyer_name', ''), |
| | 'Amount': inv.get('amount', 0), |
| | 'Date': inv.get('date', ''), |
| | 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| | 'Method': inv.get('extraction_info', {}).get('method', ''), |
| | 'File': inv.get('file_info', {}).get('file_name', ''), |
| | 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| | }) |
| | |
| | df = pd.DataFrame(df_data) |
| | |
| | |
| | col1, col2, col3 = st.columns(3) |
| | |
| | with col1: |
| | suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| | selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| | |
| | with col2: |
| | methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| | selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| | |
| | with col3: |
| | min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| | |
| | |
| | filtered_df = df.copy() |
| | if selected_supplier != 'All': |
| | filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| | if selected_method != 'All': |
| | filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| | if min_amount > 0: |
| | filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| | |
| | |
| | st.dataframe( |
| | filtered_df, |
| | use_container_width=True, |
| | column_config={ |
| | "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| | "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| | } |
| | ) |
| | |
| | |
| | col1, col2 = st.columns(2) |
| | |
| | with col1: |
| | if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| | csv_data = filtered_df.to_csv(index=False) |
| | st.download_button( |
| | "Download CSV", |
| | csv_data, |
| | f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| | "text/csv", |
| | key=f"download_csv_{session_id}" |
| | ) |
| | |
| | with col2: |
| | if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| | filtered_invoices = [inv for inv in invoices |
| | if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| | |
| | export_data = { |
| | "exported_at": datetime.now().isoformat(), |
| | "total_records": len(filtered_invoices), |
| | "invoices": filtered_invoices |
| | } |
| | |
| | st.download_button( |
| | "Download JSON", |
| | json.dumps(export_data, indent=2), |
| | f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| | "application/json", |
| | key=f"download_json_{session_id}" |
| | ) |
| | |
| | except Exception as e: |
| | st.error(f"Data explorer error: {e}") |
| | |
| | |
| | |
| | |
| | |
| | st.markdown("---") |
| | st.markdown("### π¬ Quick Chat (Works from any section)") |
| | |
| | global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| | |
| | if global_query: |
| | handle_chat_query(global_query, show_response=True) |
| | |
| | |
| | st.markdown("---") |
| | st.markdown(""" |
| | <div style="text-align: center; color: #666;"> |
| | <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| | <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| | </div> |
| | """, unsafe_allow_html=True) |
| |
|
| | |
| | |
| | |
| |
|
| | def process_files_once(uploaded_files, session_id): |
| | """Process uploaded files only once with proper state management""" |
| | if not uploaded_files: |
| | st.error("No files to process!") |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | return |
| | |
| | st.markdown("### π Processing Files...") |
| | |
| | |
| | processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| | |
| | |
| | files_to_process = [] |
| | for file in uploaded_files: |
| | file_hash = hash((file.name, file.size)) |
| | if file_hash not in processed_hashes: |
| | files_to_process.append((file, file_hash)) |
| | |
| | if not files_to_process: |
| | st.info("β
All files have already been processed!") |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processing_complete_{session_id}'] = True |
| | return |
| | |
| | |
| | progress_container = st.container() |
| | status_container = st.container() |
| | results_container = st.container() |
| | |
| | successful = 0 |
| | failed = 0 |
| | |
| | |
| | with progress_container: |
| | progress_bar = st.progress(0) |
| | progress_text = st.empty() |
| | |
| | with status_container: |
| | st.info(f"Starting to process {len(files_to_process)} new files...") |
| | |
| | |
| | for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| | current_progress = (i + 1) / len(files_to_process) |
| | |
| | with progress_container: |
| | progress_bar.progress(current_progress) |
| | progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| | |
| | with status_container: |
| | st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| | |
| | try: |
| | |
| | result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| | |
| | |
| | processed_hashes.add(file_hash) |
| | |
| | |
| | with results_container: |
| | if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| | successful += 1 |
| | st.success(f"β
Successfully processed: {uploaded_file.name}") |
| | |
| | |
| | col1, col2, col3 = st.columns(3) |
| | with col1: |
| | st.write(f"**Invoice #:** {result.invoice_number}") |
| | st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| | with col2: |
| | st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| | st.write(f"**Date:** {result.date or 'Not found'}") |
| | with col3: |
| | st.write(f"**Method:** {result.processing_method}") |
| | st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| | |
| | st.markdown("---") |
| | else: |
| | failed += 1 |
| | st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| | if result: |
| | st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| | st.markdown("---") |
| | |
| | except Exception as e: |
| | failed += 1 |
| | |
| | processed_hashes.add(file_hash) |
| | |
| | with results_container: |
| | st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| | st.markdown("---") |
| | |
| | |
| | st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| | |
| | |
| | with progress_container: |
| | progress_bar.progress(1.0) |
| | progress_text.text("β
Processing completed!") |
| | |
| | with status_container: |
| | if successful > 0: |
| | st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| | if successful > 0: |
| | st.balloons() |
| | else: |
| | st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| | |
| | |
| | st.session_state[f'currently_processing_{session_id}'] = False |
| | st.session_state[f'processing_complete_{session_id}'] = True |
| | |
| | |
| | st.rerun() |
| |
|
| | def process_files(uploaded_files, session_id): |
| | """Legacy function - redirect to process_files_once""" |
| | return process_files_once(uploaded_files, session_id) |
| |
|
| | def handle_chat_query(query, show_response=False): |
| | """Handle chat query""" |
| | st.session_state.chat_history.append({ |
| | "role": "user", |
| | "content": query, |
| | "timestamp": datetime.now() |
| | }) |
| | |
| | try: |
| | with st.spinner("π€ AI is analyzing..."): |
| | response = st.session_state.chatbot.query_database(query) |
| | |
| | st.session_state.chat_history.append({ |
| | "role": "assistant", |
| | "content": response, |
| | "timestamp": datetime.now() |
| | }) |
| | |
| | if show_response: |
| | with st.chat_message("assistant"): |
| | st.markdown(response) |
| | st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| | |
| | st.rerun() |
| | |
| | except Exception as e: |
| | st.error(f"Chat error: {e}") |
| |
|
| | |
| | |
| | |
| |
|
| | def main(): |
| | """Main entry point for Hugging Face Spaces""" |
| | try: |
| | if IS_HF_SPACE: |
| | st.sidebar.info("π€ Running on Hugging Face Spaces") |
| | |
| | create_app() |
| | |
| | except Exception as e: |
| | st.error(f""" |
| | ## π¨ Application Error |
| | |
| | {e} |
| | |
| | Please refresh the page or check the logs for more details. |
| | """) |
| |
|
| | if __name__ == "__main__": |
| | main() |