Spaces:
Paused
Paused
File size: 5,480 Bytes
d0aabb7 49a5e27 417c5c4 4ca8eaf 4542012 4ca8eaf 417c5c4 4542012 5c1fa92 417c5c4 4ca8eaf 49a5e27 4ca8eaf 49a5e27 4ca8eaf 49a5e27 4ca8eaf 49a5e27 4ca8eaf 49a5e27 4ca8eaf 49a5e27 e7a285b 4ca8eaf e7a285b 4542012 4ca8eaf 4542012 4ca8eaf 4542012 4ca8eaf 49a5e27 4542012 4ca8eaf 4542012 49a5e27 4542012 49a5e27 4ca8eaf 49a5e27 4ca8eaf 49a5e27 4542012 4ca8eaf 4542012 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | # src/utils/embeddings.py
import numpy as np
import torch
import torch.nn.functional as F
from numpy.typing import NDArray
from transformers import (AutoModel, AutoTokenizer, PreTrainedModel,
PreTrainedTokenizer)
from src.config.settings import settings
from src.utils.logger import logger
class EmbeddingClient:
"""
An embedding client that generates vector embeddings for text using a
transformer model, mirroring the logic used for knowledge base creation.
"""
def __init__(self, model_name: str):
self.model_name = model_name
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer: PreTrainedTokenizer | None = None
self.model: PreTrainedModel | None = None
self.dimension: int | None = None
self._available = self._init_embedding_model()
def _init_embedding_model(self) -> bool:
"""Initializes the transformer model and tokenizer."""
try:
logger().info(f"Loading embedding model '{self.model_name}' on {self.device}")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModel.from_pretrained(self.model_name).to(self.device)
self.model.eval()
# Dynamically determine the embedding dimension
self.dimension = self._get_embedding_dimension()
logger().info(f"Successfully loaded model. Embedding dimension: {self.dimension}")
return True
except Exception as e:
logger().error(f"Failed to load embedding model '{self.model_name}': {e}")
return False
def _get_embedding_dimension(self) -> int:
"""Runs a test input to determine the model's output dimension."""
if not self.tokenizer or not self.model:
raise RuntimeError("Model and tokenizer must be initialized.")
test_input = self.tokenizer(
"test", return_tensors="pt", truncation=True, padding=True
).to(self.device)
with torch.no_grad():
test_output = self.model(**test_input)
test_embedding = self._mean_pooling(
test_output.last_hidden_state, test_input["attention_mask"]
)
return test_embedding.shape[1]
def _mean_pooling(
self, token_embeddings: torch.Tensor, attention_mask: torch.Tensor
) -> torch.Tensor:
"""Performs mean pooling on token embeddings using an attention mask."""
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
masked_embeddings = token_embeddings * input_mask_expanded
summed_embeddings = torch.sum(masked_embeddings, 1)
summed_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
return summed_embeddings / summed_mask
def embed(self, texts: str | list[str], batch_size: int = 64) -> list[list[float]]:
"""
Generates normalized, mean-pooled embeddings for the given texts.
Returns an empty list if the model is not available or an error occurs.
"""
if not self.is_available() or not self.tokenizer or not self.model:
logger().error("Embedding model is not available, cannot generate embeddings.")
return [[] for _ in range(len(texts) if isinstance(texts, list) else 1)]
if isinstance(texts, str):
texts = [texts]
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i : i + batch_size]
try:
inputs = self.tokenizer(
batch_texts,
truncation=True,
padding=True,
max_length=512,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
outputs = self.model(**outputs)
attention_mask = inputs["attention_mask"]
chunk_embeddings = self._mean_pooling(
outputs.last_hidden_state, attention_mask
)
# L2 Normalization - CRITICAL STEP FOR COMPATIBILITY
normalized_embeddings = F.normalize(chunk_embeddings, p=2, dim=1)
all_embeddings.extend(normalized_embeddings.cpu().numpy().tolist())
except Exception as e:
logger().error(f"Error during embedding generation for a batch: {e}")
# Add empty embeddings for the failed batch
all_embeddings.extend([[] for _ in batch_texts])
return all_embeddings
def is_available(self) -> bool:
"""Checks if the embedding model was loaded successfully."""
return self._available
def semantic_search(
self,
query: str,
candidates: list[str],
top_k: int = settings.SEMANTIC_CONTEXT_SIZE,
threshold: float = settings.SIMILARITY_THRESHOLD,
) -> list[str]:
"""Finds semantically similar texts using embedding-based search."""
if not self.is_available() or not candidates:
return []
query_vector = np.array(self.embed(query)[0], dtype="float32")
if query_vector.size == 0:
return []
candidate_vectors = self.embed(candidates)
similarities = [
(
self._cosine_similarity(query_vector, np.array(vec, dtype="float32")),
text,
)
for vec, text in zip(candidate_vectors, candidates) if vec
]
similarities.sort(key=lambda x: x[0], reverse=True)
return [text for score, text in similarities[:top_k] if score > threshold]
def get_model_info(self) -> dict:
"""Get information about the current embedding model."""
return {
"model_name": self.model_name,
"dimension": self.dimension,
"device": str(self.device),
"available": self.is_available(),
}
@staticmethod
def _cosine_similarity(
vec_a: NDArray[np.float32], vec_b: NDArray[np.float32]
) -> float:
"""Calculates the cosine similarity between two vectors."""
norm_a = np.linalg.norm(vec_a)
norm_b = np.linalg.norm(vec_b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(vec_a, vec_b) / (norm_a * norm_b))
|