Spaces:
Sleeping
Sleeping
| from openai import AsyncOpenAI | |
| import logging | |
| from typing import List, Dict, Union, Optional | |
| from datasets import Dataset | |
| import asyncio | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from src.api.exceptions import OpenAIError | |
| # Set up structured logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class EmbeddingService: | |
| def __init__( | |
| self, | |
| openai_api_key: str, | |
| model: str = "text-embedding-3-small", | |
| batch_size: int = 10, | |
| max_concurrent_requests: int = 10, # Limit to 10 concurrent requests | |
| ): | |
| self.client = AsyncOpenAI(api_key=openai_api_key) | |
| self.model = model | |
| self.batch_size = batch_size | |
| self.semaphore = asyncio.Semaphore(max_concurrent_requests) # Rate limiter | |
| self.total_requests = 0 # Total number of requests to process | |
| self.completed_requests = 0 # Number of completed requests | |
| async def get_embedding(self, text: str) -> List[float]: | |
| """Generate embeddings for the given text using OpenAI.""" | |
| text = text.replace("\n", " ") | |
| try: | |
| async with self.semaphore: # Acquire a semaphore slot | |
| response = await self.client.embeddings.create( | |
| input=[text], model=self.model | |
| ) | |
| self.completed_requests += 1 # Increment completed requests | |
| self._log_progress() # Log progress | |
| return response.data[0].embedding | |
| except Exception as e: | |
| logger.error(f"Failed to generate embedding: {e}") | |
| raise OpenAIError(f"OpenAI API error: {e}") | |
| async def create_embeddings( | |
| self, | |
| data: Union[Dataset, List[str]], | |
| target_column: str = None, | |
| output_column: str = "embeddings", | |
| ) -> Union[Dataset, List[List[float]]]: | |
| """ | |
| Create embeddings for either a Dataset or a list of strings. | |
| Args: | |
| data: Either a Dataset or a list of strings. | |
| target_column: The column in the Dataset to generate embeddings for (required if data is a Dataset). | |
| output_column: The column to store embeddings in the Dataset (default: "embeddings"). | |
| Returns: | |
| If data is a Dataset, returns the Dataset with the embeddings column. | |
| If data is a list of strings, returns a list of embeddings. | |
| """ | |
| if isinstance(data, Dataset): | |
| if not target_column: | |
| raise ValueError("target_column is required when data is a Dataset.") | |
| return await self._create_embeddings_for_dataset( | |
| data, target_column, output_column | |
| ) | |
| elif isinstance(data, list): | |
| return await self._create_embeddings_for_texts(data) | |
| else: | |
| raise TypeError( | |
| "data must be either a Hugging Face Dataset or a list of strings." | |
| ) | |
| async def _create_embeddings_for_dataset( | |
| self, dataset: Dataset, target_column: str, output_column: str | |
| ) -> Dataset: | |
| """Create embeddings for the target column in the Dataset.""" | |
| logger.info("Generating embeddings for Dataset...") | |
| self.total_requests = len(dataset) # Set total number of requests | |
| self.completed_requests = 0 # Reset completed requests counter | |
| embeddings = [] | |
| for i in range(0, len(dataset), self.batch_size): | |
| batch = dataset[i : i + self.batch_size] | |
| batch_embeddings = await asyncio.gather( | |
| *[self.get_embedding(text) for text in batch[target_column]] | |
| ) | |
| embeddings.extend(batch_embeddings) | |
| dataset = dataset.add_column(output_column, embeddings) | |
| return dataset | |
| async def _create_embeddings_for_texts(self, texts: List[str]) -> List[List[float]]: | |
| """Create embeddings for a list of strings.""" | |
| logger.info("Generating embeddings for list of texts...") | |
| self.total_requests = len(texts) # Set total number of requests | |
| self.completed_requests = 0 # Reset completed requests counter | |
| batches = [ | |
| texts[i : i + self.batch_size] | |
| for i in range(0, len(texts), self.batch_size) | |
| ] | |
| embeddings = [] | |
| for batch in batches: | |
| batch_embeddings = await asyncio.gather( | |
| *[self.get_embedding(text) for text in batch] | |
| ) | |
| embeddings.extend(batch_embeddings) | |
| return embeddings | |
| def _log_progress(self): | |
| """Log the progress of embedding generation.""" | |
| progress = (self.completed_requests / self.total_requests) * 100 | |
| logger.info( | |
| f"Progress: {self.completed_requests}/{self.total_requests} ({progress:.2f}%)" | |
| ) | |
| # async def search_embeddings( | |
| # self, | |
| # query_embeddings: List[List[float]], | |
| # dataset: Dataset, | |
| # embedding_column: str, | |
| # target_column: str, | |
| # num_results: int, | |
| # ) -> Dict[str, List]: | |
| # """ | |
| # Perform a cosine similarity search between query embeddings and dataset embeddings. | |
| # Args: | |
| # query_embeddings: List of embeddings for the query texts. | |
| # dataset: The dataset to search in. | |
| # embedding_column: The column in the dataset containing embeddings. | |
| # target_column: The column to return in the results. | |
| # num_results: The number of results to return. | |
| # Returns: | |
| # A dictionary of lists containing the target column values and their similarity scores. | |
| # """ | |
| # dataset_embeddings = np.array(dataset[embedding_column]) | |
| # query_embeddings = np.array(query_embeddings) | |
| # # Compute cosine similarity | |
| # similarities = cosine_similarity(query_embeddings, dataset_embeddings) | |
| # # Initialize the results dictionary | |
| # results = { | |
| # target_column: [], | |
| # "similarity": [], | |
| # } | |
| # # Get the top-k results for each query | |
| # for query_similarities in similarities: | |
| # top_k_indices = np.argsort(query_similarities)[-num_results:][::-1] | |
| # for idx in top_k_indices: | |
| # results[target_column].append(dataset[target_column][idx]) | |
| # results["similarity"].append(float(query_similarities[idx])) | |
| # return results | |
| async def search_embeddings( | |
| self, | |
| query_embeddings: List[List[float]], | |
| dataset: Dataset, | |
| embedding_column: str, | |
| target_column: str, | |
| num_results: int, | |
| additional_columns: Optional[List[str]] = None, | |
| ) -> Dict[str, List]: | |
| """ | |
| Perform a cosine similarity search between query embeddings and dataset embeddings. | |
| Args: | |
| query_embeddings: List of embeddings for the query texts. | |
| dataset: The dataset to search in. | |
| embedding_column: The column in the dataset containing embeddings. | |
| target_column: The column to return in the results. | |
| num_results: The number of results to return. | |
| additional_columns: List of additional columns to include in the results. | |
| Returns: | |
| A dictionary of lists containing the target column values, their similarity scores, | |
| and any additional columns specified. | |
| """ | |
| dataset_embeddings = np.array(dataset[embedding_column]) | |
| query_embeddings = np.array(query_embeddings) | |
| # Compute cosine similarity | |
| similarities = cosine_similarity(query_embeddings, dataset_embeddings) | |
| # Initialize the results dictionary | |
| results = { | |
| target_column: [], | |
| "similarity": [], | |
| } | |
| # Add additional columns to the results dictionary | |
| if additional_columns: | |
| for column in additional_columns: | |
| results[column] = [] | |
| # Get the top-k results for each query | |
| for query_similarities in similarities: | |
| top_k_indices = np.argsort(query_similarities)[-num_results:][::-1] | |
| for idx in top_k_indices: | |
| results[target_column].append(dataset[target_column][idx]) | |
| results["similarity"].append(float(query_similarities[idx])) | |
| if additional_columns: | |
| for column in additional_columns: | |
| results[column].append(dataset[column][idx]) | |
| return results | |