Spaces:
Sleeping
Sleeping
| import json | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.schema.runnable import Runnable | |
| from langchain.schema.runnable.config import RunnableConfig | |
| from langchain.memory import ChatMessageHistory | |
| from langchain_core.chat_history import BaseChatMessageHistory | |
| from langchain_core.runnables.history import RunnableWithMessageHistory | |
| from langchain.chains import create_history_aware_retriever, create_retrieval_chain | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| import chainlit as cl | |
| from retriever import fetch_retriever_or_load_local_retriever | |
| # to run locally use: chainlit run app.py -w | |
| # see https://python.langchain.com/v0.1/assets/images/conversational_retrieval_chain-5c7a96abe29e582bc575a0a0d63f86b0.png for architecture diagram | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True) | |
| # load local FAISS retriever with preloaded embeddings | |
| retriever = fetch_retriever_or_load_local_retriever() | |
| ### Contextualize question ### | |
| contextualize_q_system_prompt = """Given a chat history and the latest user question \ | |
| which might reference context in the chat history, formulate a standalone question \ | |
| which can be understood without the chat history. Do NOT answer the question, \ | |
| just reformulate it if needed and otherwise return it as is.""" | |
| contextualize_q_prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", contextualize_q_system_prompt), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| ] | |
| ) | |
| # also manages the case where chat_history is empty, and otherwise applies prompt | llm | StrOutputParser() | retriever in sequence. | |
| history_aware_retriever = create_history_aware_retriever( | |
| llm, retriever, contextualize_q_prompt | |
| ) | |
| ### Answer question | |
| qa_system_prompt = """You're an assistant that answers questions about movies and films. \ | |
| and eloquent answers to questions about movies. Use the following pieces of \ | |
| retrieved context to answer the question. Use three sentences maximum and \ | |
| keep the answer concise. | |
| {context}""" | |
| qa_prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", qa_system_prompt), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| ] | |
| ) | |
| # chain to accept the retrieved context alongside the conversation history and query to generate an answer | |
| question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) | |
| # This chain applies the history_aware_retriever and question_answer_chain in sequence, retaining intermediate outputs such as the retrieved context for convenience | |
| rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) | |
| ### Statefully manage chat history | |
| store = {} | |
| def get_session_history(session_id: str) -> BaseChatMessageHistory: | |
| if session_id not in store: | |
| store[session_id] = ChatMessageHistory() | |
| return store[session_id] | |
| async def on_chat_start(): | |
| conversational_rag_chain = RunnableWithMessageHistory( | |
| rag_chain, | |
| get_session_history, | |
| input_messages_key="input", | |
| history_messages_key="chat_history", | |
| output_messages_key="answer", | |
| ) | |
| cl.user_session.set("runnable", conversational_rag_chain) | |
| async def on_message(message: cl.Message): | |
| runnable = cl.user_session.get("runnable") # type: Runnable | |
| msg = cl.Message(content="") | |
| async for chunk in runnable.astream( | |
| {"input": message.content}, | |
| config=RunnableConfig( | |
| callbacks=[cl.LangchainCallbackHandler()], | |
| configurable={"session_id": cl.user_session.get("id")}, | |
| ), | |
| ): | |
| # process Documents to be JSON serializable and passed into the context window but not served up as part of the tokened response | |
| if "context" in chunk: | |
| docs = chunk["context"] | |
| docs_dict = [ | |
| {"page_content": doc.page_content, "metadata": doc.metadata} | |
| for doc in docs | |
| ] | |
| chunk["context"] = json.dumps(docs_dict) | |
| if "answer" in chunk: | |
| await msg.stream_token(chunk["answer"]) | |
| await msg.send() | |