# This is for input / output operation import os import keyfile import time # Warning to be ignored import warnings warnings.filterwarnings("ignore") # This library is for loading textual data from langchain.document_loaders import TextLoader # This library will handle the splitting part of the data from langchain.text_splitter import CharacterTextSplitter # This library will handle embedding of data from langchain.embeddings import HuggingFaceEmbeddings from pinecone import Pinecone, ServerlessSpec from langchain.llms import HuggingFaceHub from langchain import PromptTemplate from langchain.schema.runnable import RunnablePassthrough from langchain.schema.output_parser import StrOutputParser from langchain.chains import RetrievalQA from langchain.llms import HuggingFaceHub from langchain.vectorstores import Pinecone template = """ You are a MLOPs engineer. The user will ask you a question about Machine Learning Operations. Use the following piece of context to answer the question. If you don't know the answer, just say don't know/ Keep the answer brief Context: {context} Question: {question} Answer: """ def setup_retrieval_qa_system(doc_directory, question, chunk_size=500, chunk_overlap=100): load_dotenv() hugging_face = keyfile.Hugging_face_key if not hugging_face: raise ValueError("HuggingFace API key is missing. Please set it in the .env file.") os.environ['HUGGINGFACEHUB_API_TOKEN'] = hugging_face pc = keyfile.PCToken PINECONE_API_KEY = os.getenv("PCToken") if not pc: raise ValueError("pc API key is missing. Please set it in the .env file.") os.environ['PCToken'] = pc # We are initializing the cloud platform over here cloud = os.environ.get("PINECONE_CLOUD") or "aws" # We are going to give a region for aws region = os.environ.get("PINECONE_REGION") or "us-east-1" # Initialize the client serv = ServerlessSpec(cloud = cloud, region = region) index_name = "Bhagya-27thoct" # We are check if the name of our index is not existing in pinecone directory if index_name not in pc.list_indexes().names(): # if not then we will create a index for us pc.create_index( name = index_name, dimension = 768, metric = "cosine", spec = serv ) # Waiting till the machine has not created the index while not pc.describe_index(index_name).status['ready']: time.sleep(1) # Check to see if the index is ready print("Index before inserting") print(pc.Index(index_name).describe_index_stats()) all_docs = [] with st.spinner('Loading and processing documents...'): for file_name in os.listdir(doc_directory): file_path = os.path.join(doc_directory, file_name) loader = PyPDFLoader(file_path) docs = loader.load() all_docs.extend(docs) text_splitter = CharacterTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap) #text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) #splitted_chunks = text_splitter.split_documents(all_docs) splitted_chunks = text_splitter.split_documents(all_docs) #embedding_model = HuggingFaceInstructEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") embedding_model = HuggingFaceInstructEmbeddings(model_name="mistralai/Mixtral-8x7B-Instruct-v0.1") vector_db = FAISS.from_documents(splitted_chunks, embedding_model) retriever = vector_db.as_retriever() # IF the index is not there in the index list if index_name not in pc.list_indexes(): docsearch = PineconeVectorStore.from_documents(docs, embeddings, index_name = index_name) else: docsearch = PineconeVectorStore.from_existing_index(index_name, embeddings, pinecone_index = pc.Index(index_name)) llm = HuggingFaceHub( repo_id = model_id, model_kwargs = {"temperature" : 0.8, "top_k" : 50}, huggingfacehub_api_token = hugging_face ) #llm = ChatGroq(model="llama3-8b-8192") prompt = PromptTemplate( template = template, input_variables = ["context", "question"] ) rag_chain = ( {"context" : docsearch.as_retriever(), "question" : RunnablePassthrough()} | prompt | llm | StrOutputParser() ) llm = HuggingFaceHub( repo_id=model_id, model_kwargs={"temperature": 0.8, "top_k": 50}, huggingfacehub_api_token=hugging_face ) qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=docsearch.as_retriever(), ) #retrieval_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever) with st.spinner('Finding the best answer...'): result = qa_chain.run(query) # with st.spinner('Finding the best answer...'): # result = retrieval_chain.invoke(question) return result['result'] def main(): st.title("📝 Document-Based Question Answering System with Groq") st.sidebar.header("Configuration") # File uploader for PDFs uploaded_files = st.sidebar.file_uploader("Upload PDF documents", type="pdf", accept_multiple_files=True) # Get the document directory from the user doc_directory = st.text_input("Or enter the document directory path directly:", "") # Set chunk size and overlap chunk_size = st.sidebar.slider("Set chunk size", 100, 1000, 500) chunk_overlap = st.sidebar.slider("Set chunk overlap", 0, 200, 100) # Input for the question question = st.text_input("Enter your question:") # Button to trigger the QA system if st.button("Get Answer"): if uploaded_files: doc_directory = "/tmp/streamlit_uploaded_docs" os.makedirs(doc_directory, exist_ok=True) for file in uploaded_files: with open(os.path.join(doc_directory, file.name), "wb") as f: f.write(file.getbuffer()) elif not doc_directory: st.warning("Please upload PDF files or provide a document directory.") return if question: try: result = setup_retrieval_qa_system(doc_directory, question, chunk_size, chunk_overlap) st.success("Answer found!") st.write(f"**Answer:** {result}") except Exception as e: st.error(f"An error occurred: {e}") else: st.warning("Please provide a question.") if __name__ == "__main__": main()