diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -491,7 +491,7 @@ Respond with JSON only: [/INST]""" r'[\$₹£€]\s*([0-9,]+\.?\d*)', # Amounts at end of lines (common in invoices) - r'([0-9,]+\.?\d*)\s*[\$₹£€]?\s*']} + r'([0-9,]+\.?\d*)\s*[\$₹£€]?\s* def parse_date(self, date_str: str) -> str: """Parse date to YYYY-MM-DD format""" @@ -778,2506 +778,10 @@ class InvoiceProcessor: text_length = len(text) st.info(f"📝 Extracted {text_length} characters of text") - # Show text preview and extraction debug info + # Show text preview if text_length > 0: - with st.expander("📄 Text Preview & Extraction Debug", expanded=True): - st.text_area("Extracted Text (First 1000 chars):", value=text[:1000], height=150, disabled=True) - - # Debug amount detection - st.markdown("**🔍 Amount Detection Debug:**") - amount_patterns = [ - r'total\s*(?:amount)?\s*:?\s*[\$₹£€]?\s*([0-9,]+\.?\d*)', - r'[\$₹£€]\s*([0-9,]+\.?\d*)', - r'([0-9,]+\.?\d*)\s*[\$₹£€]?\s*$', # ✅ Added $ for end of line - ] - - - # Extract invoice data - st.info("🤖 Extracting invoice data using AI/Regex...") - invoice_data = self.ai_extractor.extract_with_ai(text) - invoice_data.file_path = uploaded_file.name - - # Show extraction results - st.info(f"📊 Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") - - # Save to storage - st.info("💾 Saving extracted data...") - self.save_invoice_data(invoice_data, text, file_size) - - self.processing_stats['successful'] += 1 - st.success(f"✅ Successfully processed {uploaded_file.name}") - - return invoice_data - - finally: - # Cleanup - try: - os.unlink(tmp_file_path) - st.info("🧹 Cleaned up temporary file") - except: - pass - - except Exception as e: - error_msg = f"Error processing {uploaded_file.name}: {str(e)}" - st.error(error_msg) - self.processing_stats['failed'] += 1 - - # Show detailed error for debugging - with st.expander("🔍 Error Details", expanded=False): - st.code(str(e)) - import traceback - st.code(traceback.format_exc()) - - return InvoiceData() - - def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): - """Save invoice data to JSON and vector store""" - try: - # Load existing data - data = self.load_json_data() - - # Create invoice record - invoice_record = { - "id": len(data["invoices"]) + 1, - "invoice_number": invoice_data.invoice_number, - "supplier_name": invoice_data.supplier_name, - "buyer_name": invoice_data.buyer_name, - "date": invoice_data.date, - "amount": invoice_data.amount, - "quantity": invoice_data.quantity, - "product_description": invoice_data.product_description, - "file_info": { - "file_name": invoice_data.file_path, - "file_size": file_size - }, - "extraction_info": { - "confidence": invoice_data.extraction_confidence, - "method": invoice_data.processing_method, - "raw_text_preview": raw_text[:300] - }, - "timestamps": { - "created_at": datetime.now().isoformat() - } - } - - # Add to invoices - data["invoices"].append(invoice_record) - - # Update summary - self.update_summary(data) - - # Save JSON - self.save_json_data(data) - - # Add to vector store - if self.vector_store: - self.vector_store.add_document(invoice_record, raw_text) - self.vector_store.save_vector_store() - - except Exception as e: - st.error(f"Error saving invoice data: {e}") - - def update_summary(self, data: dict): - """Update summary statistics""" - invoices = data["invoices"] - - total_amount = sum(inv.get("amount", 0) for inv in invoices) - unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) - - data["summary"] = { - "total_amount": total_amount, - "unique_suppliers": unique_suppliers, - "processing_stats": { - "successful": self.processing_stats['successful'], - "failed": self.processing_stats['failed'], - "total_processed": self.processing_stats['total_processed'] - } - } - - data["metadata"]["last_updated"] = datetime.now().isoformat() - data["metadata"]["total_invoices"] = len(invoices) - -# =============================================================================== -# CHATBOT CLASS -# =============================================================================== - -class ChatBot: - """Chatbot for invoice queries""" - - def __init__(self, processor: InvoiceProcessor): - self.processor = processor - - def query_database(self, query: str) -> str: - """Process user query and return response""" - try: - data = self.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - return "No invoice data found. Please upload some invoices first." - - query_lower = query.lower() - - # Handle different query types - if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): - return self.generate_summary(data) - - elif "count" in query_lower or "how many" in query_lower: - return self.handle_count_query(data) - - elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): - return self.handle_amount_query(data) - - elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): - return self.handle_supplier_query(data, query) - - elif self.processor.vector_store: - return self.handle_semantic_search(query) - - else: - return self.handle_general_query(data, query) - - except Exception as e: - return f"Error processing query: {e}" - - def generate_summary(self, data: dict) -> str: - """Generate comprehensive summary""" - invoices = data.get("invoices", []) - summary = data.get("summary", {}) - - if not invoices: - return "No invoices found in the system." - - total_amount = summary.get("total_amount", 0) - avg_amount = total_amount / len(invoices) if invoices else 0 - unique_suppliers = len(summary.get("unique_suppliers", [])) - - response = f""" -**📊 Invoice System Summary** - -• **Total Invoices**: {len(invoices):,} -• **Total Value**: ₹{total_amount:,.2f} -• **Average Invoice**: ₹{avg_amount:,.2f} -• **Unique Suppliers**: {unique_suppliers} - -**📈 Processing Stats** -• **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} -• **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} - -**🔍 Recent Invoices** -""" - - # Show recent invoices - recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] - for i, inv in enumerate(recent, 1): - response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (₹{inv.get('amount', 0):,.2f})" - - return response - - def handle_count_query(self, data: dict) -> str: - """Handle count-related queries""" - invoices = data.get("invoices", []) - total = len(invoices) - unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) - - return f""" -**📊 Invoice Count Summary** - -• **Total Records**: {total} -• **Unique Invoice Numbers**: {unique_numbers} -• **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} - -**📅 Processing Timeline** -• **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} -• **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} -""" - - def handle_amount_query(self, data: dict) -> str: - """Handle amount-related queries""" - invoices = data.get("invoices", []) - amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] - - if not amounts: - return "No amount information found in invoices." - - total_amount = sum(amounts) - avg_amount = total_amount / len(amounts) - max_amount = max(amounts) - min_amount = min(amounts) - - # Find high-value invoices - high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount - high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] - - response = f""" -**💰 Financial Analysis** - -• **Total Amount**: ₹{total_amount:,.2f} -• **Average Amount**: ₹{avg_amount:,.2f} -• **Highest Invoice**: ₹{max_amount:,.2f} -• **Lowest Invoice**: ₹{min_amount:,.2f} - -**🎯 High-Value Invoices (₹{high_value_threshold:,.2f}+)** -""" - - for i, inv in enumerate(high_value_invoices[:5], 1): - response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (₹{inv.get('amount', 0):,.2f})" - - return response - - def handle_supplier_query(self, data: dict, query: str) -> str: - """Handle supplier-related queries""" - invoices = data.get("invoices", []) - - # Count invoices by supplier - supplier_counts = {} - supplier_amounts = {} - - for inv in invoices: - supplier = inv.get('supplier_name', '').strip() - if supplier: - supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 - supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) - - if not supplier_counts: - return "No supplier information found in invoices." - - # Sort suppliers by amount - top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] - - response = f""" -**🏢 Supplier Analysis** - -• **Total Unique Suppliers**: {len(supplier_counts)} -• **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) - -**💰 Top Suppliers by Amount** -""" - - for i, (supplier, amount) in enumerate(top_suppliers, 1): - count = supplier_counts[supplier] - avg = amount / count if count > 0 else 0 - response += f"\n{i}. **{supplier}** - ₹{amount:,.2f} ({count} invoices, avg: ₹{avg:,.2f})" - - return response - - def handle_semantic_search(self, query: str) -> str: - """Handle semantic search queries""" - try: - results = self.processor.vector_store.semantic_search(query, top_k=5) - - if not results: - return f"No relevant results found for '{query}'. Try different keywords." - - response = f"🔍 **Semantic Search Results for '{query}'**\n\n" - - for i, result in enumerate(results, 1): - response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" - response += f" • Similarity: {result.similarity_score:.3f}\n" - response += f" • Amount: ₹{result.metadata.get('amount', 0):,.2f}\n" - response += f" • Preview: {result.content_preview[:100]}...\n\n" - - return response - - except Exception as e: - return f"Semantic search error: {e}" - - def handle_general_query(self, data: dict, query: str) -> str: - """Handle general queries with keyword search""" - invoices = data.get("invoices", []) - query_words = query.lower().split() - - # Simple keyword matching - matching_invoices = [] - for inv in invoices: - text_to_search = ( - inv.get('supplier_name', '') + ' ' + - inv.get('buyer_name', '') + ' ' + - inv.get('product_description', '') + ' ' + - inv.get('extraction_info', {}).get('raw_text_preview', '') - ).lower() - - if any(word in text_to_search for word in query_words): - matching_invoices.append(inv) - - if not matching_invoices: - return f"No invoices found matching '{query}'. Try different keywords or check the summary." - - response = f"🔍 **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" - - for i, inv in enumerate(matching_invoices[:5], 1): - response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" - response += f" • Amount: ₹{inv.get('amount', 0):,.2f}\n" - response += f" • Date: {inv.get('date', 'N/A')}\n\n" - - if len(matching_invoices) > 5: - response += f"... and {len(matching_invoices) - 5} more results." - - return response - -# =============================================================================== -# STREAMLIT APPLICATION -# =============================================================================== - -def create_app(): - """Main Streamlit application""" - - # Generate unique session ID for this run - if 'session_id' not in st.session_state: - st.session_state.session_id = str(uuid.uuid4())[:8] - - session_id = st.session_state.session_id - - # Custom CSS - st.markdown(""" - - """, unsafe_allow_html=True) - - # Header - st.markdown('

📄 AI Invoice Processing System

', unsafe_allow_html=True) - st.markdown(""" -
-

