""" Embedding generation supporting multiple model backends. This module provides efficient batch embedding generation with automatic model loading, caching, and device management. Supports both SentenceTransformers models and NVIDIA NV-Embed-v2. """ import numpy as np import torch from typing import List, Optional from tqdm import tqdm from src.config.settings import get_settings, get_embedding_model_config, EMBEDDING_MODELS from src.utils.logging import get_logger, log_embedding_generation from src.ingestion.models import Chunk import time logger = get_logger(__name__) class Embedder: """Generate embeddings using SentenceTransformers or NV-Embed-v2.""" # Models that require special handling NVEMBED_MODELS = ["nvidia/NV-Embed-v2", "nvidia/NV-Embed-v1"] def __init__(self, model_name: Optional[str] = None): """ Initialize embedder with specified or default model. Args: model_name: Optional model identifier. If None, uses settings default. """ settings = get_settings() self.model_name = model_name or settings.embedding_model self.device = settings.embedding_device # Get model-specific config try: model_config = get_embedding_model_config(self.model_name) self.batch_size = model_config.get("batch_size", settings.embedding_batch_size) self._dimensions = model_config.get("dimensions") self._max_length = model_config.get("max_length", 512) except ValueError: # Fallback for unknown models self.batch_size = settings.embedding_batch_size self._dimensions = None self._max_length = 512 self._model = None self._tokenizer = None self._is_nvembed = self.model_name in self.NVEMBED_MODELS @property def model(self): """ Lazy load the embedding model. The model is only loaded when first accessed, and then cached for reuse. Returns: Model instance (SentenceTransformer or transformers model) """ if self._model is None: logger.info(f"Loading embedding model: {self.model_name}") if self._is_nvembed: self._load_nvembed_model() else: self._load_sentence_transformer() logger.info(f"Model loaded on device: {self.device}") return self._model def _load_sentence_transformer(self): """Load a SentenceTransformer model.""" from sentence_transformers import SentenceTransformer self._model = SentenceTransformer(self.model_name) self._model.to(self.device) def _load_nvembed_model(self): """Load NVIDIA NV-Embed-v2 model.""" from transformers import AutoModel, AutoTokenizer logger.info("Loading NV-Embed-v2 (this may take a moment)...") # Determine torch dtype based on device if self.device == "mps": # MPS works best with float32 for this model torch_dtype = torch.float32 elif self.device == "cuda": torch_dtype = torch.float16 else: torch_dtype = torch.float32 self._tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True ) self._model = AutoModel.from_pretrained( self.model_name, trust_remote_code=True, torch_dtype=torch_dtype, ) self._model.to(self.device) self._model.eval() def _nvembed_encode( self, texts: List[str], instruction: str = "", max_length: Optional[int] = None, ) -> np.ndarray: """ Encode texts using NV-Embed-v2's native encode method. Args: texts: List of texts to encode instruction: Instruction prefix for queries (empty for documents) max_length: Maximum sequence length (uses model config if None) Returns: np.ndarray: Embeddings array """ if max_length is None: max_length = self._max_length all_embeddings = [] for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding"): batch_texts = texts[i:i + self.batch_size] # Use NV-Embed's native encode method with torch.no_grad(): if instruction: # For queries: use instruction embeddings = self._model.encode( batch_texts, instruction=instruction, max_length=max_length, ) else: # For documents: no instruction needed embeddings = self._model.encode( batch_texts, max_length=max_length, ) # Handle both tensor and numpy outputs if isinstance(embeddings, torch.Tensor): embeddings = embeddings.cpu().numpy() all_embeddings.append(embeddings) return np.vstack(all_embeddings) def encode_batch(self, chunks: List[Chunk]) -> np.ndarray: """ Generate embeddings for a batch of chunks (documents). Processes chunks in smaller batches for memory efficiency and displays progress with tqdm. Args: chunks: List of chunks to embed Returns: np.ndarray: Array of embeddings with shape (num_chunks, embedding_dim) """ if not chunks: logger.warning("No chunks to embed") return np.array([]) start_time = time.time() # Extract text from chunks texts = [chunk.text for chunk in chunks] logger.info(f"Generating embeddings for {len(chunks)} chunks") if self._is_nvembed: # NV-Embed: documents don't need instruction prefix _ = self.model # Ensure model is loaded embeddings = self._nvembed_encode(texts, instruction="") else: # SentenceTransformers path embeddings = [] for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding chunks"): batch_texts = texts[i:i + self.batch_size] batch_embeddings = self.model.encode( batch_texts, batch_size=self.batch_size, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True ) embeddings.append(batch_embeddings) embeddings = np.vstack(embeddings) # Log performance duration = time.time() - start_time log_embedding_generation(logger, len(chunks), duration) return embeddings def encode_single(self, text: str, is_query: bool = False) -> np.ndarray: """ Generate embedding for a single text. Args: text: Text to embed is_query: If True, applies query instruction (for NV-Embed) Returns: np.ndarray: Embedding vector """ if self._is_nvembed: _ = self.model # Ensure model is loaded # NV-Embed uses instruction prefix for queries instruction = ( "Instruct: Given a question, retrieve passages that answer the question\nQuery: " if is_query else "" ) embeddings = self._nvembed_encode([text], instruction=instruction) return embeddings[0] else: embedding = self.model.encode( text, convert_to_numpy=True, normalize_embeddings=True ) return embedding def get_embedding_dimension(self) -> int: """ Get the dimension of embeddings produced by this model. Returns: int: Embedding dimension """ # Use pre-configured dimensions if available if self._dimensions: return self._dimensions # Otherwise, load model and query _ = self.model # Ensure model is loaded if self._is_nvembed: return 4096 else: return self._model.get_sentence_embedding_dimension() def get_model_info(self) -> dict: """ Get information about the current embedding model. Returns: dict: Model information including name, dimensions, etc. """ try: config = get_embedding_model_config(self.model_name) return { "id": self.model_name, "name": config.get("name", self.model_name), "dimensions": self.get_embedding_dimension(), "type": config.get("type", "unknown"), "description": config.get("description", ""), } except ValueError: return { "id": self.model_name, "name": self.model_name.split("/")[-1], "dimensions": self.get_embedding_dimension(), "type": "unknown", "description": "Custom model", }