Spaces:
Sleeping
Sleeping
| import chromadb | |
| from chromadb.config import Settings | |
| import numpy as np | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import os | |
| import json | |
| import time | |
| import streamlit as st | |
| from pathlib import Path | |
| import uuid | |
| from config import Config | |
| class BulletproofVectorStore: | |
| """ | |
| Ultra-robust vector storage with bulletproof deletion mechanics. | |
| Engineering Philosophy: | |
| - Atomic operations with rollback capability | |
| - Deep diagnostic feedback for troubleshooting | |
| - Multiple deletion strategies with fallback mechanisms | |
| - State synchronization with UI refresh triggers | |
| """ | |
| def __init__(self): | |
| self.config = Config() | |
| self.client = self._initialize_chromadb_with_diagnostics() | |
| self.collection_name = "hr_knowledge_base" | |
| self.collection = self._get_or_create_collection_robust() | |
| self.deletion_diagnostics = {"operations": [], "performance_metrics": {}} | |
| def _initialize_chromadb_with_diagnostics(self) -> chromadb.Client: | |
| """Initialize ChromaDB with comprehensive error diagnosis and recovery.""" | |
| try: | |
| data_dir = Path(self.config.VECTOR_DB_PATH) | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| client = chromadb.PersistentClient( | |
| path=str(data_dir), | |
| settings=Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True, | |
| # Enhanced settings for deletion reliability | |
| chroma_server_authn_credentials_file=None, | |
| chroma_server_authn_provider=None | |
| ) | |
| ) | |
| # Verify client connection with diagnostic test | |
| collections = client.list_collections() | |
| st.info(f"π ChromaDB initialized successfully. Found {len(collections)} existing collections.") | |
| return client | |
| except Exception as initialization_error: | |
| st.error(f"π¨ ChromaDB initialization failed: {str(initialization_error)}") | |
| raise | |
| def _get_or_create_collection_robust(self) -> chromadb.Collection: | |
| """Get or create collection with enhanced error handling and validation.""" | |
| try: | |
| # Attempt to get existing collection with diagnostic feedback | |
| try: | |
| collection = self.client.get_collection( | |
| name=self.collection_name, | |
| embedding_function=None | |
| ) | |
| # Validate collection integrity | |
| collection_count = collection.count() | |
| st.success(f"β Connected to existing collection with {collection_count} items") | |
| return collection | |
| except Exception as get_error: | |
| st.info(f"π Creating new collection: {str(get_error)}") | |
| # Create new collection with enhanced metadata | |
| collection = self.client.create_collection( | |
| name=self.collection_name, | |
| embedding_function=None, | |
| metadata={ | |
| "description": "BLUESCARF AI HR Knowledge Base", | |
| "created_at": time.time(), | |
| "version": "2.0_bulletproof", | |
| "deletion_engine": "enhanced" | |
| } | |
| ) | |
| st.success("π New collection created successfully") | |
| return collection | |
| except Exception as collection_error: | |
| st.error(f"π₯ Collection setup failed: {str(collection_error)}") | |
| raise | |
| def delete_document_bulletproof(self, document_hash: str) -> bool: | |
| """ | |
| Bulletproof document deletion with multiple strategies and deep diagnostics. | |
| Architecture: | |
| 1. Pre-deletion validation and state capture | |
| 2. Multiple deletion strategies with fallback mechanisms | |
| 3. Post-deletion verification and cleanup | |
| 4. UI state synchronization and user feedback | |
| Args: | |
| document_hash: Unique document identifier | |
| Returns: | |
| bool: True if deletion successful, False otherwise | |
| """ | |
| deletion_session_id = str(uuid.uuid4())[:8] | |
| operation_start = time.time() | |
| st.info(f"π **Deletion Engine Activated** (Session: {deletion_session_id})") | |
| # Phase 1: Pre-deletion diagnostics and validation | |
| validation_result = self._execute_pre_deletion_diagnostics(document_hash) | |
| if not validation_result["is_valid"]: | |
| st.error(f"β Pre-deletion validation failed: {validation_result['reason']}") | |
| return False | |
| st.success(f"β Validation passed - {validation_result['chunk_count']} chunks identified") | |
| # Phase 2: Execute deletion with multiple strategies | |
| deletion_strategies = [ | |
| ("primary_where_clause", self._delete_via_where_clause), | |
| ("direct_id_deletion", self._delete_via_direct_ids), | |
| ("batch_deletion", self._delete_via_batch_operations), | |
| ("nuclear_reset", self._delete_via_collection_reset) | |
| ] | |
| for strategy_name, deletion_method in deletion_strategies: | |
| try: | |
| st.info(f"π§ Executing {strategy_name.replace('_', ' ').title()} strategy...") | |
| deletion_success = deletion_method(document_hash, validation_result) | |
| if deletion_success: | |
| # Phase 3: Post-deletion verification | |
| verification_result = self._execute_post_deletion_verification(document_hash) | |
| if verification_result["is_clean"]: | |
| # Phase 4: Cleanup and UI synchronization | |
| self._execute_comprehensive_cleanup(document_hash) | |
| self._trigger_ui_state_refresh() | |
| operation_time = time.time() - operation_start | |
| st.success(f"π **Deletion Complete!** ({operation_time:.2f}s using {strategy_name})") | |
| # Record successful operation | |
| self._record_deletion_success(deletion_session_id, strategy_name, operation_time) | |
| return True | |
| else: | |
| st.warning(f"β οΈ {strategy_name} incomplete - trying next strategy") | |
| else: | |
| st.warning(f"β οΈ {strategy_name} failed - trying next strategy") | |
| except Exception as strategy_error: | |
| st.error(f"π₯ {strategy_name} error: {str(strategy_error)}") | |
| continue | |
| # All strategies failed - provide comprehensive diagnostics | |
| st.error("π¨ **All deletion strategies failed**") | |
| self._provide_failure_diagnostics(document_hash, deletion_session_id) | |
| return False | |
| def _execute_pre_deletion_diagnostics(self, document_hash: str) -> Dict[str, Any]: | |
| """Comprehensive pre-deletion validation with detailed diagnostics.""" | |
| diagnostic_result = { | |
| "is_valid": False, | |
| "chunk_count": 0, | |
| "chunk_ids": [], | |
| "reason": "", | |
| "collection_status": {}, | |
| "metadata_status": {} | |
| } | |
| try: | |
| # Collection integrity check | |
| collection_count = self.collection.count() | |
| diagnostic_result["collection_status"] = { | |
| "total_items": collection_count, | |
| "is_accessible": True, | |
| "connection_healthy": True | |
| } | |
| # Document existence verification with multiple query approaches | |
| query_results = self.collection.get( | |
| where={"document_hash": document_hash}, | |
| include=['documents', 'metadatas'] | |
| ) | |
| if not query_results['ids']: | |
| # Try alternative query methods | |
| all_items = self.collection.get(include=['metadatas']) | |
| matching_items = [ | |
| item_id for item_id, metadata in zip(all_items['ids'], all_items['metadatas']) | |
| if metadata.get('document_hash') == document_hash | |
| ] | |
| if matching_items: | |
| diagnostic_result["chunk_ids"] = matching_items | |
| diagnostic_result["chunk_count"] = len(matching_items) | |
| diagnostic_result["is_valid"] = True | |
| st.info(f"π Found document via alternative query: {len(matching_items)} chunks") | |
| else: | |
| diagnostic_result["reason"] = "Document not found in collection" | |
| return diagnostic_result | |
| else: | |
| diagnostic_result["chunk_ids"] = query_results['ids'] | |
| diagnostic_result["chunk_count"] = len(query_results['ids']) | |
| diagnostic_result["is_valid"] = True | |
| # Metadata file verification | |
| metadata_file = Path(self.config.VECTOR_DB_PATH) / "metadata" / f"{document_hash}.json" | |
| diagnostic_result["metadata_status"] = { | |
| "file_exists": metadata_file.exists(), | |
| "file_path": str(metadata_file) | |
| } | |
| return diagnostic_result | |
| except Exception as diagnostic_error: | |
| diagnostic_result["reason"] = f"Diagnostic error: {str(diagnostic_error)}" | |
| return diagnostic_result | |
| def _delete_via_where_clause(self, document_hash: str, validation_data: Dict) -> bool: | |
| """Primary deletion strategy using WHERE clause filtering.""" | |
| try: | |
| pre_count = self.collection.count() | |
| # Execute deletion with enhanced where clause | |
| self.collection.delete(where={"document_hash": document_hash}) | |
| post_count = self.collection.count() | |
| deleted_count = pre_count - post_count | |
| st.info(f"π Where clause deletion: {deleted_count} items removed") | |
| return deleted_count > 0 | |
| except Exception as where_error: | |
| st.error(f"Where clause deletion failed: {str(where_error)}") | |
| return False | |
| def _delete_via_direct_ids(self, document_hash: str, validation_data: Dict) -> bool: | |
| """Secondary deletion strategy using direct ID targeting.""" | |
| try: | |
| chunk_ids = validation_data.get("chunk_ids", []) | |
| if not chunk_ids: | |
| return False | |
| # Delete by specific IDs in batches for reliability | |
| batch_size = 10 | |
| deleted_total = 0 | |
| for i in range(0, len(chunk_ids), batch_size): | |
| batch_ids = chunk_ids[i:i + batch_size] | |
| try: | |
| self.collection.delete(ids=batch_ids) | |
| deleted_total += len(batch_ids) | |
| st.info(f"ποΈ Batch {i//batch_size + 1}: Deleted {len(batch_ids)} chunks") | |
| except Exception as batch_error: | |
| st.warning(f"Batch deletion failed: {str(batch_error)}") | |
| continue | |
| return deleted_total > 0 | |
| except Exception as id_error: | |
| st.error(f"Direct ID deletion failed: {str(id_error)}") | |
| return False | |
| def _delete_via_batch_operations(self, document_hash: str, validation_data: Dict) -> bool: | |
| """Tertiary deletion strategy using optimized batch operations.""" | |
| try: | |
| # Get all items and filter out target document | |
| all_items = self.collection.get(include=['documents', 'metadatas']) | |
| # Identify items to keep (inverse deletion approach) | |
| items_to_keep = { | |
| 'ids': [], | |
| 'documents': [], | |
| 'metadatas': [] | |
| } | |
| for item_id, doc, metadata in zip(all_items['ids'], all_items['documents'], all_items['metadatas']): | |
| if metadata.get('document_hash') != document_hash: | |
| items_to_keep['ids'].append(item_id) | |
| items_to_keep['documents'].append(doc) | |
| items_to_keep['metadatas'].append(metadata) | |
| # Reset collection and add back only items to keep | |
| collection_metadata = self.collection.metadata | |
| self.client.delete_collection(self.collection_name) | |
| self.collection = self.client.create_collection( | |
| name=self.collection_name, | |
| embedding_function=None, | |
| metadata=collection_metadata | |
| ) | |
| # Re-add items that should be kept | |
| if items_to_keep['ids']: | |
| # Need to get embeddings back - this is complex, skip for now | |
| st.warning("Batch operation requires embedding reconstruction - skipping") | |
| return False | |
| st.info("π Batch operation completed") | |
| return True | |
| except Exception as batch_error: | |
| st.error(f"Batch operation failed: {str(batch_error)}") | |
| return False | |
| def _delete_via_collection_reset(self, document_hash: str, validation_data: Dict) -> bool: | |
| """Nuclear option: reset collection and rebuild without target document.""" | |
| try: | |
| st.warning("β οΈ **NUCLEAR OPTION**: Rebuilding entire collection") | |
| # This is a last resort - requires careful implementation | |
| # For now, return False to avoid data loss | |
| st.error("Nuclear reset not implemented for safety - manual intervention required") | |
| return False | |
| except Exception as reset_error: | |
| st.error(f"Collection reset failed: {str(reset_error)}") | |
| return False | |
| def _execute_post_deletion_verification(self, document_hash: str) -> Dict[str, Any]: | |
| """Verify deletion completion with comprehensive checks.""" | |
| verification_result = { | |
| "is_clean": False, | |
| "remaining_chunks": 0, | |
| "verification_methods": {} | |
| } | |
| try: | |
| # Method 1: WHERE clause verification | |
| where_results = self.collection.get(where={"document_hash": document_hash}) | |
| remaining_via_where = len(where_results['ids']) | |
| verification_result["verification_methods"]["where_clause"] = remaining_via_where | |
| # Method 2: Full scan verification | |
| all_items = self.collection.get(include=['metadatas']) | |
| remaining_via_scan = sum( | |
| 1 for metadata in all_items['metadatas'] | |
| if metadata.get('document_hash') == document_hash | |
| ) | |
| verification_result["verification_methods"]["full_scan"] = remaining_via_scan | |
| # Determine overall cleanliness | |
| verification_result["remaining_chunks"] = max(remaining_via_where, remaining_via_scan) | |
| verification_result["is_clean"] = verification_result["remaining_chunks"] == 0 | |
| if verification_result["is_clean"]: | |
| st.success("β Verification passed - document completely removed") | |
| else: | |
| st.warning(f"β οΈ Verification found {verification_result['remaining_chunks']} remaining chunks") | |
| return verification_result | |
| except Exception as verification_error: | |
| st.error(f"Verification failed: {str(verification_error)}") | |
| verification_result["verification_error"] = str(verification_error) | |
| return verification_result | |
| def _execute_comprehensive_cleanup(self, document_hash: str): | |
| """Execute comprehensive cleanup of metadata and cached data.""" | |
| try: | |
| # Remove metadata file | |
| metadata_file = Path(self.config.VECTOR_DB_PATH) / "metadata" / f"{document_hash}.json" | |
| if metadata_file.exists(): | |
| metadata_file.unlink() | |
| st.info("π§Ή Metadata file removed") | |
| # Clear any cached data in session state | |
| cache_keys_to_clear = [ | |
| 'admin_documents_cache', | |
| 'document_list_cache', | |
| 'admin_stats_cache' | |
| ] | |
| for key in cache_keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| st.info("π Cache cleared") | |
| except Exception as cleanup_error: | |
| st.warning(f"Cleanup warning: {str(cleanup_error)}") | |
| def _trigger_ui_state_refresh(self): | |
| """Trigger comprehensive UI state refresh to reflect deletion.""" | |
| # Force refresh of admin components | |
| refresh_triggers = [ | |
| 'admin_refresh_counter', | |
| 'document_management_refresh', | |
| 'collection_stats_refresh' | |
| ] | |
| for trigger in refresh_triggers: | |
| if trigger not in st.session_state: | |
| st.session_state[trigger] = 0 | |
| st.session_state[trigger] += 1 | |
| # Set global refresh flag | |
| st.session_state.force_admin_refresh = True | |
| st.info("π UI refresh triggered") | |
| def _record_deletion_success(self, session_id: str, strategy: str, operation_time: float): | |
| """Record successful deletion for analytics and optimization.""" | |
| success_record = { | |
| "session_id": session_id, | |
| "strategy_used": strategy, | |
| "operation_time": operation_time, | |
| "timestamp": time.time(), | |
| "collection_size_after": self.collection.count() | |
| } | |
| self.deletion_diagnostics["operations"].append(success_record) | |
| st.info(f"π Operation recorded: {strategy} in {operation_time:.2f}s") | |
| def _provide_failure_diagnostics(self, document_hash: str, session_id: str): | |
| """Provide comprehensive failure diagnostics for troubleshooting.""" | |
| st.error("π¨ **DELETION FAILURE ANALYSIS**") | |
| diagnostic_data = { | |
| "session_id": session_id, | |
| "document_hash": document_hash[:16] + "...", | |
| "collection_info": { | |
| "total_items": self.collection.count(), | |
| "collection_name": self.collection_name | |
| }, | |
| "attempted_strategies": ["where_clause", "direct_ids", "batch_operations"], | |
| "system_state": { | |
| "chromadb_version": chromadb.__version__, | |
| "python_version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}" | |
| } | |
| } | |
| with st.expander("π **Technical Diagnostics**", expanded=True): | |
| st.json(diagnostic_data) | |
| st.markdown("**π οΈ Troubleshooting Steps:**") | |
| st.write("1. **Verify Collection Access**: Check if collection is properly initialized") | |
| st.write("2. **Manual Verification**: Use admin panel to verify document existence") | |
| st.write("3. **System Restart**: Try refreshing the application") | |
| st.write("4. **Alternative Approach**: Use collection reset if data loss is acceptable") | |
| if st.button("π **Force Collection Refresh**", key=f"force_refresh_{session_id}"): | |
| try: | |
| self.collection = self._get_or_create_collection_robust() | |
| st.success("β Collection refreshed - try deletion again") | |
| st.rerun() | |
| except Exception as refresh_error: | |
| st.error(f"Refresh failed: {str(refresh_error)}") | |
| # Keep all other existing methods from the original VectorStore class | |
| # Just replace the delete_document method with delete_document_bulletproof | |
| def delete_document(self, document_hash: str) -> bool: | |
| """Wrapper method for backwards compatibility.""" | |
| return self.delete_document_bulletproof(document_hash) | |
| # Include all other original methods here for completeness | |
| def add_document(self, processed_doc: Dict[str, Any]) -> bool: | |
| """Add processed document with chunks and embeddings to vector store.""" | |
| try: | |
| # Check if document already exists | |
| existing_docs = self.get_documents_by_hash(processed_doc['document_hash']) | |
| if existing_docs: | |
| st.warning(f"Document {processed_doc['filename']} already exists in knowledge base") | |
| return False | |
| # Prepare data for ChromaDB | |
| chunk_ids = [] | |
| embeddings = [] | |
| documents = [] | |
| metadatas = [] | |
| for i, chunk in enumerate(processed_doc['chunks']): | |
| # Generate unique ID for each chunk | |
| chunk_id = f"{processed_doc['document_hash']}_{i}" | |
| chunk_ids.append(chunk_id) | |
| # Extract embedding | |
| embeddings.append(chunk['embedding']) | |
| # Store chunk content | |
| documents.append(chunk['content']) | |
| # Prepare metadata (ChromaDB doesn't support nested objects) | |
| metadata = { | |
| 'source': processed_doc['filename'], | |
| 'document_hash': processed_doc['document_hash'], | |
| 'chunk_index': chunk['metadata']['chunk_index'], | |
| 'chunk_type': chunk['metadata']['chunk_type'], | |
| 'processed_at': chunk['metadata'].get('processed_at', time.time()), | |
| 'content_length': len(chunk['content']), | |
| 'document_type': chunk['metadata'].get('document_type', 'hr_policy') | |
| } | |
| # Add section header if available | |
| if 'section_header' in chunk['metadata']: | |
| metadata['section_header'] = chunk['metadata']['section_header'] | |
| metadatas.append(metadata) | |
| # Add to collection in batch for efficiency | |
| self.collection.add( | |
| ids=chunk_ids, | |
| embeddings=embeddings, | |
| documents=documents, | |
| metadatas=metadatas | |
| ) | |
| # Store document-level metadata separately | |
| self._store_document_metadata(processed_doc) | |
| st.success(f"β Added {len(chunk_ids)} chunks from {processed_doc['filename']} to knowledge base") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to add document to vector store: {str(e)}") | |
| return False | |
| def _store_document_metadata(self, processed_doc: Dict[str, Any]): | |
| """Store document-level metadata for management and tracking.""" | |
| try: | |
| metadata_dir = Path(self.config.VECTOR_DB_PATH) / "metadata" | |
| metadata_dir.mkdir(exist_ok=True) | |
| metadata_file = metadata_dir / f"{processed_doc['document_hash']}.json" | |
| doc_metadata = { | |
| 'filename': processed_doc['filename'], | |
| 'document_hash': processed_doc['document_hash'], | |
| 'chunk_count': processed_doc['chunk_count'], | |
| 'total_tokens': processed_doc['total_tokens'], | |
| 'processed_at': time.time(), | |
| 'metadata': processed_doc['metadata'] | |
| } | |
| with open(metadata_file, 'w') as f: | |
| json.dump(doc_metadata, f, indent=2) | |
| except Exception as e: | |
| st.warning(f"Failed to store document metadata: {str(e)}") | |
| def similarity_search(self, query: str, k: int = 5, filter_metadata: Optional[Dict] = None) -> List[Dict[str, Any]]: | |
| """Perform semantic similarity search with advanced filtering and ranking.""" | |
| try: | |
| # Import here to avoid loading model at startup | |
| from sentence_transformers import SentenceTransformer | |
| # Generate query embedding | |
| embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| query_embedding = embedding_model.encode([query], normalize_embeddings=True)[0].tolist() | |
| # Perform similarity search | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=min(k * 2, 20), # Get more results for re-ranking | |
| where=filter_metadata, | |
| include=['documents', 'metadatas', 'distances'] | |
| ) | |
| if not results['documents'][0]: | |
| return [] | |
| # Process and rank results | |
| processed_results = [] | |
| for i, (doc, metadata, distance) in enumerate(zip( | |
| results['documents'][0], | |
| results['metadatas'][0], | |
| results['distances'][0] | |
| )): | |
| # Convert distance to similarity score | |
| similarity_score = 1.0 - distance | |
| # Apply content-based scoring | |
| content_score = self._calculate_content_relevance(query, doc) | |
| # Combine scores with weighting | |
| final_score = (similarity_score * 0.7) + (content_score * 0.3) | |
| processed_results.append({ | |
| 'content': doc, | |
| 'metadata': metadata, | |
| 'similarity_score': similarity_score, | |
| 'content_score': content_score, | |
| 'final_score': final_score, | |
| 'rank': i + 1 | |
| }) | |
| # Sort by final score and return top k | |
| processed_results.sort(key=lambda x: x['final_score'], reverse=True) | |
| return processed_results[:k] | |
| except Exception as e: | |
| st.error(f"Similarity search failed: {str(e)}") | |
| return [] | |
| def _calculate_content_relevance(self, query: str, content: str) -> float: | |
| """Calculate content-based relevance score using keyword matching and context analysis.""" | |
| try: | |
| query_words = set(query.lower().split()) | |
| content_words = set(content.lower().split()) | |
| # Keyword overlap score | |
| common_words = query_words.intersection(content_words) | |
| keyword_score = len(common_words) / len(query_words) if query_words else 0 | |
| # Length penalty for very short chunks | |
| length_score = min(len(content) / 200, 1.0) | |
| # Section header bonus | |
| if any(word in content.lower()[:100] for word in ['policy', 'procedure', 'guidelines']): | |
| header_bonus = 0.1 | |
| else: | |
| header_bonus = 0 | |
| return min(keyword_score + length_score * 0.3 + header_bonus, 1.0) | |
| except Exception: | |
| return 0.5 # Default score if calculation fails | |
| def get_documents_by_hash(self, document_hash: str) -> List[Dict[str, Any]]: | |
| """Retrieve all chunks for a specific document by hash.""" | |
| try: | |
| results = self.collection.get( | |
| where={"document_hash": document_hash}, | |
| include=['documents', 'metadatas'] | |
| ) | |
| chunks = [] | |
| for doc, metadata in zip(results['documents'], results['metadatas']): | |
| chunks.append({ | |
| 'content': doc, | |
| 'metadata': metadata | |
| }) | |
| return chunks | |
| except Exception as e: | |
| st.error(f"Failed to retrieve document: {str(e)}") | |
| return [] | |
| def get_all_documents(self) -> List[Dict[str, Any]]: | |
| """Get metadata for all documents in the knowledge base.""" | |
| try: | |
| # Get unique documents from collection | |
| results = self.collection.get(include=['metadatas']) | |
| if not results['metadatas']: | |
| return [] | |
| # Group by document hash | |
| documents = {} | |
| for metadata in results['metadatas']: | |
| doc_hash = metadata['document_hash'] | |
| if doc_hash not in documents: | |
| documents[doc_hash] = { | |
| 'document_hash': doc_hash, | |
| 'filename': metadata['source'], | |
| 'document_type': metadata.get('document_type', 'hr_policy'), | |
| 'processed_at': metadata.get('processed_at', 0), | |
| 'chunk_count': 0 | |
| } | |
| documents[doc_hash]['chunk_count'] += 1 | |
| # Load additional metadata from files | |
| metadata_dir = Path(self.config.VECTOR_DB_PATH) / "metadata" | |
| if metadata_dir.exists(): | |
| for metadata_file in metadata_dir.glob("*.json"): | |
| try: | |
| with open(metadata_file, 'r') as f: | |
| file_metadata = json.load(f) | |
| doc_hash = file_metadata['document_hash'] | |
| if doc_hash in documents: | |
| documents[doc_hash].update(file_metadata) | |
| except Exception as e: | |
| continue | |
| return list(documents.values()) | |
| except Exception as e: | |
| st.error(f"Failed to retrieve documents: {str(e)}") | |
| return [] | |
| def get_document_count(self) -> int: | |
| """Get total number of documents in knowledge base.""" | |
| try: | |
| documents = self.get_all_documents() | |
| return len(documents) | |
| except Exception: | |
| return 0 | |
| def get_total_chunks(self) -> int: | |
| """Get total number of chunks in knowledge base.""" | |
| try: | |
| collection_info = self.collection.count() | |
| return collection_info | |
| except Exception: | |
| return 0 | |
| def get_collection_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive statistics about the knowledge base.""" | |
| try: | |
| documents = self.get_all_documents() | |
| total_chunks = self.get_total_chunks() | |
| if not documents: | |
| return { | |
| 'total_documents': 0, | |
| 'total_chunks': 0, | |
| 'avg_chunks_per_doc': 0, | |
| 'document_types': {}, | |
| 'latest_update': None | |
| } | |
| # Calculate statistics | |
| document_types = {} | |
| latest_update = 0 | |
| for doc in documents: | |
| doc_type = doc.get('document_type', 'unknown') | |
| document_types[doc_type] = document_types.get(doc_type, 0) + 1 | |
| processed_at = doc.get('processed_at', 0) | |
| if processed_at > latest_update: | |
| latest_update = processed_at | |
| avg_chunks = total_chunks / len(documents) if documents else 0 | |
| return { | |
| 'total_documents': len(documents), | |
| 'total_chunks': total_chunks, | |
| 'avg_chunks_per_doc': round(avg_chunks, 1), | |
| 'document_types': document_types, | |
| 'latest_update': latest_update, | |
| 'storage_path': str(self.config.VECTOR_DB_PATH) | |
| } | |
| except Exception as e: | |
| st.error(f"Failed to get collection stats: {str(e)}") | |
| return {} | |
| def reset_collection(self) -> bool: | |
| """Reset the entire knowledge base (use with caution).""" | |
| try: | |
| # Delete collection | |
| self.client.delete_collection(self.collection_name) | |
| # Recreate collection | |
| self.collection = self._get_or_create_collection_robust() | |
| # Clean up metadata files | |
| metadata_dir = Path(self.config.VECTOR_DB_PATH) / "metadata" | |
| if metadata_dir.exists(): | |
| for metadata_file in metadata_dir.glob("*.json"): | |
| metadata_file.unlink() | |
| st.success("β Knowledge base reset successfully") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to reset collection: {str(e)}") | |
| return False | |
| def health_check(self) -> Dict[str, Any]: | |
| """Perform health check on vector store system.""" | |
| try: | |
| # Check collection accessibility | |
| collection_healthy = True | |
| try: | |
| self.collection.count() | |
| except Exception: | |
| collection_healthy = False | |
| # Check storage path | |
| storage_accessible = Path(self.config.VECTOR_DB_PATH).exists() | |
| # Get basic stats | |
| stats = self.get_collection_stats() | |
| return { | |
| 'collection_healthy': collection_healthy, | |
| 'storage_accessible': storage_accessible, | |
| 'total_documents': stats.get('total_documents', 0), | |
| 'total_chunks': stats.get('total_chunks', 0), | |
| 'last_check': time.time(), | |
| 'status': 'healthy' if (collection_healthy and storage_accessible) else 'unhealthy' | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'error_message': str(e), | |
| 'last_check': time.time() | |
| } | |
| # Replace the original VectorStore with our bulletproof version | |
| VectorStore = BulletproofVectorStore |