"""Vector store management and operations.""" import os # Disable MPS before importing torch to prevent meta tensor issues on Mac os.environ.setdefault("PYTORCH_ENABLE_MPS_FALLBACK", "1") os.environ.setdefault("PYTORCH_MPS_HIGH_WATERMARK_RATIO", "0.0") from pathlib import Path from typing import Dict, Any, List, Optional import torch # Disable MPS backend explicitly to prevent meta tensor issues if hasattr(torch.backends, 'mps'): # Monkey patch to disable MPS original_mps_available = torch.backends.mps.is_available torch.backends.mps.is_available = lambda: False from langchain_qdrant import QdrantVectorStore from langchain_core.embeddings import Embeddings from sentence_transformers import SentenceTransformer from langchain_huggingface import HuggingFaceEmbeddings try: from langchain.docstore.document import Document except: from langchain_core.documents import Document class MatryoshkaEmbeddings(Embeddings): """Custom embeddings class that supports Matryoshka dimension truncation.""" def __init__(self, model_name: str, truncate_dim: int = None, **kwargs): """ Initialize Matryoshka embeddings. Args: model_name: Name of the model truncate_dim: Dimension to truncate to (for Matryoshka models) **kwargs: Additional arguments (ignored for Matryoshka models) """ self.model_name = model_name self.truncate_dim = truncate_dim if truncate_dim and "matryoshka" in model_name.lower(): # Use SentenceTransformer directly for Matryoshka models # Fix for meta tensor issue: Explicitly force CPU # MPS is already disabled at module level # Explicitly pass device="cpu" to prevent MPS/CUDA detection self.model = SentenceTransformer( model_name, truncate_dim=truncate_dim, device="cpu" # Force CPU to prevent meta tensor issues ) print(f"🔧 Matryoshka model configured for {truncate_dim} dimensions") else: # Use standard HuggingFaceEmbeddings # Don't pass device parameter - let it load naturally on CPU # This prevents the meta tensor error if "model_kwargs" not in kwargs: kwargs["model_kwargs"] = {} # Remove device from model_kwargs if present to prevent meta tensor issues kwargs["model_kwargs"].pop("device", None) self.model = HuggingFaceEmbeddings(model_name=model_name, **kwargs) def embed_documents(self, texts: List[str]) -> List[List[float]]: """Embed documents.""" if self.truncate_dim and "matryoshka" in self.model_name.lower(): embeddings = self.model.encode(texts, normalize_embeddings=True) return embeddings.tolist() else: return self.model.embed_documents(texts) def embed_query(self, text: str) -> List[float]: """Embed query.""" if self.truncate_dim and "matryoshka" in self.model_name.lower(): embedding = self.model.encode([text], normalize_embeddings=True) return embedding[0].tolist() else: return self.model.embed_query(text) class VectorStoreManager: """Manages vector store operations and connections.""" def __init__(self, config: Dict[str, Any]): """ Initialize vector store manager. Args: config: Configuration dictionary """ self.config = config self.embeddings = self._create_embeddings() self.vectorstore = None # Define metadata fields that need payload indexes for filtering self.metadata_fields = [ ("metadata.year", "keyword"), ("metadata.source", "keyword"), ("metadata.filename", "keyword"), # Add more metadata fields as needed ] def _create_embeddings(self) -> HuggingFaceEmbeddings: """Create embeddings model from configuration.""" model_name = self.config["retriever"]["model"] normalize = self.config["retriever"]["normalize"] # Fix for meta tensor issue: Force CPU usage to prevent MPS/CUDA detection # The error occurs when SentenceTransformer detects MPS/CUDA and tries to move meta tensors # MPS is already disabled at module level, now we explicitly force CPU in model_kwargs model_kwargs = { "device": "cpu", # Explicitly force CPU to prevent MPS/CUDA detection "trust_remote_code": True, # Some models need this } encode_kwargs = { "normalize_embeddings": normalize, "batch_size": 100, } # For Matryoshka models, check if we need to truncate dimensions if "matryoshka" in model_name.lower(): # Check if we have a specific dimension requirement collection_name = self.config.get("qdrant", {}).get("collection_name", "") if "modernbert-embed-base-akryl-matryoshka" in collection_name: # This collection expects 768 dimensions truncate_dim = 768 print(f"🔧 Matryoshka model configured for {truncate_dim} dimensions") # Use custom MatryoshkaEmbeddings embeddings = MatryoshkaEmbeddings( model_name=model_name, truncate_dim=truncate_dim, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs, show_progress=True, ) return embeddings # Use standard HuggingFaceEmbeddings for non-Matryoshka models # Don't pass device in model_kwargs - let HuggingFaceEmbeddings handle it # but ensure we're not using meta device embeddings = HuggingFaceEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs, show_progress=True, ) return embeddings def ensure_metadata_indexes(self) -> None: """ Create payload indexes for all required metadata fields. This ensures filtering works properly, especially in Qdrant Cloud. """ if not self.vectorstore: return qdrant_config = self.config["qdrant"] collection_name = qdrant_config["collection_name"] for field_name, field_type in self.metadata_fields: try: self.vectorstore.client.create_payload_index( collection_name=collection_name, field_name=field_name, field_type=field_type ) print(f"Created payload index for {field_name} ({field_type})") except Exception as e: # Index might already exist or other error - log but continue print(f"Index creation for {field_name} ({field_type}): {str(e)}") def connect_to_existing(self, force_recreate: bool = False) -> QdrantVectorStore: """ Connect to existing Qdrant collection. Args: force_recreate: If True, recreate the collection if dimension mismatch occurs Returns: QdrantVectorStore instance """ qdrant_config = self.config["qdrant"] kwargs_qdrant = { "url": qdrant_config["url"], "collection_name": qdrant_config["collection_name"], "prefer_grpc": qdrant_config.get("prefer_grpc", True), "api_key": qdrant_config.get("api_key", None), } if force_recreate: kwargs_qdrant["force_recreate"] = True self.vectorstore = QdrantVectorStore.from_existing_collection( embedding=self.embeddings, **kwargs_qdrant ) # Ensure payload indexes exist for metadata filtering self.ensure_metadata_indexes() return self.vectorstore def create_from_documents(self, documents: List[Document]) -> QdrantVectorStore: """ Create new Qdrant collection from documents. Args: documents: List of Document objects Returns: QdrantVectorStore instance """ qdrant_config = self.config["qdrant"] kwargs_qdrant = { "url": qdrant_config["url"], "collection_name": qdrant_config["collection_name"], "prefer_grpc": qdrant_config.get("prefer_grpc", True), "api_key": qdrant_config.get("api_key", None), } self.vectorstore = QdrantVectorStore.from_documents( documents=documents, embedding=self.embeddings, **kwargs_qdrant ) # Ensure payload indexes exist for metadata filtering self.ensure_metadata_indexes() return self.vectorstore def delete_collection(self) -> None: """ Delete the current Qdrant collection. Returns: QdrantVectorStore instance """ qdrant_config = self.config["qdrant"] collection_name = qdrant_config.get("collection_name") self.vectorstore.client.delete_collection( collection_name=collection_name ) return self.vectorstore def get_vectorstore(self) -> Optional[QdrantVectorStore]: """Get current vectorstore instance.""" return self.vectorstore def get_local_qdrant(config: Dict[str, Any]) -> QdrantVectorStore: """ Get local Qdrant vector store (legacy function for compatibility). Args: config: Configuration dictionary Returns: QdrantVectorStore instance """ manager = VectorStoreManager(config) return manager.connect_to_existing() def create_vectorstore(config: Dict[str, Any], documents: List[Document]) -> QdrantVectorStore: """ Create new vector store from documents. Args: config: Configuration dictionary documents: List of Document objects Returns: QdrantVectorStore instance """ manager = VectorStoreManager(config) return manager.create_from_documents(documents) def get_embeddings_model(config: Dict[str, Any]) -> HuggingFaceEmbeddings: """ Create embeddings model from configuration (legacy function). Args: config: Configuration dictionary Returns: HuggingFaceEmbeddings instance """ manager = VectorStoreManager(config) return manager.embeddings