File size: 3,184 Bytes
040da4c
3bdfcb1
 
 
040da4c
 
3bdfcb1
 
0966b24
040da4c
 
3bdfcb1
 
 
040da4c
 
 
 
 
 
 
3bdfcb1
 
 
 
 
 
040da4c
 
 
 
a540238
 
adb221d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a540238
 
 
 
 
 
 
b4a5816
a540238
 
 
 
 
 
 
 
 
b4a5816
a540238
 
 
 
0966b24
a540238
 
 
 
 
 
 
 
 
 
 
 
0966b24
 
 
 
a540238
3bdfcb1
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
from typing import Dict, List

from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec

from src.utils import logger

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
embeddings_model = OpenAIEmbeddings(
    api_key=OPENAI_API_KEY, model="text-embedding-ada-002"
)

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=PINECONE_API_KEY)
index_name = "mandalaforus-index"

existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
if index_name not in existing_indexes:
    pc.create_index(
        name=index_name,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )

index = pc.Index(index_name)
vector_store = PineconeVectorStore(index=index, embedding=embeddings_model)


class VectorEmbedding:
    """VectorEmbedding class provides asynchronous context management and methods to interact with a vector store.

    Methods:
        __aenter__:
            Asynchronous context manager entry method.

        __aexit__:
            Asynchronous context manager exit method.

        store_documents:
            Store documents in the vector store.
                documents (List[Document]): List of document objects.
                int: Number of documents stored.

        search_documents:
            Search documents in the vector store.
                query (str): Search query.
                num_results (int, optional): Number of results to return. Defaults to 20.
                user_id (str, optional): User ID for filtering results. Defaults to "public".
                List[Dict]: List of search results.

        delete_documents:
            Delete documents from the vector store.
                document_ids (List[int]): List of document IDs.
                None
    """

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

    async def store_documents(self, documents: List[Document]) -> int:
        """
        Store documents in vector store.

        Args:
            documents: List of document objects

        Returns:
            Number of documents stored
        """
        return await vector_store.aadd_documents(
            documents,
        )

    async def search_documents(
        self, query: str, num_results: int = 20, user_id="public"
    ) -> List[Dict]:
        """
        Search documents in vector store.

        Args:
            query: Search query
            num_results: Number of results to return

        Returns:
            List of search results
        """
        return await vector_store.asearch(
            query=query,
            search_type="similarity",
            k=num_results,
            filter={"user_id": user_id},
        )

    async def delete_documents(self, document_ids: List[int]) -> None:
        """
        Delete documents from vector store.

        Args:
            document_ids: List of document IDs
        """
        return await vector_store.adelete(document_ids)