Spaces:
Sleeping
Sleeping
| import urllib.request | |
| import fitz | |
| import re | |
| import openai | |
| import os | |
| from semantic_search import SemanticSearch | |
| recommender = SemanticSearch() | |
| def download_pdf(url, output_path): | |
| urllib.request.urlretrieve(url, output_path) | |
| def preprocess(text): | |
| text = text.replace('\n', ' ') | |
| text = re.sub('\s+', ' ', text) | |
| return text | |
| # converts pdf to text | |
| def pdf_to_text(path, start_page=1, end_page=None): | |
| doc = fitz.open(path) | |
| total_pages = doc.page_count | |
| if end_page is None: | |
| end_page = total_pages | |
| text_list = [] | |
| for i in range(start_page-1, end_page): | |
| text = doc.load_page(i).get_text("text") | |
| text = preprocess(text) | |
| text_list.append(text) | |
| doc.close() | |
| return text_list | |
| # converts a text into a list of chunks | |
| def text_to_chunks(texts, word_length=150, start_page=1, file_number=1): | |
| filtered_texts = [''.join(char for char in text if ord(char) < 128) for text in texts] | |
| text_toks = [t.split(' ') for t in filtered_texts] | |
| chunks = [] | |
| for idx, words in enumerate(text_toks): | |
| for i in range(0, len(words), word_length): | |
| chunk = words[i:i+word_length] | |
| if (i+word_length) > len(words) and (len(chunk) < word_length) and ( | |
| len(text_toks) != (idx+1)): | |
| text_toks[idx+1] = chunk + text_toks[idx+1] | |
| continue | |
| chunk = ' '.join(chunk).strip() | |
| chunk = f'[PDF no. {file_number}] [Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"' | |
| chunks.append(chunk) | |
| return chunks | |
| # merges a list of pdfs into a list of chunks and fits the recommender | |
| def load_recommender(paths, start_page=1): | |
| global recommender | |
| chunks = [] | |
| print("working") | |
| for idx, path in enumerate(paths): | |
| chunks += text_to_chunks(pdf_to_text(path, start_page=start_page), start_page=start_page, file_number=idx+1) | |
| recommender.fit(chunks) | |
| return 'Corpus Loaded.' | |
| # calls the OpenAI API to generate a response for the given query | |
| def generate_text(openAI_key, prompt, model="gpt-3.5-turbo"): | |
| openai.api_key = openAI_key | |
| temperature=0.7 | |
| max_tokens=256 | |
| top_p=1 | |
| frequency_penalty=0 | |
| presence_penalty=0 | |
| if model == "text-davinci-003": | |
| completions = openai.Completion.create( | |
| engine=model, | |
| prompt=prompt, | |
| max_tokens=max_tokens, | |
| n=1, | |
| stop=None, | |
| temperature=temperature, | |
| ) | |
| message = completions.choices[0].text | |
| else: | |
| message = openai.ChatCompletion.create( | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "assistant", "content": "Here is some initial assistant message."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=.3, | |
| max_tokens=max_tokens, | |
| top_p=top_p, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| ).choices[0].message['content'] | |
| return message | |
| # constructs the prompt for the given query | |
| def construct_prompt(question, openAI_key): | |
| topn_chunks = recommender(question) | |
| topn_chunks = summarize_ss_results_if_needed(openAI_key, topn_chunks, model="gpt-3.5-turbo") | |
| prompt = 'search results:\n\n' | |
| for c in topn_chunks: | |
| prompt += c + '\n\n' | |
| prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. "\ | |
| "Cite each reference using [PDF Number][Page Number] notation. "\ | |
| "Only answer what is asked. The answer should be short and concise. \n\nQuery: " | |
| prompt += f"{question}\nAnswer:" | |
| print("prompt == " + str(prompt)) | |
| return prompt | |
| # main function that is called when the user clicks the submit button, generates an answer for the query | |
| def question_answer(chat_history, url, files, question, openAI_key, model): | |
| try: | |
| if files == None: | |
| files = [] | |
| if openAI_key.strip()=='': | |
| return '[ERROR]: Please enter your Open AI Key. Get your key here : https://platform.openai.com/account/api-keys' | |
| if url.strip() == '' and files == []: | |
| return '[ERROR]: Both URL and PDF is empty. Provide at least one.' | |
| if url.strip() != '' and files is not []: | |
| return '[ERROR]: Both URL and PDF is provided. Please provide only one (either URL or PDF).' | |
| if model is None or model =='': | |
| return '[ERROR]: You have not selected any model. Please choose an LLM model.' | |
| if url.strip() != '': | |
| glob_url = url | |
| download_pdf(glob_url, 'corpus.pdf') | |
| load_recommender('corpus.pdf') | |
| else: | |
| print(files) | |
| filenames = [] | |
| for file in files: | |
| old_file_name = file.name | |
| file_name = file.name | |
| file_name = file_name[:-12] + file_name[-4:] | |
| os.rename(old_file_name, file_name) | |
| filenames.append(file_name) | |
| load_recommender(filenames) | |
| if question.strip() == '': | |
| return '[ERROR]: Question field is empty' | |
| prompt = construct_prompt(question, openAI_key) | |
| answer = generate_text(openAI_key, prompt, model) | |
| chat_history.append([question, answer]) | |
| return chat_history | |
| except openai.error.InvalidRequestError as e: | |
| return f'[ERROR]: Either you do not have access to GPT4 or you have exhausted your quota!' | |
| def summarize_ss_results_if_needed(openAI_key, chunks, model, token_limit=8000): | |
| total_tokens = sum(len(chunk.split()) for chunk in chunks) | |
| if total_tokens > token_limit: | |
| print("has to summarize") | |
| summary_prompt = "Summarize the following text, while keeping important information, facts and figures. It is also very important to keep the [PDF Number][Page number] notation intact!\n\n" | |
| for c in chunks: | |
| summary_prompt += c + '\n\n' | |
| print(summary_prompt) | |
| return generate_text(openAI_key, summary_prompt, model=model) | |
| else: | |
| return chunks |