Spaces:
Build error
Build error
| import os | |
| import gradio as gr | |
| import numpy as np | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.document_loaders import DirectoryLoader, PyPDFLoader | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.prompts import PromptTemplate | |
| from langchain_community.llms import HuggingFaceHub | |
| from dotenv import load_dotenv | |
| from langchain_openai import ChatOpenAI | |
| import shutil | |
| # Define directory variable | |
| load_dotenv(dotenv_path=os.path.join(os.getcwd(), '.env')) | |
| DOCUMENTS_DIR = "documents" | |
| # Set up environment variables for HuggingFace | |
| huggingface_token = os.getenv("HUGGINGFACE_API_TOKEN") | |
| os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY') | |
| if huggingface_token: | |
| os.environ["HUGGINGFACEHUB_API_TOKEN"] = huggingface_token | |
| # # Remove the existing documents directory if it exists | |
| # if os.path.exists(DOCUMENTS_DIR): | |
| # shutil.rmtree(DOCUMENTS_DIR) | |
| llm = ChatOpenAI(temperature=0.7, model_name="gpt-3.5-turbo") | |
| # Create a directory for document storage if it doesn't exist | |
| os.makedirs(DOCUMENTS_DIR, exist_ok=True) | |
| # Function to load documents | |
| def load_documents(directory=DOCUMENTS_DIR): | |
| print("Entered load documents") | |
| documents = [] | |
| # Find all PDF files | |
| pdf_files = [] | |
| for root, _, files in os.walk(directory): | |
| for file in files: | |
| if file.lower().endswith('.pdf'): | |
| pdf_files.append(os.path.join(root, file)) | |
| print(f"Found {len(pdf_files)} PDF files") | |
| # Process each PDF with error handling | |
| for pdf_path in pdf_files: | |
| try: | |
| print(f"Processing {pdf_path}") | |
| loader = PyPDFLoader(pdf_path) | |
| file_documents = loader.load() | |
| documents.extend(file_documents) | |
| print(f"Successfully loaded {pdf_path}") | |
| except Exception as e: | |
| print(f"Failed to load {pdf_path}: {str(e)}") | |
| print(f"Successfully loaded {len(documents)} documents") | |
| return documents | |
| # Function to process documents and create vector store | |
| def process_documents(): | |
| documents = load_documents() | |
| # Split documents into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=400, | |
| chunk_overlap=150 | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| # Create embeddings | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # Create vector store | |
| vector_store = FAISS.from_documents(chunks, embeddings) | |
| return vector_store | |
| # Create RAG chain | |
| def create_chain(vector_store): | |
| if not os.getenv("HUGGINGFACEHUB_API_TOKEN"): | |
| return None | |
| # llm = HuggingFaceHub( | |
| # repo_id="google/flan-t5-large", | |
| # model_kwargs={"temperature": 0.5, "max_length": 512} | |
| # ) | |
| memory = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| qa_prompt = PromptTemplate.from_template(""" | |
| You are a helpful assistant for answering questions about documents. | |
| Context information is below. | |
| --------------------- | |
| {context} | |
| --------------------- | |
| Given the context information and not prior knowledge, answer the question: {question} | |
| If the context is not provided, please respond saying, no context was found | |
| """) | |
| chain = ConversationalRetrievalChain.from_llm( | |
| llm=llm, | |
| retriever=vector_store.as_retriever(search_kwargs={"k": 3}), | |
| memory=memory, | |
| combine_docs_chain_kwargs={"prompt": qa_prompt} | |
| ) | |
| return chain | |
| # Initialize variables for handling chat state | |
| vector_store = None | |
| chain = None | |
| chat_history = [] | |
| # Function to handle file uploads | |
| import shutil | |
| def upload_file(files): | |
| print("Entered file processing:") | |
| print(files) | |
| try: | |
| # Clear existing documents if uploading new ones | |
| for f in os.listdir(DOCUMENTS_DIR): | |
| file_path = os.path.join(DOCUMENTS_DIR, f) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| # Process uploaded files | |
| for file in files: | |
| if isinstance(file, str) and os.path.isfile(file): | |
| file_name = os.path.basename(file) | |
| dest_path = os.path.join(DOCUMENTS_DIR, file_name) | |
| shutil.copy(file, dest_path) | |
| print(f"Copied {file} to {dest_path}") | |
| else: | |
| return f"Invalid file format or file not found: {file}" | |
| # Process documents and create vector store | |
| global vector_store, chain | |
| vector_store = process_documents() | |
| chain = create_chain(vector_store) | |
| if chain is None: | |
| return "Files uploaded and processed, but HuggingFace API token is missing. Set the environment variable to enable the chatbot." | |
| return "Files uploaded and processed successfully!" | |
| except Exception as e: | |
| return f"Error processing files: {str(e)}" | |
| # Function to handle user queries | |
| def chat(message, history): | |
| global chain, chat_history, vector_store | |
| if vector_store is None: | |
| if os.path.exists(DOCUMENTS_DIR) and any(os.path.isfile(os.path.join(DOCUMENTS_DIR, f)) for f in os.listdir(DOCUMENTS_DIR)): | |
| vector_store = process_documents() | |
| chain = create_chain(vector_store) | |
| else: | |
| return history + [[message, "Please upload documents first to initialize the chatbot."]] | |
| if chain is None: | |
| return history + [[message, "HuggingFace API token is not set. Please set the HUGGINGFACE_API_TOKEN environment variable."]] | |
| try: | |
| if history: | |
| chat_history = [(turn[0], turn[1]) for turn in history] | |
| response = chain({"question": message}) | |
| answer = response['answer'] | |
| return history + [[message, answer]] | |
| except Exception as e: | |
| error_message = f"Error processing your request: {str(e)}" | |
| return history + [[message, error_message]] | |
| # Create Gradio interface | |
| with gr.Blocks(title="RAG Chatbot") as demo: | |
| gr.Markdown("# RAG-based Conversational Chatbot") | |
| gr.Markdown("Upload text documents and chat with an AI that can answer questions based on their content.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_output = gr.Textbox(label="Upload Status") | |
| file_input = gr.File( | |
| file_count="multiple", | |
| label="Upload Documents (.txt files)", | |
| type="filepath" | |
| ) | |
| upload_button = gr.Button("Process Documents") | |
| upload_button.click(upload_file, inputs=[file_input], outputs=[file_output]) | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(height=400) | |
| msg = gr.Textbox(label="Ask a question about your documents") | |
| msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot]) | |
| clear = gr.Button("Clear") | |
| clear.click(lambda: [], outputs=[chatbot]) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() | |