File size: 6,065 Bytes
0c02556
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
from tqdm import tqdm
from openai import OpenAI
import string
import pickle
import os
import time

from langchain_community.document_loaders import PyMuPDFLoader

class API_Interface:
    def __init__(self, OPEN_AI_KEY, PINECONE_KEY, 

                 chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"):
        self.chunk_size = chunk_size
        self.embed_model = embed_model
        self.chat_model = chat_model
        
        # with open("open_ai_key.txt") as infile:
        #     OPEN_AI_KEY = infile.readline().strip()
        # with open("pinecone_key.txt") as infile:
        #     PINECONE_KEY = infile.readline().strip()
        
            
        self.__client = OpenAI(api_key=OPEN_AI_KEY)
        self.__pc = Pinecone(api_key=PINECONE_KEY)
        self.__index = self.__pc.Index('eep596mp2')
        
        print("Chunking documents.")
        self.chunked_texts, self.chunked_pnums = self.__chunk_document()
        print("Initializing vector store.")
        self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY)
        
    def __chunk_document(self) -> tuple[list[str], list[int]]:
        loader = PyMuPDFLoader(file_path = "machine_learning.pdf", mode = "page")
        docs = loader.load()

        page_texts = [page.page_content for page in docs]  # Extract page_content
        page_numbers = [page.metadata["page"] for page in docs]  # Extract metadata["page"]

        splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=50)

        chunked_texts, chunk_page_numbers = [], []
        previous_page_tail = ""

        for text, pnum in zip(page_texts, page_numbers):
            chunks = splitter.split_text(previous_page_tail + " " + text)
            chunked_texts.extend(chunks[:-1])
            chunk_page_numbers.extend([pnum]*(len(chunks)-1))
            previous_page_tail = chunks[-1]
        chunked_texts.append(chunks[-1])
        chunk_page_numbers.append(pnum)
        
        return chunked_texts, chunk_page_numbers

    def __init_vectorstore(self, OPEN_AI_KEY):
        
        NAMESPACE = f"ns{self.chunk_size}"
        _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE)
        if _ns is not None and _ns.get('vector_count') in (None, 0):
            self.__index.delete(delete_all=True, namespace=NAMESPACE)
            _ns = None
            
        if _ns is None:
            print("... generating embeddings.")      
            embeddings = self.__generate_embeddings()

            records = []
            for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, embeddings)):
                records.append({
                    "id": f"chunk{i}",
                    "values": embedding,
                    "metadata": {
                        "text": text,
                        "page_number": pnum
                    }
                })

            print(len(records))
            batch_size = 180
            print("... upsertting records.")
            for b in tqdm(range((len(records)-1)//batch_size+1)):
                self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE)
                # print(b+1, "/", (len(records)-1)//batch_size+1)
            
            while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None:
                time.sleep(1)

        print("Index stats:", self.__index.describe_index_stats())
        openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model)
        vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs)
        
        return NAMESPACE, vectorstore
    
    def __generate_embeddings(self):
        
        def get_embedding(text):
            text = text.replace("\n", " ")
            # text = text.replace(string.punctuation, "")
            response = self.__client.embeddings.create(input = [text], model=self.embed_model)
            return response.data[0].embedding

        if not os.path.exists(f"ML_text_embeddings_{self.chunk_size}.pkl"):
            embeddings = []
            for text in tqdm(self.chunked_texts):
                embeddings.append(get_embedding(text))

            with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "wb") as outfile:
                pickle.dump(embeddings, outfile)
        else:
            print("--- found existing embeddings file. Shortcutting.")
            with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "rb") as infile:
                embeddings:list[list[float]] = pickle.load(infile)
                
        return embeddings


    def query_pinecone_vector_store(self, query:str, top_k:int = 5, namespace:str = None):
        namespace = namespace or self.namespace
        assert namespace in self.__index.describe_index_stats().get('namespaces')
        
        response = self.vectorstore.similarity_search_with_relevance_scores(query=query,
                                                        k=top_k,
                                                        namespace=namespace)
        
        return tuple(zip(*response))
    
    
    def client_chat(self, messages, model=None):
        model = model or self.chat_model
        response = self.__client.chat.completions.create(messages=messages, model=model)
        return response.choices[0].message.content

if __name__ == "__main__":
    tester = API_Interface()
    my_query = "What is the backpropogation algorithm?"
    response = tester.query_pinecone_vector_store(my_query)
    for doc in response:
        print(doc.metadata["page_number"], doc.page_content, "\n\n")