from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Pinecone from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone from tqdm import tqdm from openai import OpenAI import string import pickle import os import time from langchain_community.document_loaders import PyMuPDFLoader class API_Interface: def __init__(self, OPEN_AI_KEY, PINECONE_KEY, chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"): self.chunk_size = chunk_size self.embed_model = embed_model self.chat_model = chat_model # with open("open_ai_key.txt") as infile: # OPEN_AI_KEY = infile.readline().strip() # with open("pinecone_key.txt") as infile: # PINECONE_KEY = infile.readline().strip() self.__client = OpenAI(api_key=OPEN_AI_KEY) self.__pc = Pinecone(api_key=PINECONE_KEY) self.__index = self.__pc.Index('eep596mp2') print("Chunking documents.") self.chunked_texts, self.chunked_pnums = self.__chunk_document() print("Initializing vector store.") self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY) def __chunk_document(self) -> tuple[list[str], list[int]]: loader = PyMuPDFLoader(file_path = "machine_learning.pdf", mode = "page") docs = loader.load() page_texts = [page.page_content for page in docs] # Extract page_content page_numbers = [page.metadata["page"] for page in docs] # Extract metadata["page"] splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=50) chunked_texts, chunk_page_numbers = [], [] previous_page_tail = "" for text, pnum in zip(page_texts, page_numbers): chunks = splitter.split_text(previous_page_tail + " " + text) chunked_texts.extend(chunks[:-1]) chunk_page_numbers.extend([pnum]*(len(chunks)-1)) previous_page_tail = chunks[-1] chunked_texts.append(chunks[-1]) chunk_page_numbers.append(pnum) return chunked_texts, chunk_page_numbers def __init_vectorstore(self, OPEN_AI_KEY): NAMESPACE = f"ns{self.chunk_size}" _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) if _ns is not None and _ns.get('vector_count') in (None, 0): self.__index.delete(delete_all=True, namespace=NAMESPACE) _ns = None if _ns is None: print("... generating embeddings.") embeddings = self.__generate_embeddings() records = [] for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, embeddings)): records.append({ "id": f"chunk{i}", "values": embedding, "metadata": { "text": text, "page_number": pnum } }) print(len(records)) batch_size = 180 print("... upsertting records.") for b in tqdm(range((len(records)-1)//batch_size+1)): self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) # print(b+1, "/", (len(records)-1)//batch_size+1) while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: time.sleep(1) print("Index stats:", self.__index.describe_index_stats()) openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) return NAMESPACE, vectorstore def __generate_embeddings(self): def get_embedding(text): text = text.replace("\n", " ") # text = text.replace(string.punctuation, "") response = self.__client.embeddings.create(input = [text], model=self.embed_model) return response.data[0].embedding if not os.path.exists(f"ML_text_embeddings_{self.chunk_size}.pkl"): embeddings = [] for text in tqdm(self.chunked_texts): embeddings.append(get_embedding(text)) with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "wb") as outfile: pickle.dump(embeddings, outfile) else: print("--- found existing embeddings file. Shortcutting.") with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "rb") as infile: embeddings:list[list[float]] = pickle.load(infile) return embeddings def query_pinecone_vector_store(self, query:str, top_k:int = 5, namespace:str = None): namespace = namespace or self.namespace assert namespace in self.__index.describe_index_stats().get('namespaces') response = self.vectorstore.similarity_search_with_relevance_scores(query=query, k=top_k, namespace=namespace) return tuple(zip(*response)) def client_chat(self, messages, model=None): model = model or self.chat_model response = self.__client.chat.completions.create(messages=messages, model=model) return response.choices[0].message.content if __name__ == "__main__": tester = API_Interface() my_query = "What is the backpropogation algorithm?" response = tester.query_pinecone_vector_store(my_query) for doc in response: print(doc.metadata["page_number"], doc.page_content, "\n\n")