| | """ |
| | Embedding generation supporting multiple model backends. |
| | |
| | This module provides efficient batch embedding generation with automatic |
| | model loading, caching, and device management. Supports both SentenceTransformers |
| | models and NVIDIA NV-Embed-v2. |
| | """ |
| |
|
| | import numpy as np |
| | import torch |
| | from typing import List, Optional |
| | from tqdm import tqdm |
| | from src.config.settings import get_settings, get_embedding_model_config, EMBEDDING_MODELS |
| | from src.utils.logging import get_logger, log_embedding_generation |
| | from src.ingestion.models import Chunk |
| | import time |
| |
|
| | logger = get_logger(__name__) |
| |
|
| |
|
| | class Embedder: |
| | """Generate embeddings using SentenceTransformers or NV-Embed-v2.""" |
| |
|
| | |
| | NVEMBED_MODELS = ["nvidia/NV-Embed-v2", "nvidia/NV-Embed-v1"] |
| |
|
| | def __init__(self, model_name: Optional[str] = None): |
| | """ |
| | Initialize embedder with specified or default model. |
| | |
| | Args: |
| | model_name: Optional model identifier. If None, uses settings default. |
| | """ |
| | settings = get_settings() |
| | self.model_name = model_name or settings.embedding_model |
| | self.device = settings.embedding_device |
| |
|
| | |
| | try: |
| | model_config = get_embedding_model_config(self.model_name) |
| | self.batch_size = model_config.get("batch_size", settings.embedding_batch_size) |
| | self._dimensions = model_config.get("dimensions") |
| | self._max_length = model_config.get("max_length", 512) |
| | except ValueError: |
| | |
| | self.batch_size = settings.embedding_batch_size |
| | self._dimensions = None |
| | self._max_length = 512 |
| |
|
| | self._model = None |
| | self._tokenizer = None |
| | self._is_nvembed = self.model_name in self.NVEMBED_MODELS |
| |
|
| | @property |
| | def model(self): |
| | """ |
| | Lazy load the embedding model. |
| | |
| | The model is only loaded when first accessed, and then cached for reuse. |
| | |
| | Returns: |
| | Model instance (SentenceTransformer or transformers model) |
| | """ |
| | if self._model is None: |
| | logger.info(f"Loading embedding model: {self.model_name}") |
| |
|
| | if self._is_nvembed: |
| | self._load_nvembed_model() |
| | else: |
| | self._load_sentence_transformer() |
| |
|
| | logger.info(f"Model loaded on device: {self.device}") |
| | return self._model |
| |
|
| | def _load_sentence_transformer(self): |
| | """Load a SentenceTransformer model.""" |
| | from sentence_transformers import SentenceTransformer |
| | self._model = SentenceTransformer(self.model_name) |
| | self._model.to(self.device) |
| |
|
| | def _load_nvembed_model(self): |
| | """Load NVIDIA NV-Embed-v2 model.""" |
| | from transformers import AutoModel, AutoTokenizer |
| |
|
| | logger.info("Loading NV-Embed-v2 (this may take a moment)...") |
| |
|
| | |
| | if self.device == "mps": |
| | |
| | torch_dtype = torch.float32 |
| | elif self.device == "cuda": |
| | torch_dtype = torch.float16 |
| | else: |
| | torch_dtype = torch.float32 |
| |
|
| | self._tokenizer = AutoTokenizer.from_pretrained( |
| | self.model_name, |
| | trust_remote_code=True |
| | ) |
| |
|
| | self._model = AutoModel.from_pretrained( |
| | self.model_name, |
| | trust_remote_code=True, |
| | torch_dtype=torch_dtype, |
| | ) |
| | self._model.to(self.device) |
| | self._model.eval() |
| |
|
| | def _nvembed_encode( |
| | self, |
| | texts: List[str], |
| | instruction: str = "", |
| | max_length: Optional[int] = None, |
| | ) -> np.ndarray: |
| | """ |
| | Encode texts using NV-Embed-v2's native encode method. |
| | |
| | Args: |
| | texts: List of texts to encode |
| | instruction: Instruction prefix for queries (empty for documents) |
| | max_length: Maximum sequence length (uses model config if None) |
| | |
| | Returns: |
| | np.ndarray: Embeddings array |
| | """ |
| | if max_length is None: |
| | max_length = self._max_length |
| |
|
| | all_embeddings = [] |
| |
|
| | for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding"): |
| | batch_texts = texts[i:i + self.batch_size] |
| |
|
| | |
| | with torch.no_grad(): |
| | if instruction: |
| | |
| | embeddings = self._model.encode( |
| | batch_texts, |
| | instruction=instruction, |
| | max_length=max_length, |
| | ) |
| | else: |
| | |
| | embeddings = self._model.encode( |
| | batch_texts, |
| | max_length=max_length, |
| | ) |
| |
|
| | |
| | if isinstance(embeddings, torch.Tensor): |
| | embeddings = embeddings.cpu().numpy() |
| |
|
| | all_embeddings.append(embeddings) |
| |
|
| | return np.vstack(all_embeddings) |
| |
|
| | def encode_batch(self, chunks: List[Chunk]) -> np.ndarray: |
| | """ |
| | Generate embeddings for a batch of chunks (documents). |
| | |
| | Processes chunks in smaller batches for memory efficiency and |
| | displays progress with tqdm. |
| | |
| | Args: |
| | chunks: List of chunks to embed |
| | |
| | Returns: |
| | np.ndarray: Array of embeddings with shape (num_chunks, embedding_dim) |
| | """ |
| | if not chunks: |
| | logger.warning("No chunks to embed") |
| | return np.array([]) |
| |
|
| | start_time = time.time() |
| |
|
| | |
| | texts = [chunk.text for chunk in chunks] |
| |
|
| | logger.info(f"Generating embeddings for {len(chunks)} chunks") |
| |
|
| | if self._is_nvembed: |
| | |
| | _ = self.model |
| | embeddings = self._nvembed_encode(texts, instruction="") |
| | else: |
| | |
| | embeddings = [] |
| | for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding chunks"): |
| | batch_texts = texts[i:i + self.batch_size] |
| |
|
| | batch_embeddings = self.model.encode( |
| | batch_texts, |
| | batch_size=self.batch_size, |
| | show_progress_bar=False, |
| | convert_to_numpy=True, |
| | normalize_embeddings=True |
| | ) |
| |
|
| | embeddings.append(batch_embeddings) |
| |
|
| | embeddings = np.vstack(embeddings) |
| |
|
| | |
| | duration = time.time() - start_time |
| | log_embedding_generation(logger, len(chunks), duration) |
| |
|
| | return embeddings |
| |
|
| | def encode_single(self, text: str, is_query: bool = False) -> np.ndarray: |
| | """ |
| | Generate embedding for a single text. |
| | |
| | Args: |
| | text: Text to embed |
| | is_query: If True, applies query instruction (for NV-Embed) |
| | |
| | Returns: |
| | np.ndarray: Embedding vector |
| | """ |
| | if self._is_nvembed: |
| | _ = self.model |
| | |
| | instruction = ( |
| | "Instruct: Given a question, retrieve passages that answer the question\nQuery: " |
| | if is_query else "" |
| | ) |
| | embeddings = self._nvembed_encode([text], instruction=instruction) |
| | return embeddings[0] |
| | else: |
| | embedding = self.model.encode( |
| | text, |
| | convert_to_numpy=True, |
| | normalize_embeddings=True |
| | ) |
| | return embedding |
| |
|
| | def get_embedding_dimension(self) -> int: |
| | """ |
| | Get the dimension of embeddings produced by this model. |
| | |
| | Returns: |
| | int: Embedding dimension |
| | """ |
| | |
| | if self._dimensions: |
| | return self._dimensions |
| |
|
| | |
| | _ = self.model |
| |
|
| | if self._is_nvembed: |
| | return 4096 |
| | else: |
| | return self._model.get_sentence_embedding_dimension() |
| |
|
| | def get_model_info(self) -> dict: |
| | """ |
| | Get information about the current embedding model. |
| | |
| | Returns: |
| | dict: Model information including name, dimensions, etc. |
| | """ |
| | try: |
| | config = get_embedding_model_config(self.model_name) |
| | return { |
| | "id": self.model_name, |
| | "name": config.get("name", self.model_name), |
| | "dimensions": self.get_embedding_dimension(), |
| | "type": config.get("type", "unknown"), |
| | "description": config.get("description", ""), |
| | } |
| | except ValueError: |
| | return { |
| | "id": self.model_name, |
| | "name": self.model_name.split("/")[-1], |
| | "dimensions": self.get_embedding_dimension(), |
| | "type": "unknown", |
| | "description": "Custom model", |
| | } |
| |
|