Spaces:
Runtime error
Runtime error
| import os | |
| from typing import Dict, List | |
| from langchain_core.documents import Document | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_pinecone import PineconeVectorStore | |
| from pinecone import Pinecone, ServerlessSpec | |
| from src.utils import logger | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| embeddings_model = OpenAIEmbeddings( | |
| api_key=OPENAI_API_KEY, model="text-embedding-ada-002" | |
| ) | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| index_name = "mandalaforus-index" | |
| existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
| if index_name not in existing_indexes: | |
| pc.create_index( | |
| name=index_name, | |
| dimension=1536, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
| ) | |
| index = pc.Index(index_name) | |
| vector_store = PineconeVectorStore(index=index, embedding=embeddings_model) | |
| class VectorEmbedding: | |
| """VectorEmbedding class provides asynchronous context management and methods to interact with a vector store. | |
| Methods: | |
| __aenter__: | |
| Asynchronous context manager entry method. | |
| __aexit__: | |
| Asynchronous context manager exit method. | |
| store_documents: | |
| Store documents in the vector store. | |
| documents (List[Document]): List of document objects. | |
| int: Number of documents stored. | |
| search_documents: | |
| Search documents in the vector store. | |
| query (str): Search query. | |
| num_results (int, optional): Number of results to return. Defaults to 20. | |
| user_id (str, optional): User ID for filtering results. Defaults to "public". | |
| List[Dict]: List of search results. | |
| delete_documents: | |
| Delete documents from the vector store. | |
| document_ids (List[int]): List of document IDs. | |
| None | |
| """ | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| async def store_documents(self, documents: List[Document]) -> int: | |
| """ | |
| Store documents in vector store. | |
| Args: | |
| documents: List of document objects | |
| Returns: | |
| Number of documents stored | |
| """ | |
| return await vector_store.aadd_documents( | |
| documents, | |
| ) | |
| async def search_documents( | |
| self, query: str, num_results: int = 20, user_id="public" | |
| ) -> List[Dict]: | |
| """ | |
| Search documents in vector store. | |
| Args: | |
| query: Search query | |
| num_results: Number of results to return | |
| Returns: | |
| List of search results | |
| """ | |
| return await vector_store.asearch( | |
| query=query, | |
| search_type="similarity", | |
| k=num_results, | |
| filter={"user_id": user_id}, | |
| ) | |
| async def delete_documents(self, document_ids: List[int]) -> None: | |
| """ | |
| Delete documents from vector store. | |
| Args: | |
| document_ids: List of document IDs | |
| """ | |
| return await vector_store.adelete(document_ids) | |