|
|
import json |
|
|
import os |
|
|
from langchain.chains.combine_documents import create_stuff_documents_chain |
|
|
from langchain.chains.history_aware_retriever import create_history_aware_retriever |
|
|
from langchain.chains.retrieval import create_retrieval_chain |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain_community.chat_message_histories import ChatMessageHistory |
|
|
from langchain_core.chat_history import BaseChatMessageHistory |
|
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
|
from langchain_groq import ChatGroq |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
import streamlit as st |
|
|
from dotenv import load_dotenv |
|
|
from langchain_core.runnables.history import RunnableWithMessageHistory |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANGCHAIN_API_KEY') |
|
|
os.environ['LANGCHAIN_TRACING_V2'] = 'true' |
|
|
os.environ['LANGCHAIN_PROJECT'] = "Rag with chat history" |
|
|
os.environ['GROQ_API_KEY'] = os.getenv('GROQ_API_KEY') |
|
|
os.environ["HF_TOKEN"] = os.getenv('HF_TOKEN') |
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
|
|
|
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") |
|
|
|
|
|
|
|
|
session_file = 'sessions.json' |
|
|
if not os.path.exists(session_file): |
|
|
with open(session_file, 'w') as f: |
|
|
json.dump({"current_session_id": 1}, f) |
|
|
|
|
|
def get_new_session_id(): |
|
|
with open(session_file, 'r+') as f: |
|
|
data = json.load(f) |
|
|
session_id = data['current_session_id'] |
|
|
data['current_session_id'] += 1 |
|
|
f.seek(0) |
|
|
json.dump(data, f) |
|
|
f.truncate() |
|
|
return session_id |
|
|
|
|
|
|
|
|
st.title("Rag with chat history") |
|
|
|
|
|
llm = ChatGroq(model="llama-3.1-70b-Versatile") |
|
|
|
|
|
|
|
|
if 'session_id' not in st.session_state: |
|
|
st.session_state.session_id = get_new_session_id() |
|
|
session_id = st.session_state.session_id |
|
|
st.write(f"Session ID: {session_id}") |
|
|
|
|
|
|
|
|
if 'store' not in st.session_state: |
|
|
st.session_state.store = {} |
|
|
|
|
|
uploaded_files = st.file_uploader("Choose a PDF file", type="pdf", accept_multiple_files=True) |
|
|
|
|
|
|
|
|
if uploaded_files: |
|
|
documents = [] |
|
|
for uploaded_file in uploaded_files: |
|
|
temppdf = f"./temp.pdf" |
|
|
with open(temppdf, "wb") as file: |
|
|
file.write(uploaded_file.getvalue()) |
|
|
|
|
|
loader = PyPDFLoader(temppdf) |
|
|
docs = loader.load() |
|
|
documents.extend(docs) |
|
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) |
|
|
splits = text_splitter.split_documents(documents) |
|
|
vector_store = FAISS.from_documents(documents=splits, embedding=embeddings) |
|
|
retriever = vector_store.as_retriever() |
|
|
|
|
|
contextualize_q_systemprompt = ( |
|
|
"Given a chat history and the latest user question " |
|
|
"which might reference context in the chat history, " |
|
|
"formulate a standalone question which can be understood " |
|
|
"without the chat history. Do not answer the question, " |
|
|
"just reformulate it if needed and otherwise return it as it is." |
|
|
) |
|
|
|
|
|
contextualize_q_prompt = ChatPromptTemplate.from_messages( |
|
|
[ |
|
|
("system", contextualize_q_systemprompt), |
|
|
MessagesPlaceholder("chat_history"), |
|
|
("human", "{input}") |
|
|
] |
|
|
) |
|
|
|
|
|
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt) |
|
|
|
|
|
|
|
|
system_prompt = ( |
|
|
"You are an assistant for question-answering tasks. " |
|
|
"Use the following pieces of retrieved context to answer the question. " |
|
|
"If you don't have enough context, you can say that you " |
|
|
"don't know. Use three sentences maximum and keep the " |
|
|
"answer concise." |
|
|
"\n\n" |
|
|
"{context}" |
|
|
) |
|
|
|
|
|
qa_prompt = ChatPromptTemplate.from_messages( |
|
|
[ |
|
|
("system", system_prompt), |
|
|
MessagesPlaceholder("chat_history"), |
|
|
("human", "{input}") |
|
|
] |
|
|
) |
|
|
|
|
|
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) |
|
|
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) |
|
|
|
|
|
def get_session_history(session: str) -> BaseChatMessageHistory: |
|
|
if session_id not in st.session_state.store: |
|
|
st.session_state.store[session_id] = ChatMessageHistory() |
|
|
return st.session_state.store[session_id] |
|
|
|
|
|
conversational_rag_chain = RunnableWithMessageHistory( |
|
|
rag_chain, get_session_history, |
|
|
input_messages_key="input", |
|
|
history_messages_key="chat_history", |
|
|
output_messages_key="answer" |
|
|
) |
|
|
|
|
|
user_input = st.text_input("Ask a question") |
|
|
if user_input: |
|
|
session_history = get_session_history(session_id) |
|
|
response = conversational_rag_chain.invoke( |
|
|
{"input": user_input}, |
|
|
config={"configurable": {"session_id": session_id}}, |
|
|
) |
|
|
st.session_state.store[session_id] = session_history |
|
|
st.write(st.session_state.store) |
|
|
st.write("Assistant:", response["answer"]) |
|
|
st.write("Chat History:", session_history.messages) |
|
|
|
|
|
else: |
|
|
st.write("Please upload a file") |