Spaces:
Sleeping
Sleeping
| from typing import List, Optional, Dict, Any | |
| from langchain_classic.schema import Document | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_qdrant import QdrantVectorStore | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| from config import Config | |
| import uuid | |
| class VectorStoreManager: | |
| """Manages Qdrant vector store operations for insurance documents""" | |
| def __init__(self): | |
| """Initialize Qdrant client and embeddings""" | |
| # Validate configuration | |
| Config.validate_config() | |
| # Get configuration | |
| self.qdrant_config = Config.get_qdrant_config() | |
| self.retrieval_config = Config.get_retrieval_config() | |
| # Initialize Qdrant client | |
| self.client = QdrantClient( | |
| url=self.qdrant_config["url"], | |
| api_key=self.qdrant_config["api_key"], | |
| ) | |
| # Initialize embeddings | |
| self.embeddings = GoogleGenerativeAIEmbeddings( | |
| model=Config.EMBEDDING_MODEL, | |
| output_dimensionality=Config.EMBEDDING_DIMENSION, | |
| google_api_key=Config.GEMINI_API_KEY | |
| ) | |
| self.collection_name = self.qdrant_config["collection_name"] | |
| print("Vector store manager initialized") | |
| def create_collection(self, recreate: bool = False) -> bool: | |
| """ | |
| Create a new collection in Qdrant | |
| Args: | |
| recreate: If True, delete existing collection and create new one | |
| Returns: | |
| Boolean indicating success | |
| """ | |
| try: | |
| # Check if collection exists | |
| collections = self.client.get_collections().collections | |
| collection_exists = any(c.name == self.collection_name for c in collections) | |
| if collection_exists: | |
| if recreate: | |
| print(f"⚠ Deleting existing collection: {self.collection_name}") | |
| self.client.delete_collection(self.collection_name) | |
| else: | |
| print(f" Collection '{self.collection_name}' already exists") | |
| return True | |
| # Create new collection | |
| self.client.create_collection( | |
| collection_name=self.collection_name, | |
| vectors_config=VectorParams( | |
| size=self.qdrant_config["vector_size"], | |
| distance=Distance.COSINE | |
| ) | |
| ) | |
| print(f" Created collection: {self.collection_name}") | |
| return True | |
| except Exception as e: | |
| print(f" Error creating collection: {str(e)}") | |
| raise | |
| def add_documents(self, documents: List[Document], batch_size: int = 100) -> List[str]: | |
| """ | |
| Add documents to Qdrant vector store | |
| Args: | |
| documents: List of Document objects to add | |
| batch_size: Number of documents to process in each batch | |
| Returns: | |
| List of document IDs | |
| """ | |
| try: | |
| print(f"Adding {len(documents)} documents to vector store...") | |
| # Ensure collection exists | |
| self.create_collection(recreate=False) | |
| # Initialize vector store | |
| vector_store = QdrantVectorStore( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| embedding=self.embeddings | |
| ) | |
| # Add documents in batches | |
| all_ids = [] | |
| for i in range(0, len(documents), batch_size): | |
| batch = documents[i:i + batch_size] | |
| # Generate unique IDs for this batch | |
| batch_ids = [str(uuid.uuid4()) for _ in batch] | |
| # Add to vector store | |
| vector_store.add_documents(documents=batch, ids=batch_ids) | |
| all_ids.extend(batch_ids) | |
| print(f" Processed batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}") | |
| print(f" Successfully added {len(documents)} documents") | |
| return all_ids | |
| except Exception as e: | |
| print(f" Error adding documents: {str(e)}") | |
| raise | |
| def similarity_search( | |
| self, | |
| query: str, | |
| k: Optional[int] = None, | |
| filter_dict: Optional[Dict[str, Any]] = None | |
| ) -> List[Document]: | |
| """ | |
| Search for similar documents using semantic similarity | |
| Args: | |
| query: Search query string | |
| k: Number of results to return (default from config) | |
| filter_dict: Optional metadata filters (e.g., {"section_type": "exclusions"}) | |
| Returns: | |
| List of most similar Documents | |
| """ | |
| try: | |
| if k is None: | |
| k = self.retrieval_config["top_k"] | |
| # Initialize vector store for querying | |
| vector_store = QdrantVectorStore( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| embedding=self.embeddings | |
| ) | |
| if filter_dict: | |
| # Get more results than needed | |
| results = vector_store.similarity_search(query=query, k=k*3) | |
| # Filter by metadata | |
| filtered_results = [] | |
| for doc in results: | |
| match = True | |
| for key, value in filter_dict.items(): | |
| if doc.metadata.get(key) != value: | |
| match = False | |
| break | |
| if match: | |
| filtered_results.append(doc) | |
| # Stop when we have enough results | |
| if len(filtered_results) >= k: | |
| break | |
| return filtered_results[:k] | |
| else: | |
| results = vector_store.similarity_search(query=query, k=k) | |
| return results | |
| except Exception as e: | |
| print(f" Error during similarity search: {str(e)}") | |
| raise | |
| def similarity_search_with_score( | |
| self, | |
| query: str, | |
| k: Optional[int] = None, | |
| score_threshold: Optional[float] = None | |
| ) -> List[tuple[Document, float]]: | |
| """ | |
| Search with similarity scores | |
| Args: | |
| query: Search query string | |
| k: Number of results to return | |
| score_threshold: Minimum similarity score (default from config) | |
| Returns: | |
| List of (Document, score) tuples | |
| """ | |
| try: | |
| if k is None: | |
| k = self.retrieval_config["top_k"] | |
| if score_threshold is None: | |
| score_threshold = self.retrieval_config["similarity_threshold"] | |
| # Initialize vector store | |
| vector_store = QdrantVectorStore( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| embedding=self.embeddings | |
| ) | |
| # Search with scores | |
| results = vector_store.similarity_search_with_score(query=query, k=k) | |
| # Filter by score threshold | |
| filtered_results = [ | |
| (doc, score) for doc, score in results | |
| if score >= score_threshold | |
| ] | |
| print(f" Found {len(filtered_results)} results above threshold {score_threshold}") | |
| return filtered_results | |
| except Exception as e: | |
| print(f" Error during similarity search with score: {str(e)}") | |
| raise | |
| def search_by_section_type( | |
| self, | |
| query: str, | |
| section_type: str, | |
| k: Optional[int] = None | |
| ) -> List[Document]: | |
| """ | |
| Search within a specific section type (e.g., 'exclusions', 'addons') | |
| Args: | |
| query: Search query string | |
| section_type: Type of section to search in | |
| k: Number of results to return | |
| Returns: | |
| List of Documents from specified section type | |
| """ | |
| filter_dict = {"section_type": section_type} | |
| return self.similarity_search(query=query, k=k, filter_dict=filter_dict) | |
| def get_collection_info(self) -> Dict: | |
| """ | |
| Get information about the current collection | |
| Returns: | |
| Dictionary with collection statistics | |
| """ | |
| try: | |
| collection_info = self.client.get_collection(self.collection_name) | |
| return { | |
| "name": self.collection_name, | |
| "vectors_count": collection_info.vectors_count, | |
| "points_count": collection_info.points_count, | |
| "status": collection_info.status, | |
| } | |
| except Exception as e: | |
| print(f" Error getting collection info: {str(e)}") | |
| return {} | |
| def delete_collection(self) -> bool: | |
| """ | |
| Delete the current collection | |
| Returns: | |
| Boolean indicating success | |
| """ | |
| try: | |
| self.client.delete_collection(self.collection_name) | |
| print(f" Deleted collection: {self.collection_name}") | |
| return True | |
| except Exception as e: | |
| print(f" Error deleting collection: {str(e)}") | |
| return False | |
| def get_retriever(self, **kwargs): | |
| """ | |
| Get a LangChain retriever object for use in chains | |
| Args: | |
| **kwargs: Additional arguments for retriever configuration | |
| Returns: | |
| VectorStoreRetriever object | |
| """ | |
| vector_store = QdrantVectorStore( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| embedding=self.embeddings | |
| ) | |
| # Set default search kwargs | |
| search_kwargs = { | |
| "k": self.retrieval_config["top_k"] | |
| } | |
| search_kwargs.update(kwargs) | |
| return vector_store.as_retriever(search_kwargs=search_kwargs) | |