from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Pinecone from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone from tqdm import tqdm from openai import OpenAI import string import pickle import os import time from langchain_community.document_loaders import PyMuPDFLoader class API_Interface: def __init__(self, OPEN_AI_KEY, PINECONE_KEY, chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"): self.chunk_size = chunk_size self.embed_model = embed_model self.chat_model = chat_model #with open("open_ai_key.txt") as infile: # OPEN_AI_KEY = infile.readline().strip() #with open("pinecone_key.txt") as infile: # PINECONE_KEY = infile.readline().strip() OPEN_AI_KEY = OPEN_AI_KEY PINECONE_KEY = PINECONE_KEY self.__client = OpenAI(api_key=OPEN_AI_KEY) self.__pc = Pinecone(api_key=PINECONE_KEY) self.__index = self.__pc.Index('eep596mp2') print("Chunking documents.") self.chunked_texts, self.chunked_pnums = self.__chunk_document() self.table_texts, self.table_pnums = self.__chunk_tables() print("Initializing vector store.") self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY) print("Initializing table store.") self.tablespace, self.tablestore = self.__init_tablestore(OPEN_AI_KEY) def __chunk_document(self) -> tuple[list[str], list[int]]: loader = PyMuPDFLoader(file_path = "Solar Eclipse Information.pdf", mode = "page") docs = loader.load() page_texts = [page.page_content for page in docs] # Extract page_content page_numbers = [page.metadata["page"] for page in docs] # Extract metadata["page"] splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=500) chunked_texts, chunk_page_numbers = [], [] previous_page_tail = "" for text, pnum in zip(page_texts, page_numbers): chunks = splitter.split_text(previous_page_tail + " " + text) chunked_texts.extend(chunks[:-1]) chunk_page_numbers.extend([pnum]*(len(chunks)-1)) previous_page_tail = chunks[-1] chunked_texts.append(chunks[-1]) chunk_page_numbers.append(pnum) return chunked_texts, chunk_page_numbers def __chunk_tables(self): tabler = PyMuPDFLoader(file_path = "Solar Eclipse Table.pdf", mode = "page") tables = tabler.load() HEADER = "Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration" table_texts = [] # print(tables[0].page_content) c = tables[0].page_content i = c.find("km") # print(c[i+3:]) for page in tables: c = page.page_content i = c.find("km") values = c[i+3:].split("\n") text = "" idv = 0 dates = [None, None] partial_flag = False for val in values: if idv == 2: year = val if dates[0] is None: dates[0] = year else: dates[1] = year if idv % 16 == 4: val = val.replace(" ", " ") text += val + " " idv += 1 if val.startswith("P"): partial_flag = True if val.endswith("W") or val.endswith("E"): if partial_flag: idv = -2 else: idv = -4 if idv == 0: text += "\n" partial_flag = False table_texts.append(f"Solar eclipses between {dates[0]} and {dates[1]}:\n\n" + HEADER + "\n" + text) table_numbers = [page.metadata["page"] for page in tables] return table_texts, table_numbers def __init_vectorstore(self, OPEN_AI_KEY): NAMESPACE = f"ns_eclipse_{self.chunk_size}" _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) if _ns is not None and _ns.get('vector_count') in (None, 0): self.__index.delete(delete_all=True, namespace=NAMESPACE) _ns = None if _ns is None: print("... generating embeddings.") self.__generate_embeddings() records = [] for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, self.embeddings)): records.append({ "id": f"chunk{i}", "values": embedding, "metadata": { "text": text, "page_number": pnum } }) print(len(records)) batch_size = 180 print("... upsertting records.") for b in tqdm(range((len(records)-1)//batch_size+1)): self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) # print(b+1, "/", (len(records)-1)//batch_size+1) while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: time.sleep(1) print("Index stats:", self.__index.describe_index_stats()) openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) return NAMESPACE, vectorstore def __init_tablestore(self, OPEN_AI_KEY): NAMESPACE = f"ts_eclipse" _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) if _ns is not None and _ns.get('vector_count') in (None, 0): self.__index.delete(delete_all=True, namespace=NAMESPACE) _ns = None if _ns is None: print("... generating table embeddings.") self.__generate_table_embeddings() records = [] for i, (text, pnum, embedding) in enumerate(zip(self.table_texts, self.table_pnums, self.tmbeddings)): records.append({ "id": f"chunk{i}", "values": embedding, "metadata": { "text": text, "page_number": pnum } }) print(len(records)) batch_size = 180 print("... upsertting records.") for b in tqdm(range((len(records)-1)//batch_size+1)): self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) # print(b+1, "/", (len(records)-1)//batch_size+1) while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: time.sleep(1) print("Index stats:", self.__index.describe_index_stats()) openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) return NAMESPACE, vectorstore def __generate_embeddings(self) -> None: """ Generates self.embeddings """ EMBED_PATH = f"eclipse_text_embeddings_{self.chunk_size}.pkl" def get_embedding(text): text = text.replace("\n", " ") # text = text.replace(string.punctuation, "") response = self.__client.embeddings.create(input = [text], model=self.embed_model) return response.data[0].embedding if not os.path.exists(EMBED_PATH): self.embeddings = [] for text in tqdm(self.chunked_texts): self.embeddings.append(get_embedding(text)) with open(EMBED_PATH, "wb") as outfile: pickle.dump(self.embeddings, outfile) else: print("--- found existing embeddings file. Shortcutting.") with open(EMBED_PATH, "rb") as infile: self.embeddings:list[list[float]] = pickle.load(infile) def __generate_table_embeddings(self) -> None: """ Generates self.tmbeddings """ TABLE_PATH = f"eclipse_table_embeddings.pkl" HEADER = """Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration""" def get_embedding(text): text = text.replace("\n", " ") response = self.__client.embeddings.create(input = [text], model=self.embed_model) return response.data[0].embedding if not os.path.exists(TABLE_PATH): self.tmbeddings = [] for table in tqdm(self.table_texts): self.tmbeddings.append(get_embedding(table)) with open(TABLE_PATH, "wb") as outfile: pickle.dump(self.tmbeddings, outfile) else: print("--- found existing embeddings file. Shortcutting.") with open(TABLE_PATH, "rb") as infile: self.tmbeddings:list[list[float]] = pickle.load(infile) def query_pinecone_vector_store(self, query:str, top_k_docs:int = 5, top_k_tbls:int = 5, namespace:str = None, tablespace:str = None): namespace = namespace or self.namespace tablespace = tablespace or self.tablespace assert namespace in self.__index.describe_index_stats().get('namespaces') assert tablespace in self.__index.describe_index_stats().get('namespaces') response = self.vectorstore.similarity_search_with_relevance_scores(query=query, k=top_k_docs, namespace=namespace) tesponse = self.vectorstore.similarity_search_with_relevance_scores(query=query, k=top_k_tbls, namespace=tablespace) return [tuple(zip(*response)), tuple(zip(*tesponse))] def client_chat(self, messages, model=None): model = model or self.chat_model response = self.__client.chat.completions.create(messages=messages, model=model) return response.choices[0].message.content if __name__ == "__main__": tester = API_Interface() my_query = "What is the backpropogation algorithm?" response = tester.query_pinecone_vector_store(my_query) for doc in response: print(doc.metadata["page_number"], doc.page_content, "\n\n")