Spaces:
Runtime error
Runtime error
| import logging | |
| from pathlib import Path | |
| _logger = logging.getLogger("semantic") | |
| from operator import itemgetter | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables.base import RunnableSequence | |
| from langchain_core.vectorstores import VectorStore | |
| from langchain.retrievers.multi_query import MultiQueryRetriever | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain.schema.output_parser import StrOutputParser | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| from globals import ( | |
| embeddings, | |
| gpt35_model, | |
| gpt4_model, | |
| META_10K_FILE_PATH, | |
| META_SEMANTIC_COLLECTION, | |
| VECTOR_STORE_PATH | |
| ) | |
| USE_MEMORY = True | |
| from qdrant_client import QdrantClient | |
| qclient: QdrantClient | |
| if USE_MEMORY == True: | |
| qclient = QdrantClient(":memory:") | |
| else: | |
| qclient = QdrantClient(path=VECTOR_STORE_PATH) | |
| RAG_PROMPT = """ | |
| Reply the user's query thoughtfully and clearly. | |
| You should only respond to user's query if the context is related to the query. | |
| If you are not sure how to answer, please reply "I don't know". | |
| Respond with structure in markdown. | |
| CONTEXT: | |
| {context} | |
| QUERY: | |
| {question} | |
| YOUR REPLY: """ | |
| rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT) | |
| class SemanticStoreFactory: | |
| _semantic_vectorstore: VectorStore = None | |
| def __load_semantic_store( | |
| cls | |
| ) -> VectorStore: | |
| path = Path(VECTOR_STORE_PATH) | |
| store = None | |
| # check if path exists and if it is not empty | |
| if path.exists() and path.is_dir() and any(path.iterdir()): | |
| _logger.info(f"\tQdrant loading ...") | |
| store = Qdrant.from_documents( | |
| [], | |
| embeddings=embeddings, | |
| location=":memory:", | |
| # path=META_10K_FILE_PATH, | |
| prefer_grpc=True, | |
| collection_name=META_SEMANTIC_COLLECTION, | |
| ) | |
| # store = Qdrant( | |
| # client=qclient, | |
| # embeddings=embeddings, | |
| # collection_name=META_SEMANTIC_COLLECTION, | |
| # ) | |
| else: | |
| _logger.info(f"\tQdrant creating ...") | |
| store = cls.__create_semantic_store() | |
| return store | |
| def __create_semantic_store( | |
| cls | |
| ) -> VectorStore: | |
| if USE_MEMORY == True: | |
| _logger.info(f"creating semantic vector store: {USE_MEMORY}") | |
| else: | |
| _logger.info(f"creating semantic vector store: {VECTOR_STORE_PATH}") | |
| path = Path(VECTOR_STORE_PATH) | |
| if not path.exists(): | |
| path.mkdir(parents=True, exist_ok=True) | |
| _logger.info(f"Directory '{path}' created.") | |
| documents = PyMuPDFLoader(META_10K_FILE_PATH).load() | |
| semantic_chunker = SemanticChunker( | |
| embeddings=embeddings, | |
| breakpoint_threshold_type="percentile" | |
| ) | |
| semantic_chunks = semantic_chunker.create_documents( | |
| [d.page_content for d in documents] | |
| ) | |
| _logger.info(f"created semantic_chunks: {len(semantic_chunks)}") | |
| if USE_MEMORY == True: | |
| _logger.info(f"\t==> creating memory vectorstore ...") | |
| semantic_chunk_vectorstore = Qdrant.from_documents( | |
| semantic_chunks, | |
| embeddings, | |
| location=":memory:", | |
| collection_name=META_SEMANTIC_COLLECTION, | |
| force_recreate=True | |
| ) | |
| _logger.info(f"\t==> finished constructing vectorstore") | |
| else: | |
| semantic_chunk_vectorstore = Qdrant.from_documents( | |
| semantic_chunks, | |
| embeddings, | |
| path=VECTOR_STORE_PATH, | |
| collection_name=META_SEMANTIC_COLLECTION, | |
| force_recreate=True | |
| ) | |
| _logger.info(f"\t==> return vectorstore {META_SEMANTIC_COLLECTION}") | |
| return semantic_chunk_vectorstore | |
| def get_semantic_store( | |
| cls | |
| ) -> VectorStore: | |
| _logger.info(f"get_semantic_store") | |
| if cls._semantic_vectorstore is None: | |
| if USE_MEMORY == True: | |
| cls._semantic_vectorstore = cls.__create_semantic_store() | |
| _logger.info(f"received semantic_vectorstore") | |
| else: | |
| _logger.info(f"Loading semantic vectorstore {META_SEMANTIC_COLLECTION} from: {VECTOR_STORE_PATH}") | |
| try: | |
| # first try to load the store | |
| cls._semantic_vectorstore = cls.__load_semantic_store() | |
| except Exception as e: | |
| _logger.warning(f"cannot load: {e}") | |
| cls._semantic_vectorstore = cls.__create_semantic_store() | |
| _logger.info(f"RETURNING get_semantic_store") | |
| return cls._semantic_vectorstore | |
| class SemanticRAGChainFactory: | |
| _chain: RunnableSequence = None | |
| def get_semantic_rag_chain( | |
| cls | |
| ) -> RunnableSequence: | |
| if cls._chain is None: | |
| _logger.info(f"creating SemanticRAGChainFactory") | |
| semantic_store = SemanticStoreFactory.get_semantic_store() | |
| if semantic_store is not None: | |
| semantic_chunk_retriever = semantic_store.as_retriever() | |
| semantic_mquery_retriever = MultiQueryRetriever.from_llm( | |
| retriever=semantic_chunk_retriever, | |
| llm=gpt4_model | |
| ) | |
| cls._chain = ( | |
| # INVOKE CHAIN WITH: {"question" : "<<SOME USER QUESTION>>"} | |
| # "question" : populated by getting the value of the "question" key | |
| # "context" : populated by getting the value of the "question" key and chaining it into the base_retriever | |
| {"context": itemgetter("question") | semantic_mquery_retriever, "question": itemgetter("question")} | |
| # "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step) | |
| # by getting the value of the "context" key from the previous step | |
| | RunnablePassthrough.assign(context=itemgetter("context")) | |
| # "response" : the "context" and "question" values are used to format our prompt object and then piped | |
| # into the LLM and stored in a key called "response" | |
| # "context" : populated by getting the value of the "context" key from the previous step | |
| | {"response": rag_prompt | gpt4_model, "context": itemgetter("context")} | |
| ) | |
| _logger.info(f"\t_chain constructed") | |
| return cls._chain |