- AI-Powered Document Processing • Semantic Search • Smart Analytics • Hugging Face Spaces -

-
- """, unsafe_allow_html=True) - - # Initialize processor - if 'processor' not in st.session_state: - with st.spinner("🔧 Initializing AI Invoice Processor..."): - try: - st.session_state.processor = InvoiceProcessor() - st.session_state.chatbot = ChatBot(st.session_state.processor) - st.session_state.chat_history = [] - st.success("✅ System initialized successfully!") - except Exception as e: - st.error(f"❌ Initialization failed: {e}") - st.stop() - - # Sidebar - with st.sidebar: - st.header("🎛️ System Status") - - processor = st.session_state.processor - - # Component status - if processor.document_processor.processors: - st.markdown('✅ Document Processing', unsafe_allow_html=True) - else: - st.markdown('❌ Document Processing', unsafe_allow_html=True) - - if processor.ai_extractor.use_transformers: - st.markdown('✅ AI Extraction', unsafe_allow_html=True) - else: - st.markdown('⚠️ Regex Extraction', unsafe_allow_html=True) - - if processor.vector_store and processor.vector_store.embedding_model: - st.markdown('✅ Semantic Search', unsafe_allow_html=True) - else: - st.markdown('⚠️ Keyword Search Only', unsafe_allow_html=True) - - # Quick stats - st.header("📊 Quick Stats") - try: - data = processor.load_json_data() - total_invoices = len(data.get("invoices", [])) - total_amount = data.get("summary", {}).get("total_amount", 0) - - st.metric("Total Invoices", total_invoices) - st.metric("Total Value", f"₹{total_amount:,.2f}") - st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") - - except Exception as e: - st.error(f"Stats error: {e}") - - # System info - st.header("⚙️ System Info") - st.info(f""" - **Session ID:** {session_id} - - **Limits:** - • Max file size: 10MB - • Max concurrent files: 3 - • Timeout: 30s - """) - - # Main navigation - selected_tab = st.radio( - "Choose a section:", - ["📤 Upload & Process", "💬 AI Chat", "📊 Analytics", "📋 Data Explorer"], - horizontal=True, - key=f"main_navigation_{session_id}" - ) - - # ------------------------------------------------------------------------- - # UPLOAD & PROCESS SECTION - # ------------------------------------------------------------------------- - - if selected_tab == "📤 Upload & Process": - st.header("📤 Upload Invoice Documents") - - # Feature highlights - col1, col2, col3 = st.columns(3) - - with col1: - st.markdown(""" -
-

🤖 AI Extraction

-

Advanced NLP models extract structured data automatically

-
- """, unsafe_allow_html=True) - - with col2: - st.markdown(""" -
-

🔍 Smart Search

-

Semantic search finds invoices using natural language

-
- """, unsafe_allow_html=True) - - with col3: - st.markdown(""" -
-

📊 Analytics

-

Comprehensive insights and visualizations

