| import urllib.request |
| import fitz |
| import re |
| import numpy as np |
| import tensorflow_hub as hub |
| import openai |
| import gradio as gr |
| import os |
| from sklearn.neighbors import NearestNeighbors |
|
|
| def download_pdf(url, output_path): |
| urllib.request.urlretrieve(url, output_path) |
|
|
|
|
| def preprocess(text): |
| text = text.replace('\n', ' ') |
| text = re.sub('\s+', ' ', text) |
| return text |
|
|
|
|
| def pdf_to_text(path, start_page=1, end_page=None): |
| doc = fitz.open(path) |
| total_pages = doc.page_count |
|
|
| if end_page is None: |
| end_page = total_pages |
|
|
| text_list = [] |
|
|
| for i in range(start_page-1, end_page): |
| text = doc.load_page(i).get_text("text") |
| text = preprocess(text) |
| text_list.append(text) |
|
|
| doc.close() |
| return text_list |
|
|
|
|
| def text_to_chunks(texts, word_length=150, start_page=1): |
| text_toks = [t.split(' ') for t in texts] |
| page_nums = [] |
| chunks = [] |
| |
| for idx, words in enumerate(text_toks): |
| for i in range(0, len(words), word_length): |
| chunk = words[i:i+word_length] |
| if (i+word_length) > len(words) and (len(chunk) < word_length) and ( |
| len(text_toks) != (idx+1)): |
| text_toks[idx+1] = chunk + text_toks[idx+1] |
| continue |
| chunk = ' '.join(chunk).strip() |
| chunk = f'[{idx+start_page}]' + ' ' + '"' + chunk + '"' |
| chunks.append(chunk) |
| return chunks |
|
|
|
|
| class SemanticSearch: |
| |
| def __init__(self): |
| self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4') |
| self.fitted = False |
| |
| |
| def fit(self, data, batch=1000, n_neighbors=5): |
| self.data = data |
| self.embeddings = self.get_text_embedding(data, batch=batch) |
| n_neighbors = min(n_neighbors, len(self.embeddings)) |
| self.nn = NearestNeighbors(n_neighbors=n_neighbors) |
| self.nn.fit(self.embeddings) |
| self.fitted = True |
| |
| |
| def __call__(self, text, return_data=True): |
| inp_emb = self.use([text]) |
| neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] |
| |
| if return_data: |
| return [self.data[i] for i in neighbors] |
| else: |
| return neighbors |
| |
| |
| def get_text_embedding(self, texts, batch=1000): |
| embeddings = [] |
| for i in range(0, len(texts), batch): |
| text_batch = texts[i:(i+batch)] |
| emb_batch = self.use(text_batch) |
| embeddings.append(emb_batch) |
| embeddings = np.vstack(embeddings) |
| return embeddings |
|
|
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| def load_recommender(path, start_page=1): |
| global recommender |
| pdf_file = os.path.basename(path) |
| embeddings_file = f"{pdf_file}_{start_page}.npy" |
| |
| if os.path.isfile(embeddings_file): |
| embeddings = np.load(embeddings_file) |
| recommender.embeddings = embeddings |
| recommender.fitted = True |
| return "Embeddings loaded from file" |
| |
| texts = pdf_to_text(path, start_page=start_page) |
| chunks = text_to_chunks(texts, start_page=start_page) |
| recommender.fit(chunks) |
| np.save(embeddings_file, recommender.embeddings) |
| return 'Corpus Loaded.' |
|
|
|
|
|
|
| def generate_text(openAI_key,prompt, engine="text-davinci-003"): |
| openai.api_key = openAI_key |
| completions = openai.Completion.create( |
| engine=engine, |
| prompt=prompt, |
| max_tokens=512, |
| n=1, |
| stop=None, |
| temperature=0.7, |
| ) |
| message = completions.choices[0].text |
| return message |
| |
| def generate_text2(openAI_key, prompt, engine="gpt-3.5-turbo-0301"): |
| openai.api_key = openAI_key |
| messages = [{'role': 'system', 'content': 'You are a helpful assistant.'}, |
| {'role': 'user', 'content': prompt}] |
| |
| completions = openai.ChatCompletion.create( |
| model=engine, |
| messages=messages, |
| max_tokens=512, |
| n=1, |
| stop=None, |
| temperature=0.7, |
| ) |
| message = completions.choices[0].message['content'] |
| return message |
|
|
| def generate_answer(question,openAI_key): |
| topn_chunks = recommender(question) |
| prompt = "" |
| prompt += 'search results:\n\n' |
| for c in topn_chunks: |
| prompt += c + '\n\n' |
| |
| prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. "\ |
| "Cite each reference using [ Page Number] notation (every result has this number at the beginning). "\ |
| "Citation should be done at the end of each sentence. If the search results mention multiple subjects "\ |
| "with the same name, create separate answers for each. Only include information found in the results and "\ |
| "don't add any additional information. Make sure the answer is correct and don't output false content. "\ |
| "If the text does not relate to the query, simply state 'Text Not Found in PDF'. Ignore outlier "\ |
| "search results which has nothing to do with the question. Only answer what is asked. The "\ |
| "answer should be short and concise. Answer step-by-step. \n\nQuery: {question}\nAnswer: " |
| |
| prompt += f"Query: {question}\nAnswer:" |
| answer = generate_text(openAI_key, prompt,"text-davinci-003") |
| return answer |
|
|
|
|
| def question_answer(url, file, question,openAI_key): |
| if openAI_key.strip()=='': |
| return '[ERROR]: Please enter you Open AI Key. Get your key here : https://platform.openai.com/account/api-keys' |
| if url.strip() == '' and file == None: |
| return '[ERROR]: Both URL and PDF is empty. Provide atleast one.' |
| |
| if url.strip() != '' and file != None: |
| return '[ERROR]: Both URL and PDF is provided. Please provide only one (eiter URL or PDF).' |
|
|
| if url.strip() != '': |
| glob_url = url |
| download_pdf(glob_url, 'corpus.pdf') |
| load_recommender('corpus.pdf') |
|
|
| else: |
| old_file_name = file.name |
| file_name = file.name |
| file_name = file_name[:-12] + file_name[-4:] |
| os.rename(old_file_name, file_name) |
| load_recommender(file_name) |
|
|
| if question.strip() == '': |
| return '[ERROR]: Question field is empty' |
|
|
| return generate_answer(question,openAI_key) |
|
|
|
|
| recommender = SemanticSearch() |
|
|
| title = 'Chat with Your PDFs' |
| description = """ Instructions |
| 1. Input your API Key |
| 2. Upload PDF""" |
|
|
| with gr.Blocks() as demo: |
|
|
| gr.Markdown(f'<center><h1>{title}</h1></center>') |
| gr.Markdown(description) |
|
|
| with gr.Row(): |
| |
| with gr.Group(): |
| gr.Markdown(f'<p style="text-align:center">Get your Open AI API key <a href="https://platform.openai.com/account/api-keys">here</a></p>') |
| openAI_key=gr.Textbox(label='Enter your OpenAI API key here') |
| url = gr.Textbox(label='Enter PDF URL here') |
| gr.Markdown("<center><h4>OR<h4></center>") |
| file = gr.File(label='Upload your PDF/ Research Paper / Book here', file_types=['.pdf']) |
| question = gr.Textbox(label='Enter your question here') |
| btn = gr.Button(value='Submit') |
| btn.style(full_width=True) |
|
|
| with gr.Group(): |
| answer = gr.Textbox(label='Answer :') |
|
|
| btn.click(question_answer, inputs=[url, file, question,openAI_key], outputs=[answer]) |
| |
| demo.launch() |
|
|
|
|
| |
|
|
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |