Spaces:
Sleeping
Sleeping
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import Pinecone | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_pinecone import PineconeVectorStore | |
| from pinecone import Pinecone | |
| from tqdm import tqdm | |
| from openai import OpenAI | |
| import string | |
| import pickle | |
| import os | |
| import time | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| class API_Interface: | |
| def __init__(self, OPEN_AI_KEY, PINECONE_KEY, | |
| chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"): | |
| self.chunk_size = chunk_size | |
| self.embed_model = embed_model | |
| self.chat_model = chat_model | |
| # with open("open_ai_key.txt") as infile: | |
| # OPEN_AI_KEY = infile.readline().strip() | |
| # with open("pinecone_key.txt") as infile: | |
| # PINECONE_KEY = infile.readline().strip() | |
| self.__client = OpenAI(api_key=OPEN_AI_KEY) | |
| self.__pc = Pinecone(api_key=PINECONE_KEY) | |
| self.__index = self.__pc.Index('eep596mp2') | |
| print("Chunking documents.") | |
| self.chunked_texts, self.chunked_pnums = self.__chunk_document() | |
| print("Initializing vector store.") | |
| self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY) | |
| def __chunk_document(self) -> tuple[list[str], list[int]]: | |
| loader = PyMuPDFLoader(file_path = "machine_learning.pdf", mode = "page") | |
| docs = loader.load() | |
| page_texts = [page.page_content for page in docs] # Extract page_content | |
| page_numbers = [page.metadata["page"] for page in docs] # Extract metadata["page"] | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=50) | |
| chunked_texts, chunk_page_numbers = [], [] | |
| previous_page_tail = "" | |
| for text, pnum in zip(page_texts, page_numbers): | |
| chunks = splitter.split_text(previous_page_tail + " " + text) | |
| chunked_texts.extend(chunks[:-1]) | |
| chunk_page_numbers.extend([pnum]*(len(chunks)-1)) | |
| previous_page_tail = chunks[-1] | |
| chunked_texts.append(chunks[-1]) | |
| chunk_page_numbers.append(pnum) | |
| return chunked_texts, chunk_page_numbers | |
| def __init_vectorstore(self, OPEN_AI_KEY): | |
| NAMESPACE = f"ns{self.chunk_size}" | |
| _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) | |
| if _ns is not None and _ns.get('vector_count') in (None, 0): | |
| self.__index.delete(delete_all=True, namespace=NAMESPACE) | |
| _ns = None | |
| if _ns is None: | |
| print("... generating embeddings.") | |
| embeddings = self.__generate_embeddings() | |
| records = [] | |
| for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, embeddings)): | |
| records.append({ | |
| "id": f"chunk{i}", | |
| "values": embedding, | |
| "metadata": { | |
| "text": text, | |
| "page_number": pnum | |
| } | |
| }) | |
| print(len(records)) | |
| batch_size = 180 | |
| print("... upsertting records.") | |
| for b in tqdm(range((len(records)-1)//batch_size+1)): | |
| self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) | |
| # print(b+1, "/", (len(records)-1)//batch_size+1) | |
| while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: | |
| time.sleep(1) | |
| print("Index stats:", self.__index.describe_index_stats()) | |
| openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) | |
| vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) | |
| return NAMESPACE, vectorstore | |
| def __generate_embeddings(self): | |
| def get_embedding(text): | |
| text = text.replace("\n", " ") | |
| # text = text.replace(string.punctuation, "") | |
| response = self.__client.embeddings.create(input = [text], model=self.embed_model) | |
| return response.data[0].embedding | |
| if not os.path.exists(f"ML_text_embeddings_{self.chunk_size}.pkl"): | |
| embeddings = [] | |
| for text in tqdm(self.chunked_texts): | |
| embeddings.append(get_embedding(text)) | |
| with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "wb") as outfile: | |
| pickle.dump(embeddings, outfile) | |
| else: | |
| print("--- found existing embeddings file. Shortcutting.") | |
| with open(f"ML_text_embeddings_{self.chunk_size}.pkl", "rb") as infile: | |
| embeddings:list[list[float]] = pickle.load(infile) | |
| return embeddings | |
| def query_pinecone_vector_store(self, query:str, top_k:int = 5, namespace:str = None): | |
| namespace = namespace or self.namespace | |
| assert namespace in self.__index.describe_index_stats().get('namespaces') | |
| response = self.vectorstore.similarity_search_with_relevance_scores(query=query, | |
| k=top_k, | |
| namespace=namespace) | |
| return tuple(zip(*response)) | |
| def client_chat(self, messages, model=None): | |
| model = model or self.chat_model | |
| response = self.__client.chat.completions.create(messages=messages, model=model) | |
| return response.choices[0].message.content | |
| if __name__ == "__main__": | |
| tester = API_Interface() | |
| my_query = "What is the backpropogation algorithm?" | |
| response = tester.query_pinecone_vector_store(my_query) | |
| for doc in response: | |
| print(doc.metadata["page_number"], doc.page_content, "\n\n") |