Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import time | |
| import hashlib | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from document_processor import DocumentProcessor | |
| from vector_store import VectorStore | |
| from config import Config | |
| import io | |
| class AdminPanel: | |
| """ | |
| Secure administrative interface for knowledge base management. | |
| Provides document upload, deletion, and system monitoring capabilities. | |
| """ | |
| def __init__(self): | |
| self.config = Config() | |
| self.document_processor = DocumentProcessor() | |
| self.vector_store = VectorStore() | |
| self.admin_password_hash = self._get_admin_password_hash() | |
| def _get_admin_password_hash(self) -> str: | |
| """ | |
| Get or create admin password hash. | |
| Default password: 'bluescarf_admin_2024' (change this in production!) | |
| """ | |
| password_file = Path(self.config.VECTOR_DB_PATH) / "admin_password.txt" | |
| if password_file.exists(): | |
| try: | |
| with open(password_file, 'r') as f: | |
| return f.read().strip() | |
| except Exception: | |
| pass | |
| # Default password hash (SHA-256 of 'bluescarf_admin_2024') | |
| default_password = "bluescarf_admin_2024" | |
| password_hash = hashlib.sha256(default_password.encode()).hexdigest() | |
| # Save to file | |
| try: | |
| password_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(password_file, 'w') as f: | |
| f.write(password_hash) | |
| except Exception as e: | |
| st.warning(f"Could not save admin password: {str(e)}") | |
| return password_hash | |
| def _verify_admin_password(self, entered_password: str) -> bool: | |
| """ | |
| Verify admin password against stored hash. | |
| Args: | |
| entered_password: Password entered by user | |
| Returns: | |
| True if password is correct, False otherwise | |
| """ | |
| entered_hash = hashlib.sha256(entered_password.encode()).hexdigest() | |
| return entered_hash == self.admin_password_hash | |
| def _change_admin_password(self, current_password: str, new_password: str) -> bool: | |
| """ | |
| Change admin password with verification. | |
| Args: | |
| current_password: Current admin password | |
| new_password: New password to set | |
| Returns: | |
| True if password changed successfully, False otherwise | |
| """ | |
| if not self._verify_admin_password(current_password): | |
| st.error("Current password is incorrect") | |
| return False | |
| if len(new_password) < 8: | |
| st.error("New password must be at least 8 characters long") | |
| return False | |
| # Update password hash | |
| new_hash = hashlib.sha256(new_password.encode()).hexdigest() | |
| password_file = Path(self.config.VECTOR_DB_PATH) / "admin_password.txt" | |
| try: | |
| with open(password_file, 'w') as f: | |
| f.write(new_hash) | |
| self.admin_password_hash = new_hash | |
| st.success("β Admin password updated successfully") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to update password: {str(e)}") | |
| return False | |
| def render_authentication(self) -> bool: | |
| """ | |
| Render admin authentication interface. | |
| Returns: | |
| True if authenticated, False otherwise | |
| """ | |
| if st.session_state.admin_authenticated: | |
| return True | |
| st.markdown(""" | |
| <div class="admin-section"> | |
| <h4>π Administrator Authentication</h4> | |
| <p>Enter admin password to access knowledge base management</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with st.form("admin_auth_form", clear_on_submit=True): | |
| password = st.text_input( | |
| "Admin Password:", | |
| type="password", | |
| help="Default: bluescarf_admin_2024 (change in production!)" | |
| ) | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| login_button = st.form_submit_button("Login", type="primary") | |
| with col2: | |
| if st.form_submit_button("Show Default Password"): | |
| st.info("Default password: `bluescarf_admin_2024`") | |
| if login_button and password: | |
| if self._verify_admin_password(password): | |
| st.session_state.admin_authenticated = True | |
| st.success("β Authentication successful!") | |
| st.rerun() | |
| else: | |
| st.error("β Invalid password") | |
| return False | |
| def render_document_upload(self): | |
| """Render document upload interface with batch processing support.""" | |
| st.markdown("### π Upload Company Documents") | |
| with st.expander("π Upload Guidelines", expanded=False): | |
| st.markdown(""" | |
| **Supported Documents:** | |
| - Company policies and procedures | |
| - Employee handbooks | |
| - Benefits information | |
| - HR guidelines and regulations | |
| - Training materials | |
| **Requirements:** | |
| - PDF format only | |
| - Maximum 50MB per file | |
| - Readable text content (not scanned images) | |
| - Company-related HR content | |
| """) | |
| # File upload interface | |
| uploaded_files = st.file_uploader( | |
| "Choose PDF files", | |
| type=['pdf'], | |
| accept_multiple_files=True, | |
| help="Upload multiple PDF files for batch processing" | |
| ) | |
| if uploaded_files: | |
| st.markdown(f"**Selected Files:** {len(uploaded_files)} PDF(s)") | |
| # Display file details | |
| file_details = [] | |
| total_size = 0 | |
| for uploaded_file in uploaded_files: | |
| file_size_mb = uploaded_file.size / (1024 * 1024) | |
| total_size += file_size_mb | |
| file_details.append({ | |
| 'Filename': uploaded_file.name, | |
| 'Size (MB)': f"{file_size_mb:.2f}", | |
| 'Status': 'β Ready' if file_size_mb <= 50 else 'β Too Large' | |
| }) | |
| df = pd.DataFrame(file_details) | |
| st.dataframe(df, use_container_width=True) | |
| # Process uploaded files | |
| col1, col2, col3 = st.columns([2, 2, 1]) | |
| with col1: | |
| process_button = st.button( | |
| f"π Process {len(uploaded_files)} Files", | |
| type="primary", | |
| disabled=total_size > 200 # 200MB total limit | |
| ) | |
| with col2: | |
| if total_size > 200: | |
| st.error(f"Total size ({total_size:.1f}MB) exceeds 200MB limit") | |
| with col3: | |
| if st.button("ποΈ Clear"): | |
| st.experimental_rerun() | |
| if process_button: | |
| self._process_uploaded_files(uploaded_files) | |
| def _process_uploaded_files(self, uploaded_files: List) -> None: | |
| """ | |
| Process multiple uploaded files with progress tracking and error handling. | |
| Args: | |
| uploaded_files: List of uploaded file objects | |
| """ | |
| success_count = 0 | |
| error_count = 0 | |
| duplicate_count = 0 | |
| # Overall progress tracking | |
| overall_progress = st.progress(0) | |
| status_placeholder = st.empty() | |
| for i, uploaded_file in enumerate(uploaded_files): | |
| try: | |
| # Update overall progress | |
| progress = i / len(uploaded_files) | |
| overall_progress.progress(progress) | |
| status_placeholder.info(f"Processing {uploaded_file.name}...") | |
| # Validate file | |
| if not self.document_processor.validate_pdf_file(uploaded_file): | |
| error_count += 1 | |
| continue | |
| # Check for duplicates | |
| doc_hash = self.document_processor.calculate_document_hash(uploaded_file) | |
| existing_docs = self.vector_store.get_documents_by_hash(doc_hash) | |
| if existing_docs: | |
| st.warning(f"β οΈ {uploaded_file.name} already exists in knowledge base") | |
| duplicate_count += 1 | |
| continue | |
| # Process document | |
| processed_doc = self.document_processor.process_document( | |
| uploaded_file, | |
| uploaded_file.name | |
| ) | |
| if processed_doc: | |
| # Add to vector store | |
| if self.vector_store.add_document(processed_doc): | |
| success_count += 1 | |
| else: | |
| error_count += 1 | |
| else: | |
| error_count += 1 | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {str(e)}") | |
| error_count += 1 | |
| # Final progress update | |
| overall_progress.progress(1.0) | |
| status_placeholder.empty() | |
| # Display results summary | |
| st.markdown("### π Processing Results") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("β Successful", success_count) | |
| with col2: | |
| st.metric("β οΈ Duplicates", duplicate_count) | |
| with col3: | |
| st.metric("β Errors", error_count) | |
| if success_count > 0: | |
| st.success(f"π Successfully processed {success_count} documents!") | |
| # Refresh knowledge base stats | |
| time.sleep(1) | |
| st.rerun() | |
| def render_knowledge_base_management(self): | |
| """Render knowledge base overview and management interface.""" | |
| st.markdown("### π Knowledge Base Management") | |
| # Get current statistics | |
| stats = self.vector_store.get_collection_stats() | |
| documents = self.vector_store.get_all_documents() | |
| # Display overview metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("π Documents", stats.get('total_documents', 0)) | |
| with col2: | |
| st.metric("π§© Chunks", stats.get('total_chunks', 0)) | |
| with col3: | |
| avg_chunks = stats.get('avg_chunks_per_doc', 0) | |
| st.metric("π Avg Chunks/Doc", f"{avg_chunks:.1f}") | |
| with col4: | |
| last_update = stats.get('latest_update', 0) | |
| if last_update: | |
| update_time = datetime.fromtimestamp(last_update).strftime("%m/%d/%Y") | |
| st.metric("π Last Update", update_time) | |
| else: | |
| st.metric("π Last Update", "None") | |
| if not documents: | |
| st.info("π No documents in knowledge base. Upload some documents to get started!") | |
| return | |
| # Document management table | |
| st.markdown("#### π Document Library") | |
| # Prepare document data for display | |
| doc_data = [] | |
| for doc in documents: | |
| processed_time = datetime.fromtimestamp( | |
| doc.get('processed_at', 0) | |
| ).strftime("%Y-%m-%d %H:%M") | |
| doc_data.append({ | |
| 'Filename': doc.get('filename', 'Unknown'), | |
| 'Type': doc.get('document_type', 'hr_policy').replace('_', ' ').title(), | |
| 'Chunks': doc.get('chunk_count', 0), | |
| 'Processed': processed_time, | |
| 'Hash': doc.get('document_hash', '')[:12] + '...' | |
| }) | |
| # Display documents table | |
| df = pd.DataFrame(doc_data) | |
| selected_rows = st.dataframe( | |
| df, | |
| use_container_width=True, | |
| hide_index=True | |
| ) | |
| # Document management actions | |
| if documents: | |
| st.markdown("#### π οΈ Management Actions") | |
| col1, col2, col3 = st.columns([2, 2, 2]) | |
| with col1: | |
| # Document selection for deletion | |
| doc_options = [ | |
| f"{doc['filename']} ({doc.get('chunk_count', 0)} chunks)" | |
| for doc in documents | |
| ] | |
| selected_doc_idx = st.selectbox( | |
| "Select document to delete:", | |
| range(len(doc_options)), | |
| format_func=lambda x: doc_options[x] | |
| ) | |
| if st.button("ποΈ Delete Selected", type="secondary"): | |
| self._delete_selected_document(documents[selected_doc_idx]) | |
| with col2: | |
| # Health check | |
| if st.button("π₯ Health Check", type="secondary"): | |
| self._perform_health_check() | |
| with col3: | |
| # Danger zone - reset knowledge base | |
| if st.button("β οΈ Reset All", type="secondary"): | |
| self._confirm_reset_knowledge_base() | |
| def _delete_selected_document(self, document: Dict[str, Any]): | |
| """ | |
| Delete selected document with confirmation. | |
| Args: | |
| document: Document metadata to delete | |
| """ | |
| doc_hash = document.get('document_hash') | |
| filename = document.get('filename', 'Unknown') | |
| if not doc_hash: | |
| st.error("Invalid document selection") | |
| return | |
| # Confirmation dialog | |
| with st.form(f"delete_confirm_{doc_hash[:8]}"): | |
| st.warning(f"β οΈ **Confirm Deletion**") | |
| st.write(f"Document: **{filename}**") | |
| st.write(f"Chunks: **{document.get('chunk_count', 0)}**") | |
| st.write("This action cannot be undone!") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| confirm_delete = st.form_submit_button("ποΈ Confirm Delete", type="primary") | |
| with col2: | |
| cancel_delete = st.form_submit_button("β Cancel") | |
| if confirm_delete: | |
| if self.vector_store.delete_document(doc_hash): | |
| st.success(f"β Successfully deleted {filename}") | |
| time.sleep(1) | |
| st.experimental_rerun() | |
| else: | |
| st.error("Failed to delete document") | |
| if cancel_delete: | |
| st.info("Deletion cancelled") | |
| st.experimental_rerun() | |
| def _perform_health_check(self): | |
| """Perform comprehensive system health check.""" | |
| with st.spinner("Performing health check..."): | |
| health_status = self.vector_store.health_check() | |
| st.markdown("#### π₯ System Health Report") | |
| if health_status.get('status') == 'healthy': | |
| st.success("β System is healthy!") | |
| elif health_status.get('status') == 'unhealthy': | |
| st.warning("β οΈ System issues detected") | |
| else: | |
| st.error("β System error") | |
| # Display detailed health metrics | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("**Storage Status:**") | |
| if health_status.get('storage_accessible'): | |
| st.success("β Storage accessible") | |
| else: | |
| st.error("β Storage issues") | |
| with col2: | |
| st.markdown("**Collection Status:**") | |
| if health_status.get('collection_healthy'): | |
| st.success("β Collection healthy") | |
| else: | |
| st.error("β Collection issues") | |
| # Additional metrics | |
| st.markdown("**System Metrics:**") | |
| metrics_data = { | |
| 'Total Documents': health_status.get('total_documents', 0), | |
| 'Total Chunks': health_status.get('total_chunks', 0), | |
| 'Last Check': datetime.fromtimestamp( | |
| health_status.get('last_check', time.time()) | |
| ).strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| for metric, value in metrics_data.items(): | |
| st.write(f"β’ **{metric}:** {value}") | |
| def _confirm_reset_knowledge_base(self): | |
| """Render knowledge base reset confirmation with safeguards.""" | |
| st.markdown("#### β οΈ **DANGER ZONE**") | |
| st.error("**Reset Knowledge Base** - This will delete ALL documents and chunks!") | |
| with st.form("reset_confirmation"): | |
| st.write("This action will:") | |
| st.write("β’ Delete all processed documents") | |
| st.write("β’ Remove all embeddings and chunks") | |
| st.write("β’ Clear document metadata") | |
| st.write("β’ **Cannot be undone!**") | |
| confirmation_text = st.text_input( | |
| "Type 'RESET BLUESCARF KNOWLEDGE BASE' to confirm:", | |
| placeholder="Type confirmation text here..." | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| reset_button = st.form_submit_button( | |
| "π₯ RESET EVERYTHING", | |
| type="primary" | |
| ) | |
| with col2: | |
| cancel_button = st.form_submit_button("β Cancel") | |
| if reset_button: | |
| if confirmation_text == "RESET BLUESCARF KNOWLEDGE BASE": | |
| with st.spinner("Resetting knowledge base..."): | |
| if self.vector_store.reset_collection(): | |
| st.success("β Knowledge base reset successfully!") | |
| time.sleep(2) | |
| st.rerun() | |
| else: | |
| st.error("β Failed to reset knowledge base") | |
| else: | |
| st.error("β Confirmation text doesn't match. Reset cancelled.") | |
| if cancel_button: | |
| st.info("Reset cancelled") | |
| st.rerun() | |
| def render_admin_settings(self): | |
| """Render admin settings and configuration options.""" | |
| st.markdown("### βοΈ Admin Settings") | |
| # Password management | |
| with st.expander("π Password Management", expanded=False): | |
| with st.form("change_password_form"): | |
| current_password = st.text_input( | |
| "Current Password:", | |
| type="password" | |
| ) | |
| new_password = st.text_input( | |
| "New Password:", | |
| type="password", | |
| help="Minimum 8 characters" | |
| ) | |
| confirm_password = st.text_input( | |
| "Confirm New Password:", | |
| type="password" | |
| ) | |
| change_pwd_button = st.form_submit_button("Update Password") | |
| if change_pwd_button: | |
| if new_password != confirm_password: | |
| st.error("New passwords don't match") | |
| elif len(new_password) < 8: | |
| st.error("Password must be at least 8 characters") | |
| else: | |
| self._change_admin_password(current_password, new_password) | |
| # System information | |
| with st.expander("π System Information", expanded=False): | |
| stats = self.vector_store.get_collection_stats() | |
| st.json({ | |
| 'Knowledge Base Stats': stats, | |
| 'Storage Path': str(self.config.VECTOR_DB_PATH), | |
| 'Chunk Size': self.config.CHUNK_SIZE, | |
| 'Max Context Chunks': self.config.MAX_CONTEXT_CHUNKS, | |
| 'Max File Size (MB)': self.config.MAX_FILE_SIZE / (1024*1024) | |
| }) | |
| # Logout button | |
| if st.button("πͺ Logout", type="secondary"): | |
| st.session_state.admin_authenticated = False | |
| st.session_state.show_admin = False | |
| st.rerun() | |
| def render(self): | |
| """Main admin panel render method.""" | |
| if not self.render_authentication(): | |
| return | |
| st.markdown("---") | |
| st.markdown("## π§ **Administrator Panel**") | |
| # Admin navigation tabs | |
| tab1, tab2, tab3 = st.tabs([ | |
| "π Document Management", | |
| "π Knowledge Base", | |
| "βοΈ Settings" | |
| ]) | |
| with tab1: | |
| self.render_document_upload() | |
| with tab2: | |
| self.render_knowledge_base_management() | |
| with tab3: | |
| self.render_admin_settings() | |