import openai import numpy as np import asyncio from settings import Settings class EmbeddingModel: def __init__(self, settings: Settings) -> None: self.openai_api_key = settings.openai_api_key self.async_client = openai.AsyncOpenAI() self.client = openai.OpenAI() openai.api_key = self.openai_api_key self.name = settings.embeddings_model_name self.batch_size = 1024 async def aget_embeddings(self, list_of_text: list[str]) -> list[np.array]: batches = [ list_of_text[i : i + self.batch_size] for i in range(0, len(list_of_text), self.batch_size) ] async def process_batch(batch): embedding_response = await self.async_client.embeddings.create( input=batch, model=self.name ) return [embeddings.embedding for embeddings in embedding_response.data] # Use asyncio.gather to process all batches concurrently results = await asyncio.gather(*[process_batch(batch) for batch in batches]) # Flatten the results return [ np.array(embedding) for batch_result in results for embedding in batch_result ] async def aget_embedding(self, text: str) -> list[np.array]: embedding = await self.async_client.embeddings.create( input=text, model=self.name ) return np.array(embedding.data[0].embedding)