-
- """, unsafe_allow_html=True) - - # File upload - st.markdown("### 📁 Upload Your Invoices") - - # Initialize session state for files if not exists - if f'uploaded_files_{session_id}' not in st.session_state: - st.session_state[f'uploaded_files_{session_id}'] = None - if f'processing_complete_{session_id}' not in st.session_state: - st.session_state[f'processing_complete_{session_id}'] = False - if f'currently_processing_{session_id}' not in st.session_state: - st.session_state[f'currently_processing_{session_id}'] = False - if f'processed_file_hashes_{session_id}' not in st.session_state: - st.session_state[f'processed_file_hashes_{session_id}'] = set() - - # File uploader with stable key - uploaded_files = st.file_uploader( - "Choose invoice files (PDF, TXT supported)", - type=['pdf', 'txt'], - accept_multiple_files=True, - help="Maximum file size: 10MB per file", - key=f"file_uploader_stable_{session_id}" - ) - - # Store uploaded files in session state only if they're new - if uploaded_files: - # Create file hashes to detect if files have changed - current_file_hashes = set() - for file in uploaded_files: - file_hash = hash((file.name, file.size)) - current_file_hashes.add(file_hash) - - # Check if files have changed - stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) - if current_file_hashes != stored_hashes: - st.session_state[f'uploaded_files_{session_id}'] = uploaded_files - st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes - st.session_state[f'processing_complete_{session_id}'] = False - st.session_state[f'currently_processing_{session_id}'] = False - st.info("📄 New files detected - ready for processing") - - # Get files from session state - current_files = st.session_state[f'uploaded_files_{session_id}'] - is_processing = st.session_state[f'currently_processing_{session_id}'] - is_complete = st.session_state[f'processing_complete_{session_id}'] - - if current_files: - max_files = 3 - if len(current_files) > max_files: - st.warning(f"⚠️ Too many files selected. Processing first {max_files} files.") - current_files = current_files[:max_files] - - st.info(f"📊 {len(current_files)} files selected") - - # Show file names - st.markdown("**Selected Files:**") - for i, file in enumerate(current_files, 1): - file_size_mb = len(file.getvalue()) / (1024 * 1024) - file_hash = hash((file.name, file.size)) - processed_icon = "✅" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "📄" - st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") - - # Process button - only show if not currently processing - col1, col2 = st.columns([1, 1]) - - with col1: - if not is_processing and not is_complete: - if st.button("🚀 Process Files", type="primary", key=f"process_btn_{session_id}"): - st.session_state[f'currently_processing_{session_id}'] = True - st.rerun() - elif is_processing: - st.info("🔄 Processing in progress...") - # Actually process the files here - process_files_once(current_files, session_id) - elif is_complete: - st.success("✅ Processing completed!") - if st.button("🔄 Process Again", key=f"reprocess_btn_{session_id}"): - st.session_state[f'processing_complete_{session_id}'] = False - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processed_file_hashes_{session_id}'] = set() - st.rerun() - - with col2: - if st.button("🗑️ Clear Files", key=f"clear_files_{session_id}"): - # Clear all session state related to files - keys_to_clear = [ - f'uploaded_files_{session_id}', - f'uploaded_file_hashes_{session_id}', - f'processing_complete_{session_id}', - f'currently_processing_{session_id}', - f'processed_file_hashes_{session_id}' - ] - - for key in keys_to_clear: - if key in st.session_state: - del st.session_state[key] - - st.success("🗑️ Files cleared successfully!") - time.sleep(1) # Brief pause to show message - st.rerun() - - else: - st.info("👆 Please select invoice files to upload and process") - - # Show processing results if completed - if is_complete: - st.markdown("### 📋 Recent Processing Results") - try: - data = st.session_state.processor.load_json_data() - recent_invoices = sorted( - data.get("invoices", []), - key=lambda x: x.get('timestamps', {}).get('created_at', ''), - reverse=True - )[:5] - - if recent_invoices: - for i, inv in enumerate(recent_invoices, 1): - with st.expander(f"📄 {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): - col1, col2 = st.columns(2) - with col1: - st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") - st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") - st.write(f"**Amount:** ₹{inv.get('amount', 0):.2f}") - with col2: - st.write(f"**Date:** {inv.get('date', 'N/A')}") - st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") - st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") - else: - st.info("No recent processing results found.") - except Exception as e: - st.error(f"Error loading recent results: {e}") - - # ------------------------------------------------------------------------- - # AI CHAT SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "💬 AI Chat": - st.header("💬 AI Chat Interface") - - # Display chat history - if st.session_state.chat_history: - st.markdown("### 💬 Chat History") - for i, message in enumerate(st.session_state.chat_history): - with st.chat_message(message["role"]): - st.markdown(message["content"]) - - # Chat input - st.markdown("### ✍️ Ask a Question") - - col1, col2 = st.columns([4, 1]) - - with col1: - user_input = st.text_input( - "Type your question:", - placeholder="e.g., 'show me total spending'", - key=f"chat_input_{session_id}" - ) - - with col2: - ask_btn = st.button("🚀 Ask", type="primary", key=f"ask_btn_{session_id}") - - if ask_btn and user_input: - handle_chat_query(user_input) - - # Suggested queries - if not st.session_state.chat_history: - st.markdown("### 💡 Try These Queries") - - col1, col2 = st.columns(2) - - with col1: - st.markdown("**📊 Basic Queries:**") - basic_queries = [ - "Show me a summary of all invoices", - "How much have we spent in total?", - "Who are our top suppliers?", - "Find invoices with high amounts" - ] - for i, query in enumerate(basic_queries): - if st.button(query, key=f"basic_{session_id}_{i}"): - handle_chat_query(query) - - with col2: - st.markdown("**🔍 Advanced Queries:**") - advanced_queries = [ - "Find technology purchases", - "Show office supplies", - "Search consulting services", - "Recent high-value invoices" - ] - for i, query in enumerate(advanced_queries): - if st.button(query, key=f"advanced_{session_id}_{i}"): - handle_chat_query(query) - - # Clear chat - if st.session_state.chat_history: - if st.button("🗑️ Clear Chat", key=f"clear_chat_{session_id}"): - st.session_state.chat_history = [] - st.rerun() - - # ------------------------------------------------------------------------- - # ANALYTICS SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "📊 Analytics": - st.header("📊 Analytics Dashboard") - - try: - data = st.session_state.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - st.info("📊 No data available. Upload some invoices to see analytics.") - return - - # Convert to DataFrame - df_data = [] - for inv in invoices: - df_data.append({ - 'invoice_number': inv.get('invoice_number', ''), - 'supplier_name': inv.get('supplier_name', ''), - 'amount': inv.get('amount', 0), - 'date': inv.get('date', ''), - 'confidence': inv.get('extraction_info', {}).get('confidence', 0) - }) - - df = pd.DataFrame(df_data) - - # Key metrics - col1, col2, col3, col4 = st.columns(4) - - with col1: - st.metric("Total Invoices", len(df)) - with col2: - st.metric("Total Amount", f"₹{df['amount'].sum():,.2f}") - with col3: - st.metric("Avg Amount", f"₹{df['amount'].mean():,.2f}") - with col4: - st.metric("Unique Suppliers", df['supplier_name'].nunique()) - - # Visualizations - if len(df) > 0: - # Amount distribution - fig_hist = px.histogram( - df, - x='amount', - title="Invoice Amount Distribution", - labels={'amount': 'Amount (₹)', 'count': 'Number of Invoices'} - ) - st.plotly_chart(fig_hist, use_container_width=True) - - # Top suppliers - if df['supplier_name'].notna().any(): - supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) - - if len(supplier_amounts) > 0: - fig_suppliers = px.bar( - x=supplier_amounts.values, - y=supplier_amounts.index, - orientation='h', - title="Top 10 Suppliers by Total Amount", - labels={'x': 'Total Amount (₹)', 'y': 'Supplier'} - ) - st.plotly_chart(fig_suppliers, use_container_width=True) - - except Exception as e: - st.error(f"Analytics error: {e}") - - # ------------------------------------------------------------------------- - # DATA EXPLORER SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "📋 Data Explorer": - st.header("📋 Data Explorer") - - try: - data = st.session_state.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - st.info("📊 No data available. Upload some invoices first.") - return - - # Convert to DataFrame - df_data = [] - for inv in invoices: - df_data.append({ - 'Invoice Number': inv.get('invoice_number', ''), - 'Supplier': inv.get('supplier_name', ''), - 'Buyer': inv.get('buyer_name', ''), - 'Amount': inv.get('amount', 0), - 'Date': inv.get('date', ''), - 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), - 'Method': inv.get('extraction_info', {}).get('method', ''), - 'File': inv.get('file_info', {}).get('file_name', ''), - 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] - }) - - df = pd.DataFrame(df_data) - - # Filters - col1, col2, col3 = st.columns(3) - - with col1: - suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) - selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") - - with col2: - methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) - selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") - - with col3: - min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") - - # Apply filters - filtered_df = df.copy() - if selected_supplier != 'All': - filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] - if selected_method != 'All': - filtered_df = filtered_df[filtered_df['Method'] == selected_method] - if min_amount > 0: - filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] - - # Display data - st.dataframe( - filtered_df, - use_container_width=True, - column_config={ - "Amount": st.column_config.NumberColumn("Amount", format="₹%.2f"), - "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) - } - ) - - # Export options - col1, col2 = st.columns(2) - - with col1: - if st.button("📥 Export CSV", key=f"export_csv_{session_id}"): - csv_data = filtered_df.to_csv(index=False) - st.download_button( - "Download CSV", - csv_data, - f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", - "text/csv", - key=f"download_csv_{session_id}" - ) - - with col2: - if st.button("📄 Export JSON", key=f"export_json_{session_id}"): - filtered_invoices = [inv for inv in invoices - if inv.get('invoice_number') in filtered_df['Invoice Number'].values] - - export_data = { - "exported_at": datetime.now().isoformat(), - "total_records": len(filtered_invoices), - "invoices": filtered_invoices - } - - st.download_button( - "Download JSON", - json.dumps(export_data, indent=2), - f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", - "application/json", - key=f"download_json_{session_id}" - ) - - except Exception as e: - st.error(f"Data explorer error: {e}") - - # ------------------------------------------------------------------------- - # GLOBAL CHAT INPUT - # ------------------------------------------------------------------------- - - st.markdown("---") - st.markdown("### 💬 Quick Chat (Works from any section)") - - global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") - - if global_query: - handle_chat_query(global_query, show_response=True) - - # Footer - st.markdown("---") - st.markdown(""" -
-

🚀 AI Invoice Processing System - Optimized for Hugging Face Spaces

-

Built with ❤️ using Streamlit, Transformers, and AI

