Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import fitz # PyMuPDF | |
| import re | |
| from pathlib import Path | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain_chroma import Chroma | |
| from langchain.retrievers.multi_query import MultiQueryRetriever | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_openai import ChatOpenAI | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| openai_api_key = "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" | |
| def extract_text_from_pdf(pdf_file): | |
| document = fitz.open(pdf_file) | |
| text = "" | |
| for page_num in range(len(document)): | |
| page = document.load_page(page_num) | |
| text += page.get_text() | |
| document.close() | |
| return text | |
| def clean_text(text): | |
| cleaned_text = re.sub(r'\s+', ' ', text) | |
| cleaned_text = re.sub(r'(.)\1{2,}', r'\1', cleaned_text) | |
| cleaned_text = re.sub(r'\b(\w+)\b(?:\s+\1\b)+', r'\1', cleaned_text) | |
| return cleaned_text.strip() | |
| def initialize_chatbot(cleaned_text, openai_api_key): | |
| embeddings = OpenAIEmbeddings(api_key=openai_api_key) | |
| text_splitter = SemanticChunker(embeddings) | |
| docs = text_splitter.create_documents([cleaned_text]) | |
| vectorstore = Chroma.from_documents(documents=docs, embedding=embeddings) | |
| llm = ChatOpenAI(api_key=openai_api_key, temperature=0.5, model="gpt-4o", verbose=True) | |
| retriever = MultiQueryRetriever.from_llm(retriever=vectorstore.as_retriever(), llm=llm) | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| qa = ConversationalRetrievalChain.from_llm(llm, retriever=retriever, memory=memory) | |
| return qa | |
| def answer_query(pdf_file, question): | |
| extracted_text = extract_text_from_pdf(pdf_file) | |
| cleaned_text = clean_text(extracted_text) | |
| qa = initialize_chatbot(cleaned_text, openai_api_key) | |
| result = qa({"question": question}) | |
| return result['answer'] | |
| def process_pdf_and_question(pdf_file, question, chat_history): | |
| if pdf_file is None: | |
| chat_history.append(("System", "Please upload a PDF file.")) | |
| return chat_history | |
| if not question.strip(): | |
| chat_history.append(("System", "Please enter a question.")) | |
| return chat_history | |
| answer = answer_query(pdf_file, question) | |
| chat_history.append((question, answer)) | |
| return chat_history | |
| with gr.Blocks() as demo: | |
| chat_history = gr.State([]) | |
| upload = gr.File(label="Upload PDF") | |
| chatbot = gr.Chatbot(label="Chat History") | |
| question = gr.Textbox(label="Ask a question", placeholder="Type your question and hit Enter") | |
| question.submit( | |
| fn=process_pdf_and_question, | |
| inputs=[upload, question, chat_history], | |
| outputs=[chatbot], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |