Spaces:
Sleeping
Sleeping
| import urllib.request | |
| import fitz | |
| import re | |
| import numpy as np | |
| import tensorflow_hub as hub | |
| import openai | |
| import gradio as gr | |
| import os | |
| import zipfile | |
| from sklearn.neighbors import NearestNeighbors | |
| openai.api_key = os.getenv('OpenAPI') | |
| def download_pdf(url, output_path): | |
| urllib.request.urlretrieve(url, output_path) | |
| def extract_zip(file): | |
| with zipfile.ZipFile(file, 'r') as zip_ref: | |
| for member in zip_ref.namelist(): | |
| filename = os.path.basename(member) | |
| if filename.endswith('.pdf'): | |
| zip_ref.extract(member, 'pdfs') | |
| def preprocess(text): | |
| text = text.replace('\n', ' ') | |
| text = re.sub('\s+', ' ', text) | |
| return text | |
| def pdf_to_text(path, start_page=1, end_page=None): | |
| doc = fitz.open(path) | |
| total_pages = doc.page_count | |
| if end_page is None: | |
| end_page = total_pages | |
| text_list = [] | |
| for i in range(start_page-1, end_page): | |
| text = doc.load_page(i).get_text("text") | |
| text = preprocess(text) | |
| text_list.append(text) | |
| doc.close() | |
| return text_list | |
| def text_to_chunks(texts, word_length=150, start_page=1): | |
| text_toks = [t.split(' ') for t in texts] | |
| chunks = [] | |
| for idx, words in enumerate(text_toks): | |
| for i in range(0, len(words), word_length): | |
| chunk = words[i:i+word_length] | |
| if (i+word_length) > len(words) and (len(chunk) < word_length) and ( | |
| len(text_toks) != (idx+1)): | |
| text_toks[idx+1] = chunk + text_toks[idx+1] | |
| continue | |
| chunk = ' '.join(chunk).strip() | |
| chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"' | |
| chunks.append(chunk) | |
| return chunks | |
| class SemanticSearch: | |
| def __init__(self): | |
| self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4') | |
| self.fitted = False | |
| def fit(self, data, batch=1000, n_neighbors=5): | |
| self.data = data | |
| self.embeddings = self.get_text_embedding(data, batch=batch) | |
| n_neighbors = min(n_neighbors, len(self.embeddings)) | |
| self.nn = NearestNeighbors(n_neighbors=n_neighbors) | |
| self.nn.fit(self.embeddings) | |
| self.fitted = True | |
| def __call__(self, text, return_data=True): | |
| inp_emb = self.use([text]) | |
| neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] | |
| if return_data: | |
| return [self.data[i] for i in neighbors] | |
| else: | |
| return neighbors | |
| def get_text_embedding(self, texts, batch=1000): | |
| embeddings = [] | |
| for i in range(0, len(texts), batch): | |
| text_batch = texts[i:(i+batch)] | |
| emb_batch = self.use(text_batch) | |
| embeddings.append(emb_batch) | |
| embeddings = np.vstack(embeddings) | |
| return embeddings | |
| recommender = SemanticSearch() | |
| def load_recommender(paths, start_page=1): | |
| global recommender | |
| chunks = [] | |
| for path in paths: | |
| if path.endswith('.pdf'): | |
| texts = pdf_to_text(path, start_page=start_page) | |
| chunks += text_to_chunks(texts, start_page=start_page) | |
| recommender.fit(chunks) | |
| return 'Corpus Loaded.' | |
| def generate_text(prompt, engine="davinci"): | |
| completions = openai.Completion.create( | |
| engine=engine, | |
| prompt=prompt, | |
| max_tokens=512, | |
| n=1, | |
| stop=None, | |
| temperature=0.7, | |
| ) | |
| message = completions.choices[0].text | |
| return message | |
| def generate_answer(question): | |
| topn_chunks = recommender(question) | |
| prompt = "" | |
| prompt += 'search results:\n\n' | |
| for c in topn_chunks: | |
| prompt += c + '\n\n' | |
| prompt += "Compose a comprehensive reply to the query using the search results given. "\ | |
| "Only include information found in the results. "\ | |
| "If the text does not relate to the query, simply state 'Text Not Found in Body of Knowledge'. "\ | |
| "\n\nQuery: {question}\n Answer: " | |
| answer = generate_text(prompt, "davinci") | |
| return answer | |
| def question_answer(urls, file, question): | |
| if urls.strip() == '' and file is None: | |
| return '[ERROR]: Both URLs and PDFs are empty. Provide at least one.' | |
| paths = [] | |
| if urls.strip() != '': | |
| urls = urls.split(',') # split the URLs string into a list of URLs | |
| for url in urls: | |
| download_pdf(url.strip(), 'corpus.pdf') | |
| paths.append('corpus.pdf') | |
| if file is not None: | |
| extract_zip(file.name) # extract the PDFs from the zip file | |
| for pdf_file in os.listdir('pdfs'): | |
| paths.append(os.path.join('pdfs', pdf_file)) | |
| load_recommender(paths) | |
| if question.strip() == '': | |
| return '[ERROR]: Question field is empty' | |
| return generate_answer(question) | |
| title = 'Cognitive AI Agent - Asks the Expert' | |
| description = """ This cognitive agent allows you to chat with your PDF files as a single corpus of knowledge. Add your relevant PDFs to a zip file and upload. 🛑PROOF OF CONCEPT🛑 """ | |
| iface = gr.Interface( | |
| fn=question_answer, | |
| inputs=[ | |
| gr.inputs.Textbox(label="Enter PDF URLs here, separated by commas"), | |
| gr.inputs.File(label="Upload a zip file containing PDF files"), | |
| gr.inputs.Textbox(label="Enter your question here"), | |
| ], | |
| outputs=gr.outputs.Textbox(label="Generated Answer"), | |
| title=title, | |
| description=description | |
| ) | |
| iface.launch() | |