| |
| """ |
| AI Invoice Processing System - Complete Single File for Hugging Face Spaces |
| A comprehensive system with AI-powered extraction, semantic search, and analytics. |
| |
| Author: AI Assistant |
| Date: 2024 |
| Version: HuggingFace Single File v1.0 |
| """ |
|
|
| |
| |
| |
|
|
| import os |
| import json |
| import re |
| import tempfile |
| import shutil |
| import pickle |
| import numpy as np |
| from datetime import datetime |
| from typing import Dict, List, Optional, Tuple |
| from dataclasses import dataclass |
| from pathlib import Path |
| import time |
| import logging |
| import uuid |
|
|
| |
| IS_HF_SPACE = os.getenv("SPACE_ID") is not None |
|
|
| |
| HF_TOKEN = None |
| try: |
| |
| HF_TOKEN = st.secrets.get("HF_TOKEN", None) |
| except: |
| |
| HF_TOKEN = os.getenv("HF_TOKEN", None) |
|
|
| |
| import streamlit as st |
| import sqlite3 |
| import pandas as pd |
| import plotly.express as px |
| import plotly.graph_objects as go |
| import requests |
|
|
| |
| try: |
| import faiss |
| FAISS_AVAILABLE = True |
| except ImportError: |
| FAISS_AVAILABLE = False |
| st.warning("β οΈ FAISS not available. Vector search will be disabled.") |
|
|
| try: |
| from sentence_transformers import SentenceTransformer |
| SENTENCE_TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| SENTENCE_TRANSFORMERS_AVAILABLE = False |
| st.warning("β οΈ Sentence Transformers not available. Using fallback methods.") |
|
|
| try: |
| import torch |
| TORCH_AVAILABLE = True |
| except ImportError: |
| TORCH_AVAILABLE = False |
|
|
| |
| try: |
| import pdfplumber |
| PDF_PROCESSING_AVAILABLE = True |
| PDF_PROCESSOR = "pdfplumber" |
| except ImportError: |
| try: |
| import PyPDF2 |
| PDF_PROCESSING_AVAILABLE = True |
| PDF_PROCESSOR = "PyPDF2" |
| except ImportError: |
| PDF_PROCESSING_AVAILABLE = False |
| PDF_PROCESSOR = None |
|
|
| |
| |
| |
|
|
| st.set_page_config( |
| page_title="AI Invoice Processing System", |
| page_icon="π", |
| layout="wide", |
| initial_sidebar_state="expanded", |
| menu_items={ |
| 'Get Help': 'https://huggingface.co/spaces', |
| 'Report a bug': 'https://huggingface.co/spaces', |
| 'About': """ |
| # AI Invoice Processing System |
| Built for Hugging Face Spaces with AI-powered extraction and semantic search. |
| """ |
| } |
| ) |
|
|
| |
| |
| |
|
|
| HF_CONFIG = { |
| "max_file_size_mb": 10, |
| "max_concurrent_files": 3, |
| "timeout_seconds": 30, |
| "use_cpu_only": True, |
| "embedding_model": "all-MiniLM-L6-v2", |
| "cache_dir": "./cache", |
| "data_dir": "./data", |
| "enable_ollama": False, |
| } |
|
|
| |
| os.makedirs(HF_CONFIG["cache_dir"], exist_ok=True) |
| os.makedirs(HF_CONFIG["data_dir"], exist_ok=True) |
|
|
| |
| |
| |
|
|
| @dataclass |
| class InvoiceData: |
| """Data structure for extracted invoice information""" |
| supplier_name: str = "" |
| buyer_name: str = "" |
| invoice_number: str = "" |
| date: str = "" |
| amount: float = 0.0 |
| quantity: int = 0 |
| product_description: str = "" |
| file_path: str = "" |
| extraction_confidence: float = 0.0 |
| processing_method: str = "regex" |
|
|
| @dataclass |
| class VectorSearchResult: |
| """Data structure for vector search results""" |
| invoice_id: str |
| invoice_number: str |
| supplier_name: str |
| similarity_score: float |
| content_preview: str |
| metadata: Dict |
|
|
| |
| |
| |
|
|
| class DocumentProcessor: |
| """Simplified document processor for Hugging Face Spaces""" |
| |
| def __init__(self): |
| self.setup_processors() |
| |
| def setup_processors(self): |
| """Setup available document processors""" |
| self.processors = {} |
| |
| |
| if PDF_PROCESSING_AVAILABLE: |
| if PDF_PROCESSOR == "pdfplumber": |
| self.processors['pdf'] = self.extract_with_pdfplumber |
| st.success("β
PDF processing available (pdfplumber)") |
| elif PDF_PROCESSOR == "PyPDF2": |
| self.processors['pdf'] = self.extract_with_pypdf2 |
| st.success("β
PDF processing available (PyPDF2)") |
| else: |
| st.warning("β οΈ No PDF processor available") |
| |
| |
| self.processors['txt'] = self.extract_text_file |
| |
| def extract_with_pdfplumber(self, file_path: str) -> str: |
| """Extract text using pdfplumber""" |
| try: |
| import pdfplumber |
| text = "" |
| with pdfplumber.open(file_path) as pdf: |
| for page in pdf.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| return text |
| except Exception as e: |
| st.error(f"PDF extraction failed: {e}") |
| return "" |
| |
| def extract_with_pypdf2(self, file_path: str) -> str: |
| """Extract text using PyPDF2""" |
| try: |
| import PyPDF2 |
| text = "" |
| with open(file_path, 'rb') as file: |
| pdf_reader = PyPDF2.PdfReader(file) |
| for page in pdf_reader.pages: |
| text += page.extract_text() + "\n" |
| return text |
| except Exception as e: |
| st.error(f"PDF extraction failed: {e}") |
| return "" |
| |
| def extract_text_file(self, file_path: str) -> str: |
| """Extract text from text files""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| return f.read() |
| except Exception as e: |
| st.error(f"Text file extraction failed: {e}") |
| return "" |
| |
| def extract_text_from_document(self, file_path: str) -> str: |
| """Extract text from document based on file type""" |
| file_ext = Path(file_path).suffix.lower() |
| |
| if file_ext == '.pdf': |
| processor = self.processors.get('pdf') |
| elif file_ext == '.txt': |
| processor = self.processors.get('txt') |
| else: |
| st.warning(f"Unsupported file type: {file_ext}") |
| return "" |
| |
| if processor: |
| return processor(file_path) |
| else: |
| st.error(f"No processor available for {file_ext}") |
| return "" |
|
|
| |
| |
| |
|
|
| class AIExtractor: |
| """AI extraction for Hugging Face Spaces with Mistral 7B support""" |
| |
| def __init__(self): |
| self.use_mistral = self.setup_mistral() |
| self.use_transformers = self.setup_transformers() if not self.use_mistral else False |
| |
| def setup_mistral(self): |
| """Try to setup Mistral 7B model with proper authentication""" |
| try: |
| |
| if not HF_TOKEN: |
| st.warning("β οΈ Hugging Face token not found. Add HF_TOKEN to secrets for Mistral access.") |
| return False |
| |
| |
| import psutil |
| memory_gb = psutil.virtual_memory().total / (1024**3) |
| |
| if memory_gb < 8: |
| st.warning("β οΈ Insufficient memory for Mistral 7B. Using lighter models.") |
| return False |
| |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline |
| from huggingface_hub import login |
| |
| |
| login(token=HF_TOKEN) |
| |
| with st.spinner("π Loading Mistral 7B model (this may take a few minutes)..."): |
| |
| model_name = "mistralai/Mistral-7B-Instruct-v0.1" |
| |
| |
| self.mistral_tokenizer = AutoTokenizer.from_pretrained( |
| model_name, |
| cache_dir=HF_CONFIG["cache_dir"], |
| token=HF_TOKEN |
| ) |
| |
| self.mistral_model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| torch_dtype=torch.float16 if TORCH_AVAILABLE else None, |
| device_map="auto" if TORCH_AVAILABLE else None, |
| load_in_8bit=True, |
| cache_dir=HF_CONFIG["cache_dir"], |
| token=HF_TOKEN |
| ) |
| |
| |
| self.mistral_pipeline = pipeline( |
| "text-generation", |
| model=self.mistral_model, |
| tokenizer=self.mistral_tokenizer, |
| torch_dtype=torch.float16 if TORCH_AVAILABLE else None, |
| device_map="auto" if TORCH_AVAILABLE else None |
| ) |
| |
| st.success("β
Mistral 7B model loaded successfully!") |
| return True |
| |
| except ImportError as e: |
| st.warning(f"β οΈ Missing dependencies for Mistral 7B: {e}") |
| return False |
| except Exception as e: |
| st.warning(f"β οΈ Mistral 7B not available: {e}") |
| st.info("π‘ To use Mistral 7B: Add your Hugging Face token to secrets as 'HF_TOKEN'") |
| return False |
| |
| def setup_transformers(self): |
| """Fallback to lighter NER model""" |
| try: |
| from transformers import pipeline |
| |
| with st.spinner("Loading fallback AI model..."): |
| self.ner_pipeline = pipeline( |
| "ner", |
| model="dbmdz/bert-large-cased-finetuned-conll03-english", |
| aggregation_strategy="simple" |
| ) |
| |
| st.success("β
Fallback AI extraction model loaded") |
| return True |
| |
| except Exception as e: |
| st.warning(f"β οΈ AI extraction not available: {e}") |
| return False |
| |
| def extract_with_mistral(self, text: str) -> InvoiceData: |
| """Extract invoice data using Mistral 7B""" |
| try: |
| |
| prompt = f"""<s>[INST] You are an expert at extracting structured information from invoices. |
| |
| Extract the following information from this invoice text and respond ONLY with valid JSON: |
| |
| {{ |
| "invoice_number": "invoice or bill number", |
| "supplier_name": "company providing goods/services", |
| "buyer_name": "company receiving goods/services", |
| "date": "date in YYYY-MM-DD format", |
| "amount": "total amount as number only", |
| "quantity": "total quantity as integer", |
| "product_description": "brief description of items/services" |
| }} |
| |
| Invoice text: |
| {text[:2000]} |
| |
| Respond with JSON only: [/INST]""" |
|
|
| |
| response = self.mistral_pipeline( |
| prompt, |
| max_new_tokens=300, |
| temperature=0.1, |
| do_sample=True, |
| pad_token_id=self.mistral_tokenizer.eos_token_id |
| ) |
| |
| |
| generated_text = response[0]['generated_text'] |
| |
| |
| json_start = generated_text.find('{') |
| json_end = generated_text.rfind('}') + 1 |
| |
| if json_start != -1 and json_end > json_start: |
| json_str = generated_text[json_start:json_end] |
| |
| |
| import json |
| data = json.loads(json_str) |
| |
| |
| invoice_data = InvoiceData() |
| invoice_data.supplier_name = str(data.get('supplier_name', '')).strip() |
| invoice_data.buyer_name = str(data.get('buyer_name', '')).strip() |
| invoice_data.invoice_number = str(data.get('invoice_number', '')).strip() |
| invoice_data.date = self.parse_date(str(data.get('date', ''))) |
| |
| |
| try: |
| amount_val = data.get('amount', 0) |
| if isinstance(amount_val, str): |
| amount_clean = re.sub(r'[^\d.]', '', amount_val) |
| invoice_data.amount = float(amount_clean) if amount_clean else 0.0 |
| else: |
| invoice_data.amount = float(amount_val) |
| except: |
| invoice_data.amount = 0.0 |
| |
| |
| try: |
| qty_val = data.get('quantity', 0) |
| invoice_data.quantity = int(float(str(qty_val).replace(',', ''))) |
| except: |
| invoice_data.quantity = 0 |
| |
| invoice_data.product_description = str(data.get('product_description', '')).strip() |
| invoice_data.extraction_confidence = 0.95 |
| invoice_data.processing_method = "mistral_7b" |
| |
| return invoice_data |
| else: |
| st.warning("β οΈ Mistral response didn't contain valid JSON, falling back to regex") |
| return self.extract_with_regex(text) |
| |
| except Exception as e: |
| st.error(f"Mistral extraction failed: {e}") |
| return self.extract_with_regex(text) |
| |
| def extract_with_ai(self, text: str) -> InvoiceData: |
| """Extract invoice data using available AI method""" |
| if self.use_mistral: |
| st.info("π€ Using Mistral 7B for extraction...") |
| return self.extract_with_mistral(text) |
| elif self.use_transformers: |
| st.info("π€ Using NER model for extraction...") |
| return self.extract_with_ner(text) |
| else: |
| st.info("π§ Using regex extraction...") |
| return self.extract_with_regex(text) |
| |
| def extract_with_ner(self, text: str) -> InvoiceData: |
| """Extract using NER model (fallback method)""" |
| try: |
| |
| entities = self.ner_pipeline(text[:512]) |
| |
| invoice_data = InvoiceData() |
| invoice_data.processing_method = "ai_ner" |
| |
| |
| for entity in entities: |
| entity_text = entity['word'].replace('##', '') |
| |
| if entity['entity_group'] == 'ORG': |
| if not invoice_data.supplier_name: |
| invoice_data.supplier_name = entity_text |
| elif not invoice_data.buyer_name: |
| invoice_data.buyer_name = entity_text |
| |
| elif entity['entity_group'] == 'MISC': |
| if not invoice_data.invoice_number and any(c.isdigit() for c in entity_text): |
| invoice_data.invoice_number = entity_text |
| |
| |
| regex_data = self.extract_with_regex(text) |
| |
| |
| if not invoice_data.invoice_number: |
| invoice_data.invoice_number = regex_data.invoice_number |
| if not invoice_data.amount: |
| invoice_data.amount = regex_data.amount |
| if not invoice_data.date: |
| invoice_data.date = regex_data.date |
| if not invoice_data.quantity: |
| invoice_data.quantity = regex_data.quantity |
| |
| invoice_data.extraction_confidence = 0.8 |
| |
| return invoice_data |
| |
| except Exception as e: |
| st.error(f"NER extraction failed: {e}") |
| return self.extract_with_regex(text) |
| |
| def extract_with_regex(self, text: str) -> InvoiceData: |
| """Enhanced regex extraction with better amount detection""" |
| invoice_data = InvoiceData() |
| invoice_data.processing_method = "regex" |
| |
| |
| patterns = { |
| 'invoice_number': [ |
| r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| r'bill\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| r'inv\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| r'ref\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| r'#\s*([A-Z0-9\-_/]{3,})', |
| r'(?:^|\s)([A-Z]{2,}\d{3,}|\d{3,}[A-Z]{2,})', |
| ], |
| 'amount': [ |
| |
| r'total\s*(?:amount)?\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| r'amount\s*(?:due|paid|total)?\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| r'grand\s*total\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| r'net\s*(?:amount|total)\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| r'sub\s*total\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| |
| |
| r'[\$βΉΒ£β¬]\s*([0-9,]+\.?\d*)', |
| |
| |
| r'([0-9,]+\.?\d*)\s*[\$βΉΒ£β¬]?\s* |
| |
| def parse_date(self, date_str: str) -> str: |
| """Parse date to YYYY-MM-DD format""" |
| if not date_str: |
| return "" |
| |
| formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] |
| |
| for fmt in formats: |
| try: |
| parsed_date = datetime.strptime(date_str, fmt) |
| return parsed_date.strftime('%Y-%m-%d') |
| except ValueError: |
| continue |
| |
| return date_str |
| |
| # =============================================================================== |
| # VECTOR STORE CLASS |
| # =============================================================================== |
| |
| class VectorStore: |
| """Simplified vector store for Hugging Face Spaces""" |
| |
| def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): |
| self.embedding_model_name = embedding_model |
| self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") |
| self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") |
| self.embedding_model = None |
| self.vectors = [] |
| self.document_metadata = [] |
| self.embedding_dimension = None |
| |
| self.setup_embedding_model() |
| self.load_vector_store() |
| |
| def setup_embedding_model(self): |
| """Initialize the sentence transformer model""" |
| if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| st.warning("β οΈ Sentence Transformers not available. Vector search disabled.") |
| return |
| |
| try: |
| with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): |
| self.embedding_model = SentenceTransformer( |
| self.embedding_model_name, |
| cache_folder=HF_CONFIG["cache_dir"] |
| ) |
| |
| # Get embedding dimension |
| test_embedding = self.embedding_model.encode(["test"]) |
| self.embedding_dimension = test_embedding.shape[0] |
| |
| st.success(f"β
Embedding model loaded: {self.embedding_model_name}") |
| |
| except Exception as e: |
| st.error(f"β Failed to load embedding model: {e}") |
| self.embedding_model = None |
| |
| def load_vector_store(self): |
| """Load existing vector store""" |
| try: |
| if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): |
| with open(self.vector_store_path, 'rb') as f: |
| self.vectors = pickle.load(f) |
| |
| with open(self.metadata_path, 'rb') as f: |
| self.document_metadata = pickle.load(f) |
| |
| st.success(f"β
Vector store loaded: {len(self.document_metadata)} documents") |
| else: |
| self.vectors = [] |
| self.document_metadata = [] |
| st.info("π New vector store initialized") |
| |
| except Exception as e: |
| st.error(f"β Error loading vector store: {e}") |
| self.vectors = [] |
| self.document_metadata = [] |
| |
| def save_vector_store(self): |
| """Save vector store to disk""" |
| try: |
| with open(self.vector_store_path, 'wb') as f: |
| pickle.dump(self.vectors, f) |
| |
| with open(self.metadata_path, 'wb') as f: |
| pickle.dump(self.document_metadata, f) |
| |
| return True |
| except Exception as e: |
| st.error(f"Error saving vector store: {e}") |
| return False |
| |
| def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: |
| """Create searchable text from invoice data""" |
| text_parts = [] |
| |
| for field, value in invoice_data.items(): |
| if value and field != 'id': |
| text_parts.append(f"{field}: {value}") |
| |
| if raw_text: |
| text_parts.append(f"content: {raw_text[:300]}") |
| |
| return " | ".join(text_parts) |
| |
| def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: |
| """Add a document to the vector store""" |
| if not self.embedding_model: |
| return False |
| |
| try: |
| document_text = self.create_document_text(invoice_data, raw_text) |
| |
| # Generate embedding |
| embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) |
| |
| # Create metadata |
| metadata = { |
| 'invoice_id': invoice_data.get('id', ''), |
| 'invoice_number': invoice_data.get('invoice_number', ''), |
| 'supplier_name': invoice_data.get('supplier_name', ''), |
| 'buyer_name': invoice_data.get('buyer_name', ''), |
| 'amount': invoice_data.get('amount', 0), |
| 'date': invoice_data.get('date', ''), |
| 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), |
| 'document_text': document_text[:200], |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| # Add to store |
| self.vectors.append(embedding) |
| self.document_metadata.append(metadata) |
| |
| return True |
| |
| except Exception as e: |
| st.error(f"Error adding document to vector store: {e}") |
| return False |
| |
| def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: |
| """Perform semantic search using cosine similarity""" |
| if not self.embedding_model or not self.vectors: |
| return [] |
| |
| try: |
| # Generate query embedding |
| query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) |
| |
| # Calculate similarities |
| similarities = [] |
| for i, doc_embedding in enumerate(self.vectors): |
| similarity = np.dot(query_embedding, doc_embedding) |
| similarities.append((similarity, i)) |
| |
| # Sort by similarity |
| similarities.sort(reverse=True) |
| |
| # Return top results |
| results = [] |
| for similarity, idx in similarities[:top_k]: |
| if similarity > 0.1: # Relevance threshold |
| metadata = self.document_metadata[idx] |
| result = VectorSearchResult( |
| invoice_id=metadata.get('invoice_id', ''), |
| invoice_number=metadata.get('invoice_number', ''), |
| supplier_name=metadata.get('supplier_name', ''), |
| similarity_score=float(similarity), |
| content_preview=metadata.get('document_text', ''), |
| metadata=metadata |
| ) |
| results.append(result) |
| |
| return results |
| |
| except Exception as e: |
| st.error(f"Error in semantic search: {e}") |
| return [] |
| |
| # =============================================================================== |
| # MAIN PROCESSOR CLASS |
| # =============================================================================== |
| |
| class InvoiceProcessor: |
| """Main invoice processor for Hugging Face Spaces""" |
| |
| def __init__(self): |
| self.setup_storage() |
| self.document_processor = DocumentProcessor() |
| self.ai_extractor = AIExtractor() |
| self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None |
| |
| # Initialize stats |
| self.processing_stats = { |
| 'total_processed': 0, |
| 'successful': 0, |
| 'failed': 0, |
| 'start_time': datetime.now() |
| } |
| |
| def setup_storage(self): |
| """Setup storage paths""" |
| self.data_dir = HF_CONFIG["data_dir"] |
| self.json_path = os.path.join(self.data_dir, "invoices.json") |
| |
| # Initialize JSON storage |
| if not os.path.exists(self.json_path): |
| initial_data = { |
| "metadata": { |
| "created_at": datetime.now().isoformat(), |
| "version": "hf_v1.0", |
| "total_invoices": 0 |
| }, |
| "invoices": [], |
| "summary": { |
| "total_amount": 0.0, |
| "unique_suppliers": [], |
| "processing_stats": {"successful": 0, "failed": 0} |
| } |
| } |
| self.save_json_data(initial_data) |
| |
| def load_json_data(self) -> dict: |
| """Load invoice data from JSON""" |
| try: |
| with open(self.json_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except (FileNotFoundError, json.JSONDecodeError): |
| self.setup_storage() |
| return self.load_json_data() |
| |
| def save_json_data(self, data: dict): |
| """Save invoice data to JSON""" |
| try: |
| with open(self.json_path, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
| except Exception as e: |
| st.error(f"Error saving data: {e}") |
| |
| def process_uploaded_file(self, uploaded_file) -> InvoiceData: |
| """Process a single uploaded file with enhanced debugging""" |
| self.processing_stats['total_processed'] += 1 |
| |
| try: |
| # Debug file info |
| file_size = len(uploaded_file.getvalue()) |
| file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' |
| |
| st.info(f"π Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") |
| |
| # Check file size |
| if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: |
| error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| # Check file type |
| if file_extension not in ['pdf', 'txt']: |
| error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" |
| st.warning(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| # Save temporarily |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: |
| file_content = uploaded_file.getvalue() |
| tmp_file.write(file_content) |
| tmp_file_path = tmp_file.name |
| |
| st.info(f"πΎ Saved temporarily to: {tmp_file_path}") |
| |
| try: |
| # Extract text |
| st.info("π Extracting text from document...") |
| text = self.document_processor.extract_text_from_document(tmp_file_path) |
| |
| if not text or not text.strip(): |
| st.warning(f"β No text extracted from {uploaded_file.name}") |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| text_length = len(text) |
| st.info(f"π Extracted {text_length} characters of text") |
| |
| # Show text preview and extraction debug info |
| if text_length > 0: |
| with st.expander("π Text Preview & Extraction Debug", expanded=True): |
| st.text_area("Extracted Text (First 1000 chars):", value=text[:1000], height=150, disabled=True) |
| |
| # Debug amount detection |
| st.markdown("**π Amount Detection Debug:**") |
| amount_patterns = [ |
| r'total\s*(?:amount)?\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| r'[\$βΉΒ£β¬]\s*([0-9,]+\.?\d*)', |
| r'([0-9,]+\.?\d*)\s*[\$βΉΒ£β¬]?\s* |
| |
| |
| st.info("π€ Extracting invoice data using AI/Regex...") |
| invoice_data = self.ai_extractor.extract_with_ai(text) |
| invoice_data.file_path = uploaded_file.name |
| |
| |
| st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| |
| |
| st.info("πΎ Saving extracted data...") |
| self.save_invoice_data(invoice_data, text, file_size) |
| |
| self.processing_stats['successful'] += 1 |
| st.success(f"β
Successfully processed {uploaded_file.name}") |
| |
| return invoice_data |
| |
| finally: |
| |
| try: |
| os.unlink(tmp_file_path) |
| st.info("π§Ή Cleaned up temporary file") |
| except: |
| pass |
| |
| except Exception as e: |
| error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| |
| |
| with st.expander("π Error Details", expanded=False): |
| st.code(str(e)) |
| import traceback |
| st.code(traceback.format_exc()) |
| |
| return InvoiceData() |
| |
| def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| """Save invoice data to JSON and vector store""" |
| try: |
| |
| data = self.load_json_data() |
| |
| |
| invoice_record = { |
| "id": len(data["invoices"]) + 1, |
| "invoice_number": invoice_data.invoice_number, |
| "supplier_name": invoice_data.supplier_name, |
| "buyer_name": invoice_data.buyer_name, |
| "date": invoice_data.date, |
| "amount": invoice_data.amount, |
| "quantity": invoice_data.quantity, |
| "product_description": invoice_data.product_description, |
| "file_info": { |
| "file_name": invoice_data.file_path, |
| "file_size": file_size |
| }, |
| "extraction_info": { |
| "confidence": invoice_data.extraction_confidence, |
| "method": invoice_data.processing_method, |
| "raw_text_preview": raw_text[:300] |
| }, |
| "timestamps": { |
| "created_at": datetime.now().isoformat() |
| } |
| } |
| |
| |
| data["invoices"].append(invoice_record) |
| |
| |
| self.update_summary(data) |
| |
| |
| self.save_json_data(data) |
| |
| |
| if self.vector_store: |
| self.vector_store.add_document(invoice_record, raw_text) |
| self.vector_store.save_vector_store() |
| |
| except Exception as e: |
| st.error(f"Error saving invoice data: {e}") |
| |
| def update_summary(self, data: dict): |
| """Update summary statistics""" |
| invoices = data["invoices"] |
| |
| total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| |
| data["summary"] = { |
| "total_amount": total_amount, |
| "unique_suppliers": unique_suppliers, |
| "processing_stats": { |
| "successful": self.processing_stats['successful'], |
| "failed": self.processing_stats['failed'], |
| "total_processed": self.processing_stats['total_processed'] |
| } |
| } |
| |
| data["metadata"]["last_updated"] = datetime.now().isoformat() |
| data["metadata"]["total_invoices"] = len(invoices) |
|
|
| |
| |
| |
|
|
| class ChatBot: |
| """Chatbot for invoice queries""" |
| |
| def __init__(self, processor: InvoiceProcessor): |
| self.processor = processor |
| |
| def query_database(self, query: str) -> str: |
| """Process user query and return response""" |
| try: |
| data = self.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| return "No invoice data found. Please upload some invoices first." |
| |
| query_lower = query.lower() |
| |
| |
| if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| return self.generate_summary(data) |
| |
| elif "count" in query_lower or "how many" in query_lower: |
| return self.handle_count_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| return self.handle_amount_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| return self.handle_supplier_query(data, query) |
| |
| elif self.processor.vector_store: |
| return self.handle_semantic_search(query) |
| |
| else: |
| return self.handle_general_query(data, query) |
| |
| except Exception as e: |
| return f"Error processing query: {e}" |
| |
| def generate_summary(self, data: dict) -> str: |
| """Generate comprehensive summary""" |
| invoices = data.get("invoices", []) |
| summary = data.get("summary", {}) |
| |
| if not invoices: |
| return "No invoices found in the system." |
| |
| total_amount = summary.get("total_amount", 0) |
| avg_amount = total_amount / len(invoices) if invoices else 0 |
| unique_suppliers = len(summary.get("unique_suppliers", [])) |
| |
| response = f""" |
| **π Invoice System Summary** |
| |
| β’ **Total Invoices**: {len(invoices):,} |
| β’ **Total Value**: βΉ{total_amount:,.2f} |
| β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| β’ **Unique Suppliers**: {unique_suppliers} |
| |
| **π Processing Stats** |
| β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| |
| **π Recent Invoices** |
| """ |
| |
| |
| recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| for i, inv in enumerate(recent, 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_count_query(self, data: dict) -> str: |
| """Handle count-related queries""" |
| invoices = data.get("invoices", []) |
| total = len(invoices) |
| unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| |
| return f""" |
| **π Invoice Count Summary** |
| |
| β’ **Total Records**: {total} |
| β’ **Unique Invoice Numbers**: {unique_numbers} |
| β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| |
| **π
Processing Timeline** |
| β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| """ |
| |
| def handle_amount_query(self, data: dict) -> str: |
| """Handle amount-related queries""" |
| invoices = data.get("invoices", []) |
| amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| |
| if not amounts: |
| return "No amount information found in invoices." |
| |
| total_amount = sum(amounts) |
| avg_amount = total_amount / len(amounts) |
| max_amount = max(amounts) |
| min_amount = min(amounts) |
| |
| |
| high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| |
| response = f""" |
| **π° Financial Analysis** |
| |
| β’ **Total Amount**: βΉ{total_amount:,.2f} |
| β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| |
| **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| """ |
| |
| for i, inv in enumerate(high_value_invoices[:5], 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_supplier_query(self, data: dict, query: str) -> str: |
| """Handle supplier-related queries""" |
| invoices = data.get("invoices", []) |
| |
| |
| supplier_counts = {} |
| supplier_amounts = {} |
| |
| for inv in invoices: |
| supplier = inv.get('supplier_name', '').strip() |
| if supplier: |
| supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| |
| if not supplier_counts: |
| return "No supplier information found in invoices." |
| |
| |
| top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| |
| response = f""" |
| **π’ Supplier Analysis** |
| |
| β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| |
| **π° Top Suppliers by Amount** |
| """ |
| |
| for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| count = supplier_counts[supplier] |
| avg = amount / count if count > 0 else 0 |
| response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| |
| return response |
| |
| def handle_semantic_search(self, query: str) -> str: |
| """Handle semantic search queries""" |
| try: |
| results = self.processor.vector_store.semantic_search(query, top_k=5) |
| |
| if not results: |
| return f"No relevant results found for '{query}'. Try different keywords." |
| |
| response = f"π **Semantic Search Results for '{query}'**\n\n" |
| |
| for i, result in enumerate(results, 1): |
| response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| |
| return response |
| |
| except Exception as e: |
| return f"Semantic search error: {e}" |
| |
| def handle_general_query(self, data: dict, query: str) -> str: |
| """Handle general queries with keyword search""" |
| invoices = data.get("invoices", []) |
| query_words = query.lower().split() |
| |
| |
| matching_invoices = [] |
| for inv in invoices: |
| text_to_search = ( |
| inv.get('supplier_name', '') + ' ' + |
| inv.get('buyer_name', '') + ' ' + |
| inv.get('product_description', '') + ' ' + |
| inv.get('extraction_info', {}).get('raw_text_preview', '') |
| ).lower() |
| |
| if any(word in text_to_search for word in query_words): |
| matching_invoices.append(inv) |
| |
| if not matching_invoices: |
| return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| |
| response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| |
| for i, inv in enumerate(matching_invoices[:5], 1): |
| response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| |
| if len(matching_invoices) > 5: |
| response += f"... and {len(matching_invoices) - 5} more results." |
| |
| return response |
|
|
| |
| |
| |
|
|
| def create_app(): |
| """Main Streamlit application""" |
| |
| |
| if 'session_id' not in st.session_state: |
| st.session_state.session_id = str(uuid.uuid4())[:8] |
| |
| session_id = st.session_state.session_id |
| |
| |
| st.markdown(""" |
| <style> |
| .main-header { |
| font-size: 2.5rem; |
| font-weight: bold; |
| text-align: center; |
| color: #FF6B35; |
| margin-bottom: 1rem; |
| } |
| .feature-box { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| padding: 1rem; |
| border-radius: 10px; |
| color: white; |
| margin: 0.5rem 0; |
| text-align: center; |
| } |
| .status-ok { color: #28a745; font-weight: bold; } |
| .status-warning { color: #ffc107; font-weight: bold; } |
| .status-error { color: #dc3545; font-weight: bold; } |
| </style> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| st.markdown(""" |
| <div style="text-align: center; margin-bottom: 2rem;"> |
| <p style="font-size: 1.1rem; color: #666;"> |
| AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| </p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| if 'processor' not in st.session_state: |
| with st.spinner("π§ Initializing AI Invoice Processor..."): |
| try: |
| st.session_state.processor = InvoiceProcessor() |
| st.session_state.chatbot = ChatBot(st.session_state.processor) |
| st.session_state.chat_history = [] |
| st.success("β
System initialized successfully!") |
| except Exception as e: |
| st.error(f"β Initialization failed: {e}") |
| st.stop() |
| |
| |
| with st.sidebar: |
| st.header("ποΈ System Status") |
| |
| processor = st.session_state.processor |
| |
| |
| if processor.document_processor.processors: |
| st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| |
| if processor.ai_extractor.use_transformers: |
| st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| |
| if processor.vector_store and processor.vector_store.embedding_model: |
| st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| |
| |
| st.header("π Quick Stats") |
| try: |
| data = processor.load_json_data() |
| total_invoices = len(data.get("invoices", [])) |
| total_amount = data.get("summary", {}).get("total_amount", 0) |
| |
| st.metric("Total Invoices", total_invoices) |
| st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| |
| except Exception as e: |
| st.error(f"Stats error: {e}") |
| |
| |
| st.header("βοΈ System Info") |
| st.info(f""" |
| **Session ID:** {session_id} |
| |
| **Limits:** |
| β’ Max file size: 10MB |
| β’ Max concurrent files: 3 |
| β’ Timeout: 30s |
| """) |
| |
| |
| selected_tab = st.radio( |
| "Choose a section:", |
| ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| horizontal=True, |
| key=f"main_navigation_{session_id}" |
| ) |
| |
| |
| |
| |
| |
| if selected_tab == "π€ Upload & Process": |
| st.header("π€ Upload Invoice Documents") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π€ AI Extraction</h4> |
| <p>Advanced NLP models extract structured data automatically</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col2: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Smart Search</h4> |
| <p>Semantic search finds invoices using natural language</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col3: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Analytics</h4> |
| <p>Comprehensive insights and visualizations</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown("### π Upload Your Invoices") |
| |
| |
| if f'uploaded_files_{session_id}' not in st.session_state: |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| if f'processing_complete_{session_id}' not in st.session_state: |
| st.session_state[f'processing_complete_{session_id}'] = False |
| if f'currently_processing_{session_id}' not in st.session_state: |
| st.session_state[f'currently_processing_{session_id}'] = False |
| if f'processed_file_hashes_{session_id}' not in st.session_state: |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| |
| |
| uploaded_files = st.file_uploader( |
| "Choose invoice files (PDF, TXT supported)", |
| type=['pdf', 'txt'], |
| accept_multiple_files=True, |
| help="Maximum file size: 10MB per file", |
| key=f"file_uploader_stable_{session_id}" |
| ) |
| |
| |
| if uploaded_files: |
| |
| current_file_hashes = set() |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| current_file_hashes.add(file_hash) |
| |
| |
| stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| if current_file_hashes != stored_hashes: |
| st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.info("π New files detected - ready for processing") |
| |
| |
| current_files = st.session_state[f'uploaded_files_{session_id}'] |
| is_processing = st.session_state[f'currently_processing_{session_id}'] |
| is_complete = st.session_state[f'processing_complete_{session_id}'] |
| |
| if current_files: |
| max_files = 3 |
| if len(current_files) > max_files: |
| st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| current_files = current_files[:max_files] |
| |
| st.info(f"π {len(current_files)} files selected") |
| |
| |
| st.markdown("**Selected Files:**") |
| for i, file in enumerate(current_files, 1): |
| file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| file_hash = hash((file.name, file.size)) |
| processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| |
| |
| col1, col2 = st.columns([1, 1]) |
| |
| with col1: |
| if not is_processing and not is_complete: |
| if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| st.session_state[f'currently_processing_{session_id}'] = True |
| st.rerun() |
| elif is_processing: |
| st.info("π Processing in progress...") |
| |
| process_files_once(current_files, session_id) |
| elif is_complete: |
| st.success("β
Processing completed!") |
| if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| with col2: |
| if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| |
| keys_to_clear = [ |
| f'uploaded_files_{session_id}', |
| f'uploaded_file_hashes_{session_id}', |
| f'processing_complete_{session_id}', |
| f'currently_processing_{session_id}', |
| f'processed_file_hashes_{session_id}' |
| ] |
| |
| for key in keys_to_clear: |
| if key in st.session_state: |
| del st.session_state[key] |
| |
| st.success("ποΈ Files cleared successfully!") |
| time.sleep(1) |
| st.rerun() |
| |
| else: |
| st.info("π Please select invoice files to upload and process") |
| |
| |
| if is_complete: |
| st.markdown("### π Recent Processing Results") |
| try: |
| data = st.session_state.processor.load_json_data() |
| recent_invoices = sorted( |
| data.get("invoices", []), |
| key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| reverse=True |
| )[:5] |
| |
| if recent_invoices: |
| for i, inv in enumerate(recent_invoices, 1): |
| with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| col1, col2 = st.columns(2) |
| with col1: |
| st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| with col2: |
| st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| else: |
| st.info("No recent processing results found.") |
| except Exception as e: |
| st.error(f"Error loading recent results: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π¬ AI Chat": |
| st.header("π¬ AI Chat Interface") |
| |
| |
| if st.session_state.chat_history: |
| st.markdown("### π¬ Chat History") |
| for i, message in enumerate(st.session_state.chat_history): |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
| |
| |
| st.markdown("### βοΈ Ask a Question") |
| |
| col1, col2 = st.columns([4, 1]) |
| |
| with col1: |
| user_input = st.text_input( |
| "Type your question:", |
| placeholder="e.g., 'show me total spending'", |
| key=f"chat_input_{session_id}" |
| ) |
| |
| with col2: |
| ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| |
| if ask_btn and user_input: |
| handle_chat_query(user_input) |
| |
| |
| if not st.session_state.chat_history: |
| st.markdown("### π‘ Try These Queries") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.markdown("**π Basic Queries:**") |
| basic_queries = [ |
| "Show me a summary of all invoices", |
| "How much have we spent in total?", |
| "Who are our top suppliers?", |
| "Find invoices with high amounts" |
| ] |
| for i, query in enumerate(basic_queries): |
| if st.button(query, key=f"basic_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| with col2: |
| st.markdown("**π Advanced Queries:**") |
| advanced_queries = [ |
| "Find technology purchases", |
| "Show office supplies", |
| "Search consulting services", |
| "Recent high-value invoices" |
| ] |
| for i, query in enumerate(advanced_queries): |
| if st.button(query, key=f"advanced_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| |
| if st.session_state.chat_history: |
| if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| st.session_state.chat_history = [] |
| st.rerun() |
| |
| |
| |
| |
| |
| elif selected_tab == "π Analytics": |
| st.header("π Analytics Dashboard") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices to see analytics.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'invoice_number': inv.get('invoice_number', ''), |
| 'supplier_name': inv.get('supplier_name', ''), |
| 'amount': inv.get('amount', 0), |
| 'date': inv.get('date', ''), |
| 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3, col4 = st.columns(4) |
| |
| with col1: |
| st.metric("Total Invoices", len(df)) |
| with col2: |
| st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| with col3: |
| st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| with col4: |
| st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| |
| |
| if len(df) > 0: |
| |
| fig_hist = px.histogram( |
| df, |
| x='amount', |
| title="Invoice Amount Distribution", |
| labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| ) |
| st.plotly_chart(fig_hist, use_container_width=True) |
| |
| |
| if df['supplier_name'].notna().any(): |
| supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| |
| if len(supplier_amounts) > 0: |
| fig_suppliers = px.bar( |
| x=supplier_amounts.values, |
| y=supplier_amounts.index, |
| orientation='h', |
| title="Top 10 Suppliers by Total Amount", |
| labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| ) |
| st.plotly_chart(fig_suppliers, use_container_width=True) |
| |
| except Exception as e: |
| st.error(f"Analytics error: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π Data Explorer": |
| st.header("π Data Explorer") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices first.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'Invoice Number': inv.get('invoice_number', ''), |
| 'Supplier': inv.get('supplier_name', ''), |
| 'Buyer': inv.get('buyer_name', ''), |
| 'Amount': inv.get('amount', 0), |
| 'Date': inv.get('date', ''), |
| 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| 'Method': inv.get('extraction_info', {}).get('method', ''), |
| 'File': inv.get('file_info', {}).get('file_name', ''), |
| 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| |
| with col2: |
| methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| |
| with col3: |
| min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| |
| |
| filtered_df = df.copy() |
| if selected_supplier != 'All': |
| filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| if selected_method != 'All': |
| filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| if min_amount > 0: |
| filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| |
| |
| st.dataframe( |
| filtered_df, |
| use_container_width=True, |
| column_config={ |
| "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| } |
| ) |
| |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| csv_data = filtered_df.to_csv(index=False) |
| st.download_button( |
| "Download CSV", |
| csv_data, |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| "text/csv", |
| key=f"download_csv_{session_id}" |
| ) |
| |
| with col2: |
| if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| filtered_invoices = [inv for inv in invoices |
| if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| |
| export_data = { |
| "exported_at": datetime.now().isoformat(), |
| "total_records": len(filtered_invoices), |
| "invoices": filtered_invoices |
| } |
| |
| st.download_button( |
| "Download JSON", |
| json.dumps(export_data, indent=2), |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| "application/json", |
| key=f"download_json_{session_id}" |
| ) |
| |
| except Exception as e: |
| st.error(f"Data explorer error: {e}") |
| |
| |
| |
| |
| |
| st.markdown("---") |
| st.markdown("### π¬ Quick Chat (Works from any section)") |
| |
| global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| |
| if global_query: |
| handle_chat_query(global_query, show_response=True) |
| |
| |
| st.markdown("---") |
| st.markdown(""" |
| <div style="text-align: center; color: #666;"> |
| <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| |
| |
|
|
| def process_files_once(uploaded_files, session_id): |
| """Process uploaded files only once with proper state management""" |
| if not uploaded_files: |
| st.error("No files to process!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| return |
| |
| st.markdown("### π Processing Files...") |
| |
| |
| processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| |
| |
| files_to_process = [] |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| if file_hash not in processed_hashes: |
| files_to_process.append((file, file_hash)) |
| |
| if not files_to_process: |
| st.info("β
All files have already been processed!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| return |
| |
| |
| progress_container = st.container() |
| status_container = st.container() |
| results_container = st.container() |
| |
| successful = 0 |
| failed = 0 |
| |
| |
| with progress_container: |
| progress_bar = st.progress(0) |
| progress_text = st.empty() |
| |
| with status_container: |
| st.info(f"Starting to process {len(files_to_process)} new files...") |
| |
| |
| for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| current_progress = (i + 1) / len(files_to_process) |
| |
| with progress_container: |
| progress_bar.progress(current_progress) |
| progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| |
| with status_container: |
| st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| |
| try: |
| |
| result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| |
| |
| processed_hashes.add(file_hash) |
| |
| |
| with results_container: |
| if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| successful += 1 |
| st.success(f"β
Successfully processed: {uploaded_file.name}") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| with col1: |
| st.write(f"**Invoice #:** {result.invoice_number}") |
| st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| with col2: |
| st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| st.write(f"**Date:** {result.date or 'Not found'}") |
| with col3: |
| st.write(f"**Method:** {result.processing_method}") |
| st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| |
| st.markdown("---") |
| else: |
| failed += 1 |
| st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| if result: |
| st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| st.markdown("---") |
| |
| except Exception as e: |
| failed += 1 |
| |
| processed_hashes.add(file_hash) |
| |
| with results_container: |
| st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| st.markdown("---") |
| |
| |
| st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| |
| |
| with progress_container: |
| progress_bar.progress(1.0) |
| progress_text.text("β
Processing completed!") |
| |
| with status_container: |
| if successful > 0: |
| st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| if successful > 0: |
| st.balloons() |
| else: |
| st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| |
| |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| |
| |
| st.rerun() |
|
|
| def process_files(uploaded_files, session_id): |
| """Legacy function - redirect to process_files_once""" |
| return process_files_once(uploaded_files, session_id) |
|
|
| def handle_chat_query(query, show_response=False): |
| """Handle chat query""" |
| st.session_state.chat_history.append({ |
| "role": "user", |
| "content": query, |
| "timestamp": datetime.now() |
| }) |
| |
| try: |
| with st.spinner("π€ AI is analyzing..."): |
| response = st.session_state.chatbot.query_database(query) |
| |
| st.session_state.chat_history.append({ |
| "role": "assistant", |
| "content": response, |
| "timestamp": datetime.now() |
| }) |
| |
| if show_response: |
| with st.chat_message("assistant"): |
| st.markdown(response) |
| st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| |
| st.rerun() |
| |
| except Exception as e: |
| st.error(f"Chat error: {e}") |
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point for Hugging Face Spaces""" |
| try: |
| if IS_HF_SPACE: |
| st.sidebar.info("π€ Running on Hugging Face Spaces") |
| |
| create_app() |
| |
| except Exception as e: |
| st.error(f""" |
| ## π¨ Application Error |
| |
| {e} |
| |
| Please refresh the page or check the logs for more details. |
| """) |
|
|
| if __name__ == "__main__": |
| main(), |
| |
| |
| r'([0-9,]+\.?\d*)\s*(?:dollars?|rupees?|usd|inr|eur|gbp)', |
| |
| |
| r'(?:price|cost|rate)\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| |
| |
| r'(?:^|\s)([0-9]{1,3}(?:,\d{3})*\.?\d{0,2})(?=\s|$)', |
| ], |
| 'date': [ |
| r'date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| r'(?:invoice|bill)\s*date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| r'(?:^|\s)(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})(?=\s|$)', |
| r'(\d{4}[/\-\.]\d{1,2}[/\-\.]\d{1,2})', |
| r'(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d{2,4})', |
| ], |
| 'quantity': [ |
| r'qty\s*:?\s*(\d+)', |
| r'quantity\s*:?\s*(\d+)', |
| r'(?:units?|pcs?|pieces?)\s*:?\s*(\d+)', |
| r'(\d+)\s*(?:pcs?|units?|items?|pieces?)', |
| ] |
| } |
| |
| text_lower = text.lower() |
| |
| |
| for pattern in patterns['invoice_number']: |
| match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| if match: |
| invoice_data.invoice_number = match.group(1).upper().strip() |
| break |
| |
| |
| amounts_found = [] |
| for pattern in patterns['amount']: |
| matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| for match in matches: |
| try: |
| amount_str = match.group(1).replace(',', '').replace(' ', '') |
| amount_val = float(amount_str) |
| if 0.01 <= amount_val <= 1000000: |
| amounts_found.append(amount_val) |
| except (ValueError, IndexError): |
| continue |
| |
| |
| if amounts_found: |
| |
| unique_amounts = sorted(set(amounts_found), reverse=True) |
| |
| invoice_data.amount = unique_amounts[0] |
| |
| |
| for pattern in patterns['date']: |
| match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) |
| if match: |
| invoice_data.date = self.parse_date(match.group(1)) |
| break |
| |
| |
| for pattern in patterns['quantity']: |
| match = re.search(pattern, text_lower, re.IGNORECASE) |
| if match: |
| try: |
| invoice_data.quantity = int(match.group(1)) |
| break |
| except ValueError: |
| continue |
| |
| |
| company_patterns = [ |
| r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)', |
| r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)', |
| ] |
| |
| companies_found = [] |
| for pattern in company_patterns: |
| matches = re.findall(pattern, text, re.MULTILINE) |
| for match in matches: |
| clean_company = match.strip().title() |
| if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']): |
| companies_found.append(clean_company) |
| |
| |
| if companies_found: |
| invoice_data.supplier_name = companies_found[0] |
| if len(companies_found) > 1: |
| invoice_data.buyer_name = companies_found[1] |
| |
| |
| desc_patterns = [ |
| r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| ] |
| |
| for pattern in desc_patterns: |
| match = re.search(pattern, text, re.IGNORECASE) |
| if match: |
| desc = match.group(1).strip() |
| if len(desc) > 5: |
| invoice_data.product_description = desc[:200] |
| break |
| |
| |
| confidence_factors = [] |
| if invoice_data.invoice_number: |
| confidence_factors.append(0.3) |
| if invoice_data.amount > 0: |
| confidence_factors.append(0.3) |
| if invoice_data.supplier_name: |
| confidence_factors.append(0.2) |
| if invoice_data.date: |
| confidence_factors.append(0.1) |
| if invoice_data.quantity > 0: |
| confidence_factors.append(0.1) |
| |
| invoice_data.extraction_confidence = sum(confidence_factors) |
| |
| return invoice_data |
| |
| def parse_date(self, date_str: str) -> str: |
| """Parse date to YYYY-MM-DD format""" |
| if not date_str: |
| return "" |
| |
| formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] |
| |
| for fmt in formats: |
| try: |
| parsed_date = datetime.strptime(date_str, fmt) |
| return parsed_date.strftime('%Y-%m-%d') |
| except ValueError: |
| continue |
| |
| return date_str |
|
|
| |
| |
| |
|
|
| class VectorStore: |
| """Simplified vector store for Hugging Face Spaces""" |
| |
| def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): |
| self.embedding_model_name = embedding_model |
| self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") |
| self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") |
| self.embedding_model = None |
| self.vectors = [] |
| self.document_metadata = [] |
| self.embedding_dimension = None |
| |
| self.setup_embedding_model() |
| self.load_vector_store() |
| |
| def setup_embedding_model(self): |
| """Initialize the sentence transformer model""" |
| if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| st.warning("β οΈ Sentence Transformers not available. Vector search disabled.") |
| return |
| |
| try: |
| with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): |
| self.embedding_model = SentenceTransformer( |
| self.embedding_model_name, |
| cache_folder=HF_CONFIG["cache_dir"] |
| ) |
| |
| |
| test_embedding = self.embedding_model.encode(["test"]) |
| self.embedding_dimension = test_embedding.shape[0] |
| |
| st.success(f"β
Embedding model loaded: {self.embedding_model_name}") |
| |
| except Exception as e: |
| st.error(f"β Failed to load embedding model: {e}") |
| self.embedding_model = None |
| |
| def load_vector_store(self): |
| """Load existing vector store""" |
| try: |
| if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): |
| with open(self.vector_store_path, 'rb') as f: |
| self.vectors = pickle.load(f) |
| |
| with open(self.metadata_path, 'rb') as f: |
| self.document_metadata = pickle.load(f) |
| |
| st.success(f"β
Vector store loaded: {len(self.document_metadata)} documents") |
| else: |
| self.vectors = [] |
| self.document_metadata = [] |
| st.info("π New vector store initialized") |
| |
| except Exception as e: |
| st.error(f"β Error loading vector store: {e}") |
| self.vectors = [] |
| self.document_metadata = [] |
| |
| def save_vector_store(self): |
| """Save vector store to disk""" |
| try: |
| with open(self.vector_store_path, 'wb') as f: |
| pickle.dump(self.vectors, f) |
| |
| with open(self.metadata_path, 'wb') as f: |
| pickle.dump(self.document_metadata, f) |
| |
| return True |
| except Exception as e: |
| st.error(f"Error saving vector store: {e}") |
| return False |
| |
| def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: |
| """Create searchable text from invoice data""" |
| text_parts = [] |
| |
| for field, value in invoice_data.items(): |
| if value and field != 'id': |
| text_parts.append(f"{field}: {value}") |
| |
| if raw_text: |
| text_parts.append(f"content: {raw_text[:300]}") |
| |
| return " | ".join(text_parts) |
| |
| def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: |
| """Add a document to the vector store""" |
| if not self.embedding_model: |
| return False |
| |
| try: |
| document_text = self.create_document_text(invoice_data, raw_text) |
| |
| |
| embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) |
| |
| |
| metadata = { |
| 'invoice_id': invoice_data.get('id', ''), |
| 'invoice_number': invoice_data.get('invoice_number', ''), |
| 'supplier_name': invoice_data.get('supplier_name', ''), |
| 'buyer_name': invoice_data.get('buyer_name', ''), |
| 'amount': invoice_data.get('amount', 0), |
| 'date': invoice_data.get('date', ''), |
| 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), |
| 'document_text': document_text[:200], |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| |
| self.vectors.append(embedding) |
| self.document_metadata.append(metadata) |
| |
| return True |
| |
| except Exception as e: |
| st.error(f"Error adding document to vector store: {e}") |
| return False |
| |
| def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: |
| """Perform semantic search using cosine similarity""" |
| if not self.embedding_model or not self.vectors: |
| return [] |
| |
| try: |
| |
| query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) |
| |
| |
| similarities = [] |
| for i, doc_embedding in enumerate(self.vectors): |
| similarity = np.dot(query_embedding, doc_embedding) |
| similarities.append((similarity, i)) |
| |
| |
| similarities.sort(reverse=True) |
| |
| |
| results = [] |
| for similarity, idx in similarities[:top_k]: |
| if similarity > 0.1: |
| metadata = self.document_metadata[idx] |
| result = VectorSearchResult( |
| invoice_id=metadata.get('invoice_id', ''), |
| invoice_number=metadata.get('invoice_number', ''), |
| supplier_name=metadata.get('supplier_name', ''), |
| similarity_score=float(similarity), |
| content_preview=metadata.get('document_text', ''), |
| metadata=metadata |
| ) |
| results.append(result) |
| |
| return results |
| |
| except Exception as e: |
| st.error(f"Error in semantic search: {e}") |
| return [] |
|
|
| |
| |
| |
|
|
| class InvoiceProcessor: |
| """Main invoice processor for Hugging Face Spaces""" |
| |
| def __init__(self): |
| self.setup_storage() |
| self.document_processor = DocumentProcessor() |
| self.ai_extractor = AIExtractor() |
| self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None |
| |
| |
| self.processing_stats = { |
| 'total_processed': 0, |
| 'successful': 0, |
| 'failed': 0, |
| 'start_time': datetime.now() |
| } |
| |
| def setup_storage(self): |
| """Setup storage paths""" |
| self.data_dir = HF_CONFIG["data_dir"] |
| self.json_path = os.path.join(self.data_dir, "invoices.json") |
| |
| |
| if not os.path.exists(self.json_path): |
| initial_data = { |
| "metadata": { |
| "created_at": datetime.now().isoformat(), |
| "version": "hf_v1.0", |
| "total_invoices": 0 |
| }, |
| "invoices": [], |
| "summary": { |
| "total_amount": 0.0, |
| "unique_suppliers": [], |
| "processing_stats": {"successful": 0, "failed": 0} |
| } |
| } |
| self.save_json_data(initial_data) |
| |
| def load_json_data(self) -> dict: |
| """Load invoice data from JSON""" |
| try: |
| with open(self.json_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except (FileNotFoundError, json.JSONDecodeError): |
| self.setup_storage() |
| return self.load_json_data() |
| |
| def save_json_data(self, data: dict): |
| """Save invoice data to JSON""" |
| try: |
| with open(self.json_path, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
| except Exception as e: |
| st.error(f"Error saving data: {e}") |
| |
| def process_uploaded_file(self, uploaded_file) -> InvoiceData: |
| """Process a single uploaded file with enhanced debugging""" |
| self.processing_stats['total_processed'] += 1 |
| |
| try: |
| |
| file_size = len(uploaded_file.getvalue()) |
| file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' |
| |
| st.info(f"π Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") |
| |
| |
| if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: |
| error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| |
| if file_extension not in ['pdf', 'txt']: |
| error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" |
| st.warning(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: |
| file_content = uploaded_file.getvalue() |
| tmp_file.write(file_content) |
| tmp_file_path = tmp_file.name |
| |
| st.info(f"πΎ Saved temporarily to: {tmp_file_path}") |
| |
| try: |
| |
| st.info("π Extracting text from document...") |
| text = self.document_processor.extract_text_from_document(tmp_file_path) |
| |
| if not text or not text.strip(): |
| st.warning(f"β No text extracted from {uploaded_file.name}") |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| text_length = len(text) |
| st.info(f"π Extracted {text_length} characters of text") |
| |
| |
| if text_length > 0: |
| with st.expander("π Text Preview (First 500 characters)", expanded=False): |
| st.text(text[:500] + "..." if len(text) > 500 else text) |
| |
| |
| st.info("π€ Extracting invoice data using AI/Regex...") |
| invoice_data = self.ai_extractor.extract_with_ai(text) |
| invoice_data.file_path = uploaded_file.name |
| |
| |
| st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| |
| |
| st.info("πΎ Saving extracted data...") |
| self.save_invoice_data(invoice_data, text, file_size) |
| |
| self.processing_stats['successful'] += 1 |
| st.success(f"β
Successfully processed {uploaded_file.name}") |
| |
| return invoice_data |
| |
| finally: |
| |
| try: |
| os.unlink(tmp_file_path) |
| st.info("π§Ή Cleaned up temporary file") |
| except: |
| pass |
| |
| except Exception as e: |
| error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| |
| |
| with st.expander("π Error Details", expanded=False): |
| st.code(str(e)) |
| import traceback |
| st.code(traceback.format_exc()) |
| |
| return InvoiceData() |
| |
| def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| """Save invoice data to JSON and vector store""" |
| try: |
| |
| data = self.load_json_data() |
| |
| |
| invoice_record = { |
| "id": len(data["invoices"]) + 1, |
| "invoice_number": invoice_data.invoice_number, |
| "supplier_name": invoice_data.supplier_name, |
| "buyer_name": invoice_data.buyer_name, |
| "date": invoice_data.date, |
| "amount": invoice_data.amount, |
| "quantity": invoice_data.quantity, |
| "product_description": invoice_data.product_description, |
| "file_info": { |
| "file_name": invoice_data.file_path, |
| "file_size": file_size |
| }, |
| "extraction_info": { |
| "confidence": invoice_data.extraction_confidence, |
| "method": invoice_data.processing_method, |
| "raw_text_preview": raw_text[:300] |
| }, |
| "timestamps": { |
| "created_at": datetime.now().isoformat() |
| } |
| } |
| |
| |
| data["invoices"].append(invoice_record) |
| |
| |
| self.update_summary(data) |
| |
| |
| self.save_json_data(data) |
| |
| |
| if self.vector_store: |
| self.vector_store.add_document(invoice_record, raw_text) |
| self.vector_store.save_vector_store() |
| |
| except Exception as e: |
| st.error(f"Error saving invoice data: {e}") |
| |
| def update_summary(self, data: dict): |
| """Update summary statistics""" |
| invoices = data["invoices"] |
| |
| total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| |
| data["summary"] = { |
| "total_amount": total_amount, |
| "unique_suppliers": unique_suppliers, |
| "processing_stats": { |
| "successful": self.processing_stats['successful'], |
| "failed": self.processing_stats['failed'], |
| "total_processed": self.processing_stats['total_processed'] |
| } |
| } |
| |
| data["metadata"]["last_updated"] = datetime.now().isoformat() |
| data["metadata"]["total_invoices"] = len(invoices) |
|
|
| |
| |
| |
|
|
| class ChatBot: |
| """Chatbot for invoice queries""" |
| |
| def __init__(self, processor: InvoiceProcessor): |
| self.processor = processor |
| |
| def query_database(self, query: str) -> str: |
| """Process user query and return response""" |
| try: |
| data = self.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| return "No invoice data found. Please upload some invoices first." |
| |
| query_lower = query.lower() |
| |
| |
| if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| return self.generate_summary(data) |
| |
| elif "count" in query_lower or "how many" in query_lower: |
| return self.handle_count_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| return self.handle_amount_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| return self.handle_supplier_query(data, query) |
| |
| elif self.processor.vector_store: |
| return self.handle_semantic_search(query) |
| |
| else: |
| return self.handle_general_query(data, query) |
| |
| except Exception as e: |
| return f"Error processing query: {e}" |
| |
| def generate_summary(self, data: dict) -> str: |
| """Generate comprehensive summary""" |
| invoices = data.get("invoices", []) |
| summary = data.get("summary", {}) |
| |
| if not invoices: |
| return "No invoices found in the system." |
| |
| total_amount = summary.get("total_amount", 0) |
| avg_amount = total_amount / len(invoices) if invoices else 0 |
| unique_suppliers = len(summary.get("unique_suppliers", [])) |
| |
| response = f""" |
| **π Invoice System Summary** |
| |
| β’ **Total Invoices**: {len(invoices):,} |
| β’ **Total Value**: βΉ{total_amount:,.2f} |
| β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| β’ **Unique Suppliers**: {unique_suppliers} |
| |
| **π Processing Stats** |
| β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| |
| **π Recent Invoices** |
| """ |
| |
| |
| recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| for i, inv in enumerate(recent, 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_count_query(self, data: dict) -> str: |
| """Handle count-related queries""" |
| invoices = data.get("invoices", []) |
| total = len(invoices) |
| unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| |
| return f""" |
| **π Invoice Count Summary** |
| |
| β’ **Total Records**: {total} |
| β’ **Unique Invoice Numbers**: {unique_numbers} |
| β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| |
| **π
Processing Timeline** |
| β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| """ |
| |
| def handle_amount_query(self, data: dict) -> str: |
| """Handle amount-related queries""" |
| invoices = data.get("invoices", []) |
| amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| |
| if not amounts: |
| return "No amount information found in invoices." |
| |
| total_amount = sum(amounts) |
| avg_amount = total_amount / len(amounts) |
| max_amount = max(amounts) |
| min_amount = min(amounts) |
| |
| |
| high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| |
| response = f""" |
| **π° Financial Analysis** |
| |
| β’ **Total Amount**: βΉ{total_amount:,.2f} |
| β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| |
| **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| """ |
| |
| for i, inv in enumerate(high_value_invoices[:5], 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_supplier_query(self, data: dict, query: str) -> str: |
| """Handle supplier-related queries""" |
| invoices = data.get("invoices", []) |
| |
| |
| supplier_counts = {} |
| supplier_amounts = {} |
| |
| for inv in invoices: |
| supplier = inv.get('supplier_name', '').strip() |
| if supplier: |
| supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| |
| if not supplier_counts: |
| return "No supplier information found in invoices." |
| |
| |
| top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| |
| response = f""" |
| **π’ Supplier Analysis** |
| |
| β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| |
| **π° Top Suppliers by Amount** |
| """ |
| |
| for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| count = supplier_counts[supplier] |
| avg = amount / count if count > 0 else 0 |
| response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| |
| return response |
| |
| def handle_semantic_search(self, query: str) -> str: |
| """Handle semantic search queries""" |
| try: |
| results = self.processor.vector_store.semantic_search(query, top_k=5) |
| |
| if not results: |
| return f"No relevant results found for '{query}'. Try different keywords." |
| |
| response = f"π **Semantic Search Results for '{query}'**\n\n" |
| |
| for i, result in enumerate(results, 1): |
| response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| |
| return response |
| |
| except Exception as e: |
| return f"Semantic search error: {e}" |
| |
| def handle_general_query(self, data: dict, query: str) -> str: |
| """Handle general queries with keyword search""" |
| invoices = data.get("invoices", []) |
| query_words = query.lower().split() |
| |
| |
| matching_invoices = [] |
| for inv in invoices: |
| text_to_search = ( |
| inv.get('supplier_name', '') + ' ' + |
| inv.get('buyer_name', '') + ' ' + |
| inv.get('product_description', '') + ' ' + |
| inv.get('extraction_info', {}).get('raw_text_preview', '') |
| ).lower() |
| |
| if any(word in text_to_search for word in query_words): |
| matching_invoices.append(inv) |
| |
| if not matching_invoices: |
| return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| |
| response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| |
| for i, inv in enumerate(matching_invoices[:5], 1): |
| response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| |
| if len(matching_invoices) > 5: |
| response += f"... and {len(matching_invoices) - 5} more results." |
| |
| return response |
|
|
| |
| |
| |
|
|
| def create_app(): |
| """Main Streamlit application""" |
| |
| |
| if 'session_id' not in st.session_state: |
| st.session_state.session_id = str(uuid.uuid4())[:8] |
| |
| session_id = st.session_state.session_id |
| |
| |
| st.markdown(""" |
| <style> |
| .main-header { |
| font-size: 2.5rem; |
| font-weight: bold; |
| text-align: center; |
| color: #FF6B35; |
| margin-bottom: 1rem; |
| } |
| .feature-box { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| padding: 1rem; |
| border-radius: 10px; |
| color: white; |
| margin: 0.5rem 0; |
| text-align: center; |
| } |
| .status-ok { color: #28a745; font-weight: bold; } |
| .status-warning { color: #ffc107; font-weight: bold; } |
| .status-error { color: #dc3545; font-weight: bold; } |
| </style> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| st.markdown(""" |
| <div style="text-align: center; margin-bottom: 2rem;"> |
| <p style="font-size: 1.1rem; color: #666;"> |
| AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| </p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| if 'processor' not in st.session_state: |
| with st.spinner("π§ Initializing AI Invoice Processor..."): |
| try: |
| st.session_state.processor = InvoiceProcessor() |
| st.session_state.chatbot = ChatBot(st.session_state.processor) |
| st.session_state.chat_history = [] |
| st.success("β
System initialized successfully!") |
| except Exception as e: |
| st.error(f"β Initialization failed: {e}") |
| st.stop() |
| |
| |
| with st.sidebar: |
| st.header("ποΈ System Status") |
| |
| processor = st.session_state.processor |
| |
| |
| if processor.document_processor.processors: |
| st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| |
| if processor.ai_extractor.use_transformers: |
| st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| |
| if processor.vector_store and processor.vector_store.embedding_model: |
| st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| |
| |
| st.header("π Quick Stats") |
| try: |
| data = processor.load_json_data() |
| total_invoices = len(data.get("invoices", [])) |
| total_amount = data.get("summary", {}).get("total_amount", 0) |
| |
| st.metric("Total Invoices", total_invoices) |
| st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| |
| except Exception as e: |
| st.error(f"Stats error: {e}") |
| |
| |
| st.header("βοΈ System Info") |
| st.info(f""" |
| **Session ID:** {session_id} |
| |
| **Limits:** |
| β’ Max file size: 10MB |
| β’ Max concurrent files: 3 |
| β’ Timeout: 30s |
| """) |
| |
| |
| selected_tab = st.radio( |
| "Choose a section:", |
| ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| horizontal=True, |
| key=f"main_navigation_{session_id}" |
| ) |
| |
| |
| |
| |
| |
| if selected_tab == "π€ Upload & Process": |
| st.header("π€ Upload Invoice Documents") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π€ AI Extraction</h4> |
| <p>Advanced NLP models extract structured data automatically</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col2: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Smart Search</h4> |
| <p>Semantic search finds invoices using natural language</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col3: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Analytics</h4> |
| <p>Comprehensive insights and visualizations</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown("### π Upload Your Invoices") |
| |
| |
| if f'uploaded_files_{session_id}' not in st.session_state: |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| if f'processing_complete_{session_id}' not in st.session_state: |
| st.session_state[f'processing_complete_{session_id}'] = False |
| if f'currently_processing_{session_id}' not in st.session_state: |
| st.session_state[f'currently_processing_{session_id}'] = False |
| if f'processed_file_hashes_{session_id}' not in st.session_state: |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| |
| |
| uploaded_files = st.file_uploader( |
| "Choose invoice files (PDF, TXT supported)", |
| type=['pdf', 'txt'], |
| accept_multiple_files=True, |
| help="Maximum file size: 10MB per file", |
| key=f"file_uploader_stable_{session_id}" |
| ) |
| |
| |
| if uploaded_files: |
| |
| current_file_hashes = set() |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| current_file_hashes.add(file_hash) |
| |
| |
| stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| if current_file_hashes != stored_hashes: |
| st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.info("π New files detected - ready for processing") |
| |
| |
| current_files = st.session_state[f'uploaded_files_{session_id}'] |
| is_processing = st.session_state[f'currently_processing_{session_id}'] |
| is_complete = st.session_state[f'processing_complete_{session_id}'] |
| |
| if current_files: |
| max_files = 3 |
| if len(current_files) > max_files: |
| st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| current_files = current_files[:max_files] |
| |
| st.info(f"π {len(current_files)} files selected") |
| |
| |
| st.markdown("**Selected Files:**") |
| for i, file in enumerate(current_files, 1): |
| file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| file_hash = hash((file.name, file.size)) |
| processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| |
| |
| col1, col2 = st.columns([1, 1]) |
| |
| with col1: |
| if not is_processing and not is_complete: |
| if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| st.session_state[f'currently_processing_{session_id}'] = True |
| st.rerun() |
| elif is_processing: |
| st.info("π Processing in progress...") |
| |
| process_files_once(current_files, session_id) |
| elif is_complete: |
| st.success("β
Processing completed!") |
| if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| with col2: |
| if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = set() |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| else: |
| st.info("π Please select invoice files to upload and process") |
| |
| |
| if is_complete: |
| st.markdown("### π Recent Processing Results") |
| try: |
| data = st.session_state.processor.load_json_data() |
| recent_invoices = sorted( |
| data.get("invoices", []), |
| key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| reverse=True |
| )[:5] |
| |
| if recent_invoices: |
| for i, inv in enumerate(recent_invoices, 1): |
| with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| col1, col2 = st.columns(2) |
| with col1: |
| st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| with col2: |
| st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| else: |
| st.info("No recent processing results found.") |
| except Exception as e: |
| st.error(f"Error loading recent results: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π¬ AI Chat": |
| st.header("π¬ AI Chat Interface") |
| |
| |
| if st.session_state.chat_history: |
| st.markdown("### π¬ Chat History") |
| for i, message in enumerate(st.session_state.chat_history): |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
| |
| |
| st.markdown("### βοΈ Ask a Question") |
| |
| col1, col2 = st.columns([4, 1]) |
| |
| with col1: |
| user_input = st.text_input( |
| "Type your question:", |
| placeholder="e.g., 'show me total spending'", |
| key=f"chat_input_{session_id}" |
| ) |
| |
| with col2: |
| ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| |
| if ask_btn and user_input: |
| handle_chat_query(user_input) |
| |
| |
| if not st.session_state.chat_history: |
| st.markdown("### π‘ Try These Queries") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.markdown("**π Basic Queries:**") |
| basic_queries = [ |
| "Show me a summary of all invoices", |
| "How much have we spent in total?", |
| "Who are our top suppliers?", |
| "Find invoices with high amounts" |
| ] |
| for i, query in enumerate(basic_queries): |
| if st.button(query, key=f"basic_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| with col2: |
| st.markdown("**π Advanced Queries:**") |
| advanced_queries = [ |
| "Find technology purchases", |
| "Show office supplies", |
| "Search consulting services", |
| "Recent high-value invoices" |
| ] |
| for i, query in enumerate(advanced_queries): |
| if st.button(query, key=f"advanced_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| |
| if st.session_state.chat_history: |
| if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| st.session_state.chat_history = [] |
| st.rerun() |
| |
| |
| |
| |
| |
| elif selected_tab == "π Analytics": |
| st.header("π Analytics Dashboard") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices to see analytics.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'invoice_number': inv.get('invoice_number', ''), |
| 'supplier_name': inv.get('supplier_name', ''), |
| 'amount': inv.get('amount', 0), |
| 'date': inv.get('date', ''), |
| 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3, col4 = st.columns(4) |
| |
| with col1: |
| st.metric("Total Invoices", len(df)) |
| with col2: |
| st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| with col3: |
| st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| with col4: |
| st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| |
| |
| if len(df) > 0: |
| |
| fig_hist = px.histogram( |
| df, |
| x='amount', |
| title="Invoice Amount Distribution", |
| labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| ) |
| st.plotly_chart(fig_hist, use_container_width=True) |
| |
| |
| if df['supplier_name'].notna().any(): |
| supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| |
| if len(supplier_amounts) > 0: |
| fig_suppliers = px.bar( |
| x=supplier_amounts.values, |
| y=supplier_amounts.index, |
| orientation='h', |
| title="Top 10 Suppliers by Total Amount", |
| labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| ) |
| st.plotly_chart(fig_suppliers, use_container_width=True) |
| |
| except Exception as e: |
| st.error(f"Analytics error: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π Data Explorer": |
| st.header("π Data Explorer") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices first.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'Invoice Number': inv.get('invoice_number', ''), |
| 'Supplier': inv.get('supplier_name', ''), |
| 'Buyer': inv.get('buyer_name', ''), |
| 'Amount': inv.get('amount', 0), |
| 'Date': inv.get('date', ''), |
| 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| 'Method': inv.get('extraction_info', {}).get('method', ''), |
| 'File': inv.get('file_info', {}).get('file_name', ''), |
| 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| |
| with col2: |
| methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| |
| with col3: |
| min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| |
| |
| filtered_df = df.copy() |
| if selected_supplier != 'All': |
| filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| if selected_method != 'All': |
| filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| if min_amount > 0: |
| filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| |
| |
| st.dataframe( |
| filtered_df, |
| use_container_width=True, |
| column_config={ |
| "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| } |
| ) |
| |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| csv_data = filtered_df.to_csv(index=False) |
| st.download_button( |
| "Download CSV", |
| csv_data, |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| "text/csv", |
| key=f"download_csv_{session_id}" |
| ) |
| |
| with col2: |
| if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| filtered_invoices = [inv for inv in invoices |
| if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| |
| export_data = { |
| "exported_at": datetime.now().isoformat(), |
| "total_records": len(filtered_invoices), |
| "invoices": filtered_invoices |
| } |
| |
| st.download_button( |
| "Download JSON", |
| json.dumps(export_data, indent=2), |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| "application/json", |
| key=f"download_json_{session_id}" |
| ) |
| |
| except Exception as e: |
| st.error(f"Data explorer error: {e}") |
| |
| |
| |
| |
| |
| st.markdown("---") |
| st.markdown("### π¬ Quick Chat (Works from any section)") |
| |
| global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| |
| if global_query: |
| handle_chat_query(global_query, show_response=True) |
| |
| |
| st.markdown("---") |
| st.markdown(""" |
| <div style="text-align: center; color: #666;"> |
| <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| |
| |
|
|
| def process_files_once(uploaded_files, session_id): |
| """Process uploaded files only once with proper state management""" |
| if not uploaded_files: |
| st.error("No files to process!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| return |
| |
| st.markdown("### π Processing Files...") |
| |
| |
| processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| |
| |
| files_to_process = [] |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| if file_hash not in processed_hashes: |
| files_to_process.append((file, file_hash)) |
| |
| if not files_to_process: |
| st.info("β
All files have already been processed!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| return |
| |
| |
| progress_container = st.container() |
| status_container = st.container() |
| results_container = st.container() |
| |
| successful = 0 |
| failed = 0 |
| |
| |
| with progress_container: |
| progress_bar = st.progress(0) |
| progress_text = st.empty() |
| |
| with status_container: |
| st.info(f"Starting to process {len(files_to_process)} new files...") |
| |
| |
| for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| current_progress = (i + 1) / len(files_to_process) |
| |
| with progress_container: |
| progress_bar.progress(current_progress) |
| progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| |
| with status_container: |
| st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| |
| try: |
| |
| result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| |
| |
| processed_hashes.add(file_hash) |
| |
| |
| with results_container: |
| if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| successful += 1 |
| st.success(f"β
Successfully processed: {uploaded_file.name}") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| with col1: |
| st.write(f"**Invoice #:** {result.invoice_number}") |
| st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| with col2: |
| st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| st.write(f"**Date:** {result.date or 'Not found'}") |
| with col3: |
| st.write(f"**Method:** {result.processing_method}") |
| st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| |
| st.markdown("---") |
| else: |
| failed += 1 |
| st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| if result: |
| st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| st.markdown("---") |
| |
| except Exception as e: |
| failed += 1 |
| |
| processed_hashes.add(file_hash) |
| |
| with results_container: |
| st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| st.markdown("---") |
| |
| |
| st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| |
| |
| with progress_container: |
| progress_bar.progress(1.0) |
| progress_text.text("β
Processing completed!") |
| |
| with status_container: |
| if successful > 0: |
| st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| if successful > 0: |
| st.balloons() |
| else: |
| st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| |
| |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| |
| |
| st.rerun() |
|
|
| def process_files(uploaded_files, session_id): |
| """Legacy function - redirect to process_files_once""" |
| return process_files_once(uploaded_files, session_id) |
|
|
| def handle_chat_query(query, show_response=False): |
| """Handle chat query""" |
| st.session_state.chat_history.append({ |
| "role": "user", |
| "content": query, |
| "timestamp": datetime.now() |
| }) |
| |
| try: |
| with st.spinner("π€ AI is analyzing..."): |
| response = st.session_state.chatbot.query_database(query) |
| |
| st.session_state.chat_history.append({ |
| "role": "assistant", |
| "content": response, |
| "timestamp": datetime.now() |
| }) |
| |
| if show_response: |
| with st.chat_message("assistant"): |
| st.markdown(response) |
| st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| |
| st.rerun() |
| |
| except Exception as e: |
| st.error(f"Chat error: {e}") |
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point for Hugging Face Spaces""" |
| try: |
| if IS_HF_SPACE: |
| st.sidebar.info("π€ Running on Hugging Face Spaces") |
| |
| create_app() |
| |
| except Exception as e: |
| st.error(f""" |
| ## π¨ Application Error |
| |
| {e} |
| |
| Please refresh the page or check the logs for more details. |
| """) |
|
|
| if __name__ == "__main__": |
| main(), |
| ] |
| |
| amounts_found = [] |
| for i, pattern in enumerate(amount_patterns): |
| matches = re.findall(pattern, text.lower(), re.IGNORECASE | re.MULTILINE) |
| if matches: |
| st.write(f"Pattern {i+1}: {matches}") |
| for match in matches: |
| try: |
| amount_val = float(match.replace(',', '')) |
| amounts_found.append(amount_val) |
| except: |
| pass |
| |
| if amounts_found: |
| st.success(f"β
Found amounts: {amounts_found}") |
| else: |
| st.warning("β οΈ No amounts detected in text") |
| |
| |
| st.markdown("**π Invoice Number Detection Debug:**") |
| inv_patterns = [ |
| r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', |
| r'#\s*([A-Z0-9\-_/]{3,})', |
| ] |
| |
| for i, pattern in enumerate(inv_patterns): |
| matches = re.findall(pattern, text.lower(), re.IGNORECASE) |
| if matches: |
| st.write(f"Invoice Pattern {i+1}: {matches}") |
| |
| |
| if st.checkbox("Show Full Extracted Text", key=f"debug_full_text_{uploaded_file.name}"): |
| st.text_area("Full Text:", value=text, height=300, disabled=True) |
| |
| |
| st.info("π€ Extracting invoice data using AI/Regex...") |
| invoice_data = self.ai_extractor.extract_with_ai(text) |
| invoice_data.file_path = uploaded_file.name |
| |
| |
| st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| |
| |
| st.info("πΎ Saving extracted data...") |
| self.save_invoice_data(invoice_data, text, file_size) |
| |
| self.processing_stats['successful'] += 1 |
| st.success(f"β
Successfully processed {uploaded_file.name}") |
| |
| return invoice_data |
| |
| finally: |
| |
| try: |
| os.unlink(tmp_file_path) |
| st.info("π§Ή Cleaned up temporary file") |
| except: |
| pass |
| |
| except Exception as e: |
| error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| |
| |
| with st.expander("π Error Details", expanded=False): |
| st.code(str(e)) |
| import traceback |
| st.code(traceback.format_exc()) |
| |
| return InvoiceData() |
| |
| def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| """Save invoice data to JSON and vector store""" |
| try: |
| |
| data = self.load_json_data() |
| |
| |
| invoice_record = { |
| "id": len(data["invoices"]) + 1, |
| "invoice_number": invoice_data.invoice_number, |
| "supplier_name": invoice_data.supplier_name, |
| "buyer_name": invoice_data.buyer_name, |
| "date": invoice_data.date, |
| "amount": invoice_data.amount, |
| "quantity": invoice_data.quantity, |
| "product_description": invoice_data.product_description, |
| "file_info": { |
| "file_name": invoice_data.file_path, |
| "file_size": file_size |
| }, |
| "extraction_info": { |
| "confidence": invoice_data.extraction_confidence, |
| "method": invoice_data.processing_method, |
| "raw_text_preview": raw_text[:300] |
| }, |
| "timestamps": { |
| "created_at": datetime.now().isoformat() |
| } |
| } |
| |
| |
| data["invoices"].append(invoice_record) |
| |
| |
| self.update_summary(data) |
| |
| |
| self.save_json_data(data) |
| |
| |
| if self.vector_store: |
| self.vector_store.add_document(invoice_record, raw_text) |
| self.vector_store.save_vector_store() |
| |
| except Exception as e: |
| st.error(f"Error saving invoice data: {e}") |
| |
| def update_summary(self, data: dict): |
| """Update summary statistics""" |
| invoices = data["invoices"] |
| |
| total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| |
| data["summary"] = { |
| "total_amount": total_amount, |
| "unique_suppliers": unique_suppliers, |
| "processing_stats": { |
| "successful": self.processing_stats['successful'], |
| "failed": self.processing_stats['failed'], |
| "total_processed": self.processing_stats['total_processed'] |
| } |
| } |
| |
| data["metadata"]["last_updated"] = datetime.now().isoformat() |
| data["metadata"]["total_invoices"] = len(invoices) |
|
|
| |
| |
| |
|
|
| class ChatBot: |
| """Chatbot for invoice queries""" |
| |
| def __init__(self, processor: InvoiceProcessor): |
| self.processor = processor |
| |
| def query_database(self, query: str) -> str: |
| """Process user query and return response""" |
| try: |
| data = self.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| return "No invoice data found. Please upload some invoices first." |
| |
| query_lower = query.lower() |
| |
| |
| if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| return self.generate_summary(data) |
| |
| elif "count" in query_lower or "how many" in query_lower: |
| return self.handle_count_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| return self.handle_amount_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| return self.handle_supplier_query(data, query) |
| |
| elif self.processor.vector_store: |
| return self.handle_semantic_search(query) |
| |
| else: |
| return self.handle_general_query(data, query) |
| |
| except Exception as e: |
| return f"Error processing query: {e}" |
| |
| def generate_summary(self, data: dict) -> str: |
| """Generate comprehensive summary""" |
| invoices = data.get("invoices", []) |
| summary = data.get("summary", {}) |
| |
| if not invoices: |
| return "No invoices found in the system." |
| |
| total_amount = summary.get("total_amount", 0) |
| avg_amount = total_amount / len(invoices) if invoices else 0 |
| unique_suppliers = len(summary.get("unique_suppliers", [])) |
| |
| response = f""" |
| **π Invoice System Summary** |
| |
| β’ **Total Invoices**: {len(invoices):,} |
| β’ **Total Value**: βΉ{total_amount:,.2f} |
| β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| β’ **Unique Suppliers**: {unique_suppliers} |
| |
| **π Processing Stats** |
| β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| |
| **π Recent Invoices** |
| """ |
| |
| |
| recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| for i, inv in enumerate(recent, 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_count_query(self, data: dict) -> str: |
| """Handle count-related queries""" |
| invoices = data.get("invoices", []) |
| total = len(invoices) |
| unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| |
| return f""" |
| **π Invoice Count Summary** |
| |
| β’ **Total Records**: {total} |
| β’ **Unique Invoice Numbers**: {unique_numbers} |
| β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| |
| **π
Processing Timeline** |
| β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| """ |
| |
| def handle_amount_query(self, data: dict) -> str: |
| """Handle amount-related queries""" |
| invoices = data.get("invoices", []) |
| amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| |
| if not amounts: |
| return "No amount information found in invoices." |
| |
| total_amount = sum(amounts) |
| avg_amount = total_amount / len(amounts) |
| max_amount = max(amounts) |
| min_amount = min(amounts) |
| |
| |
| high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| |
| response = f""" |
| **π° Financial Analysis** |
| |
| β’ **Total Amount**: βΉ{total_amount:,.2f} |
| β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| |
| **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| """ |
| |
| for i, inv in enumerate(high_value_invoices[:5], 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_supplier_query(self, data: dict, query: str) -> str: |
| """Handle supplier-related queries""" |
| invoices = data.get("invoices", []) |
| |
| |
| supplier_counts = {} |
| supplier_amounts = {} |
| |
| for inv in invoices: |
| supplier = inv.get('supplier_name', '').strip() |
| if supplier: |
| supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| |
| if not supplier_counts: |
| return "No supplier information found in invoices." |
| |
| |
| top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| |
| response = f""" |
| **π’ Supplier Analysis** |
| |
| β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| |
| **π° Top Suppliers by Amount** |
| """ |
| |
| for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| count = supplier_counts[supplier] |
| avg = amount / count if count > 0 else 0 |
| response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| |
| return response |
| |
| def handle_semantic_search(self, query: str) -> str: |
| """Handle semantic search queries""" |
| try: |
| results = self.processor.vector_store.semantic_search(query, top_k=5) |
| |
| if not results: |
| return f"No relevant results found for '{query}'. Try different keywords." |
| |
| response = f"π **Semantic Search Results for '{query}'**\n\n" |
| |
| for i, result in enumerate(results, 1): |
| response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| |
| return response |
| |
| except Exception as e: |
| return f"Semantic search error: {e}" |
| |
| def handle_general_query(self, data: dict, query: str) -> str: |
| """Handle general queries with keyword search""" |
| invoices = data.get("invoices", []) |
| query_words = query.lower().split() |
| |
| |
| matching_invoices = [] |
| for inv in invoices: |
| text_to_search = ( |
| inv.get('supplier_name', '') + ' ' + |
| inv.get('buyer_name', '') + ' ' + |
| inv.get('product_description', '') + ' ' + |
| inv.get('extraction_info', {}).get('raw_text_preview', '') |
| ).lower() |
| |
| if any(word in text_to_search for word in query_words): |
| matching_invoices.append(inv) |
| |
| if not matching_invoices: |
| return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| |
| response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| |
| for i, inv in enumerate(matching_invoices[:5], 1): |
| response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| |
| if len(matching_invoices) > 5: |
| response += f"... and {len(matching_invoices) - 5} more results." |
| |
| return response |
|
|
| |
| |
| |
|
|
| def create_app(): |
| """Main Streamlit application""" |
| |
| |
| if 'session_id' not in st.session_state: |
| st.session_state.session_id = str(uuid.uuid4())[:8] |
| |
| session_id = st.session_state.session_id |
| |
| |
| st.markdown(""" |
| <style> |
| .main-header { |
| font-size: 2.5rem; |
| font-weight: bold; |
| text-align: center; |
| color: #FF6B35; |
| margin-bottom: 1rem; |
| } |
| .feature-box { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| padding: 1rem; |
| border-radius: 10px; |
| color: white; |
| margin: 0.5rem 0; |
| text-align: center; |
| } |
| .status-ok { color: #28a745; font-weight: bold; } |
| .status-warning { color: #ffc107; font-weight: bold; } |
| .status-error { color: #dc3545; font-weight: bold; } |
| </style> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| st.markdown(""" |
| <div style="text-align: center; margin-bottom: 2rem;"> |
| <p style="font-size: 1.1rem; color: #666;"> |
| AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| </p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| if 'processor' not in st.session_state: |
| with st.spinner("π§ Initializing AI Invoice Processor..."): |
| try: |
| st.session_state.processor = InvoiceProcessor() |
| st.session_state.chatbot = ChatBot(st.session_state.processor) |
| st.session_state.chat_history = [] |
| st.success("β
System initialized successfully!") |
| except Exception as e: |
| st.error(f"β Initialization failed: {e}") |
| st.stop() |
| |
| |
| with st.sidebar: |
| st.header("ποΈ System Status") |
| |
| processor = st.session_state.processor |
| |
| |
| if processor.document_processor.processors: |
| st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| |
| if processor.ai_extractor.use_transformers: |
| st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| |
| if processor.vector_store and processor.vector_store.embedding_model: |
| st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| |
| |
| st.header("π Quick Stats") |
| try: |
| data = processor.load_json_data() |
| total_invoices = len(data.get("invoices", [])) |
| total_amount = data.get("summary", {}).get("total_amount", 0) |
| |
| st.metric("Total Invoices", total_invoices) |
| st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| |
| except Exception as e: |
| st.error(f"Stats error: {e}") |
| |
| |
| st.header("βοΈ System Info") |
| st.info(f""" |
| **Session ID:** {session_id} |
| |
| **Limits:** |
| β’ Max file size: 10MB |
| β’ Max concurrent files: 3 |
| β’ Timeout: 30s |
| """) |
| |
| |
| selected_tab = st.radio( |
| "Choose a section:", |
| ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| horizontal=True, |
| key=f"main_navigation_{session_id}" |
| ) |
| |
| |
| |
| |
| |
| if selected_tab == "π€ Upload & Process": |
| st.header("π€ Upload Invoice Documents") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π€ AI Extraction</h4> |
| <p>Advanced NLP models extract structured data automatically</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col2: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Smart Search</h4> |
| <p>Semantic search finds invoices using natural language</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col3: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Analytics</h4> |
| <p>Comprehensive insights and visualizations</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown("### π Upload Your Invoices") |
| |
| |
| if f'uploaded_files_{session_id}' not in st.session_state: |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| if f'processing_complete_{session_id}' not in st.session_state: |
| st.session_state[f'processing_complete_{session_id}'] = False |
| if f'currently_processing_{session_id}' not in st.session_state: |
| st.session_state[f'currently_processing_{session_id}'] = False |
| if f'processed_file_hashes_{session_id}' not in st.session_state: |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| |
| |
| uploaded_files = st.file_uploader( |
| "Choose invoice files (PDF, TXT supported)", |
| type=['pdf', 'txt'], |
| accept_multiple_files=True, |
| help="Maximum file size: 10MB per file", |
| key=f"file_uploader_stable_{session_id}" |
| ) |
| |
| |
| if uploaded_files: |
| |
| current_file_hashes = set() |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| current_file_hashes.add(file_hash) |
| |
| |
| stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| if current_file_hashes != stored_hashes: |
| st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.info("π New files detected - ready for processing") |
| |
| |
| current_files = st.session_state[f'uploaded_files_{session_id}'] |
| is_processing = st.session_state[f'currently_processing_{session_id}'] |
| is_complete = st.session_state[f'processing_complete_{session_id}'] |
| |
| if current_files: |
| max_files = 3 |
| if len(current_files) > max_files: |
| st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| current_files = current_files[:max_files] |
| |
| st.info(f"π {len(current_files)} files selected") |
| |
| |
| st.markdown("**Selected Files:**") |
| for i, file in enumerate(current_files, 1): |
| file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| file_hash = hash((file.name, file.size)) |
| processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| |
| |
| col1, col2 = st.columns([1, 1]) |
| |
| with col1: |
| if not is_processing and not is_complete: |
| if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| st.session_state[f'currently_processing_{session_id}'] = True |
| st.rerun() |
| elif is_processing: |
| st.info("π Processing in progress...") |
| |
| process_files_once(current_files, session_id) |
| elif is_complete: |
| st.success("β
Processing completed!") |
| if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| with col2: |
| if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| |
| keys_to_clear = [ |
| f'uploaded_files_{session_id}', |
| f'uploaded_file_hashes_{session_id}', |
| f'processing_complete_{session_id}', |
| f'currently_processing_{session_id}', |
| f'processed_file_hashes_{session_id}' |
| ] |
| |
| for key in keys_to_clear: |
| if key in st.session_state: |
| del st.session_state[key] |
| |
| st.success("ποΈ Files cleared successfully!") |
| time.sleep(1) |
| st.rerun() |
| |
| else: |
| st.info("π Please select invoice files to upload and process") |
| |
| |
| if is_complete: |
| st.markdown("### π Recent Processing Results") |
| try: |
| data = st.session_state.processor.load_json_data() |
| recent_invoices = sorted( |
| data.get("invoices", []), |
| key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| reverse=True |
| )[:5] |
| |
| if recent_invoices: |
| for i, inv in enumerate(recent_invoices, 1): |
| with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| col1, col2 = st.columns(2) |
| with col1: |
| st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| with col2: |
| st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| else: |
| st.info("No recent processing results found.") |
| except Exception as e: |
| st.error(f"Error loading recent results: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π¬ AI Chat": |
| st.header("π¬ AI Chat Interface") |
| |
| |
| if st.session_state.chat_history: |
| st.markdown("### π¬ Chat History") |
| for i, message in enumerate(st.session_state.chat_history): |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
| |
| |
| st.markdown("### βοΈ Ask a Question") |
| |
| col1, col2 = st.columns([4, 1]) |
| |
| with col1: |
| user_input = st.text_input( |
| "Type your question:", |
| placeholder="e.g., 'show me total spending'", |
| key=f"chat_input_{session_id}" |
| ) |
| |
| with col2: |
| ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| |
| if ask_btn and user_input: |
| handle_chat_query(user_input) |
| |
| |
| if not st.session_state.chat_history: |
| st.markdown("### π‘ Try These Queries") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.markdown("**π Basic Queries:**") |
| basic_queries = [ |
| "Show me a summary of all invoices", |
| "How much have we spent in total?", |
| "Who are our top suppliers?", |
| "Find invoices with high amounts" |
| ] |
| for i, query in enumerate(basic_queries): |
| if st.button(query, key=f"basic_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| with col2: |
| st.markdown("**π Advanced Queries:**") |
| advanced_queries = [ |
| "Find technology purchases", |
| "Show office supplies", |
| "Search consulting services", |
| "Recent high-value invoices" |
| ] |
| for i, query in enumerate(advanced_queries): |
| if st.button(query, key=f"advanced_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| |
| if st.session_state.chat_history: |
| if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| st.session_state.chat_history = [] |
| st.rerun() |
| |
| |
| |
| |
| |
| elif selected_tab == "π Analytics": |
| st.header("π Analytics Dashboard") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices to see analytics.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'invoice_number': inv.get('invoice_number', ''), |
| 'supplier_name': inv.get('supplier_name', ''), |
| 'amount': inv.get('amount', 0), |
| 'date': inv.get('date', ''), |
| 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3, col4 = st.columns(4) |
| |
| with col1: |
| st.metric("Total Invoices", len(df)) |
| with col2: |
| st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| with col3: |
| st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| with col4: |
| st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| |
| |
| if len(df) > 0: |
| |
| fig_hist = px.histogram( |
| df, |
| x='amount', |
| title="Invoice Amount Distribution", |
| labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| ) |
| st.plotly_chart(fig_hist, use_container_width=True) |
| |
| |
| if df['supplier_name'].notna().any(): |
| supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| |
| if len(supplier_amounts) > 0: |
| fig_suppliers = px.bar( |
| x=supplier_amounts.values, |
| y=supplier_amounts.index, |
| orientation='h', |
| title="Top 10 Suppliers by Total Amount", |
| labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| ) |
| st.plotly_chart(fig_suppliers, use_container_width=True) |
| |
| except Exception as e: |
| st.error(f"Analytics error: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π Data Explorer": |
| st.header("π Data Explorer") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices first.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'Invoice Number': inv.get('invoice_number', ''), |
| 'Supplier': inv.get('supplier_name', ''), |
| 'Buyer': inv.get('buyer_name', ''), |
| 'Amount': inv.get('amount', 0), |
| 'Date': inv.get('date', ''), |
| 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| 'Method': inv.get('extraction_info', {}).get('method', ''), |
| 'File': inv.get('file_info', {}).get('file_name', ''), |
| 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| |
| with col2: |
| methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| |
| with col3: |
| min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| |
| |
| filtered_df = df.copy() |
| if selected_supplier != 'All': |
| filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| if selected_method != 'All': |
| filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| if min_amount > 0: |
| filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| |
| |
| st.dataframe( |
| filtered_df, |
| use_container_width=True, |
| column_config={ |
| "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| } |
| ) |
| |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| csv_data = filtered_df.to_csv(index=False) |
| st.download_button( |
| "Download CSV", |
| csv_data, |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| "text/csv", |
| key=f"download_csv_{session_id}" |
| ) |
| |
| with col2: |
| if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| filtered_invoices = [inv for inv in invoices |
| if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| |
| export_data = { |
| "exported_at": datetime.now().isoformat(), |
| "total_records": len(filtered_invoices), |
| "invoices": filtered_invoices |
| } |
| |
| st.download_button( |
| "Download JSON", |
| json.dumps(export_data, indent=2), |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| "application/json", |
| key=f"download_json_{session_id}" |
| ) |
| |
| except Exception as e: |
| st.error(f"Data explorer error: {e}") |
| |
| |
| |
| |
| |
| st.markdown("---") |
| st.markdown("### π¬ Quick Chat (Works from any section)") |
| |
| global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| |
| if global_query: |
| handle_chat_query(global_query, show_response=True) |
| |
| |
| st.markdown("---") |
| st.markdown(""" |
| <div style="text-align: center; color: #666;"> |
| <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| |
| |
|
|
| def process_files_once(uploaded_files, session_id): |
| """Process uploaded files only once with proper state management""" |
| if not uploaded_files: |
| st.error("No files to process!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| return |
| |
| st.markdown("### π Processing Files...") |
| |
| |
| processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| |
| |
| files_to_process = [] |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| if file_hash not in processed_hashes: |
| files_to_process.append((file, file_hash)) |
| |
| if not files_to_process: |
| st.info("β
All files have already been processed!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| return |
| |
| |
| progress_container = st.container() |
| status_container = st.container() |
| results_container = st.container() |
| |
| successful = 0 |
| failed = 0 |
| |
| |
| with progress_container: |
| progress_bar = st.progress(0) |
| progress_text = st.empty() |
| |
| with status_container: |
| st.info(f"Starting to process {len(files_to_process)} new files...") |
| |
| |
| for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| current_progress = (i + 1) / len(files_to_process) |
| |
| with progress_container: |
| progress_bar.progress(current_progress) |
| progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| |
| with status_container: |
| st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| |
| try: |
| |
| result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| |
| |
| processed_hashes.add(file_hash) |
| |
| |
| with results_container: |
| if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| successful += 1 |
| st.success(f"β
Successfully processed: {uploaded_file.name}") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| with col1: |
| st.write(f"**Invoice #:** {result.invoice_number}") |
| st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| with col2: |
| st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| st.write(f"**Date:** {result.date or 'Not found'}") |
| with col3: |
| st.write(f"**Method:** {result.processing_method}") |
| st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| |
| st.markdown("---") |
| else: |
| failed += 1 |
| st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| if result: |
| st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| st.markdown("---") |
| |
| except Exception as e: |
| failed += 1 |
| |
| processed_hashes.add(file_hash) |
| |
| with results_container: |
| st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| st.markdown("---") |
| |
| |
| st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| |
| |
| with progress_container: |
| progress_bar.progress(1.0) |
| progress_text.text("β
Processing completed!") |
| |
| with status_container: |
| if successful > 0: |
| st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| if successful > 0: |
| st.balloons() |
| else: |
| st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| |
| |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| |
| |
| st.rerun() |
|
|
| def process_files(uploaded_files, session_id): |
| """Legacy function - redirect to process_files_once""" |
| return process_files_once(uploaded_files, session_id) |
|
|
| def handle_chat_query(query, show_response=False): |
| """Handle chat query""" |
| st.session_state.chat_history.append({ |
| "role": "user", |
| "content": query, |
| "timestamp": datetime.now() |
| }) |
| |
| try: |
| with st.spinner("π€ AI is analyzing..."): |
| response = st.session_state.chatbot.query_database(query) |
| |
| st.session_state.chat_history.append({ |
| "role": "assistant", |
| "content": response, |
| "timestamp": datetime.now() |
| }) |
| |
| if show_response: |
| with st.chat_message("assistant"): |
| st.markdown(response) |
| st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| |
| st.rerun() |
| |
| except Exception as e: |
| st.error(f"Chat error: {e}") |
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point for Hugging Face Spaces""" |
| try: |
| if IS_HF_SPACE: |
| st.sidebar.info("π€ Running on Hugging Face Spaces") |
| |
| create_app() |
| |
| except Exception as e: |
| st.error(f""" |
| ## π¨ Application Error |
| |
| {e} |
| |
| Please refresh the page or check the logs for more details. |
| """) |
|
|
| if __name__ == "__main__": |
| main(), |
| |
| |
| r'([0-9,]+\.?\d*)\s*(?:dollars?|rupees?|usd|inr|eur|gbp)', |
| |
| |
| r'(?:price|cost|rate)\s*:?\s*[\$βΉΒ£β¬]?\s*([0-9,]+\.?\d*)', |
| |
| |
| r'(?:^|\s)([0-9]{1,3}(?:,\d{3})*\.?\d{0,2})(?=\s|$)', |
| ], |
| 'date': [ |
| r'date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| r'(?:invoice|bill)\s*date\s*:?\s*(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})', |
| r'(?:^|\s)(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})(?=\s|$)', |
| r'(\d{4}[/\-\.]\d{1,2}[/\-\.]\d{1,2})', |
| r'(\d{1,2}\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\s+\d{2,4})', |
| ], |
| 'quantity': [ |
| r'qty\s*:?\s*(\d+)', |
| r'quantity\s*:?\s*(\d+)', |
| r'(?:units?|pcs?|pieces?)\s*:?\s*(\d+)', |
| r'(\d+)\s*(?:pcs?|units?|items?|pieces?)', |
| ] |
| } |
| |
| text_lower = text.lower() |
| |
| |
| for pattern in patterns['invoice_number']: |
| match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| if match: |
| invoice_data.invoice_number = match.group(1).upper().strip() |
| break |
| |
| |
| amounts_found = [] |
| for pattern in patterns['amount']: |
| matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE) |
| for match in matches: |
| try: |
| amount_str = match.group(1).replace(',', '').replace(' ', '') |
| amount_val = float(amount_str) |
| if 0.01 <= amount_val <= 1000000: |
| amounts_found.append(amount_val) |
| except (ValueError, IndexError): |
| continue |
| |
| |
| if amounts_found: |
| |
| unique_amounts = sorted(set(amounts_found), reverse=True) |
| |
| invoice_data.amount = unique_amounts[0] |
| |
| |
| for pattern in patterns['date']: |
| match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) |
| if match: |
| invoice_data.date = self.parse_date(match.group(1)) |
| break |
| |
| |
| for pattern in patterns['quantity']: |
| match = re.search(pattern, text_lower, re.IGNORECASE) |
| if match: |
| try: |
| invoice_data.quantity = int(match.group(1)) |
| break |
| except ValueError: |
| continue |
| |
| |
| company_patterns = [ |
| r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', |
| r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)', |
| r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)', |
| ] |
| |
| companies_found = [] |
| for pattern in company_patterns: |
| matches = re.findall(pattern, text, re.MULTILINE) |
| for match in matches: |
| clean_company = match.strip().title() |
| if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']): |
| companies_found.append(clean_company) |
| |
| |
| if companies_found: |
| invoice_data.supplier_name = companies_found[0] |
| if len(companies_found) > 1: |
| invoice_data.buyer_name = companies_found[1] |
| |
| |
| desc_patterns = [ |
| r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', |
| ] |
| |
| for pattern in desc_patterns: |
| match = re.search(pattern, text, re.IGNORECASE) |
| if match: |
| desc = match.group(1).strip() |
| if len(desc) > 5: |
| invoice_data.product_description = desc[:200] |
| break |
| |
| |
| confidence_factors = [] |
| if invoice_data.invoice_number: |
| confidence_factors.append(0.3) |
| if invoice_data.amount > 0: |
| confidence_factors.append(0.3) |
| if invoice_data.supplier_name: |
| confidence_factors.append(0.2) |
| if invoice_data.date: |
| confidence_factors.append(0.1) |
| if invoice_data.quantity > 0: |
| confidence_factors.append(0.1) |
| |
| invoice_data.extraction_confidence = sum(confidence_factors) |
| |
| return invoice_data |
| |
| def parse_date(self, date_str: str) -> str: |
| """Parse date to YYYY-MM-DD format""" |
| if not date_str: |
| return "" |
| |
| formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] |
| |
| for fmt in formats: |
| try: |
| parsed_date = datetime.strptime(date_str, fmt) |
| return parsed_date.strftime('%Y-%m-%d') |
| except ValueError: |
| continue |
| |
| return date_str |
|
|
| |
| |
| |
|
|
| class VectorStore: |
| """Simplified vector store for Hugging Face Spaces""" |
| |
| def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): |
| self.embedding_model_name = embedding_model |
| self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") |
| self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") |
| self.embedding_model = None |
| self.vectors = [] |
| self.document_metadata = [] |
| self.embedding_dimension = None |
| |
| self.setup_embedding_model() |
| self.load_vector_store() |
| |
| def setup_embedding_model(self): |
| """Initialize the sentence transformer model""" |
| if not SENTENCE_TRANSFORMERS_AVAILABLE: |
| st.warning("β οΈ Sentence Transformers not available. Vector search disabled.") |
| return |
| |
| try: |
| with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): |
| self.embedding_model = SentenceTransformer( |
| self.embedding_model_name, |
| cache_folder=HF_CONFIG["cache_dir"] |
| ) |
| |
| |
| test_embedding = self.embedding_model.encode(["test"]) |
| self.embedding_dimension = test_embedding.shape[0] |
| |
| st.success(f"β
Embedding model loaded: {self.embedding_model_name}") |
| |
| except Exception as e: |
| st.error(f"β Failed to load embedding model: {e}") |
| self.embedding_model = None |
| |
| def load_vector_store(self): |
| """Load existing vector store""" |
| try: |
| if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): |
| with open(self.vector_store_path, 'rb') as f: |
| self.vectors = pickle.load(f) |
| |
| with open(self.metadata_path, 'rb') as f: |
| self.document_metadata = pickle.load(f) |
| |
| st.success(f"β
Vector store loaded: {len(self.document_metadata)} documents") |
| else: |
| self.vectors = [] |
| self.document_metadata = [] |
| st.info("π New vector store initialized") |
| |
| except Exception as e: |
| st.error(f"β Error loading vector store: {e}") |
| self.vectors = [] |
| self.document_metadata = [] |
| |
| def save_vector_store(self): |
| """Save vector store to disk""" |
| try: |
| with open(self.vector_store_path, 'wb') as f: |
| pickle.dump(self.vectors, f) |
| |
| with open(self.metadata_path, 'wb') as f: |
| pickle.dump(self.document_metadata, f) |
| |
| return True |
| except Exception as e: |
| st.error(f"Error saving vector store: {e}") |
| return False |
| |
| def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: |
| """Create searchable text from invoice data""" |
| text_parts = [] |
| |
| for field, value in invoice_data.items(): |
| if value and field != 'id': |
| text_parts.append(f"{field}: {value}") |
| |
| if raw_text: |
| text_parts.append(f"content: {raw_text[:300]}") |
| |
| return " | ".join(text_parts) |
| |
| def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: |
| """Add a document to the vector store""" |
| if not self.embedding_model: |
| return False |
| |
| try: |
| document_text = self.create_document_text(invoice_data, raw_text) |
| |
| |
| embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) |
| |
| |
| metadata = { |
| 'invoice_id': invoice_data.get('id', ''), |
| 'invoice_number': invoice_data.get('invoice_number', ''), |
| 'supplier_name': invoice_data.get('supplier_name', ''), |
| 'buyer_name': invoice_data.get('buyer_name', ''), |
| 'amount': invoice_data.get('amount', 0), |
| 'date': invoice_data.get('date', ''), |
| 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), |
| 'document_text': document_text[:200], |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| |
| self.vectors.append(embedding) |
| self.document_metadata.append(metadata) |
| |
| return True |
| |
| except Exception as e: |
| st.error(f"Error adding document to vector store: {e}") |
| return False |
| |
| def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: |
| """Perform semantic search using cosine similarity""" |
| if not self.embedding_model or not self.vectors: |
| return [] |
| |
| try: |
| |
| query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) |
| |
| |
| similarities = [] |
| for i, doc_embedding in enumerate(self.vectors): |
| similarity = np.dot(query_embedding, doc_embedding) |
| similarities.append((similarity, i)) |
| |
| |
| similarities.sort(reverse=True) |
| |
| |
| results = [] |
| for similarity, idx in similarities[:top_k]: |
| if similarity > 0.1: |
| metadata = self.document_metadata[idx] |
| result = VectorSearchResult( |
| invoice_id=metadata.get('invoice_id', ''), |
| invoice_number=metadata.get('invoice_number', ''), |
| supplier_name=metadata.get('supplier_name', ''), |
| similarity_score=float(similarity), |
| content_preview=metadata.get('document_text', ''), |
| metadata=metadata |
| ) |
| results.append(result) |
| |
| return results |
| |
| except Exception as e: |
| st.error(f"Error in semantic search: {e}") |
| return [] |
|
|
| |
| |
| |
|
|
| class InvoiceProcessor: |
| """Main invoice processor for Hugging Face Spaces""" |
| |
| def __init__(self): |
| self.setup_storage() |
| self.document_processor = DocumentProcessor() |
| self.ai_extractor = AIExtractor() |
| self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None |
| |
| |
| self.processing_stats = { |
| 'total_processed': 0, |
| 'successful': 0, |
| 'failed': 0, |
| 'start_time': datetime.now() |
| } |
| |
| def setup_storage(self): |
| """Setup storage paths""" |
| self.data_dir = HF_CONFIG["data_dir"] |
| self.json_path = os.path.join(self.data_dir, "invoices.json") |
| |
| |
| if not os.path.exists(self.json_path): |
| initial_data = { |
| "metadata": { |
| "created_at": datetime.now().isoformat(), |
| "version": "hf_v1.0", |
| "total_invoices": 0 |
| }, |
| "invoices": [], |
| "summary": { |
| "total_amount": 0.0, |
| "unique_suppliers": [], |
| "processing_stats": {"successful": 0, "failed": 0} |
| } |
| } |
| self.save_json_data(initial_data) |
| |
| def load_json_data(self) -> dict: |
| """Load invoice data from JSON""" |
| try: |
| with open(self.json_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except (FileNotFoundError, json.JSONDecodeError): |
| self.setup_storage() |
| return self.load_json_data() |
| |
| def save_json_data(self, data: dict): |
| """Save invoice data to JSON""" |
| try: |
| with open(self.json_path, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
| except Exception as e: |
| st.error(f"Error saving data: {e}") |
| |
| def process_uploaded_file(self, uploaded_file) -> InvoiceData: |
| """Process a single uploaded file with enhanced debugging""" |
| self.processing_stats['total_processed'] += 1 |
| |
| try: |
| |
| file_size = len(uploaded_file.getvalue()) |
| file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' |
| |
| st.info(f"π Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") |
| |
| |
| if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: |
| error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| |
| if file_extension not in ['pdf', 'txt']: |
| error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" |
| st.warning(error_msg) |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: |
| file_content = uploaded_file.getvalue() |
| tmp_file.write(file_content) |
| tmp_file_path = tmp_file.name |
| |
| st.info(f"πΎ Saved temporarily to: {tmp_file_path}") |
| |
| try: |
| |
| st.info("π Extracting text from document...") |
| text = self.document_processor.extract_text_from_document(tmp_file_path) |
| |
| if not text or not text.strip(): |
| st.warning(f"β No text extracted from {uploaded_file.name}") |
| self.processing_stats['failed'] += 1 |
| return InvoiceData() |
| |
| text_length = len(text) |
| st.info(f"π Extracted {text_length} characters of text") |
| |
| |
| if text_length > 0: |
| with st.expander("π Text Preview (First 500 characters)", expanded=False): |
| st.text(text[:500] + "..." if len(text) > 500 else text) |
| |
| |
| st.info("π€ Extracting invoice data using AI/Regex...") |
| invoice_data = self.ai_extractor.extract_with_ai(text) |
| invoice_data.file_path = uploaded_file.name |
| |
| |
| st.info(f"π Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") |
| |
| |
| st.info("πΎ Saving extracted data...") |
| self.save_invoice_data(invoice_data, text, file_size) |
| |
| self.processing_stats['successful'] += 1 |
| st.success(f"β
Successfully processed {uploaded_file.name}") |
| |
| return invoice_data |
| |
| finally: |
| |
| try: |
| os.unlink(tmp_file_path) |
| st.info("π§Ή Cleaned up temporary file") |
| except: |
| pass |
| |
| except Exception as e: |
| error_msg = f"Error processing {uploaded_file.name}: {str(e)}" |
| st.error(error_msg) |
| self.processing_stats['failed'] += 1 |
| |
| |
| with st.expander("π Error Details", expanded=False): |
| st.code(str(e)) |
| import traceback |
| st.code(traceback.format_exc()) |
| |
| return InvoiceData() |
| |
| def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): |
| """Save invoice data to JSON and vector store""" |
| try: |
| |
| data = self.load_json_data() |
| |
| |
| invoice_record = { |
| "id": len(data["invoices"]) + 1, |
| "invoice_number": invoice_data.invoice_number, |
| "supplier_name": invoice_data.supplier_name, |
| "buyer_name": invoice_data.buyer_name, |
| "date": invoice_data.date, |
| "amount": invoice_data.amount, |
| "quantity": invoice_data.quantity, |
| "product_description": invoice_data.product_description, |
| "file_info": { |
| "file_name": invoice_data.file_path, |
| "file_size": file_size |
| }, |
| "extraction_info": { |
| "confidence": invoice_data.extraction_confidence, |
| "method": invoice_data.processing_method, |
| "raw_text_preview": raw_text[:300] |
| }, |
| "timestamps": { |
| "created_at": datetime.now().isoformat() |
| } |
| } |
| |
| |
| data["invoices"].append(invoice_record) |
| |
| |
| self.update_summary(data) |
| |
| |
| self.save_json_data(data) |
| |
| |
| if self.vector_store: |
| self.vector_store.add_document(invoice_record, raw_text) |
| self.vector_store.save_vector_store() |
| |
| except Exception as e: |
| st.error(f"Error saving invoice data: {e}") |
| |
| def update_summary(self, data: dict): |
| """Update summary statistics""" |
| invoices = data["invoices"] |
| |
| total_amount = sum(inv.get("amount", 0) for inv in invoices) |
| unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) |
| |
| data["summary"] = { |
| "total_amount": total_amount, |
| "unique_suppliers": unique_suppliers, |
| "processing_stats": { |
| "successful": self.processing_stats['successful'], |
| "failed": self.processing_stats['failed'], |
| "total_processed": self.processing_stats['total_processed'] |
| } |
| } |
| |
| data["metadata"]["last_updated"] = datetime.now().isoformat() |
| data["metadata"]["total_invoices"] = len(invoices) |
|
|
| |
| |
| |
|
|
| class ChatBot: |
| """Chatbot for invoice queries""" |
| |
| def __init__(self, processor: InvoiceProcessor): |
| self.processor = processor |
| |
| def query_database(self, query: str) -> str: |
| """Process user query and return response""" |
| try: |
| data = self.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| return "No invoice data found. Please upload some invoices first." |
| |
| query_lower = query.lower() |
| |
| |
| if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): |
| return self.generate_summary(data) |
| |
| elif "count" in query_lower or "how many" in query_lower: |
| return self.handle_count_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): |
| return self.handle_amount_query(data) |
| |
| elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): |
| return self.handle_supplier_query(data, query) |
| |
| elif self.processor.vector_store: |
| return self.handle_semantic_search(query) |
| |
| else: |
| return self.handle_general_query(data, query) |
| |
| except Exception as e: |
| return f"Error processing query: {e}" |
| |
| def generate_summary(self, data: dict) -> str: |
| """Generate comprehensive summary""" |
| invoices = data.get("invoices", []) |
| summary = data.get("summary", {}) |
| |
| if not invoices: |
| return "No invoices found in the system." |
| |
| total_amount = summary.get("total_amount", 0) |
| avg_amount = total_amount / len(invoices) if invoices else 0 |
| unique_suppliers = len(summary.get("unique_suppliers", [])) |
| |
| response = f""" |
| **π Invoice System Summary** |
| |
| β’ **Total Invoices**: {len(invoices):,} |
| β’ **Total Value**: βΉ{total_amount:,.2f} |
| β’ **Average Invoice**: βΉ{avg_amount:,.2f} |
| β’ **Unique Suppliers**: {unique_suppliers} |
| |
| **π Processing Stats** |
| β’ **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} |
| β’ **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} |
| |
| **π Recent Invoices** |
| """ |
| |
| |
| recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] |
| for i, inv in enumerate(recent, 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_count_query(self, data: dict) -> str: |
| """Handle count-related queries""" |
| invoices = data.get("invoices", []) |
| total = len(invoices) |
| unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) |
| |
| return f""" |
| **π Invoice Count Summary** |
| |
| β’ **Total Records**: {total} |
| β’ **Unique Invoice Numbers**: {unique_numbers} |
| β’ **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} |
| |
| **π
Processing Timeline** |
| β’ **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| β’ **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} |
| """ |
| |
| def handle_amount_query(self, data: dict) -> str: |
| """Handle amount-related queries""" |
| invoices = data.get("invoices", []) |
| amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] |
| |
| if not amounts: |
| return "No amount information found in invoices." |
| |
| total_amount = sum(amounts) |
| avg_amount = total_amount / len(amounts) |
| max_amount = max(amounts) |
| min_amount = min(amounts) |
| |
| |
| high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount |
| high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] |
| |
| response = f""" |
| **π° Financial Analysis** |
| |
| β’ **Total Amount**: βΉ{total_amount:,.2f} |
| β’ **Average Amount**: βΉ{avg_amount:,.2f} |
| β’ **Highest Invoice**: βΉ{max_amount:,.2f} |
| β’ **Lowest Invoice**: βΉ{min_amount:,.2f} |
| |
| **π― High-Value Invoices (βΉ{high_value_threshold:,.2f}+)** |
| """ |
| |
| for i, inv in enumerate(high_value_invoices[:5], 1): |
| response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (βΉ{inv.get('amount', 0):,.2f})" |
| |
| return response |
| |
| def handle_supplier_query(self, data: dict, query: str) -> str: |
| """Handle supplier-related queries""" |
| invoices = data.get("invoices", []) |
| |
| |
| supplier_counts = {} |
| supplier_amounts = {} |
| |
| for inv in invoices: |
| supplier = inv.get('supplier_name', '').strip() |
| if supplier: |
| supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 |
| supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) |
| |
| if not supplier_counts: |
| return "No supplier information found in invoices." |
| |
| |
| top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] |
| |
| response = f""" |
| **π’ Supplier Analysis** |
| |
| β’ **Total Unique Suppliers**: {len(supplier_counts)} |
| β’ **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) |
| |
| **π° Top Suppliers by Amount** |
| """ |
| |
| for i, (supplier, amount) in enumerate(top_suppliers, 1): |
| count = supplier_counts[supplier] |
| avg = amount / count if count > 0 else 0 |
| response += f"\n{i}. **{supplier}** - βΉ{amount:,.2f} ({count} invoices, avg: βΉ{avg:,.2f})" |
| |
| return response |
| |
| def handle_semantic_search(self, query: str) -> str: |
| """Handle semantic search queries""" |
| try: |
| results = self.processor.vector_store.semantic_search(query, top_k=5) |
| |
| if not results: |
| return f"No relevant results found for '{query}'. Try different keywords." |
| |
| response = f"π **Semantic Search Results for '{query}'**\n\n" |
| |
| for i, result in enumerate(results, 1): |
| response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" |
| response += f" β’ Similarity: {result.similarity_score:.3f}\n" |
| response += f" β’ Amount: βΉ{result.metadata.get('amount', 0):,.2f}\n" |
| response += f" β’ Preview: {result.content_preview[:100]}...\n\n" |
| |
| return response |
| |
| except Exception as e: |
| return f"Semantic search error: {e}" |
| |
| def handle_general_query(self, data: dict, query: str) -> str: |
| """Handle general queries with keyword search""" |
| invoices = data.get("invoices", []) |
| query_words = query.lower().split() |
| |
| |
| matching_invoices = [] |
| for inv in invoices: |
| text_to_search = ( |
| inv.get('supplier_name', '') + ' ' + |
| inv.get('buyer_name', '') + ' ' + |
| inv.get('product_description', '') + ' ' + |
| inv.get('extraction_info', {}).get('raw_text_preview', '') |
| ).lower() |
| |
| if any(word in text_to_search for word in query_words): |
| matching_invoices.append(inv) |
| |
| if not matching_invoices: |
| return f"No invoices found matching '{query}'. Try different keywords or check the summary." |
| |
| response = f"π **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" |
| |
| for i, inv in enumerate(matching_invoices[:5], 1): |
| response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" |
| response += f" β’ Amount: βΉ{inv.get('amount', 0):,.2f}\n" |
| response += f" β’ Date: {inv.get('date', 'N/A')}\n\n" |
| |
| if len(matching_invoices) > 5: |
| response += f"... and {len(matching_invoices) - 5} more results." |
| |
| return response |
|
|
| |
| |
| |
|
|
| def create_app(): |
| """Main Streamlit application""" |
| |
| |
| if 'session_id' not in st.session_state: |
| st.session_state.session_id = str(uuid.uuid4())[:8] |
| |
| session_id = st.session_state.session_id |
| |
| |
| st.markdown(""" |
| <style> |
| .main-header { |
| font-size: 2.5rem; |
| font-weight: bold; |
| text-align: center; |
| color: #FF6B35; |
| margin-bottom: 1rem; |
| } |
| .feature-box { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| padding: 1rem; |
| border-radius: 10px; |
| color: white; |
| margin: 0.5rem 0; |
| text-align: center; |
| } |
| .status-ok { color: #28a745; font-weight: bold; } |
| .status-warning { color: #ffc107; font-weight: bold; } |
| .status-error { color: #dc3545; font-weight: bold; } |
| </style> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown('<h1 class="main-header">π AI Invoice Processing System</h1>', unsafe_allow_html=True) |
| st.markdown(""" |
| <div style="text-align: center; margin-bottom: 2rem;"> |
| <p style="font-size: 1.1rem; color: #666;"> |
| AI-Powered Document Processing β’ Semantic Search β’ Smart Analytics β’ Hugging Face Spaces |
| </p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| if 'processor' not in st.session_state: |
| with st.spinner("π§ Initializing AI Invoice Processor..."): |
| try: |
| st.session_state.processor = InvoiceProcessor() |
| st.session_state.chatbot = ChatBot(st.session_state.processor) |
| st.session_state.chat_history = [] |
| st.success("β
System initialized successfully!") |
| except Exception as e: |
| st.error(f"β Initialization failed: {e}") |
| st.stop() |
| |
| |
| with st.sidebar: |
| st.header("ποΈ System Status") |
| |
| processor = st.session_state.processor |
| |
| |
| if processor.document_processor.processors: |
| st.markdown('<span class="status-ok">β
Document Processing</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-error">β Document Processing</span>', unsafe_allow_html=True) |
| |
| if processor.ai_extractor.use_transformers: |
| st.markdown('<span class="status-ok">β
AI Extraction</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Regex Extraction</span>', unsafe_allow_html=True) |
| |
| if processor.vector_store and processor.vector_store.embedding_model: |
| st.markdown('<span class="status-ok">β
Semantic Search</span>', unsafe_allow_html=True) |
| else: |
| st.markdown('<span class="status-warning">β οΈ Keyword Search Only</span>', unsafe_allow_html=True) |
| |
| |
| st.header("π Quick Stats") |
| try: |
| data = processor.load_json_data() |
| total_invoices = len(data.get("invoices", [])) |
| total_amount = data.get("summary", {}).get("total_amount", 0) |
| |
| st.metric("Total Invoices", total_invoices) |
| st.metric("Total Value", f"βΉ{total_amount:,.2f}") |
| st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") |
| |
| except Exception as e: |
| st.error(f"Stats error: {e}") |
| |
| |
| st.header("βοΈ System Info") |
| st.info(f""" |
| **Session ID:** {session_id} |
| |
| **Limits:** |
| β’ Max file size: 10MB |
| β’ Max concurrent files: 3 |
| β’ Timeout: 30s |
| """) |
| |
| |
| selected_tab = st.radio( |
| "Choose a section:", |
| ["π€ Upload & Process", "π¬ AI Chat", "π Analytics", "π Data Explorer"], |
| horizontal=True, |
| key=f"main_navigation_{session_id}" |
| ) |
| |
| |
| |
| |
| |
| if selected_tab == "π€ Upload & Process": |
| st.header("π€ Upload Invoice Documents") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π€ AI Extraction</h4> |
| <p>Advanced NLP models extract structured data automatically</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col2: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Smart Search</h4> |
| <p>Semantic search finds invoices using natural language</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| with col3: |
| st.markdown(""" |
| <div class="feature-box"> |
| <h4>π Analytics</h4> |
| <p>Comprehensive insights and visualizations</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown("### π Upload Your Invoices") |
| |
| |
| if f'uploaded_files_{session_id}' not in st.session_state: |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| if f'processing_complete_{session_id}' not in st.session_state: |
| st.session_state[f'processing_complete_{session_id}'] = False |
| if f'currently_processing_{session_id}' not in st.session_state: |
| st.session_state[f'currently_processing_{session_id}'] = False |
| if f'processed_file_hashes_{session_id}' not in st.session_state: |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| |
| |
| uploaded_files = st.file_uploader( |
| "Choose invoice files (PDF, TXT supported)", |
| type=['pdf', 'txt'], |
| accept_multiple_files=True, |
| help="Maximum file size: 10MB per file", |
| key=f"file_uploader_stable_{session_id}" |
| ) |
| |
| |
| if uploaded_files: |
| |
| current_file_hashes = set() |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| current_file_hashes.add(file_hash) |
| |
| |
| stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) |
| if current_file_hashes != stored_hashes: |
| st.session_state[f'uploaded_files_{session_id}'] = uploaded_files |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.info("π New files detected - ready for processing") |
| |
| |
| current_files = st.session_state[f'uploaded_files_{session_id}'] |
| is_processing = st.session_state[f'currently_processing_{session_id}'] |
| is_complete = st.session_state[f'processing_complete_{session_id}'] |
| |
| if current_files: |
| max_files = 3 |
| if len(current_files) > max_files: |
| st.warning(f"β οΈ Too many files selected. Processing first {max_files} files.") |
| current_files = current_files[:max_files] |
| |
| st.info(f"π {len(current_files)} files selected") |
| |
| |
| st.markdown("**Selected Files:**") |
| for i, file in enumerate(current_files, 1): |
| file_size_mb = len(file.getvalue()) / (1024 * 1024) |
| file_hash = hash((file.name, file.size)) |
| processed_icon = "β
" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "π" |
| st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") |
| |
| |
| col1, col2 = st.columns([1, 1]) |
| |
| with col1: |
| if not is_processing and not is_complete: |
| if st.button("π Process Files", type="primary", key=f"process_btn_{session_id}"): |
| st.session_state[f'currently_processing_{session_id}'] = True |
| st.rerun() |
| elif is_processing: |
| st.info("π Processing in progress...") |
| |
| process_files_once(current_files, session_id) |
| elif is_complete: |
| st.success("β
Processing completed!") |
| if st.button("π Process Again", key=f"reprocess_btn_{session_id}"): |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| with col2: |
| if st.button("ποΈ Clear Files", key=f"clear_files_{session_id}"): |
| st.session_state[f'uploaded_files_{session_id}'] = None |
| st.session_state[f'uploaded_file_hashes_{session_id}'] = set() |
| st.session_state[f'processing_complete_{session_id}'] = False |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processed_file_hashes_{session_id}'] = set() |
| st.rerun() |
| |
| else: |
| st.info("π Please select invoice files to upload and process") |
| |
| |
| if is_complete: |
| st.markdown("### π Recent Processing Results") |
| try: |
| data = st.session_state.processor.load_json_data() |
| recent_invoices = sorted( |
| data.get("invoices", []), |
| key=lambda x: x.get('timestamps', {}).get('created_at', ''), |
| reverse=True |
| )[:5] |
| |
| if recent_invoices: |
| for i, inv in enumerate(recent_invoices, 1): |
| with st.expander(f"π {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): |
| col1, col2 = st.columns(2) |
| with col1: |
| st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") |
| st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") |
| st.write(f"**Amount:** βΉ{inv.get('amount', 0):.2f}") |
| with col2: |
| st.write(f"**Date:** {inv.get('date', 'N/A')}") |
| st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") |
| st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") |
| else: |
| st.info("No recent processing results found.") |
| except Exception as e: |
| st.error(f"Error loading recent results: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π¬ AI Chat": |
| st.header("π¬ AI Chat Interface") |
| |
| |
| if st.session_state.chat_history: |
| st.markdown("### π¬ Chat History") |
| for i, message in enumerate(st.session_state.chat_history): |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
| |
| |
| st.markdown("### βοΈ Ask a Question") |
| |
| col1, col2 = st.columns([4, 1]) |
| |
| with col1: |
| user_input = st.text_input( |
| "Type your question:", |
| placeholder="e.g., 'show me total spending'", |
| key=f"chat_input_{session_id}" |
| ) |
| |
| with col2: |
| ask_btn = st.button("π Ask", type="primary", key=f"ask_btn_{session_id}") |
| |
| if ask_btn and user_input: |
| handle_chat_query(user_input) |
| |
| |
| if not st.session_state.chat_history: |
| st.markdown("### π‘ Try These Queries") |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.markdown("**π Basic Queries:**") |
| basic_queries = [ |
| "Show me a summary of all invoices", |
| "How much have we spent in total?", |
| "Who are our top suppliers?", |
| "Find invoices with high amounts" |
| ] |
| for i, query in enumerate(basic_queries): |
| if st.button(query, key=f"basic_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| with col2: |
| st.markdown("**π Advanced Queries:**") |
| advanced_queries = [ |
| "Find technology purchases", |
| "Show office supplies", |
| "Search consulting services", |
| "Recent high-value invoices" |
| ] |
| for i, query in enumerate(advanced_queries): |
| if st.button(query, key=f"advanced_{session_id}_{i}"): |
| handle_chat_query(query) |
| |
| |
| if st.session_state.chat_history: |
| if st.button("ποΈ Clear Chat", key=f"clear_chat_{session_id}"): |
| st.session_state.chat_history = [] |
| st.rerun() |
| |
| |
| |
| |
| |
| elif selected_tab == "π Analytics": |
| st.header("π Analytics Dashboard") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices to see analytics.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'invoice_number': inv.get('invoice_number', ''), |
| 'supplier_name': inv.get('supplier_name', ''), |
| 'amount': inv.get('amount', 0), |
| 'date': inv.get('date', ''), |
| 'confidence': inv.get('extraction_info', {}).get('confidence', 0) |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3, col4 = st.columns(4) |
| |
| with col1: |
| st.metric("Total Invoices", len(df)) |
| with col2: |
| st.metric("Total Amount", f"βΉ{df['amount'].sum():,.2f}") |
| with col3: |
| st.metric("Avg Amount", f"βΉ{df['amount'].mean():,.2f}") |
| with col4: |
| st.metric("Unique Suppliers", df['supplier_name'].nunique()) |
| |
| |
| if len(df) > 0: |
| |
| fig_hist = px.histogram( |
| df, |
| x='amount', |
| title="Invoice Amount Distribution", |
| labels={'amount': 'Amount (βΉ)', 'count': 'Number of Invoices'} |
| ) |
| st.plotly_chart(fig_hist, use_container_width=True) |
| |
| |
| if df['supplier_name'].notna().any(): |
| supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) |
| |
| if len(supplier_amounts) > 0: |
| fig_suppliers = px.bar( |
| x=supplier_amounts.values, |
| y=supplier_amounts.index, |
| orientation='h', |
| title="Top 10 Suppliers by Total Amount", |
| labels={'x': 'Total Amount (βΉ)', 'y': 'Supplier'} |
| ) |
| st.plotly_chart(fig_suppliers, use_container_width=True) |
| |
| except Exception as e: |
| st.error(f"Analytics error: {e}") |
| |
| |
| |
| |
| |
| elif selected_tab == "π Data Explorer": |
| st.header("π Data Explorer") |
| |
| try: |
| data = st.session_state.processor.load_json_data() |
| invoices = data.get("invoices", []) |
| |
| if not invoices: |
| st.info("π No data available. Upload some invoices first.") |
| return |
| |
| |
| df_data = [] |
| for inv in invoices: |
| df_data.append({ |
| 'Invoice Number': inv.get('invoice_number', ''), |
| 'Supplier': inv.get('supplier_name', ''), |
| 'Buyer': inv.get('buyer_name', ''), |
| 'Amount': inv.get('amount', 0), |
| 'Date': inv.get('date', ''), |
| 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), |
| 'Method': inv.get('extraction_info', {}).get('method', ''), |
| 'File': inv.get('file_info', {}).get('file_name', ''), |
| 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] |
| }) |
| |
| df = pd.DataFrame(df_data) |
| |
| |
| col1, col2, col3 = st.columns(3) |
| |
| with col1: |
| suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) |
| selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") |
| |
| with col2: |
| methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) |
| selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") |
| |
| with col3: |
| min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") |
| |
| |
| filtered_df = df.copy() |
| if selected_supplier != 'All': |
| filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] |
| if selected_method != 'All': |
| filtered_df = filtered_df[filtered_df['Method'] == selected_method] |
| if min_amount > 0: |
| filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] |
| |
| |
| st.dataframe( |
| filtered_df, |
| use_container_width=True, |
| column_config={ |
| "Amount": st.column_config.NumberColumn("Amount", format="βΉ%.2f"), |
| "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) |
| } |
| ) |
| |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| if st.button("π₯ Export CSV", key=f"export_csv_{session_id}"): |
| csv_data = filtered_df.to_csv(index=False) |
| st.download_button( |
| "Download CSV", |
| csv_data, |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", |
| "text/csv", |
| key=f"download_csv_{session_id}" |
| ) |
| |
| with col2: |
| if st.button("π Export JSON", key=f"export_json_{session_id}"): |
| filtered_invoices = [inv for inv in invoices |
| if inv.get('invoice_number') in filtered_df['Invoice Number'].values] |
| |
| export_data = { |
| "exported_at": datetime.now().isoformat(), |
| "total_records": len(filtered_invoices), |
| "invoices": filtered_invoices |
| } |
| |
| st.download_button( |
| "Download JSON", |
| json.dumps(export_data, indent=2), |
| f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", |
| "application/json", |
| key=f"download_json_{session_id}" |
| ) |
| |
| except Exception as e: |
| st.error(f"Data explorer error: {e}") |
| |
| |
| |
| |
| |
| st.markdown("---") |
| st.markdown("### π¬ Quick Chat (Works from any section)") |
| |
| global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") |
| |
| if global_query: |
| handle_chat_query(global_query, show_response=True) |
| |
| |
| st.markdown("---") |
| st.markdown(""" |
| <div style="text-align: center; color: #666;"> |
| <p>π <strong>AI Invoice Processing System</strong> - Optimized for Hugging Face Spaces</p> |
| <p>Built with β€οΈ using Streamlit, Transformers, and AI</p> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| |
| |
|
|
| def process_files_once(uploaded_files, session_id): |
| """Process uploaded files only once with proper state management""" |
| if not uploaded_files: |
| st.error("No files to process!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| return |
| |
| st.markdown("### π Processing Files...") |
| |
| |
| processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] |
| |
| |
| files_to_process = [] |
| for file in uploaded_files: |
| file_hash = hash((file.name, file.size)) |
| if file_hash not in processed_hashes: |
| files_to_process.append((file, file_hash)) |
| |
| if not files_to_process: |
| st.info("β
All files have already been processed!") |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| return |
| |
| |
| progress_container = st.container() |
| status_container = st.container() |
| results_container = st.container() |
| |
| successful = 0 |
| failed = 0 |
| |
| |
| with progress_container: |
| progress_bar = st.progress(0) |
| progress_text = st.empty() |
| |
| with status_container: |
| st.info(f"Starting to process {len(files_to_process)} new files...") |
| |
| |
| for i, (uploaded_file, file_hash) in enumerate(files_to_process): |
| current_progress = (i + 1) / len(files_to_process) |
| |
| with progress_container: |
| progress_bar.progress(current_progress) |
| progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") |
| |
| with status_container: |
| st.info(f"π Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") |
| |
| try: |
| |
| result = st.session_state.processor.process_uploaded_file(uploaded_file) |
| |
| |
| processed_hashes.add(file_hash) |
| |
| |
| with results_container: |
| if result and hasattr(result, 'invoice_number') and result.invoice_number: |
| successful += 1 |
| st.success(f"β
Successfully processed: {uploaded_file.name}") |
| |
| |
| col1, col2, col3 = st.columns(3) |
| with col1: |
| st.write(f"**Invoice #:** {result.invoice_number}") |
| st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") |
| with col2: |
| st.write(f"**Amount:** βΉ{result.amount:.2f}") |
| st.write(f"**Date:** {result.date or 'Not found'}") |
| with col3: |
| st.write(f"**Method:** {result.processing_method}") |
| st.write(f"**Confidence:** {result.extraction_confidence:.1%}") |
| |
| st.markdown("---") |
| else: |
| failed += 1 |
| st.warning(f"β οΈ Could not extract complete data from: {uploaded_file.name}") |
| if result: |
| st.write(f"Partial data: {result.supplier_name}, βΉ{result.amount}") |
| st.markdown("---") |
| |
| except Exception as e: |
| failed += 1 |
| |
| processed_hashes.add(file_hash) |
| |
| with results_container: |
| st.error(f"β Error processing {uploaded_file.name}: {str(e)}") |
| st.markdown("---") |
| |
| |
| st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes |
| |
| |
| with progress_container: |
| progress_bar.progress(1.0) |
| progress_text.text("β
Processing completed!") |
| |
| with status_container: |
| if successful > 0: |
| st.success(f"π Processing complete! {successful} successful, {failed} failed") |
| if successful > 0: |
| st.balloons() |
| else: |
| st.error(f"β Processing failed for all {failed} files. Please check file formats and content.") |
| |
| |
| st.session_state[f'currently_processing_{session_id}'] = False |
| st.session_state[f'processing_complete_{session_id}'] = True |
| |
| |
| st.rerun() |
|
|
| def process_files(uploaded_files, session_id): |
| """Legacy function - redirect to process_files_once""" |
| return process_files_once(uploaded_files, session_id) |
|
|
| def handle_chat_query(query, show_response=False): |
| """Handle chat query""" |
| st.session_state.chat_history.append({ |
| "role": "user", |
| "content": query, |
| "timestamp": datetime.now() |
| }) |
| |
| try: |
| with st.spinner("π€ AI is analyzing..."): |
| response = st.session_state.chatbot.query_database(query) |
| |
| st.session_state.chat_history.append({ |
| "role": "assistant", |
| "content": response, |
| "timestamp": datetime.now() |
| }) |
| |
| if show_response: |
| with st.chat_message("assistant"): |
| st.markdown(response) |
| st.info("π‘ Switch to the 'AI Chat' section to see full conversation history!") |
| |
| st.rerun() |
| |
| except Exception as e: |
| st.error(f"Chat error: {e}") |
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point for Hugging Face Spaces""" |
| try: |
| if IS_HF_SPACE: |
| st.sidebar.info("π€ Running on Hugging Face Spaces") |
| |
| create_app() |
| |
| except Exception as e: |
| st.error(f""" |
| ## π¨ Application Error |
| |
| {e} |
| |
| Please refresh the page or check the logs for more details. |
| """) |
|
|
| if __name__ == "__main__": |
| main() |