Spaces:
Sleeping
Sleeping
| import uuid | |
| import os | |
| from pinecone import PineconeAsyncio as AsyncPinecone | |
| from openai import AsyncOpenAI | |
| class PineconeClient: | |
| def __init__(self): | |
| self.index_name = os.getenv("PINECONE_INDEX") | |
| self.embedding_model = os.getenv( | |
| "OPENAI_EMBEDDING_MODEL", "text-embedding-3-small" | |
| ) | |
| self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| self.client = None | |
| self.collection = None | |
| self._client_ctx = None | |
| self._collection_ctx = None | |
| self._pinecone_upsert_batch_size = int(os.getenv("PINECONE_UPSERT_BATCH_SIZE", "50")) | |
| async def __aenter__(self): | |
| self._client_ctx = AsyncPinecone(api_key=os.getenv("PINECONE_API_KEY")) | |
| self.client = await self._client_ctx.__aenter__() | |
| self._collection_ctx = self.client.IndexAsyncio(self.index_name) | |
| self.collection = await self._collection_ctx.__aenter__() | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| if self._collection_ctx is not None: | |
| await self._collection_ctx.__aexit__(exc_type, exc_value, traceback) | |
| if self._client_ctx is not None: | |
| await self._client_ctx.__aexit__(exc_type, exc_value, traceback) | |
| self.client = None | |
| self.collection = None | |
| self._client_ctx = None | |
| self._collection_ctx = None | |
| async def _get_text_embedding(self, text: str) -> list[float]: | |
| response = await self.openai.embeddings.create( | |
| input=text, | |
| model=self.embedding_model, | |
| ) | |
| return response.data[0].embedding | |
| async def _get_batch_embeddings(self, texts: list[str]) -> list[list[float]]: | |
| response = await self.openai.embeddings.create( | |
| input=texts, | |
| model=self.embedding_model, | |
| ) | |
| return [item.embedding for item in response.data] | |
| async def text_splitter(self, text: str, splitter: str = "\n\n") -> list[str]: | |
| return text.split(splitter) | |
| async def upsert(self, texts: list[str], metadatas: list[dict] = None): | |
| if not texts: | |
| return | |
| if metadatas is None: | |
| metadatas = [{} for _ in texts] | |
| if len(texts) != len(metadatas): | |
| raise ValueError("texts and metadatas must have the same length") | |
| ids = [meta.pop("id", str(uuid.uuid4())) for meta in metadatas] | |
| embeddings = await self._get_batch_embeddings(texts) | |
| vectors = [ | |
| { | |
| "id": id_, | |
| "values": embedding, | |
| "metadata": {**meta, "_document": text}, | |
| } | |
| for id_, embedding, text, meta in zip(ids, embeddings, texts, metadatas) | |
| ] | |
| for i in range(0, len(vectors), self._pinecone_upsert_batch_size): | |
| batch = vectors[i : i + self._pinecone_upsert_batch_size] | |
| await self.collection.upsert(vectors=batch) | |
| async def query(self, query: str, n_results: int = 5) -> dict: | |
| query_embedding = await self._get_text_embedding(query) | |
| results = await self.collection.query( | |
| vector=query_embedding, | |
| top_k=n_results, | |
| include_metadata=True, | |
| include_values=False, | |
| ) | |
| matches = results["matches"] | |
| ids = [m["id"] for m in matches] | |
| documents = [m["metadata"].pop("_document", "") for m in matches] | |
| metadatas = [m["metadata"] for m in matches] | |
| distances = [m["score"] for m in matches] | |
| return { | |
| "ids": ids, | |
| "documents": documents, | |
| "metadatas": metadatas, | |
| "distances": distances, | |
| } |