Spaces:
Paused
Paused
| import os | |
| from typing import List | |
| from operator import itemgetter | |
| from Chunking import ChunkingStrategy, TextLoaderAndSplitterWrapper | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain_openai import ChatOpenAI | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_community.vectorstores import Qdrant | |
| import chainlit as cl | |
| from chainlit.types import AskFileResponse | |
| from chainlit.cli import run_chainlit | |
| from uuid import uuid4 | |
| import tempfile | |
| OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] | |
| GPT_MODEL = "gpt-4o-mini" | |
| # Used for Langsmith | |
| unique_id = uuid4().hex[0:8] | |
| os.environ["LANGCHAIN_TRACING_V2"] = "true" | |
| if os.environ.get("LANGCHAIN_PROJECT") is None: | |
| os.environ["LANGCHAIN_PROJECT"] = f"LangSmith LCEL RAG - {unique_id}" | |
| is_azure = False if os.environ.get("AZURE_DEPLOYMENT") is None else True | |
| is_azure_qdrant_inmem = True if os.environ.get("AZURE_QDRANT_INMEM") else False | |
| # Utility functions | |
| def save_file(file: AskFileResponse,file_ext:str,is_azure:bool) -> str: | |
| if file_ext == "application/pdf": | |
| file_ext = ".pdf" | |
| elif file_ext == "text/plain": | |
| file_ext = ".txt" | |
| else: | |
| raise ValueError(f"Unknown file type: {file_ext}") | |
| dir = "/tmp" if is_azure_qdrant_inmem else None | |
| with tempfile.NamedTemporaryFile( | |
| mode="wb", delete=False, suffix=file_ext,dir=dir | |
| ) as temp_file: | |
| temp_file_path = temp_file.name | |
| temp_file.write(file.content) | |
| return temp_file_path | |
| def setup_vectorstore(documents: List[str], embedding_model: OpenAIEmbeddings,is_azure:bool) -> Qdrant: | |
| if is_azure: | |
| if is_azure_qdrant_inmem: | |
| qdrant_vectorstore = Qdrant.from_documents( | |
| documents=documents, | |
| embedding=embedding_model, | |
| location=":memory:" | |
| ) | |
| else: | |
| qdrant_vectorstore = Qdrant.from_documents( | |
| documents=documents, | |
| embedding=embedding_model, | |
| url="http://qdrant:6333", # Docker compose setup | |
| ) | |
| else: | |
| qdrant_vectorstore = Qdrant.from_documents( | |
| documents=documents, | |
| embedding=embedding_model, | |
| location=":memory:" | |
| ) | |
| return qdrant_vectorstore | |
| # Prepare the components that will form the chain | |
| ## Step 1: Create a prompt template | |
| base_rag_prompt_template = """\ | |
| You are a helpful assistant that can answer questions related to the provided context. Repond I don't have that information if outside context. | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| """ | |
| base_rag_prompt = ChatPromptTemplate.from_template(base_rag_prompt_template) | |
| ## Step 2: Create Embeddings model instance for creating embeddings | |
| embedding_model = OpenAIEmbeddings(model="text-embedding-3-small") | |
| ## Step 2: Create the OpenAI chat model | |
| base_llm = ChatOpenAI(model="gpt-4o-mini", tags=["base_llm"]) | |
| async def on_chat_start(): | |
| msg = cl.Message(content="Welcome to the Chat with Files app powered by LCEL and OpenAI - RAG!") | |
| await msg.send() | |
| files = None | |
| documents = None | |
| # Wait for the user to upload a file | |
| while files == None: | |
| files = await cl.AskFileMessage( | |
| content="Please upload a text or a pdf file to begin!", | |
| accept=["text/plain", "application/pdf"], | |
| max_size_mb=10, | |
| max_files=1, | |
| timeout=180, | |
| ).send() | |
| ## Load file and split into chunks | |
| await cl.Message(content=f"Processing `{files[0].name}`...").send() | |
| current_file_path = save_file(files[0], files[0].type,is_azure) | |
| loader_splitter = TextLoaderAndSplitterWrapper(ChunkingStrategy.RECURSIVE_CHARACTER_CHAR_SPLITTER, current_file_path) | |
| documents = loader_splitter.load_documents() | |
| await cl.Message(content=" Data Chunked...").send() | |
| ## Vectorising the documents | |
| qdrant_vectorstore = setup_vectorstore(documents, embedding_model,is_azure) | |
| qdrant_retriever = qdrant_vectorstore.as_retriever() | |
| await cl.Message(content=" Created Vector store").send() | |
| # create the chain on new chart session | |
| retrieval_augmented_qa_chain = ( | |
| # INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"} | |
| # "question" : populated by getting the value of the "question" key | |
| # "context" : populated by getting the value of the "question" key and chaining it into the base_retriever | |
| {"context": itemgetter("question") | qdrant_retriever, "question": itemgetter("question")} | |
| # "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step) | |
| # by getting the value of the "context" key from the previous step | |
| | RunnablePassthrough.assign(context=itemgetter("context")) | |
| # "response" : the "context" and "question" values are used to format our prompt object and then piped | |
| # into the LLM and stored in a key called "response" | |
| # "context" : populated by getting the value of the "context" key from the previous step | |
| | {"response": base_rag_prompt | base_llm, "context": itemgetter("context")} | |
| ) | |
| # Let the user know that the system is ready | |
| msg = cl.Message(content=f"Processing `{files[0].name}` done. You can now ask questions!") | |
| await msg.send() | |
| cl.user_session.set("chain", retrieval_augmented_qa_chain) | |
| async def main(message: cl.Message): | |
| chain = cl.user_session.get("chain") | |
| msg = cl.Message(content="") | |
| response = chain.invoke({"question": message.content}, {"tags" : ["Demo Run"]}) | |
| msg.content= response["response"].content | |
| await msg.send() | |
| cl.user_session.set("chain", chain) | |
| if __name__ == "__main__": | |
| run_chainlit(__file__) |