Spaces:
Sleeping
Sleeping
| """ | |
| Vector store using ChromaDB for local storage. | |
| Supports efficient similarity search and filtering. | |
| """ | |
| import chromadb | |
| from chromadb.config import Settings as ChromaSettings | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| from pathlib import Path | |
| from app.config import settings | |
| from app.rag.embeddings import get_embedding_service | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class VectorStore: | |
| """ | |
| Vector store using ChromaDB for persistent local storage. | |
| Supports CRUD operations and similarity search. | |
| """ | |
| def __init__( | |
| self, | |
| persist_directory: Path = settings.VECTORDB_DIR, | |
| collection_name: str = settings.COLLECTION_NAME | |
| ): | |
| """ | |
| Initialize the vector store. | |
| Args: | |
| persist_directory: Directory to persist the database | |
| collection_name: Name of the collection to use | |
| """ | |
| self.persist_directory = persist_directory | |
| self.collection_name = collection_name | |
| # Initialize ChromaDB client with persistence | |
| self.client = chromadb.PersistentClient( | |
| path=str(persist_directory), | |
| settings=ChromaSettings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| ) | |
| ) | |
| # Get or create collection | |
| self.collection = self.client.get_or_create_collection( | |
| name=collection_name, | |
| metadata={"hnsw:space": "cosine"} # Use cosine similarity | |
| ) | |
| logger.info(f"Vector store initialized. Collection: {collection_name}, Items: {self.collection.count()}") | |
| def add_documents( | |
| self, | |
| documents: List[str], | |
| embeddings: List[List[float]], | |
| metadatas: List[Dict[str, Any]], | |
| ids: List[str] | |
| ) -> None: | |
| """ | |
| Add documents to the vector store. | |
| Args: | |
| documents: List of document texts | |
| embeddings: List of embedding vectors | |
| metadatas: List of metadata dictionaries | |
| ids: List of unique document IDs | |
| """ | |
| if not documents: | |
| logger.warning("No documents to add") | |
| return | |
| # ChromaDB doesn't accept None values in metadata | |
| clean_metadatas = [] | |
| for meta in metadatas: | |
| clean_meta = {} | |
| for k, v in meta.items(): | |
| if v is not None: | |
| clean_meta[k] = v | |
| clean_metadatas.append(clean_meta) | |
| self.collection.add( | |
| documents=documents, | |
| embeddings=embeddings, | |
| metadatas=clean_metadatas, | |
| ids=ids | |
| ) | |
| logger.info(f"Added {len(documents)} documents to vector store") | |
| def search( | |
| self, | |
| query_embedding: List[float], | |
| top_k: int = settings.TOP_K, | |
| filter_dict: Optional[Dict[str, Any]] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for similar documents. | |
| Args: | |
| query_embedding: Query embedding vector | |
| top_k: Number of results to return | |
| filter_dict: Optional filter criteria (e.g., {"kb_id": "123"}) | |
| Returns: | |
| List of results with document, metadata, and similarity score | |
| """ | |
| # ChromaDB requires filters in $and/$or format for multiple conditions | |
| where_filter = None | |
| if filter_dict: | |
| if len(filter_dict) == 1: | |
| # Single condition - use directly | |
| where_filter = filter_dict | |
| else: | |
| # Multiple conditions - use $and operator | |
| where_filter = { | |
| "$and": [ | |
| {k: v} for k, v in filter_dict.items() | |
| ] | |
| } | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k, | |
| where=where_filter, | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| # Format results | |
| formatted_results = [] | |
| if results and results['ids'] and results['ids'][0]: | |
| for i, doc_id in enumerate(results['ids'][0]): | |
| # ChromaDB returns distances, convert to similarity | |
| # For cosine distance: similarity = 1 - distance | |
| distance = results['distances'][0][i] if results['distances'] else 0 | |
| similarity = 1 - distance # Convert distance to similarity | |
| formatted_results.append({ | |
| 'id': doc_id, | |
| 'content': results['documents'][0][i] if results['documents'] else "", | |
| 'metadata': results['metadatas'][0][i] if results['metadatas'] else {}, | |
| 'similarity_score': max(0, min(1, similarity)) # Clamp to 0-1 | |
| }) | |
| return formatted_results | |
| def delete_by_filter(self, filter_dict: Dict[str, Any]) -> int: | |
| """ | |
| Delete documents matching a filter. | |
| Args: | |
| filter_dict: Filter criteria | |
| Returns: | |
| Number of documents deleted | |
| """ | |
| # ChromaDB requires filters in $and/$or format for multiple conditions | |
| where_filter = None | |
| if len(filter_dict) == 1: | |
| where_filter = filter_dict | |
| else: | |
| where_filter = { | |
| "$and": [ | |
| {k: v} for k, v in filter_dict.items() | |
| ] | |
| } | |
| # First, find matching documents | |
| results = self.collection.get( | |
| where=where_filter, | |
| include=["metadatas"] | |
| ) | |
| if results and results['ids']: | |
| self.collection.delete(ids=results['ids']) | |
| logger.info(f"Deleted {len(results['ids'])} documents matching filter") | |
| return len(results['ids']) | |
| return 0 | |
| def delete_by_ids(self, ids: List[str]) -> None: | |
| """Delete documents by their IDs.""" | |
| if ids: | |
| self.collection.delete(ids=ids) | |
| logger.info(f"Deleted {len(ids)} documents by ID") | |
| def get_stats( | |
| self, | |
| tenant_id: Optional[str] = None, # CRITICAL: Multi-tenant isolation | |
| kb_id: Optional[str] = None, | |
| user_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get statistics about the vector store. | |
| Args: | |
| tenant_id: Tenant ID for multi-tenant isolation (REQUIRED if filtering) | |
| kb_id: Optional knowledge base ID to filter | |
| user_id: Optional user ID to filter | |
| Returns: | |
| Statistics dictionary | |
| """ | |
| filter_dict = {} | |
| if tenant_id: | |
| filter_dict["tenant_id"] = tenant_id # CRITICAL: Multi-tenant isolation | |
| if kb_id: | |
| filter_dict["kb_id"] = kb_id | |
| if user_id: | |
| filter_dict["user_id"] = user_id | |
| if filter_dict: | |
| # ChromaDB requires filters in $and/$or format for multiple conditions | |
| where_filter = None | |
| if len(filter_dict) == 1: | |
| where_filter = filter_dict | |
| else: | |
| where_filter = { | |
| "$and": [ | |
| {k: v} for k, v in filter_dict.items() | |
| ] | |
| } | |
| results = self.collection.get( | |
| where=where_filter, | |
| include=["metadatas"] | |
| ) | |
| count = len(results['ids']) if results and results['ids'] else 0 | |
| # Get unique file names | |
| file_names = set() | |
| if results and results['metadatas']: | |
| for meta in results['metadatas']: | |
| if 'file_name' in meta: | |
| file_names.add(meta['file_name']) | |
| return { | |
| "total_chunks": count, | |
| "file_names": list(file_names), | |
| "tenant_id": tenant_id, | |
| "kb_id": kb_id, | |
| "user_id": user_id | |
| } | |
| else: | |
| return { | |
| "total_chunks": self.collection.count(), | |
| "collection_name": self.collection_name | |
| } | |
| def clear_collection(self) -> None: | |
| """Clear all documents from the collection.""" | |
| self.client.delete_collection(self.collection_name) | |
| self.collection = self.client.create_collection( | |
| name=self.collection_name, | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| logger.info(f"Cleared collection: {self.collection_name}") | |
| # Global vector store instance | |
| _vector_store: Optional[VectorStore] = None | |
| def get_vector_store() -> VectorStore: | |
| """Get the global vector store instance.""" | |
| global _vector_store | |
| if _vector_store is None: | |
| _vector_store = VectorStore() | |
| return _vector_store | |