zeta / src /embedding /embedder.py
rodrigo-moonray
Deploy zeta-only embeddings (NV-Embed-v2 + E5-small)
9b457ed
"""
Embedding generation supporting multiple model backends.
This module provides efficient batch embedding generation with automatic
model loading, caching, and device management. Supports both SentenceTransformers
models and NVIDIA NV-Embed-v2.
"""
import numpy as np
import torch
from typing import List, Optional
from tqdm import tqdm
from src.config.settings import get_settings, get_embedding_model_config, EMBEDDING_MODELS
from src.utils.logging import get_logger, log_embedding_generation
from src.ingestion.models import Chunk
import time
logger = get_logger(__name__)
class Embedder:
"""Generate embeddings using SentenceTransformers or NV-Embed-v2."""
# Models that require special handling
NVEMBED_MODELS = ["nvidia/NV-Embed-v2", "nvidia/NV-Embed-v1"]
def __init__(self, model_name: Optional[str] = None):
"""
Initialize embedder with specified or default model.
Args:
model_name: Optional model identifier. If None, uses settings default.
"""
settings = get_settings()
self.model_name = model_name or settings.embedding_model
self.device = settings.embedding_device
# Get model-specific config
try:
model_config = get_embedding_model_config(self.model_name)
self.batch_size = model_config.get("batch_size", settings.embedding_batch_size)
self._dimensions = model_config.get("dimensions")
self._max_length = model_config.get("max_length", 512)
except ValueError:
# Fallback for unknown models
self.batch_size = settings.embedding_batch_size
self._dimensions = None
self._max_length = 512
self._model = None
self._tokenizer = None
self._is_nvembed = self.model_name in self.NVEMBED_MODELS
@property
def model(self):
"""
Lazy load the embedding model.
The model is only loaded when first accessed, and then cached for reuse.
Returns:
Model instance (SentenceTransformer or transformers model)
"""
if self._model is None:
logger.info(f"Loading embedding model: {self.model_name}")
if self._is_nvembed:
self._load_nvembed_model()
else:
self._load_sentence_transformer()
logger.info(f"Model loaded on device: {self.device}")
return self._model
def _load_sentence_transformer(self):
"""Load a SentenceTransformer model."""
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name)
self._model.to(self.device)
def _load_nvembed_model(self):
"""Load NVIDIA NV-Embed-v2 model."""
from transformers import AutoModel, AutoTokenizer
logger.info("Loading NV-Embed-v2 (this may take a moment)...")
# Determine torch dtype based on device
if self.device == "mps":
# MPS works best with float32 for this model
torch_dtype = torch.float32
elif self.device == "cuda":
torch_dtype = torch.float16
else:
torch_dtype = torch.float32
self._tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
self._model = AutoModel.from_pretrained(
self.model_name,
trust_remote_code=True,
torch_dtype=torch_dtype,
)
self._model.to(self.device)
self._model.eval()
def _nvembed_encode(
self,
texts: List[str],
instruction: str = "",
max_length: Optional[int] = None,
) -> np.ndarray:
"""
Encode texts using NV-Embed-v2's native encode method.
Args:
texts: List of texts to encode
instruction: Instruction prefix for queries (empty for documents)
max_length: Maximum sequence length (uses model config if None)
Returns:
np.ndarray: Embeddings array
"""
if max_length is None:
max_length = self._max_length
all_embeddings = []
for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding"):
batch_texts = texts[i:i + self.batch_size]
# Use NV-Embed's native encode method
with torch.no_grad():
if instruction:
# For queries: use instruction
embeddings = self._model.encode(
batch_texts,
instruction=instruction,
max_length=max_length,
)
else:
# For documents: no instruction needed
embeddings = self._model.encode(
batch_texts,
max_length=max_length,
)
# Handle both tensor and numpy outputs
if isinstance(embeddings, torch.Tensor):
embeddings = embeddings.cpu().numpy()
all_embeddings.append(embeddings)
return np.vstack(all_embeddings)
def encode_batch(self, chunks: List[Chunk]) -> np.ndarray:
"""
Generate embeddings for a batch of chunks (documents).
Processes chunks in smaller batches for memory efficiency and
displays progress with tqdm.
Args:
chunks: List of chunks to embed
Returns:
np.ndarray: Array of embeddings with shape (num_chunks, embedding_dim)
"""
if not chunks:
logger.warning("No chunks to embed")
return np.array([])
start_time = time.time()
# Extract text from chunks
texts = [chunk.text for chunk in chunks]
logger.info(f"Generating embeddings for {len(chunks)} chunks")
if self._is_nvembed:
# NV-Embed: documents don't need instruction prefix
_ = self.model # Ensure model is loaded
embeddings = self._nvembed_encode(texts, instruction="")
else:
# SentenceTransformers path
embeddings = []
for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding chunks"):
batch_texts = texts[i:i + self.batch_size]
batch_embeddings = self.model.encode(
batch_texts,
batch_size=self.batch_size,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=True
)
embeddings.append(batch_embeddings)
embeddings = np.vstack(embeddings)
# Log performance
duration = time.time() - start_time
log_embedding_generation(logger, len(chunks), duration)
return embeddings
def encode_single(self, text: str, is_query: bool = False) -> np.ndarray:
"""
Generate embedding for a single text.
Args:
text: Text to embed
is_query: If True, applies query instruction (for NV-Embed)
Returns:
np.ndarray: Embedding vector
"""
if self._is_nvembed:
_ = self.model # Ensure model is loaded
# NV-Embed uses instruction prefix for queries
instruction = (
"Instruct: Given a question, retrieve passages that answer the question\nQuery: "
if is_query else ""
)
embeddings = self._nvembed_encode([text], instruction=instruction)
return embeddings[0]
else:
embedding = self.model.encode(
text,
convert_to_numpy=True,
normalize_embeddings=True
)
return embedding
def get_embedding_dimension(self) -> int:
"""
Get the dimension of embeddings produced by this model.
Returns:
int: Embedding dimension
"""
# Use pre-configured dimensions if available
if self._dimensions:
return self._dimensions
# Otherwise, load model and query
_ = self.model # Ensure model is loaded
if self._is_nvembed:
return 4096
else:
return self._model.get_sentence_embedding_dimension()
def get_model_info(self) -> dict:
"""
Get information about the current embedding model.
Returns:
dict: Model information including name, dimensions, etc.
"""
try:
config = get_embedding_model_config(self.model_name)
return {
"id": self.model_name,
"name": config.get("name", self.model_name),
"dimensions": self.get_embedding_dimension(),
"type": config.get("type", "unknown"),
"description": config.get("description", ""),
}
except ValueError:
return {
"id": self.model_name,
"name": self.model_name.split("/")[-1],
"dimensions": self.get_embedding_dimension(),
"type": "unknown",
"description": "Custom model",
}