avimittal30's picture
code corrected
7b52c77
import os
import gradio as gr
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import DirectoryLoader, PyPDFLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain_community.llms import HuggingFaceHub
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
import shutil
# Define directory variable
load_dotenv(dotenv_path=os.path.join(os.getcwd(), '.env'))
DOCUMENTS_DIR = "documents"
# Set up environment variables for HuggingFace
huggingface_token = os.getenv("HUGGINGFACE_API_TOKEN")
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
if huggingface_token:
os.environ["HUGGINGFACEHUB_API_TOKEN"] = huggingface_token
# # Remove the existing documents directory if it exists
# if os.path.exists(DOCUMENTS_DIR):
# shutil.rmtree(DOCUMENTS_DIR)
llm = ChatOpenAI(temperature=0.7, model_name="gpt-3.5-turbo")
# Create a directory for document storage if it doesn't exist
os.makedirs(DOCUMENTS_DIR, exist_ok=True)
# Function to load documents
def load_documents(directory=DOCUMENTS_DIR):
print("Entered load documents")
documents = []
# Find all PDF files
pdf_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith('.pdf'):
pdf_files.append(os.path.join(root, file))
print(f"Found {len(pdf_files)} PDF files")
# Process each PDF with error handling
for pdf_path in pdf_files:
try:
print(f"Processing {pdf_path}")
loader = PyPDFLoader(pdf_path)
file_documents = loader.load()
documents.extend(file_documents)
print(f"Successfully loaded {pdf_path}")
except Exception as e:
print(f"Failed to load {pdf_path}: {str(e)}")
print(f"Successfully loaded {len(documents)} documents")
return documents
# Function to process documents and create vector store
def process_documents():
documents = load_documents()
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=150
)
chunks = text_splitter.split_documents(documents)
# Create embeddings
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# Create vector store
vector_store = FAISS.from_documents(chunks, embeddings)
return vector_store
# Create RAG chain
def create_chain(vector_store):
if not os.getenv("HUGGINGFACEHUB_API_TOKEN"):
return None
# llm = HuggingFaceHub(
# repo_id="google/flan-t5-large",
# model_kwargs={"temperature": 0.5, "max_length": 512}
# )
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
qa_prompt = PromptTemplate.from_template("""
You are a helpful assistant for answering questions about documents.
Context information is below.
---------------------
{context}
---------------------
Given the context information and not prior knowledge, answer the question: {question}
If the context is not provided, please respond saying, no context was found
""")
chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vector_store.as_retriever(search_kwargs={"k": 3}),
memory=memory,
combine_docs_chain_kwargs={"prompt": qa_prompt}
)
return chain
# Initialize variables for handling chat state
vector_store = None
chain = None
chat_history = []
# Function to handle file uploads
import shutil
def upload_file(files):
print("Entered file processing:")
print(files)
try:
# Clear existing documents if uploading new ones
for f in os.listdir(DOCUMENTS_DIR):
file_path = os.path.join(DOCUMENTS_DIR, f)
if os.path.isfile(file_path):
os.remove(file_path)
# Process uploaded files
for file in files:
if isinstance(file, str) and os.path.isfile(file):
file_name = os.path.basename(file)
dest_path = os.path.join(DOCUMENTS_DIR, file_name)
shutil.copy(file, dest_path)
print(f"Copied {file} to {dest_path}")
else:
return f"Invalid file format or file not found: {file}"
# Process documents and create vector store
global vector_store, chain
vector_store = process_documents()
chain = create_chain(vector_store)
if chain is None:
return "Files uploaded and processed, but HuggingFace API token is missing. Set the environment variable to enable the chatbot."
return "Files uploaded and processed successfully!"
except Exception as e:
return f"Error processing files: {str(e)}"
# Function to handle user queries
def chat(message, history):
global chain, chat_history, vector_store
if vector_store is None:
if os.path.exists(DOCUMENTS_DIR) and any(os.path.isfile(os.path.join(DOCUMENTS_DIR, f)) for f in os.listdir(DOCUMENTS_DIR)):
vector_store = process_documents()
chain = create_chain(vector_store)
else:
return history + [[message, "Please upload documents first to initialize the chatbot."]]
if chain is None:
return history + [[message, "HuggingFace API token is not set. Please set the HUGGINGFACE_API_TOKEN environment variable."]]
try:
if history:
chat_history = [(turn[0], turn[1]) for turn in history]
response = chain({"question": message})
answer = response['answer']
return history + [[message, answer]]
except Exception as e:
error_message = f"Error processing your request: {str(e)}"
return history + [[message, error_message]]
# Create Gradio interface
with gr.Blocks(title="RAG Chatbot") as demo:
gr.Markdown("# RAG-based Conversational Chatbot")
gr.Markdown("Upload text documents and chat with an AI that can answer questions based on their content.")
with gr.Row():
with gr.Column(scale=1):
file_output = gr.Textbox(label="Upload Status")
file_input = gr.File(
file_count="multiple",
label="Upload Documents (.txt files)",
type="filepath"
)
upload_button = gr.Button("Process Documents")
upload_button.click(upload_file, inputs=[file_input], outputs=[file_output])
with gr.Column(scale=2):
chatbot = gr.Chatbot(height=400)
msg = gr.Textbox(label="Ask a question about your documents")
msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot])
clear = gr.Button("Clear")
clear.click(lambda: [], outputs=[chatbot])
# Launch the app
if __name__ == "__main__":
demo.launch()