Spaces:
Runtime error
Runtime error
| import os | |
| from typing import Any | |
| import gradio as gr | |
| import openai | |
| import pandas as pd | |
| from IPython.display import Markdown, display | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.indexes import VectorstoreIndexCreator | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.llms import OpenAI | |
| from langchain.vectorstores import DocArrayInMemorySearch | |
| from uuid import uuid4 | |
| css_style = """ | |
| .gradio-container { | |
| font-family: "IBM Plex Mono"; | |
| } | |
| """ | |
| class myClass: | |
| def __init__(self) -> None: | |
| self.openapi = "" | |
| self.valid_key = False | |
| self.docs_ready = False | |
| self.status = "⚠️Waiting for documents and key⚠️" | |
| self.uuid = uuid4() | |
| pass | |
| def check_status(self): | |
| if self.docs_ready and self.valid_key: | |
| out = "✨Ready✨" | |
| elif self.docs_ready: | |
| out = "⚠️Waiting for key⚠️" | |
| elif self.valid_key: | |
| out = "⚠️Waiting for documents⚠️" | |
| else: | |
| out = "⚠️Waiting for documents and key⚠️" | |
| self.status = out | |
| def validate_key(self, myin): | |
| assert isinstance(myin, str) | |
| self.valid_key = True | |
| self.openai_api_key = myin.strip() | |
| self.embedding = OpenAIEmbeddings(openai_api_key=self.openai_api_key) | |
| self.llm = OpenAI(openai_api_key=self.openai_api_key) | |
| self.check_status() | |
| return [self.status] | |
| def request_pathname(self, files, data): | |
| if files is None: | |
| self.docs_ready = False | |
| self.check_status() | |
| return ( | |
| pd.DataFrame(data, columns=["filepath", "citation string", "key"]), | |
| self.status, | |
| ) | |
| for file in files: | |
| # make sure we're not duplicating things in the dataset | |
| if file.name in [x[0] for x in data]: | |
| continue | |
| data.append([file.name, None, None]) | |
| mydataset = pd.DataFrame(data, columns=["filepath", "citation string", "key"]) | |
| validation_button = self.validate_dataset(mydataset) | |
| return mydataset, validation_button | |
| def validate_dataset(self, dataset): | |
| self.docs_ready = dataset.iloc[-1, 0] != "" | |
| self.dataset = dataset | |
| self.check_status() | |
| if self.status == "✨Ready✨": | |
| self.get_index() | |
| return self.status | |
| def get_index(self): | |
| if self.docs_ready and self.valid_key: | |
| # os.environ["OPENAI_API_KEY"] = self.openai_api_key | |
| # myfile = "Angela Merkel - Wikipedia.pdf" | |
| # loader = PyPDFLoader(file_path=myfile) | |
| loaders = [PyPDFLoader(f) for f in self.dataset["filepath"]] | |
| self.index = VectorstoreIndexCreator( | |
| vectorstore_cls=DocArrayInMemorySearch, | |
| embedding=self.embedding, | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| # Set a really small chunk size, just to show. | |
| chunk_size = 1000, | |
| chunk_overlap = 20, | |
| length_function = len, | |
| separators="." | |
| ) | |
| ).from_loaders(loaders=loaders) | |
| # del os.environ["OPENAI_API_KEY"] | |
| pass | |
| def do_ask(self, question): | |
| # os.environ["OPENAI_API_KEY"] = self.openai_api_key | |
| # openai.api_key = self.openai_api_key | |
| if self.status == "✨Ready✨": | |
| # os.environ["OPENAI_API_KEY"] = self.openai_api_key | |
| response = self.index.query(question=question, llm=self.llm) | |
| # del os.environ["OPENAI_API_KEY"] | |
| yield response | |
| pass | |
| def validate_key(myInstance: myClass, openai_api_key): | |
| if myInstance is None: | |
| myInstance = myClass() | |
| out = myInstance.validate_key(openai_api_key) | |
| return myInstance, *out | |
| def request_pathname(myInstance: myClass, files, data): | |
| if myInstance is None: | |
| myInstance = myClass() | |
| out = myInstance.request_pathname(files, data) | |
| return myInstance, *out | |
| def do_ask(myInstance: myClass, question): | |
| out = myInstance.do_ask(question) | |
| return myInstance, *out | |
| with gr.Blocks(css=css_style) as demo: | |
| myInstance = gr.State() | |
| openai_api_key = gr.State("") | |
| docs = gr.State() | |
| data = gr.State([]) | |
| index = gr.State() | |
| gr.Markdown( | |
| """ | |
| # Document Question and Answer | |
| *By D8a.ai* | |
| Idea based on https://huggingface.co/spaces/whitead/paper-qa | |
| Significant advances in langchain have made it possible to simplify the code. | |
| This tool allows you to ask questions of your uploaded text, PDF documents. | |
| It uses OpenAI's GPT models, so you need to enter your API key below. This | |
| tool is under active development and currently uses a lot of tokens - up to 10,000 | |
| for a single query. This is $0.10-0.20 per query, so please be careful! | |
| * [langchain](https://github.com/hwchase17/langchain) is the main library this tool utilizes. | |
| 1. Enter API Key ([What is that?](https://platform.openai.com/account/api-keys)) | |
| 2. Upload your documents | |
| 3. Ask questions | |
| """ | |
| ) | |
| openai_api_key = gr.Textbox( | |
| label="OpenAI API Key", placeholder="sk-...", type="password" | |
| ) | |
| with gr.Tab("File upload"): | |
| uploaded_files = gr.File( | |
| label="Upload your pdf Dokument", file_count="multiple" | |
| ) | |
| with gr.Accordion("See Docs:", open=False): | |
| dataset = gr.Dataframe( | |
| headers=["filepath", "citation string", "key"], | |
| datatype=["str", "str", "str"], | |
| col_count=(3, "fixed"), | |
| interactive=False, | |
| label="Documents and Citations", | |
| overflow_row_behaviour="paginate", | |
| max_rows=5, | |
| ) | |
| buildb = gr.Textbox( | |
| "⚠️Waiting for documents and key...", | |
| label="Status", | |
| interactive=False, | |
| show_label=True, | |
| max_lines=1, | |
| ) | |
| query = gr.Textbox(placeholder="Enter your question here...", label="Question") | |
| ask = gr.Button("Ask Question") | |
| answer = gr.Markdown(label="Answer") | |
| openai_api_key.change( | |
| validate_key, inputs=[myInstance, openai_api_key], outputs=[myInstance, buildb] | |
| ) | |
| uploaded_files.change( | |
| request_pathname, | |
| inputs=[myInstance, uploaded_files, data], | |
| outputs=[myInstance, dataset, buildb], | |
| ) | |
| ask.click( | |
| do_ask, | |
| inputs=[myInstance, query], | |
| outputs=[myInstance, answer], | |
| ) | |
| demo.queue(concurrency_count=20) | |
| demo.launch(show_error=True) |