import os from typing import Dict, List from langchain_core.documents import Document from langchain_openai import OpenAIEmbeddings from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone, ServerlessSpec from src.utils import logger OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") embeddings_model = OpenAIEmbeddings( api_key=OPENAI_API_KEY, model="text-embedding-ada-002" ) PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") pc = Pinecone(api_key=PINECONE_API_KEY) index_name = "mandalaforus-index" existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] if index_name not in existing_indexes: pc.create_index( name=index_name, dimension=1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) index = pc.Index(index_name) vector_store = PineconeVectorStore(index=index, embedding=embeddings_model) class VectorEmbedding: """VectorEmbedding class provides asynchronous context management and methods to interact with a vector store. Methods: __aenter__: Asynchronous context manager entry method. __aexit__: Asynchronous context manager exit method. store_documents: Store documents in the vector store. documents (List[Document]): List of document objects. int: Number of documents stored. search_documents: Search documents in the vector store. query (str): Search query. num_results (int, optional): Number of results to return. Defaults to 20. user_id (str, optional): User ID for filtering results. Defaults to "public". List[Dict]: List of search results. delete_documents: Delete documents from the vector store. document_ids (List[int]): List of document IDs. None """ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass async def store_documents(self, documents: List[Document]) -> int: """ Store documents in vector store. Args: documents: List of document objects Returns: Number of documents stored """ return await vector_store.aadd_documents( documents, ) async def search_documents( self, query: str, num_results: int = 20, user_id="public" ) -> List[Dict]: """ Search documents in vector store. Args: query: Search query num_results: Number of results to return Returns: List of search results """ return await vector_store.asearch( query=query, search_type="similarity", k=num_results, filter={"user_id": user_id}, ) async def delete_documents(self, document_ids: List[int]) -> None: """ Delete documents from vector store. Args: document_ids: List of document IDs """ return await vector_store.adelete(document_ids)