File size: 5,480 Bytes
d0aabb7
49a5e27
417c5c4
4ca8eaf
 
4542012
4ca8eaf
 
417c5c4
4542012
5c1fa92
417c5c4
 
 
4ca8eaf
 
 
 
49a5e27
4ca8eaf
49a5e27
4ca8eaf
 
 
 
 
 
 
 
49a5e27
4ca8eaf
 
 
 
 
 
 
 
 
49a5e27
4ca8eaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49a5e27
 
 
 
4ca8eaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49a5e27
e7a285b
4ca8eaf
 
e7a285b
4542012
 
 
 
 
4ca8eaf
4542012
 
4ca8eaf
4542012
 
 
4ca8eaf
 
 
 
49a5e27
4542012
4ca8eaf
 
 
 
 
4542012
49a5e27
4542012
 
 
49a5e27
4ca8eaf
49a5e27
 
 
4ca8eaf
 
49a5e27
4542012
 
4ca8eaf
 
 
4542012
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# src/utils/embeddings.py

import numpy as np
import torch
import torch.nn.functional as F
from numpy.typing import NDArray
from transformers import (AutoModel, AutoTokenizer, PreTrainedModel,
                          PreTrainedTokenizer)

from src.config.settings import settings
from src.utils.logger import logger


class EmbeddingClient:
	"""
	An embedding client that generates vector embeddings for text using a
	transformer model, mirroring the logic used for knowledge base creation.
	"""

	def __init__(self, model_name: str):
		self.model_name = model_name
		self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
		self.tokenizer: PreTrainedTokenizer | None = None
		self.model: PreTrainedModel | None = None
		self.dimension: int | None = None
		self._available = self._init_embedding_model()

	def _init_embedding_model(self) -> bool:
		"""Initializes the transformer model and tokenizer."""
		try:
			logger().info(f"Loading embedding model '{self.model_name}' on {self.device}")
			self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
			self.model = AutoModel.from_pretrained(self.model_name).to(self.device)
			self.model.eval()

			# Dynamically determine the embedding dimension
			self.dimension = self._get_embedding_dimension()
			logger().info(f"Successfully loaded model. Embedding dimension: {self.dimension}")
			return True
		except Exception as e:
			logger().error(f"Failed to load embedding model '{self.model_name}': {e}")
			return False

	def _get_embedding_dimension(self) -> int:
		"""Runs a test input to determine the model's output dimension."""
		if not self.tokenizer or not self.model:
			raise RuntimeError("Model and tokenizer must be initialized.")

		test_input = self.tokenizer(
			"test", return_tensors="pt", truncation=True, padding=True
		).to(self.device)

		with torch.no_grad():
			test_output = self.model(**test_input)
			test_embedding = self._mean_pooling(
				test_output.last_hidden_state, test_input["attention_mask"]
			)
		return test_embedding.shape[1]

	def _mean_pooling(
		self, token_embeddings: torch.Tensor, attention_mask: torch.Tensor
	) -> torch.Tensor:
		"""Performs mean pooling on token embeddings using an attention mask."""
		input_mask_expanded = (
			attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
		)
		masked_embeddings = token_embeddings * input_mask_expanded
		summed_embeddings = torch.sum(masked_embeddings, 1)
		summed_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
		return summed_embeddings / summed_mask

	def embed(self, texts: str | list[str], batch_size: int = 64) -> list[list[float]]:
		"""
		Generates normalized, mean-pooled embeddings for the given texts.
		Returns an empty list if the model is not available or an error occurs.
		"""
		if not self.is_available() or not self.tokenizer or not self.model:
			logger().error("Embedding model is not available, cannot generate embeddings.")
			return [[] for _ in range(len(texts) if isinstance(texts, list) else 1)]

		if isinstance(texts, str):
			texts = [texts]

		all_embeddings = []
		for i in range(0, len(texts), batch_size):
			batch_texts = texts[i : i + batch_size]
			try:
				inputs = self.tokenizer(
					batch_texts,
					truncation=True,
					padding=True,
					max_length=512,
					return_tensors="pt",
				).to(self.device)

				with torch.no_grad():
					outputs = self.model(**outputs)

				attention_mask = inputs["attention_mask"]
				chunk_embeddings = self._mean_pooling(
					outputs.last_hidden_state, attention_mask
				)

				# L2 Normalization - CRITICAL STEP FOR COMPATIBILITY
				normalized_embeddings = F.normalize(chunk_embeddings, p=2, dim=1)

				all_embeddings.extend(normalized_embeddings.cpu().numpy().tolist())

			except Exception as e:
				logger().error(f"Error during embedding generation for a batch: {e}")
				# Add empty embeddings for the failed batch
				all_embeddings.extend([[] for _ in batch_texts])

		return all_embeddings

	def is_available(self) -> bool:
		"""Checks if the embedding model was loaded successfully."""
		return self._available

	def semantic_search(
		self,
		query: str,
		candidates: list[str],
		top_k: int = settings.SEMANTIC_CONTEXT_SIZE,
		threshold: float = settings.SIMILARITY_THRESHOLD,
	) -> list[str]:
		"""Finds semantically similar texts using embedding-based search."""
		if not self.is_available() or not candidates:
			return []

		query_vector = np.array(self.embed(query)[0], dtype="float32")
		if query_vector.size == 0:
			return []

		candidate_vectors = self.embed(candidates)

		similarities = [
			(
				self._cosine_similarity(query_vector, np.array(vec, dtype="float32")),
				text,
			)
			for vec, text in zip(candidate_vectors, candidates) if vec
		]

		similarities.sort(key=lambda x: x[0], reverse=True)
		return [text for score, text in similarities[:top_k] if score > threshold]

	def get_model_info(self) -> dict:
		"""Get information about the current embedding model."""
		return {
			"model_name": self.model_name,
			"dimension": self.dimension,
			"device": str(self.device),
			"available": self.is_available(),
		}

	@staticmethod
	def _cosine_similarity(
		vec_a: NDArray[np.float32], vec_b: NDArray[np.float32]
	) -> float:
		"""Calculates the cosine similarity between two vectors."""
		norm_a = np.linalg.norm(vec_a)
		norm_b = np.linalg.norm(vec_b)
		if norm_a == 0 or norm_b == 0:
			return 0.0
		return float(np.dot(vec_a, vec_b) / (norm_a * norm_b))