-
- """, unsafe_allow_html=True) - -# =============================================================================== -# HELPER FUNCTIONS -# =============================================================================== - -def process_files_once(uploaded_files, session_id): - """Process uploaded files only once with proper state management""" - if not uploaded_files: - st.error("No files to process!") - st.session_state[f'currently_processing_{session_id}'] = False - return - - st.markdown("### 🔄 Processing Files...") - - # Get already processed file hashes - processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] - - # Filter out already processed files - files_to_process = [] - for file in uploaded_files: - file_hash = hash((file.name, file.size)) - if file_hash not in processed_hashes: - files_to_process.append((file, file_hash)) - - if not files_to_process: - st.info("✅ All files have already been processed!") - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processing_complete_{session_id}'] = True - return - - # Create containers for dynamic updates - progress_container = st.container() - status_container = st.container() - results_container = st.container() - - successful = 0 - failed = 0 - - # Show progress - with progress_container: - progress_bar = st.progress(0) - progress_text = st.empty() - - with status_container: - st.info(f"Starting to process {len(files_to_process)} new files...") - - # Process each file only once - for i, (uploaded_file, file_hash) in enumerate(files_to_process): - current_progress = (i + 1) / len(files_to_process) - - with progress_container: - progress_bar.progress(current_progress) - progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") - - with status_container: - st.info(f"🔄 Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") - - try: - # Process the file - result = st.session_state.processor.process_uploaded_file(uploaded_file) - - # Mark file as processed regardless of result - processed_hashes.add(file_hash) - - # Show result immediately - with results_container: - if result and hasattr(result, 'invoice_number') and result.invoice_number: - successful += 1 - st.success(f"✅ Successfully processed: {uploaded_file.name}") - - # Show extracted data - col1, col2, col3 = st.columns(3) - with col1: - st.write(f"**Invoice #:** {result.invoice_number}") - st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") - with col2: - st.write(f"**Amount:** ₹{result.amount:.2f}") - st.write(f"**Date:** {result.date or 'Not found'}") - with col3: - st.write(f"**Method:** {result.processing_method}") - st.write(f"**Confidence:** {result.extraction_confidence:.1%}") - - st.markdown("---") - else: - failed += 1 - st.warning(f"⚠️ Could not extract complete data from: {uploaded_file.name}") - if result: - st.write(f"Partial data: {result.supplier_name}, ₹{result.amount}") - st.markdown("---") - - except Exception as e: - failed += 1 - # Still mark as processed to avoid reprocessing - processed_hashes.add(file_hash) - - with results_container: - st.error(f"❌ Error processing {uploaded_file.name}: {str(e)}") - st.markdown("---") - - # Update session state - st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes - - # Final summary - with progress_container: - progress_bar.progress(1.0) - progress_text.text("✅ Processing completed!") - - with status_container: - if successful > 0: - st.success(f"🎉 Processing complete! {successful} successful, {failed} failed") - if successful > 0: - st.balloons() - else: - st.error(f"❌ Processing failed for all {failed} files. Please check file formats and content.") - - # Update processing state - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processing_complete_{session_id}'] = True - - # Force rerun to update UI - st.rerun() - -def process_files(uploaded_files, session_id): - """Legacy function - redirect to process_files_once""" - return process_files_once(uploaded_files, session_id) - -def handle_chat_query(query, show_response=False): - """Handle chat query""" - st.session_state.chat_history.append({ - "role": "user", - "content": query, - "timestamp": datetime.now() - }) - - try: - with st.spinner("🤖 AI is analyzing..."): - response = st.session_state.chatbot.query_database(query) - - st.session_state.chat_history.append({ - "role": "assistant", - "content": response, - "timestamp": datetime.now() - }) - - if show_response: - with st.chat_message("assistant"): - st.markdown(response) - st.info("💡 Switch to the 'AI Chat' section to see full conversation history!") - - st.rerun() - - except Exception as e: - st.error(f"Chat error: {e}") - -# =============================================================================== -# MAIN ENTRY POINT -# =============================================================================== - -def main(): - """Main entry point for Hugging Face Spaces""" - try: - if IS_HF_SPACE: - st.sidebar.info("🤗 Running on Hugging Face Spaces") - - create_app() - - except Exception as e: - st.error(f""" - ## 🚨 Application Error - - {e} - - Please refresh the page or check the logs for more details. - """) - -if __name__ == "__main__": - main() - - - - - - # Extract invoice number with multiple attempts - for pattern in patterns['invoice_number']: - match = re.search(pattern, text_lower, re.IGNORECASE | re.MULTILINE) - if match: - invoice_data.invoice_number = match.group(1).upper().strip() - break - - # Extract amount with enhanced logic - amounts_found = [] - for pattern in patterns['amount']: - matches = re.finditer(pattern, text_lower, re.IGNORECASE | re.MULTILINE) - for match in matches: - try: - amount_str = match.group(1).replace(',', '').replace(' ', '') - amount_val = float(amount_str) - if 0.01 <= amount_val <= 1000000: # Reasonable range - amounts_found.append(amount_val) - except (ValueError, IndexError): - continue - - # Choose the most likely amount (highest value or most repeated) - if amounts_found: - # Remove duplicates and sort - unique_amounts = sorted(set(amounts_found), reverse=True) - # Take the highest reasonable amount - invoice_data.amount = unique_amounts[0] - - # Extract date - for pattern in patterns['date']: - match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) - if match: - invoice_data.date = self.parse_date(match.group(1)) - break - - # Extract quantity - for pattern in patterns['quantity']: - match = re.search(pattern, text_lower, re.IGNORECASE) - if match: - try: - invoice_data.quantity = int(match.group(1)) - break - except ValueError: - continue - - # Enhanced company name extraction - company_patterns = [ - r'(?:from|supplier|vendor)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', - r'(?:to|buyer|client)\s*:?\s*([A-Z][A-Za-z\s&,\.]{3,50})', - r'([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:ltd|inc|corp|llc|co\.|company|pvt|private|limited)', - r'(?:^|\n)([A-Z][A-Za-z\s&,\.]{3,50})\s*(?:\n|$)', - ] - - companies_found = [] - for pattern in company_patterns: - matches = re.findall(pattern, text, re.MULTILINE) - for match in matches: - clean_company = match.strip().title() - if len(clean_company) > 3 and not any(word in clean_company.lower() for word in ['total', 'amount', 'date', 'invoice']): - companies_found.append(clean_company) - - # Assign companies (first as supplier, second as buyer) - if companies_found: - invoice_data.supplier_name = companies_found[0] - if len(companies_found) > 1: - invoice_data.buyer_name = companies_found[1] - - # Extract product description - desc_patterns = [ - r'(?:description|item|product|service)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', - r'(?:for|regarding)\s*:?\s*([A-Za-z0-9\s,.-]{10,200})', - ] - - for pattern in desc_patterns: - match = re.search(pattern, text, re.IGNORECASE) - if match: - desc = match.group(1).strip() - if len(desc) > 5: - invoice_data.product_description = desc[:200] # Limit length - break - - # Set confidence based on how much we extracted - confidence_factors = [] - if invoice_data.invoice_number: - confidence_factors.append(0.3) - if invoice_data.amount > 0: - confidence_factors.append(0.3) - if invoice_data.supplier_name: - confidence_factors.append(0.2) - if invoice_data.date: - confidence_factors.append(0.1) - if invoice_data.quantity > 0: - confidence_factors.append(0.1) - - invoice_data.extraction_confidence = sum(confidence_factors) - - return invoice_data - - def parse_date(self, date_str: str) -> str: - """Parse date to YYYY-MM-DD format""" - if not date_str: - return "" - - formats = ['%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%m-%d-%Y', '%d-%m-%Y', '%Y/%m/%d'] - - for fmt in formats: - try: - parsed_date = datetime.strptime(date_str, fmt) - return parsed_date.strftime('%Y-%m-%d') - except ValueError: - continue - - return date_str - -# =============================================================================== -# VECTOR STORE CLASS -# =============================================================================== - -class VectorStore: - """Simplified vector store for Hugging Face Spaces""" - - def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"): - self.embedding_model_name = embedding_model - self.vector_store_path = os.path.join(HF_CONFIG["data_dir"], "vectors.pkl") - self.metadata_path = os.path.join(HF_CONFIG["data_dir"], "metadata.pkl") - self.embedding_model = None - self.vectors = [] - self.document_metadata = [] - self.embedding_dimension = None - - self.setup_embedding_model() - self.load_vector_store() - - def setup_embedding_model(self): - """Initialize the sentence transformer model""" - if not SENTENCE_TRANSFORMERS_AVAILABLE: - st.warning("⚠️ Sentence Transformers not available. Vector search disabled.") - return - - try: - with st.spinner(f"Loading embedding model: {self.embedding_model_name}..."): - self.embedding_model = SentenceTransformer( - self.embedding_model_name, - cache_folder=HF_CONFIG["cache_dir"] - ) - - # Get embedding dimension - test_embedding = self.embedding_model.encode(["test"]) - self.embedding_dimension = test_embedding.shape[0] - - st.success(f"✅ Embedding model loaded: {self.embedding_model_name}") - - except Exception as e: - st.error(f"❌ Failed to load embedding model: {e}") - self.embedding_model = None - - def load_vector_store(self): - """Load existing vector store""" - try: - if os.path.exists(self.vector_store_path) and os.path.exists(self.metadata_path): - with open(self.vector_store_path, 'rb') as f: - self.vectors = pickle.load(f) - - with open(self.metadata_path, 'rb') as f: - self.document_metadata = pickle.load(f) - - st.success(f"✅ Vector store loaded: {len(self.document_metadata)} documents") - else: - self.vectors = [] - self.document_metadata = [] - st.info("📄 New vector store initialized") - - except Exception as e: - st.error(f"❌ Error loading vector store: {e}") - self.vectors = [] - self.document_metadata = [] - - def save_vector_store(self): - """Save vector store to disk""" - try: - with open(self.vector_store_path, 'wb') as f: - pickle.dump(self.vectors, f) - - with open(self.metadata_path, 'wb') as f: - pickle.dump(self.document_metadata, f) - - return True - except Exception as e: - st.error(f"Error saving vector store: {e}") - return False - - def create_document_text(self, invoice_data: dict, raw_text: str = "") -> str: - """Create searchable text from invoice data""" - text_parts = [] - - for field, value in invoice_data.items(): - if value and field != 'id': - text_parts.append(f"{field}: {value}") - - if raw_text: - text_parts.append(f"content: {raw_text[:300]}") - - return " | ".join(text_parts) - - def add_document(self, invoice_data: dict, raw_text: str = "") -> bool: - """Add a document to the vector store""" - if not self.embedding_model: - return False - - try: - document_text = self.create_document_text(invoice_data, raw_text) - - # Generate embedding - embedding = self.embedding_model.encode(document_text, normalize_embeddings=True) - - # Create metadata - metadata = { - 'invoice_id': invoice_data.get('id', ''), - 'invoice_number': invoice_data.get('invoice_number', ''), - 'supplier_name': invoice_data.get('supplier_name', ''), - 'buyer_name': invoice_data.get('buyer_name', ''), - 'amount': invoice_data.get('amount', 0), - 'date': invoice_data.get('date', ''), - 'file_name': invoice_data.get('file_info', {}).get('file_name', ''), - 'document_text': document_text[:200], - 'timestamp': datetime.now().isoformat() - } - - # Add to store - self.vectors.append(embedding) - self.document_metadata.append(metadata) - - return True - - except Exception as e: - st.error(f"Error adding document to vector store: {e}") - return False - - def semantic_search(self, query: str, top_k: int = 5) -> List[VectorSearchResult]: - """Perform semantic search using cosine similarity""" - if not self.embedding_model or not self.vectors: - return [] - - try: - # Generate query embedding - query_embedding = self.embedding_model.encode(query, normalize_embeddings=True) - - # Calculate similarities - similarities = [] - for i, doc_embedding in enumerate(self.vectors): - similarity = np.dot(query_embedding, doc_embedding) - similarities.append((similarity, i)) - - # Sort by similarity - similarities.sort(reverse=True) - - # Return top results - results = [] - for similarity, idx in similarities[:top_k]: - if similarity > 0.1: # Relevance threshold - metadata = self.document_metadata[idx] - result = VectorSearchResult( - invoice_id=metadata.get('invoice_id', ''), - invoice_number=metadata.get('invoice_number', ''), - supplier_name=metadata.get('supplier_name', ''), - similarity_score=float(similarity), - content_preview=metadata.get('document_text', ''), - metadata=metadata - ) - results.append(result) - - return results - - except Exception as e: - st.error(f"Error in semantic search: {e}") - return [] - -# =============================================================================== -# MAIN PROCESSOR CLASS -# =============================================================================== - -class InvoiceProcessor: - """Main invoice processor for Hugging Face Spaces""" - - def __init__(self): - self.setup_storage() - self.document_processor = DocumentProcessor() - self.ai_extractor = AIExtractor() - self.vector_store = VectorStore() if SENTENCE_TRANSFORMERS_AVAILABLE else None - - # Initialize stats - self.processing_stats = { - 'total_processed': 0, - 'successful': 0, - 'failed': 0, - 'start_time': datetime.now() - } - - def setup_storage(self): - """Setup storage paths""" - self.data_dir = HF_CONFIG["data_dir"] - self.json_path = os.path.join(self.data_dir, "invoices.json") - - # Initialize JSON storage - if not os.path.exists(self.json_path): - initial_data = { - "metadata": { - "created_at": datetime.now().isoformat(), - "version": "hf_v1.0", - "total_invoices": 0 - }, - "invoices": [], - "summary": { - "total_amount": 0.0, - "unique_suppliers": [], - "processing_stats": {"successful": 0, "failed": 0} - } - } - self.save_json_data(initial_data) - - def load_json_data(self) -> dict: - """Load invoice data from JSON""" - try: - with open(self.json_path, 'r', encoding='utf-8') as f: - return json.load(f) - except (FileNotFoundError, json.JSONDecodeError): - self.setup_storage() - return self.load_json_data() - - def save_json_data(self, data: dict): - """Save invoice data to JSON""" - try: - with open(self.json_path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, ensure_ascii=False) - except Exception as e: - st.error(f"Error saving data: {e}") - - def process_uploaded_file(self, uploaded_file) -> InvoiceData: - """Process a single uploaded file with enhanced debugging""" - self.processing_stats['total_processed'] += 1 - - try: - # Debug file info - file_size = len(uploaded_file.getvalue()) - file_extension = uploaded_file.name.split('.')[-1].lower() if '.' in uploaded_file.name else 'unknown' - - st.info(f"📄 Processing: {uploaded_file.name} ({file_size/1024:.1f} KB, .{file_extension})") - - # Check file size - if file_size > HF_CONFIG["max_file_size_mb"] * 1024 * 1024: - error_msg = f"File too large: {file_size / 1024 / 1024:.2f}MB > {HF_CONFIG['max_file_size_mb']}MB" - st.error(error_msg) - self.processing_stats['failed'] += 1 - return InvoiceData() - - # Check file type - if file_extension not in ['pdf', 'txt']: - error_msg = f"Unsupported file type: .{file_extension} (supported: PDF, TXT)" - st.warning(error_msg) - self.processing_stats['failed'] += 1 - return InvoiceData() - - # Save temporarily - with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: - file_content = uploaded_file.getvalue() - tmp_file.write(file_content) - tmp_file_path = tmp_file.name - - st.info(f"💾 Saved temporarily to: {tmp_file_path}") - - try: - # Extract text - st.info("🔍 Extracting text from document...") - text = self.document_processor.extract_text_from_document(tmp_file_path) - - if not text or not text.strip(): - st.warning(f"❌ No text extracted from {uploaded_file.name}") - self.processing_stats['failed'] += 1 - return InvoiceData() - - text_length = len(text) - st.info(f"📝 Extracted {text_length} characters of text") - - # Show text preview - if text_length > 0: - with st.expander("📄 Text Preview (First 500 characters)", expanded=False): - st.text(text[:500] + "..." if len(text) > 500 else text) - - # Extract invoice data - st.info("🤖 Extracting invoice data using AI/Regex...") - invoice_data = self.ai_extractor.extract_with_ai(text) - invoice_data.file_path = uploaded_file.name - - # Show extraction results - st.info(f"📊 Extraction completed with {invoice_data.extraction_confidence:.1%} confidence") - - # Save to storage - st.info("💾 Saving extracted data...") - self.save_invoice_data(invoice_data, text, file_size) - - self.processing_stats['successful'] += 1 - st.success(f"✅ Successfully processed {uploaded_file.name}") - - return invoice_data - - finally: - # Cleanup - try: - os.unlink(tmp_file_path) - st.info("🧹 Cleaned up temporary file") - except: - pass - - except Exception as e: - error_msg = f"Error processing {uploaded_file.name}: {str(e)}" - st.error(error_msg) - self.processing_stats['failed'] += 1 - - # Show detailed error for debugging - with st.expander("🔍 Error Details", expanded=False): - st.code(str(e)) - import traceback - st.code(traceback.format_exc()) - - return InvoiceData() - - def save_invoice_data(self, invoice_data: InvoiceData, raw_text: str, file_size: int): - """Save invoice data to JSON and vector store""" - try: - # Load existing data - data = self.load_json_data() - - # Create invoice record - invoice_record = { - "id": len(data["invoices"]) + 1, - "invoice_number": invoice_data.invoice_number, - "supplier_name": invoice_data.supplier_name, - "buyer_name": invoice_data.buyer_name, - "date": invoice_data.date, - "amount": invoice_data.amount, - "quantity": invoice_data.quantity, - "product_description": invoice_data.product_description, - "file_info": { - "file_name": invoice_data.file_path, - "file_size": file_size - }, - "extraction_info": { - "confidence": invoice_data.extraction_confidence, - "method": invoice_data.processing_method, - "raw_text_preview": raw_text[:300] - }, - "timestamps": { - "created_at": datetime.now().isoformat() - } - } - - # Add to invoices - data["invoices"].append(invoice_record) - - # Update summary - self.update_summary(data) - - # Save JSON - self.save_json_data(data) - - # Add to vector store - if self.vector_store: - self.vector_store.add_document(invoice_record, raw_text) - self.vector_store.save_vector_store() - - except Exception as e: - st.error(f"Error saving invoice data: {e}") - - def update_summary(self, data: dict): - """Update summary statistics""" - invoices = data["invoices"] - - total_amount = sum(inv.get("amount", 0) for inv in invoices) - unique_suppliers = list(set(inv.get("supplier_name", "") for inv in invoices if inv.get("supplier_name"))) - - data["summary"] = { - "total_amount": total_amount, - "unique_suppliers": unique_suppliers, - "processing_stats": { - "successful": self.processing_stats['successful'], - "failed": self.processing_stats['failed'], - "total_processed": self.processing_stats['total_processed'] - } - } - - data["metadata"]["last_updated"] = datetime.now().isoformat() - data["metadata"]["total_invoices"] = len(invoices) - -# =============================================================================== -# CHATBOT CLASS -# =============================================================================== - -class ChatBot: - """Chatbot for invoice queries""" - - def __init__(self, processor: InvoiceProcessor): - self.processor = processor - - def query_database(self, query: str) -> str: - """Process user query and return response""" - try: - data = self.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - return "No invoice data found. Please upload some invoices first." - - query_lower = query.lower() - - # Handle different query types - if any(phrase in query_lower for phrase in ["summary", "overview", "total"]): - return self.generate_summary(data) - - elif "count" in query_lower or "how many" in query_lower: - return self.handle_count_query(data) - - elif any(phrase in query_lower for phrase in ["amount", "value", "money", "cost"]): - return self.handle_amount_query(data) - - elif any(phrase in query_lower for phrase in ["supplier", "vendor", "company"]): - return self.handle_supplier_query(data, query) - - elif self.processor.vector_store: - return self.handle_semantic_search(query) - - else: - return self.handle_general_query(data, query) - - except Exception as e: - return f"Error processing query: {e}" - - def generate_summary(self, data: dict) -> str: - """Generate comprehensive summary""" - invoices = data.get("invoices", []) - summary = data.get("summary", {}) - - if not invoices: - return "No invoices found in the system." - - total_amount = summary.get("total_amount", 0) - avg_amount = total_amount / len(invoices) if invoices else 0 - unique_suppliers = len(summary.get("unique_suppliers", [])) - - response = f""" -**📊 Invoice System Summary** - -• **Total Invoices**: {len(invoices):,} -• **Total Value**: ₹{total_amount:,.2f} -• **Average Invoice**: ₹{avg_amount:,.2f} -• **Unique Suppliers**: {unique_suppliers} - -**📈 Processing Stats** -• **Successful**: {summary.get('processing_stats', {}).get('successful', 0)} -• **Failed**: {summary.get('processing_stats', {}).get('failed', 0)} - -**🔍 Recent Invoices** -""" - - # Show recent invoices - recent = sorted(invoices, key=lambda x: x.get('timestamps', {}).get('created_at', ''), reverse=True)[:5] - for i, inv in enumerate(recent, 1): - response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (₹{inv.get('amount', 0):,.2f})" - - return response - - def handle_count_query(self, data: dict) -> str: - """Handle count-related queries""" - invoices = data.get("invoices", []) - total = len(invoices) - unique_numbers = len(set(inv.get('invoice_number', '') for inv in invoices if inv.get('invoice_number'))) - - return f""" -**📊 Invoice Count Summary** - -• **Total Records**: {total} -• **Unique Invoice Numbers**: {unique_numbers} -• **Duplicates**: {total - unique_numbers if total > unique_numbers else 0} - -**📅 Processing Timeline** -• **First Invoice**: {invoices[0].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} -• **Latest Invoice**: {invoices[-1].get('timestamps', {}).get('created_at', 'N/A')[:10] if invoices else 'N/A'} -""" - - def handle_amount_query(self, data: dict) -> str: - """Handle amount-related queries""" - invoices = data.get("invoices", []) - amounts = [inv.get('amount', 0) for inv in invoices if inv.get('amount', 0) > 0] - - if not amounts: - return "No amount information found in invoices." - - total_amount = sum(amounts) - avg_amount = total_amount / len(amounts) - max_amount = max(amounts) - min_amount = min(amounts) - - # Find high-value invoices - high_value_threshold = sorted(amounts, reverse=True)[min(4, len(amounts)-1)] if len(amounts) > 5 else max_amount - high_value_invoices = [inv for inv in invoices if inv.get('amount', 0) >= high_value_threshold] - - response = f""" -**💰 Financial Analysis** - -• **Total Amount**: ₹{total_amount:,.2f} -• **Average Amount**: ₹{avg_amount:,.2f} -• **Highest Invoice**: ₹{max_amount:,.2f} -• **Lowest Invoice**: ₹{min_amount:,.2f} - -**🎯 High-Value Invoices (₹{high_value_threshold:,.2f}+)** -""" - - for i, inv in enumerate(high_value_invoices[:5], 1): - response += f"\n{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')} (₹{inv.get('amount', 0):,.2f})" - - return response - - def handle_supplier_query(self, data: dict, query: str) -> str: - """Handle supplier-related queries""" - invoices = data.get("invoices", []) - - # Count invoices by supplier - supplier_counts = {} - supplier_amounts = {} - - for inv in invoices: - supplier = inv.get('supplier_name', '').strip() - if supplier: - supplier_counts[supplier] = supplier_counts.get(supplier, 0) + 1 - supplier_amounts[supplier] = supplier_amounts.get(supplier, 0) + inv.get('amount', 0) - - if not supplier_counts: - return "No supplier information found in invoices." - - # Sort suppliers by amount - top_suppliers = sorted(supplier_amounts.items(), key=lambda x: x[1], reverse=True)[:10] - - response = f""" -**🏢 Supplier Analysis** - -• **Total Unique Suppliers**: {len(supplier_counts)} -• **Most Active**: {max(supplier_counts, key=supplier_counts.get)} ({supplier_counts[max(supplier_counts, key=supplier_counts.get)]} invoices) - -**💰 Top Suppliers by Amount** -""" - - for i, (supplier, amount) in enumerate(top_suppliers, 1): - count = supplier_counts[supplier] - avg = amount / count if count > 0 else 0 - response += f"\n{i}. **{supplier}** - ₹{amount:,.2f} ({count} invoices, avg: ₹{avg:,.2f})" - - return response - - def handle_semantic_search(self, query: str) -> str: - """Handle semantic search queries""" - try: - results = self.processor.vector_store.semantic_search(query, top_k=5) - - if not results: - return f"No relevant results found for '{query}'. Try different keywords." - - response = f"🔍 **Semantic Search Results for '{query}'**\n\n" - - for i, result in enumerate(results, 1): - response += f"{i}. **{result.invoice_number}** - {result.supplier_name}\n" - response += f" • Similarity: {result.similarity_score:.3f}\n" - response += f" • Amount: ₹{result.metadata.get('amount', 0):,.2f}\n" - response += f" • Preview: {result.content_preview[:100]}...\n\n" - - return response - - except Exception as e: - return f"Semantic search error: {e}" - - def handle_general_query(self, data: dict, query: str) -> str: - """Handle general queries with keyword search""" - invoices = data.get("invoices", []) - query_words = query.lower().split() - - # Simple keyword matching - matching_invoices = [] - for inv in invoices: - text_to_search = ( - inv.get('supplier_name', '') + ' ' + - inv.get('buyer_name', '') + ' ' + - inv.get('product_description', '') + ' ' + - inv.get('extraction_info', {}).get('raw_text_preview', '') - ).lower() - - if any(word in text_to_search for word in query_words): - matching_invoices.append(inv) - - if not matching_invoices: - return f"No invoices found matching '{query}'. Try different keywords or check the summary." - - response = f"🔍 **Found {len(matching_invoices)} invoices matching '{query}'**\n\n" - - for i, inv in enumerate(matching_invoices[:5], 1): - response += f"{i}. **{inv.get('invoice_number', 'N/A')}** - {inv.get('supplier_name', 'Unknown')}\n" - response += f" • Amount: ₹{inv.get('amount', 0):,.2f}\n" - response += f" • Date: {inv.get('date', 'N/A')}\n\n" - - if len(matching_invoices) > 5: - response += f"... and {len(matching_invoices) - 5} more results." - - return response - -# =============================================================================== -# STREAMLIT APPLICATION -# =============================================================================== - -def create_app(): - """Main Streamlit application""" - - # Generate unique session ID for this run - if 'session_id' not in st.session_state: - st.session_state.session_id = str(uuid.uuid4())[:8] - - session_id = st.session_state.session_id - - # Custom CSS - st.markdown(""" - - """, unsafe_allow_html=True) - - # Header - st.markdown('

