import os import gradio as gr import numpy as np from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import DirectoryLoader, PyPDFLoader from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain.chains import ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from langchain.prompts import PromptTemplate from langchain_community.llms import HuggingFaceHub from dotenv import load_dotenv from langchain_openai import ChatOpenAI import shutil # Define directory variable load_dotenv(dotenv_path=os.path.join(os.getcwd(), '.env')) DOCUMENTS_DIR = "documents" # Set up environment variables for HuggingFace huggingface_token = os.getenv("HUGGINGFACE_API_TOKEN") os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY') if huggingface_token: os.environ["HUGGINGFACEHUB_API_TOKEN"] = huggingface_token # # Remove the existing documents directory if it exists # if os.path.exists(DOCUMENTS_DIR): # shutil.rmtree(DOCUMENTS_DIR) llm = ChatOpenAI(temperature=0.7, model_name="gpt-3.5-turbo") # Create a directory for document storage if it doesn't exist os.makedirs(DOCUMENTS_DIR, exist_ok=True) # Function to load documents def load_documents(directory=DOCUMENTS_DIR): print("Entered load documents") documents = [] # Find all PDF files pdf_files = [] for root, _, files in os.walk(directory): for file in files: if file.lower().endswith('.pdf'): pdf_files.append(os.path.join(root, file)) print(f"Found {len(pdf_files)} PDF files") # Process each PDF with error handling for pdf_path in pdf_files: try: print(f"Processing {pdf_path}") loader = PyPDFLoader(pdf_path) file_documents = loader.load() documents.extend(file_documents) print(f"Successfully loaded {pdf_path}") except Exception as e: print(f"Failed to load {pdf_path}: {str(e)}") print(f"Successfully loaded {len(documents)} documents") return documents # Function to process documents and create vector store def process_documents(): documents = load_documents() # Split documents into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=150 ) chunks = text_splitter.split_documents(documents) # Create embeddings embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") # Create vector store vector_store = FAISS.from_documents(chunks, embeddings) return vector_store # Create RAG chain def create_chain(vector_store): if not os.getenv("HUGGINGFACEHUB_API_TOKEN"): return None # llm = HuggingFaceHub( # repo_id="google/flan-t5-large", # model_kwargs={"temperature": 0.5, "max_length": 512} # ) memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) qa_prompt = PromptTemplate.from_template(""" You are a helpful assistant for answering questions about documents. Context information is below. --------------------- {context} --------------------- Given the context information and not prior knowledge, answer the question: {question} If the context is not provided, please respond saying, no context was found """) chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=vector_store.as_retriever(search_kwargs={"k": 3}), memory=memory, combine_docs_chain_kwargs={"prompt": qa_prompt} ) return chain # Initialize variables for handling chat state vector_store = None chain = None chat_history = [] # Function to handle file uploads import shutil def upload_file(files): print("Entered file processing:") print(files) try: # Clear existing documents if uploading new ones for f in os.listdir(DOCUMENTS_DIR): file_path = os.path.join(DOCUMENTS_DIR, f) if os.path.isfile(file_path): os.remove(file_path) # Process uploaded files for file in files: if isinstance(file, str) and os.path.isfile(file): file_name = os.path.basename(file) dest_path = os.path.join(DOCUMENTS_DIR, file_name) shutil.copy(file, dest_path) print(f"Copied {file} to {dest_path}") else: return f"Invalid file format or file not found: {file}" # Process documents and create vector store global vector_store, chain vector_store = process_documents() chain = create_chain(vector_store) if chain is None: return "Files uploaded and processed, but HuggingFace API token is missing. Set the environment variable to enable the chatbot." return "Files uploaded and processed successfully!" except Exception as e: return f"Error processing files: {str(e)}" # Function to handle user queries def chat(message, history): global chain, chat_history, vector_store if vector_store is None: if os.path.exists(DOCUMENTS_DIR) and any(os.path.isfile(os.path.join(DOCUMENTS_DIR, f)) for f in os.listdir(DOCUMENTS_DIR)): vector_store = process_documents() chain = create_chain(vector_store) else: return history + [[message, "Please upload documents first to initialize the chatbot."]] if chain is None: return history + [[message, "HuggingFace API token is not set. Please set the HUGGINGFACE_API_TOKEN environment variable."]] try: if history: chat_history = [(turn[0], turn[1]) for turn in history] response = chain({"question": message}) answer = response['answer'] return history + [[message, answer]] except Exception as e: error_message = f"Error processing your request: {str(e)}" return history + [[message, error_message]] # Create Gradio interface with gr.Blocks(title="RAG Chatbot") as demo: gr.Markdown("# RAG-based Conversational Chatbot") gr.Markdown("Upload text documents and chat with an AI that can answer questions based on their content.") with gr.Row(): with gr.Column(scale=1): file_output = gr.Textbox(label="Upload Status") file_input = gr.File( file_count="multiple", label="Upload Documents (.txt files)", type="filepath" ) upload_button = gr.Button("Process Documents") upload_button.click(upload_file, inputs=[file_input], outputs=[file_output]) with gr.Column(scale=2): chatbot = gr.Chatbot(height=400) msg = gr.Textbox(label="Ask a question about your documents") msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot]) clear = gr.Button("Clear") clear.click(lambda: [], outputs=[chatbot]) # Launch the app if __name__ == "__main__": demo.launch()