Spaces:
Sleeping
Sleeping
| # This is for input / output operation | |
| import os | |
| import keyfile | |
| import time | |
| # Warning to be ignored | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # This library is for loading textual data | |
| from langchain.document_loaders import TextLoader | |
| # This library will handle the splitting part of the data | |
| from langchain.text_splitter import CharacterTextSplitter | |
| # This library will handle embedding of data | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from pinecone import Pinecone, ServerlessSpec | |
| from langchain.llms import HuggingFaceHub | |
| from langchain import PromptTemplate | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.schema.output_parser import StrOutputParser | |
| from langchain.chains import RetrievalQA | |
| from langchain.llms import HuggingFaceHub | |
| from langchain.vectorstores import Pinecone | |
| template = """ | |
| You are a MLOPs engineer. The user will ask you a question about Machine Learning Operations. | |
| Use the following piece of context to answer the question. | |
| If you don't know the answer, just say don't know/ | |
| Keep the answer brief | |
| Context: {context} | |
| Question: {question} | |
| Answer: | |
| """ | |
| def setup_retrieval_qa_system(doc_directory, question, chunk_size=500, chunk_overlap=100): | |
| load_dotenv() | |
| hugging_face = keyfile.Hugging_face_key | |
| if not hugging_face: | |
| raise ValueError("HuggingFace API key is missing. Please set it in the .env file.") | |
| os.environ['HUGGINGFACEHUB_API_TOKEN'] = hugging_face | |
| pc = keyfile.PCToken | |
| PINECONE_API_KEY = os.getenv("PCToken") | |
| if not pc: | |
| raise ValueError("pc API key is missing. Please set it in the .env file.") | |
| os.environ['PCToken'] = pc | |
| # We are initializing the cloud platform over here | |
| cloud = os.environ.get("PINECONE_CLOUD") or "aws" | |
| # We are going to give a region for aws | |
| region = os.environ.get("PINECONE_REGION") or "us-east-1" | |
| # Initialize the client | |
| serv = ServerlessSpec(cloud = cloud, region = region) | |
| index_name = "Bhagya-27thoct" | |
| # We are check if the name of our index is not existing in pinecone directory | |
| if index_name not in pc.list_indexes().names(): | |
| # if not then we will create a index for us | |
| pc.create_index( | |
| name = index_name, | |
| dimension = 768, | |
| metric = "cosine", | |
| spec = serv | |
| ) | |
| # Waiting till the machine has not created the index | |
| while not pc.describe_index(index_name).status['ready']: | |
| time.sleep(1) | |
| # Check to see if the index is ready | |
| print("Index before inserting") | |
| print(pc.Index(index_name).describe_index_stats()) | |
| all_docs = [] | |
| with st.spinner('Loading and processing documents...'): | |
| for file_name in os.listdir(doc_directory): | |
| file_path = os.path.join(doc_directory, file_name) | |
| loader = PyPDFLoader(file_path) | |
| docs = loader.load() | |
| all_docs.extend(docs) | |
| text_splitter = CharacterTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap) | |
| #text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| #splitted_chunks = text_splitter.split_documents(all_docs) | |
| splitted_chunks = text_splitter.split_documents(all_docs) | |
| #embedding_model = HuggingFaceInstructEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| embedding_model = HuggingFaceInstructEmbeddings(model_name="mistralai/Mixtral-8x7B-Instruct-v0.1") | |
| vector_db = FAISS.from_documents(splitted_chunks, embedding_model) | |
| retriever = vector_db.as_retriever() | |
| # IF the index is not there in the index list | |
| if index_name not in pc.list_indexes(): | |
| docsearch = PineconeVectorStore.from_documents(docs, embeddings, index_name = index_name) | |
| else: | |
| docsearch = PineconeVectorStore.from_existing_index(index_name, embeddings, pinecone_index = pc.Index(index_name)) | |
| llm = HuggingFaceHub( | |
| repo_id = model_id, | |
| model_kwargs = {"temperature" : 0.8, "top_k" : 50}, | |
| huggingfacehub_api_token = hugging_face | |
| ) | |
| #llm = ChatGroq(model="llama3-8b-8192") | |
| prompt = PromptTemplate( | |
| template = template, | |
| input_variables = ["context", "question"] | |
| ) | |
| rag_chain = ( | |
| {"context" : docsearch.as_retriever(), "question" : RunnablePassthrough()} | |
| | prompt | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| llm = HuggingFaceHub( | |
| repo_id=model_id, | |
| model_kwargs={"temperature": 0.8, "top_k": 50}, | |
| huggingfacehub_api_token=hugging_face | |
| ) | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=docsearch.as_retriever(), | |
| ) | |
| #retrieval_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever) | |
| with st.spinner('Finding the best answer...'): | |
| result = qa_chain.run(query) | |
| # with st.spinner('Finding the best answer...'): | |
| # result = retrieval_chain.invoke(question) | |
| return result['result'] | |
| def main(): | |
| st.title("📝 Document-Based Question Answering System with Groq") | |
| st.sidebar.header("Configuration") | |
| # File uploader for PDFs | |
| uploaded_files = st.sidebar.file_uploader("Upload PDF documents", type="pdf", accept_multiple_files=True) | |
| # Get the document directory from the user | |
| doc_directory = st.text_input("Or enter the document directory path directly:", "") | |
| # Set chunk size and overlap | |
| chunk_size = st.sidebar.slider("Set chunk size", 100, 1000, 500) | |
| chunk_overlap = st.sidebar.slider("Set chunk overlap", 0, 200, 100) | |
| # Input for the question | |
| question = st.text_input("Enter your question:") | |
| # Button to trigger the QA system | |
| if st.button("Get Answer"): | |
| if uploaded_files: | |
| doc_directory = "/tmp/streamlit_uploaded_docs" | |
| os.makedirs(doc_directory, exist_ok=True) | |
| for file in uploaded_files: | |
| with open(os.path.join(doc_directory, file.name), "wb") as f: | |
| f.write(file.getbuffer()) | |
| elif not doc_directory: | |
| st.warning("Please upload PDF files or provide a document directory.") | |
| return | |
| if question: | |
| try: | |
| result = setup_retrieval_qa_system(doc_directory, question, chunk_size, chunk_overlap) | |
| st.success("Answer found!") | |
| st.write(f"**Answer:** {result}") | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| else: | |
| st.warning("Please provide a question.") | |
| if __name__ == "__main__": | |
| main() |