📄 AI Invoice Processing System

', unsafe_allow_html=True) - st.markdown(""" -
-

- AI-Powered Document Processing • Semantic Search • Smart Analytics • Hugging Face Spaces -

-
- """, unsafe_allow_html=True) - - # Initialize processor - if 'processor' not in st.session_state: - with st.spinner("🔧 Initializing AI Invoice Processor..."): - try: - st.session_state.processor = InvoiceProcessor() - st.session_state.chatbot = ChatBot(st.session_state.processor) - st.session_state.chat_history = [] - st.success("✅ System initialized successfully!") - except Exception as e: - st.error(f"❌ Initialization failed: {e}") - st.stop() - - # Sidebar - with st.sidebar: - st.header("🎛️ System Status") - - processor = st.session_state.processor - - # Component status - if processor.document_processor.processors: - st.markdown('✅ Document Processing', unsafe_allow_html=True) - else: - st.markdown('❌ Document Processing', unsafe_allow_html=True) - - if processor.ai_extractor.use_transformers: - st.markdown('✅ AI Extraction', unsafe_allow_html=True) - else: - st.markdown('⚠️ Regex Extraction', unsafe_allow_html=True) - - if processor.vector_store and processor.vector_store.embedding_model: - st.markdown('✅ Semantic Search', unsafe_allow_html=True) - else: - st.markdown('⚠️ Keyword Search Only', unsafe_allow_html=True) - - # Quick stats - st.header("📊 Quick Stats") - try: - data = processor.load_json_data() - total_invoices = len(data.get("invoices", [])) - total_amount = data.get("summary", {}).get("total_amount", 0) - - st.metric("Total Invoices", total_invoices) - st.metric("Total Value", f"₹{total_amount:,.2f}") - st.metric("Success Rate", f"{processor.processing_stats['successful']}/{processor.processing_stats['total_processed']}") - - except Exception as e: - st.error(f"Stats error: {e}") - - # System info - st.header("⚙️ System Info") - st.info(f""" - **Session ID:** {session_id} - - **Limits:** - • Max file size: 10MB - • Max concurrent files: 3 - • Timeout: 30s - """) - - # Main navigation - selected_tab = st.radio( - "Choose a section:", - ["📤 Upload & Process", "💬 AI Chat", "📊 Analytics", "📋 Data Explorer"], - horizontal=True, - key=f"main_navigation_{session_id}" - ) - - # ------------------------------------------------------------------------- - # UPLOAD & PROCESS SECTION - # ------------------------------------------------------------------------- - - if selected_tab == "📤 Upload & Process": - st.header("📤 Upload Invoice Documents") - - # Feature highlights - col1, col2, col3 = st.columns(3) - - with col1: - st.markdown(""" -
-

