Spaces:
Runtime error
Runtime error
| # Import necessary modules | |
| import os | |
| import re | |
| import time | |
| from io import BytesIO | |
| from typing import Any, Dict, List | |
| import openai | |
| import streamlit as st | |
| from langchain import LLMChain, OpenAI | |
| from langchain.agents import AgentExecutor, Tool, ZeroShotAgent | |
| from langchain.chains import RetrievalQA | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.docstore.document import Document | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.llms import OpenAI | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.vectorstores import VectorStore | |
| from langchain.vectorstores.faiss import FAISS | |
| from pypdf import PdfReader | |
| # Define a function to parse a PDF file and extract its text content | |
| def parse_pdf(file: BytesIO) -> List[str]: | |
| pdf = PdfReader(file) | |
| output = [] | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| # Merge hyphenated words | |
| text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text) | |
| # Fix newlines in the middle of sentences | |
| text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip()) | |
| # Remove multiple newlines | |
| text = re.sub(r"\n\s*\n", "\n\n", text) | |
| output.append(text) | |
| return output | |
| # Define a function to convert text content to a list of documents | |
| def text_to_docs(text: str) -> List[Document]: | |
| """Converts a string or list of strings to a list of Documents | |
| with metadata.""" | |
| if isinstance(text, str): | |
| # Take a single string as one page | |
| text = [text] | |
| page_docs = [Document(page_content=page) for page in text] | |
| # Add page numbers as metadata | |
| for i, doc in enumerate(page_docs): | |
| doc.metadata["page"] = i + 1 | |
| # Split pages into chunks | |
| doc_chunks = [] | |
| for doc in page_docs: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=4000, | |
| separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""], | |
| chunk_overlap=0, | |
| ) | |
| chunks = text_splitter.split_text(doc.page_content) | |
| for i, chunk in enumerate(chunks): | |
| doc = Document( | |
| page_content=chunk, metadata={"page": doc.metadata["page"], "chunk": i} | |
| ) | |
| # Add sources a metadata | |
| doc.metadata["source"] = f"{doc.metadata['page']}-{doc.metadata['chunk']}" | |
| doc_chunks.append(doc) | |
| return doc_chunks | |
| # Define a function for the embeddings | |
| def test_embed(): | |
| embeddings = OpenAIEmbeddings(openai_api_key=api) | |
| # Indexing | |
| # Save in a Vector DB | |
| with st.spinner("It's indexing..."): | |
| index = FAISS.from_documents(pages, embeddings) | |
| st.success("Embeddings done.", icon="β ") | |
| return index | |
| # Set up the Streamlit app | |
| st.title("π€ Document AI with Memory π§ ") | |
| st.markdown( | |
| """ | |
| #### π¨οΈ Chat with your PDF files π + `Conversational Buffer Memory` | |
| > *powered by [LangChain]('https://langchain.readthedocs.io/en/latest/modules/memory.html#memory') + | |
| [OpenAI]('https://platform.openai.com/docs/models/gpt-3-5') + [HuggingFace](https://www.huggingface.co/)* | |
| """ | |
| ) | |
| st.markdown( | |
| """ | |
| `openai` | |
| `langchain` | |
| `tiktoken` | |
| `pypdf` | |
| `faiss-cpu` | |
| --------- | |
| """ | |
| ) | |
| # Set up the sidebar | |
| st.sidebar.markdown( | |
| """ | |
| ### Steps: | |
| 1. Upload PDF File | |
| 2. Enter Your Secret Key for Embeddings | |
| 3. Perform Q&A | |
| **Note : File content and API key not stored in any form.** | |
| """ | |
| ) | |
| # Allow the user to upload a PDF file | |
| uploaded_file = st.file_uploader("**Upload Your PDF File**", type=["pdf"]) | |
| if uploaded_file: | |
| name_of_file = uploaded_file.name | |
| doc = parse_pdf(uploaded_file) | |
| pages = text_to_docs(doc) | |
| if pages: | |
| # Allow the user to select a page and view its content | |
| with st.expander("Show Page Content", expanded=False): | |
| page_sel = st.number_input( | |
| label="Select Page", min_value=1, max_value=len(pages), step=1 | |
| ) | |
| pages[page_sel - 1] | |
| # Use OpenAI API key from environment or allow the user to enter it | |
| api = os.environ.get("OPENAI_API_KEY") or st.text_input( | |
| "**Enter OpenAI API Key**", | |
| type="password", | |
| placeholder="sk-", | |
| help="https://platform.openai.com/account/api-keys", | |
| ) | |
| if api: | |
| # Test the embeddings and save the index in a vector database | |
| index = test_embed() | |
| # Set up the question-answering system | |
| qa = RetrievalQA.from_chain_type( | |
| llm=OpenAI(openai_api_key=api), | |
| chain_type="stuff", | |
| retriever=index.as_retriever(), | |
| ) | |
| # Set up the conversational agent | |
| tools = [ | |
| Tool( | |
| name="PDF QA System", | |
| func=qa.run, | |
| description="Useful for when you need to answer questions about the aspects asked. Input may be a partial or fully formed question.", | |
| ) | |
| ] | |
| prefix = """Have a conversation with a human, answering the following questions as best you can based on the context and memory available. | |
| You have access to a single tool:""" | |
| suffix = """Begin!" | |
| {chat_history} | |
| Question: {input} | |
| {agent_scratchpad}""" | |
| prompt = ZeroShotAgent.create_prompt( | |
| tools, | |
| prefix=prefix, | |
| suffix=suffix, | |
| input_variables=["input", "chat_history", "agent_scratchpad"], | |
| ) | |
| if "memory" not in st.session_state: | |
| st.session_state.memory = ConversationBufferMemory( | |
| memory_key="chat_history" | |
| ) | |
| llm_chain = LLMChain( | |
| llm=OpenAI( | |
| temperature=0, openai_api_key=api, model_name="gpt-3.5-turbo" | |
| ), | |
| prompt=prompt, | |
| ) | |
| agent = ZeroShotAgent(llm_chain=llm_chain, tools=tools, verbose=True) | |
| agent_chain = AgentExecutor.from_agent_and_tools( | |
| agent=agent, tools=tools, verbose=True, memory=st.session_state.memory | |
| ) | |
| # Allow the user to enter a query and generate a response | |
| query = st.text_input( | |
| "**What's on your mind?**", | |
| placeholder="Ask me anything from {}".format(name_of_file), | |
| ) | |
| if query: | |
| with st.spinner( | |
| "Generating Answer to your Query : `{}` ".format(query) | |
| ): | |
| res = agent_chain.run(query) | |
| st.info(res, icon="π€") | |
| # Allow the user to view the conversation history and other information stored in the agent's memory | |
| with st.expander("History/Memory"): | |
| st.session_state.memory | |