pdf / embedding-model.py
navid72m's picture
Upload 9 files
43efcb9 verified
"""
Unified embedding model implementation supporting multiple backends.
"""
from typing import List, Union, Optional, Dict, Any
import logging
import numpy as np
import torch
from abc import ABC, abstractmethod
# Configure logging
logger = logging.getLogger(__name__)
class EmbeddingModel(ABC):
"""Abstract base class for embedding models."""
@abstractmethod
def embed(self, texts: Union[str, List[str]], batch_size: int = 32) -> np.ndarray:
"""
Convert text(s) to embedding vector(s).
Args:
texts: Input text(s) to embed
batch_size: Batch size for processing
Returns:
Embedding vector(s) as numpy array
"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get the dimension of the embedding vectors."""
pass
class SentenceTransformerEmbedding(EmbeddingModel):
"""Embedding model using sentence-transformers library."""
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
device: Optional[str] = None,
normalize: bool = True,
**kwargs
):
"""
Initialize the sentence transformer embedding model.
Args:
model_name: Sentence transformer model name or path
device: Device to run model on ('cpu', 'cuda', 'cuda:0', etc.)
normalize: Whether to L2-normalize embeddings
**kwargs: Additional arguments for the model
"""
try:
from sentence_transformers import SentenceTransformer
except ImportError:
raise ImportError(
"sentence-transformers is not installed. "
"Please install it with `pip install sentence-transformers`."
)
self.model_name = model_name
self.normalize = normalize
# Determine device
if device is None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
logger.info(f"Loading SentenceTransformer model: {model_name} on {self.device}")
try:
self.model = SentenceTransformer(model_name, device=self.device)
self._dimension = self.model.get_sentence_embedding_dimension()
logger.info(f"Model loaded successfully. Embedding dimension: {self._dimension}")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
def embed(self, texts: Union[str, List[str]], batch_size: int = 32) -> np.ndarray:
"""
Convert text(s) to embedding vector(s).
Args:
texts: Input text(s) to embed
batch_size: Batch size for processing
Returns:
Embedding vector(s) as numpy array
"""
# Handle single text input
if isinstance(texts, str):
texts = [texts]
# Validate input
if not texts:
logger.warning("Empty texts provided for embedding")
return np.array([])
try:
# Generate embeddings
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=False,
convert_to_numpy=True
)
# Normalize if requested
if self.normalize:
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings
except Exception as e:
logger.error(f"Error during embedding generation: {e}")
raise
@property
def dimension(self) -> int:
"""Get the dimension of the embedding vectors."""
return self._dimension
class HuggingFaceEmbedding(EmbeddingModel):
"""Embedding model using HuggingFace transformers directly."""
def __init__(
self,
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
device: Optional[str] = None,
normalize: bool = True,
max_length: int = 512,
**kwargs
):
"""
Initialize the HuggingFace embedding model.
Args:
model_name: HuggingFace model name or path
device: Device to run model on ('cpu', 'cuda', 'cuda:0', etc.)
normalize: Whether to L2-normalize embeddings
max_length: Maximum token length for inputs
**kwargs: Additional arguments for the model
"""
try:
from transformers import AutoTokenizer, AutoModel
except ImportError:
raise ImportError(
"transformers is not installed. "
"Please install it with `pip install transformers`."
)
self.model_name = model_name
self.normalize = normalize
self.max_length = max_length
# Determine device
if device is None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
logger.info(f"Loading HuggingFace model: {model_name} on {self.device}")
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
self.model.to(self.device)
self.model.eval()
# Get embedding dimension from model config
self._dimension = self.model.config.hidden_size
logger.info(f"Model loaded successfully. Embedding dimension: {self._dimension}")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
def _mean_pooling(self, model_output, attention_mask):
"""Perform mean pooling on token embeddings."""
token_embeddings = model_output.last_hidden_state
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def embed(self, texts: Union[str, List[str]], batch_size: int = 32) -> np.ndarray:
"""
Convert text(s) to embedding vector(s).
Args:
texts: Input text(s) to embed
batch_size: Batch size for processing
Returns:
Embedding vector(s) as numpy array
"""
# Handle single text input
if isinstance(texts, str):
texts = [texts]
# Validate input
if not texts:
logger.warning("Empty texts provided for embedding")
return np.array([])
try:
all_embeddings = []
# Process in batches
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
# Tokenize and move to device
inputs = self.tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors="pt"
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Generate embeddings
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = self._mean_pooling(outputs, inputs["attention_mask"])
# Normalize if requested
if self.normalize:
embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
# Move to CPU and convert to numpy
embeddings = embeddings.cpu().numpy()
all_embeddings.append(embeddings)
# Concatenate all batches
return np.vstack(all_embeddings) if all_embeddings else np.array([])
except Exception as e:
logger.error(f"Error during embedding generation: {e}")
raise
@property
def dimension(self) -> int:
"""Get the dimension of the embedding vectors."""
return self._dimension
# Factory function to create embedding models
def create_embedding_model(
backend: str = "sentence-transformers",
model_name: Optional[str] = None,
**kwargs
) -> EmbeddingModel:
"""
Factory function to create an embedding model.
Args:
backend: Backend to use ('sentence-transformers' or 'huggingface')
model_name: Model name or path
**kwargs: Additional arguments for the model
Returns:
An EmbeddingModel instance
"""
from ..config import EMBEDDING_MODEL_NAME, get_model_config
# Use config model if not specified
if model_name is None:
model_name = EMBEDDING_MODEL_NAME
# Get model-specific config
model_config = get_model_config(model_name)
# Override with provided kwargs
for k, v in kwargs.items():
model_config[k] = v
# Create the model
if backend.lower() == "sentence-transformers":
return SentenceTransformerEmbedding(model_name=model_name, **model_config)
elif backend.lower() in ["huggingface", "hf", "transformers"]:
return HuggingFaceEmbedding(model_name=model_name, **model_config)
else:
raise ValueError(f"Unsupported backend: {backend}")