🤖 AI Extraction

-

Advanced NLP models extract structured data automatically

-
- """, unsafe_allow_html=True) - - with col2: - st.markdown(""" -
-

🔍 Smart Search

-

Semantic search finds invoices using natural language

-
- """, unsafe_allow_html=True) - - with col3: - st.markdown(""" -
-

📊 Analytics

-

Comprehensive insights and visualizations

-
- """, unsafe_allow_html=True) - - # File upload - st.markdown("### 📁 Upload Your Invoices") - - # Initialize session state for files if not exists - if f'uploaded_files_{session_id}' not in st.session_state: - st.session_state[f'uploaded_files_{session_id}'] = None - if f'processing_complete_{session_id}' not in st.session_state: - st.session_state[f'processing_complete_{session_id}'] = False - if f'currently_processing_{session_id}' not in st.session_state: - st.session_state[f'currently_processing_{session_id}'] = False - if f'processed_file_hashes_{session_id}' not in st.session_state: - st.session_state[f'processed_file_hashes_{session_id}'] = set() - - # File uploader with stable key - uploaded_files = st.file_uploader( - "Choose invoice files (PDF, TXT supported)", - type=['pdf', 'txt'], - accept_multiple_files=True, - help="Maximum file size: 10MB per file", - key=f"file_uploader_stable_{session_id}" - ) - - # Store uploaded files in session state only if they're new - if uploaded_files: - # Create file hashes to detect if files have changed - current_file_hashes = set() - for file in uploaded_files: - file_hash = hash((file.name, file.size)) - current_file_hashes.add(file_hash) - - # Check if files have changed - stored_hashes = st.session_state.get(f'uploaded_file_hashes_{session_id}', set()) - if current_file_hashes != stored_hashes: - st.session_state[f'uploaded_files_{session_id}'] = uploaded_files - st.session_state[f'uploaded_file_hashes_{session_id}'] = current_file_hashes - st.session_state[f'processing_complete_{session_id}'] = False - st.session_state[f'currently_processing_{session_id}'] = False - st.info("📄 New files detected - ready for processing") - - # Get files from session state - current_files = st.session_state[f'uploaded_files_{session_id}'] - is_processing = st.session_state[f'currently_processing_{session_id}'] - is_complete = st.session_state[f'processing_complete_{session_id}'] - - if current_files: - max_files = 3 - if len(current_files) > max_files: - st.warning(f"⚠️ Too many files selected. Processing first {max_files} files.") - current_files = current_files[:max_files] - - st.info(f"📊 {len(current_files)} files selected") - - # Show file names - st.markdown("**Selected Files:**") - for i, file in enumerate(current_files, 1): - file_size_mb = len(file.getvalue()) / (1024 * 1024) - file_hash = hash((file.name, file.size)) - processed_icon = "✅" if file_hash in st.session_state[f'processed_file_hashes_{session_id}'] else "📄" - st.write(f"{processed_icon} {i}. {file.name} ({file_size_mb:.2f} MB)") - - # Process button - only show if not currently processing - col1, col2 = st.columns([1, 1]) - - with col1: - if not is_processing and not is_complete: - if st.button("🚀 Process Files", type="primary", key=f"process_btn_{session_id}"): - st.session_state[f'currently_processing_{session_id}'] = True - st.rerun() - elif is_processing: - st.info("🔄 Processing in progress...") - # Actually process the files here - process_files_once(current_files, session_id) - elif is_complete: - st.success("✅ Processing completed!") - if st.button("🔄 Process Again", key=f"reprocess_btn_{session_id}"): - st.session_state[f'processing_complete_{session_id}'] = False - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processed_file_hashes_{session_id}'] = set() - st.rerun() - - with col2: - if st.button("🗑️ Clear Files", key=f"clear_files_{session_id}"): - st.session_state[f'uploaded_files_{session_id}'] = None - st.session_state[f'uploaded_file_hashes_{session_id}'] = set() - st.session_state[f'processing_complete_{session_id}'] = False - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processed_file_hashes_{session_id}'] = set() - st.rerun() - - else: - st.info("👆 Please select invoice files to upload and process") - - # Show processing results if completed - if is_complete: - st.markdown("### 📋 Recent Processing Results") - try: - data = st.session_state.processor.load_json_data() - recent_invoices = sorted( - data.get("invoices", []), - key=lambda x: x.get('timestamps', {}).get('created_at', ''), - reverse=True - )[:5] - - if recent_invoices: - for i, inv in enumerate(recent_invoices, 1): - with st.expander(f"📄 {inv.get('invoice_number', f'Invoice {i}')} - {inv.get('supplier_name', 'Unknown')}", expanded=False): - col1, col2 = st.columns(2) - with col1: - st.write(f"**Invoice #:** {inv.get('invoice_number', 'N/A')}") - st.write(f"**Supplier:** {inv.get('supplier_name', 'N/A')}") - st.write(f"**Amount:** ₹{inv.get('amount', 0):.2f}") - with col2: - st.write(f"**Date:** {inv.get('date', 'N/A')}") - st.write(f"**Method:** {inv.get('extraction_info', {}).get('method', 'N/A')}") - st.write(f"**Confidence:** {inv.get('extraction_info', {}).get('confidence', 0):.1%}") - else: - st.info("No recent processing results found.") - except Exception as e: - st.error(f"Error loading recent results: {e}") - - # ------------------------------------------------------------------------- - # AI CHAT SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "💬 AI Chat": - st.header("💬 AI Chat Interface") - - # Display chat history - if st.session_state.chat_history: - st.markdown("### 💬 Chat History") - for i, message in enumerate(st.session_state.chat_history): - with st.chat_message(message["role"]): - st.markdown(message["content"]) - - # Chat input - st.markdown("### ✍️ Ask a Question") - - col1, col2 = st.columns([4, 1]) - - with col1: - user_input = st.text_input( - "Type your question:", - placeholder="e.g., 'show me total spending'", - key=f"chat_input_{session_id}" - ) - - with col2: - ask_btn = st.button("🚀 Ask", type="primary", key=f"ask_btn_{session_id}") - - if ask_btn and user_input: - handle_chat_query(user_input) - - # Suggested queries - if not st.session_state.chat_history: - st.markdown("### 💡 Try These Queries") - - col1, col2 = st.columns(2) - - with col1: - st.markdown("**📊 Basic Queries:**") - basic_queries = [ - "Show me a summary of all invoices", - "How much have we spent in total?", - "Who are our top suppliers?", - "Find invoices with high amounts" - ] - for i, query in enumerate(basic_queries): - if st.button(query, key=f"basic_{session_id}_{i}"): - handle_chat_query(query) - - with col2: - st.markdown("**🔍 Advanced Queries:**") - advanced_queries = [ - "Find technology purchases", - "Show office supplies", - "Search consulting services", - "Recent high-value invoices" - ] - for i, query in enumerate(advanced_queries): - if st.button(query, key=f"advanced_{session_id}_{i}"): - handle_chat_query(query) - - # Clear chat - if st.session_state.chat_history: - if st.button("🗑️ Clear Chat", key=f"clear_chat_{session_id}"): - st.session_state.chat_history = [] - st.rerun() - - # ------------------------------------------------------------------------- - # ANALYTICS SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "📊 Analytics": - st.header("📊 Analytics Dashboard") - - try: - data = st.session_state.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - st.info("📊 No data available. Upload some invoices to see analytics.") - return - - # Convert to DataFrame - df_data = [] - for inv in invoices: - df_data.append({ - 'invoice_number': inv.get('invoice_number', ''), - 'supplier_name': inv.get('supplier_name', ''), - 'amount': inv.get('amount', 0), - 'date': inv.get('date', ''), - 'confidence': inv.get('extraction_info', {}).get('confidence', 0) - }) - - df = pd.DataFrame(df_data) - - # Key metrics - col1, col2, col3, col4 = st.columns(4) - - with col1: - st.metric("Total Invoices", len(df)) - with col2: - st.metric("Total Amount", f"₹{df['amount'].sum():,.2f}") - with col3: - st.metric("Avg Amount", f"₹{df['amount'].mean():,.2f}") - with col4: - st.metric("Unique Suppliers", df['supplier_name'].nunique()) - - # Visualizations - if len(df) > 0: - # Amount distribution - fig_hist = px.histogram( - df, - x='amount', - title="Invoice Amount Distribution", - labels={'amount': 'Amount (₹)', 'count': 'Number of Invoices'} - ) - st.plotly_chart(fig_hist, use_container_width=True) - - # Top suppliers - if df['supplier_name'].notna().any(): - supplier_amounts = df.groupby('supplier_name')['amount'].sum().sort_values(ascending=False).head(10) - - if len(supplier_amounts) > 0: - fig_suppliers = px.bar( - x=supplier_amounts.values, - y=supplier_amounts.index, - orientation='h', - title="Top 10 Suppliers by Total Amount", - labels={'x': 'Total Amount (₹)', 'y': 'Supplier'} - ) - st.plotly_chart(fig_suppliers, use_container_width=True) - - except Exception as e: - st.error(f"Analytics error: {e}") - - # ------------------------------------------------------------------------- - # DATA EXPLORER SECTION - # ------------------------------------------------------------------------- - - elif selected_tab == "📋 Data Explorer": - st.header("📋 Data Explorer") - - try: - data = st.session_state.processor.load_json_data() - invoices = data.get("invoices", []) - - if not invoices: - st.info("📊 No data available. Upload some invoices first.") - return - - # Convert to DataFrame - df_data = [] - for inv in invoices: - df_data.append({ - 'Invoice Number': inv.get('invoice_number', ''), - 'Supplier': inv.get('supplier_name', ''), - 'Buyer': inv.get('buyer_name', ''), - 'Amount': inv.get('amount', 0), - 'Date': inv.get('date', ''), - 'Confidence': inv.get('extraction_info', {}).get('confidence', 0), - 'Method': inv.get('extraction_info', {}).get('method', ''), - 'File': inv.get('file_info', {}).get('file_name', ''), - 'Created': inv.get('timestamps', {}).get('created_at', '')[:19] - }) - - df = pd.DataFrame(df_data) - - # Filters - col1, col2, col3 = st.columns(3) - - with col1: - suppliers = ['All'] + sorted(df['Supplier'].dropna().unique().tolist()) - selected_supplier = st.selectbox("Filter by Supplier", suppliers, key=f"supplier_filter_{session_id}") - - with col2: - methods = ['All'] + sorted(df['Method'].dropna().unique().tolist()) - selected_method = st.selectbox("Filter by Method", methods, key=f"method_filter_{session_id}") - - with col3: - min_amount = st.number_input("Min Amount", min_value=0.0, value=0.0, key=f"amount_filter_{session_id}") - - # Apply filters - filtered_df = df.copy() - if selected_supplier != 'All': - filtered_df = filtered_df[filtered_df['Supplier'] == selected_supplier] - if selected_method != 'All': - filtered_df = filtered_df[filtered_df['Method'] == selected_method] - if min_amount > 0: - filtered_df = filtered_df[filtered_df['Amount'] >= min_amount] - - # Display data - st.dataframe( - filtered_df, - use_container_width=True, - column_config={ - "Amount": st.column_config.NumberColumn("Amount", format="₹%.2f"), - "Confidence": st.column_config.ProgressColumn("Confidence", min_value=0, max_value=1) - } - ) - - # Export options - col1, col2 = st.columns(2) - - with col1: - if st.button("📥 Export CSV", key=f"export_csv_{session_id}"): - csv_data = filtered_df.to_csv(index=False) - st.download_button( - "Download CSV", - csv_data, - f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.csv", - "text/csv", - key=f"download_csv_{session_id}" - ) - - with col2: - if st.button("📄 Export JSON", key=f"export_json_{session_id}"): - filtered_invoices = [inv for inv in invoices - if inv.get('invoice_number') in filtered_df['Invoice Number'].values] - - export_data = { - "exported_at": datetime.now().isoformat(), - "total_records": len(filtered_invoices), - "invoices": filtered_invoices - } - - st.download_button( - "Download JSON", - json.dumps(export_data, indent=2), - f"invoices_{datetime.now().strftime('%Y%m%d_%H%M')}.json", - "application/json", - key=f"download_json_{session_id}" - ) - - except Exception as e: - st.error(f"Data explorer error: {e}") - - # ------------------------------------------------------------------------- - # GLOBAL CHAT INPUT - # ------------------------------------------------------------------------- - - st.markdown("---") - st.markdown("### 💬 Quick Chat (Works from any section)") - - global_query = st.chat_input("Ask about your invoices...", key=f"global_chat_{session_id}") - - if global_query: - handle_chat_query(global_query, show_response=True) - - # Footer - st.markdown("---") - st.markdown(""" -
-

🚀 AI Invoice Processing System - Optimized for Hugging Face Spaces

-

Built with ❤️ using Streamlit, Transformers, and AI

-
- """, unsafe_allow_html=True) - -# =============================================================================== -# HELPER FUNCTIONS -# =============================================================================== - -def process_files_once(uploaded_files, session_id): - """Process uploaded files only once with proper state management""" - if not uploaded_files: - st.error("No files to process!") - st.session_state[f'currently_processing_{session_id}'] = False - return - - st.markdown("### 🔄 Processing Files...") - - # Get already processed file hashes - processed_hashes = st.session_state[f'processed_file_hashes_{session_id}'] - - # Filter out already processed files - files_to_process = [] - for file in uploaded_files: - file_hash = hash((file.name, file.size)) - if file_hash not in processed_hashes: - files_to_process.append((file, file_hash)) - - if not files_to_process: - st.info("✅ All files have already been processed!") - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processing_complete_{session_id}'] = True - return - - # Create containers for dynamic updates - progress_container = st.container() - status_container = st.container() - results_container = st.container() - - successful = 0 - failed = 0 - - # Show progress - with progress_container: - progress_bar = st.progress(0) - progress_text = st.empty() - - with status_container: - st.info(f"Starting to process {len(files_to_process)} new files...") - - # Process each file only once - for i, (uploaded_file, file_hash) in enumerate(files_to_process): - current_progress = (i + 1) / len(files_to_process) - - with progress_container: - progress_bar.progress(current_progress) - progress_text.text(f"Processing file {i+1}/{len(files_to_process)}: {uploaded_file.name}") - - with status_container: - st.info(f"🔄 Processing: {uploaded_file.name} ({len(uploaded_file.getvalue())/1024:.1f} KB)") - - try: - # Process the file - result = st.session_state.processor.process_uploaded_file(uploaded_file) - - # Mark file as processed regardless of result - processed_hashes.add(file_hash) - - # Show result immediately - with results_container: - if result and hasattr(result, 'invoice_number') and result.invoice_number: - successful += 1 - st.success(f"✅ Successfully processed: {uploaded_file.name}") - - # Show extracted data - col1, col2, col3 = st.columns(3) - with col1: - st.write(f"**Invoice #:** {result.invoice_number}") - st.write(f"**Supplier:** {result.supplier_name or 'Not found'}") - with col2: - st.write(f"**Amount:** ₹{result.amount:.2f}") - st.write(f"**Date:** {result.date or 'Not found'}") - with col3: - st.write(f"**Method:** {result.processing_method}") - st.write(f"**Confidence:** {result.extraction_confidence:.1%}") - - st.markdown("---") - else: - failed += 1 - st.warning(f"⚠️ Could not extract complete data from: {uploaded_file.name}") - if result: - st.write(f"Partial data: {result.supplier_name}, ₹{result.amount}") - st.markdown("---") - - except Exception as e: - failed += 1 - # Still mark as processed to avoid reprocessing - processed_hashes.add(file_hash) - - with results_container: - st.error(f"❌ Error processing {uploaded_file.name}: {str(e)}") - st.markdown("---") - - # Update session state - st.session_state[f'processed_file_hashes_{session_id}'] = processed_hashes - - # Final summary - with progress_container: - progress_bar.progress(1.0) - progress_text.text("✅ Processing completed!") - - with status_container: - if successful > 0: - st.success(f"🎉 Processing complete! {successful} successful, {failed} failed") - if successful > 0: - st.balloons() - else: - st.error(f"❌ Processing failed for all {failed} files. Please check file formats and content.") - - # Update processing state - st.session_state[f'currently_processing_{session_id}'] = False - st.session_state[f'processing_complete_{session_id}'] = True - - # Force rerun to update UI - st.rerun() - -def process_files(uploaded_files, session_id): - """Legacy function - redirect to process_files_once""" - return process_files_once(uploaded_files, session_id) - -def handle_chat_query(query, show_response=False): - """Handle chat query""" - st.session_state.chat_history.append({ - "role": "user", - "content": query, - "timestamp": datetime.now() - }) - - try: - with st.spinner("🤖 AI is analyzing..."): - response = st.session_state.chatbot.query_database(query) - - st.session_state.chat_history.append({ - "role": "assistant", - "content": response, - "timestamp": datetime.now() - }) - - if show_response: - with st.chat_message("assistant"): - st.markdown(response) - st.info("💡 Switch to the 'AI Chat' section to see full conversation history!") - - st.rerun() - - except Exception as e: - st.error(f"Chat error: {e}") - -# =============================================================================== -# MAIN ENTRY POINT -# =============================================================================== - -def main(): - """Main entry point for Hugging Face Spaces""" - try: - if IS_HF_SPACE: - st.sidebar.info("🤗 Running on Hugging Face Spaces") - - create_app() - - except Exception as e: - st.error(f""" - ## 🚨 Application Error - - {e} - - Please refresh the page or check the logs for more details. - """) - -if __name__ == "__main__": - main(), - ] - - amounts_found = [] - for i, pattern in enumerate(amount_patterns): - matches = re.findall(pattern, text.lower(), re.IGNORECASE | re.MULTILINE) - if matches: - st.write(f"Pattern {i+1}: {matches}") - for match in matches: - try: - amount_val = float(match.replace(',', '')) - amounts_found.append(amount_val) - except: - pass - - if amounts_found: - st.success(f"✅ Found amounts: {amounts_found}") - else: - st.warning("⚠️ No amounts detected in text") - - # Debug invoice number detection - st.markdown("**🔍 Invoice Number Detection Debug:**") - inv_patterns = [ - r'invoice\s*(?:no|number|#)?\s*:?\s*([A-Z0-9\-_/]+)', - r'#\s*([A-Z0-9\-_/]{3,})', - ] - - for i, pattern in enumerate(inv_patterns): - matches = re.findall(pattern, text.lower(), re.IGNORECASE) - if matches: - st.write(f"Invoice Pattern {i+1}: {matches}") - - # Show full text for manual inspection - if st.checkbox("Show Full Extracted Text", key=f"debug_full_text_{uploaded_file.name}"): - st.text_area("Full Text:", value=text, height=300, disabled=True) + with st.expander("📄 Text Preview (First 500 characters)", expanded=False): + st.text(text[:500] + "..." if len(text) > 500 else text) # Extract invoice data st.info("🤖 Extracting invoice data using AI/Regex...")