Spaces:
Sleeping
Sleeping
| # app.py | |
| from typing import List, Union, Optional | |
| from dotenv import load_dotenv, find_dotenv | |
| from langchain.callbacks import get_openai_callback | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.schema import (SystemMessage, HumanMessage, AIMessage) | |
| from langchain.llms import LlamaCpp | |
| from langchain.embeddings import LlamaCppEmbeddings | |
| from langchain.callbacks.manager import CallbackManager | |
| from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
| from langchain.text_splitter import TokenTextSplitter | |
| from langchain.prompts import PromptTemplate | |
| from langchain.vectorstores import Qdrant | |
| from PyPDF2 import PdfReader | |
| import streamlit as st | |
| from ctransformers import AutoModelForCausalLM | |
| from dl_hf_model import dl_hf_model | |
| from loguru import logger | |
| PROMPT_TEMPLATE = """ | |
| Use the following pieces of context enclosed by triple backquotes to answer the question at the end. | |
| \n\n | |
| Context: | |
| ``` | |
| {context} | |
| ``` | |
| \n\n | |
| Question: [][][][]{question}[][][][] | |
| \n | |
| Answer:""" | |
| def init_page() -> None: | |
| st.set_page_config( | |
| page_title="Personal ChatGPT" | |
| ) | |
| st.sidebar.title("Options") | |
| def init_messages() -> None: | |
| clear_button = st.sidebar.button("Clear Conversation", key="clear") | |
| if clear_button or "messages" not in st.session_state: | |
| st.session_state.messages = [ | |
| SystemMessage( | |
| content=( | |
| "You are a helpful AI QA assistant. " | |
| "When answering questions, use the context enclosed by triple backquotes if it is relevant. " | |
| "If you don't know the answer, just say that you don't know, " | |
| "don't try to make up an answer. " | |
| "Reply your answer in mardkown format.") | |
| ) | |
| ] | |
| st.session_state.costs = [] | |
| def get_pdf_text() -> Optional[str]: | |
| """ | |
| Function to load PDF text and split it into chunks. | |
| """ | |
| st.header("Document Upload") | |
| uploaded_file = st.file_uploader( | |
| label="Here, upload your PDF file you want ChatGPT to use to answer", | |
| type="pdf" | |
| ) | |
| if uploaded_file: | |
| pdf_reader = PdfReader(uploaded_file) | |
| text = "\n\n".join([page.extract_text() for page in pdf_reader.pages]) | |
| text_splitter = TokenTextSplitter(chunk_size=100, chunk_overlap=0) | |
| return text_splitter.split_text(text) | |
| else: | |
| return None | |
| def build_vectore_store( | |
| texts: str, embeddings: Union[OpenAIEmbeddings, LlamaCppEmbeddings]) \ | |
| -> Optional[Qdrant]: | |
| """ | |
| Store the embedding vectors of text chunks into vector store (Qdrant). | |
| """ | |
| if texts: | |
| with st.spinner("Loading PDF ..."): | |
| qdrant = Qdrant.from_texts( | |
| texts, | |
| embeddings, | |
| path=":memory:", | |
| collection_name="my_collection", | |
| force_recreate=True | |
| ) | |
| st.success("File Loaded Successfully!!") | |
| else: | |
| qdrant = None | |
| return qdrant | |
| def select_llm() -> Union[ChatOpenAI, LlamaCpp]: | |
| """ | |
| Read user selection of parameters in Streamlit sidebar. | |
| """ | |
| model_name = st.sidebar.radio("Choose LLM:", | |
| ("llama-2-7b-chat.ggmlv3.q2_K",#"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M" | |
| "gpt-3.5-turbo-0613", | |
| "gpt-3.5-turbo-16k-0613", | |
| "gpt-4")) | |
| temperature = st.sidebar.slider("Temperature:", min_value=0.0, | |
| max_value=1.0, value=0.0, step=0.01) | |
| return model_name, temperature | |
| def load_llm(model_name: str, temperature: float) -> Union[ChatOpenAI, LlamaCpp]: | |
| """ | |
| Load LLM. | |
| """ | |
| if model_name.startswith("gpt-"): | |
| return ChatOpenAI(temperature=temperature, model_name=model_name) | |
| elif model_name.startswith("llama-2-"): | |
| callback_manager = CallbackManager([StreamingStdOutCallbackHandler()]) | |
| return LlamaCpp( | |
| model_path=f"./models/{model_name}.bin", | |
| input={"temperature": temperature, | |
| "max_length": 2048, | |
| "top_p": 1 | |
| }, | |
| n_ctx=2048, | |
| callback_manager=callback_manager, | |
| verbose=False, # True | |
| ) | |
| def load_embeddings(model_name: str) -> Union[OpenAIEmbeddings, LlamaCppEmbeddings]: | |
| """ | |
| Load embedding model. | |
| """ | |
| if model_name.startswith("gpt-"): | |
| return OpenAIEmbeddings() | |
| elif model_name.startswith("llama-2-"): | |
| return LlamaCppEmbeddings(model_path=f"./models/{model_name}.bin") | |
| def get_answer(llm, messages) -> tuple[str, float]: | |
| """ | |
| Get the AI answer to user questions. | |
| """ | |
| if isinstance(llm, ChatOpenAI): | |
| with get_openai_callback() as cb: | |
| answer = llm(messages) | |
| return answer.content, cb.total_cost | |
| if isinstance(llm, LlamaCpp): | |
| return llm(llama_v2_prompt(convert_langchainschema_to_dict(messages))), 0.0 | |
| def find_role(message: Union[SystemMessage, HumanMessage, AIMessage]) -> str: | |
| """ | |
| Identify role name from langchain.schema object. | |
| """ | |
| if isinstance(message, SystemMessage): | |
| return "system" | |
| if isinstance(message, HumanMessage): | |
| return "user" | |
| if isinstance(message, AIMessage): | |
| return "assistant" | |
| raise TypeError("Unknown message type.") | |
| def convert_langchainschema_to_dict( | |
| messages: List[Union[SystemMessage, HumanMessage, AIMessage]]) \ | |
| -> List[dict]: | |
| """ | |
| Convert the chain of chat messages in list of langchain.schema format to | |
| list of dictionary format. | |
| """ | |
| return [{"role": find_role(message), | |
| "content": message.content | |
| } for message in messages] | |
| def llama_v2_prompt(messages: List[dict]) -> str: | |
| """ | |
| Convert the messages in list of dictionary format to Llama2 compliant | |
| format. | |
| """ | |
| B_INST, E_INST = "[INST]", "[/INST]" | |
| B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n" | |
| BOS, EOS = "<s>", "</s>" | |
| DEFAULT_SYSTEM_PROMPT = f"""You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Please ensure that your responses are socially unbiased and positive in nature. If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" | |
| if messages[0]["role"] != "system": | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": DEFAULT_SYSTEM_PROMPT, | |
| } | |
| ] + messages | |
| messages = [ | |
| { | |
| "role": messages[1]["role"], | |
| "content": B_SYS + messages[0]["content"] + E_SYS + messages[1]["content"], | |
| } | |
| ] + messages[2:] | |
| messages_list = [ | |
| f"{BOS}{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} {EOS}" | |
| for prompt, answer in zip(messages[::2], messages[1::2]) | |
| ] | |
| messages_list.append( | |
| f"{BOS}{B_INST} {(messages[-1]['content']).strip()} {E_INST}") | |
| return "".join(messages_list) | |
| def extract_userquesion_part_only(content): | |
| """ | |
| Function to extract only the user question part from the entire question | |
| content combining user question and pdf context. | |
| """ | |
| content_split = content.split("[][][][]") | |
| if len(content_split) == 3: | |
| return content_split[1] | |
| return content | |
| def main() -> None: | |
| _ = load_dotenv(find_dotenv()) | |
| init_page() | |
| model_name, temperature = select_llm() | |
| #llm = load_llm(model_name, temperature) | |
| url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q2_K.bin" | |
| #url = "https://huggingface.co/TheBloke/Wizard-Vicuna-7B-Uncensored-GGML/raw/main/Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M.bin" | |
| try: | |
| model_loc, file_size = dl_hf_model(url) | |
| except Exception as exc_: | |
| logger.error(exc_) | |
| raise SystemExit(1) from exc_ | |
| llm = AutoModelForCausalLM.from_pretrained( | |
| model_loc, | |
| model_type="llama", | |
| # threads=cpu_count, | |
| ) | |
| logger.info(f"done load llm {model_loc=} {file_size=}G") | |
| embeddings = load_embeddings(model_name) | |
| texts = get_pdf_text() | |
| qdrant = build_vectore_store(texts, embeddings) | |
| init_messages() | |
| st.header("Personal ChatGPT") | |
| # Supervise user input | |
| if user_input := st.chat_input("Input your question!"): | |
| if qdrant: | |
| context = [c.page_content for c in qdrant.similarity_search( | |
| user_input, k=10)] | |
| user_input_w_context = PromptTemplate( | |
| template=PROMPT_TEMPLATE, | |
| input_variables=["context", "question"]) \ | |
| .format( | |
| context=context, question=user_input) | |
| else: | |
| user_input_w_context = user_input | |
| st.session_state.messages.append( | |
| HumanMessage(content=user_input_w_context)) | |
| with st.spinner("ChatGPT is typing ..."): | |
| answer, cost = get_answer(llm, st.session_state.messages) | |
| st.session_state.messages.append(AIMessage(content=answer)) | |
| st.session_state.costs.append(cost) | |
| # Display chat history | |
| messages = st.session_state.get("messages", []) | |
| for message in messages: | |
| if isinstance(message, AIMessage): | |
| with st.chat_message("assistant"): | |
| st.markdown(message.content) | |
| elif isinstance(message, HumanMessage): | |
| with st.chat_message("user"): | |
| st.markdown(extract_userquesion_part_only(message.content)) | |
| costs = st.session_state.get("costs", []) | |
| st.sidebar.markdown("## Costs") | |
| st.sidebar.markdown(f"**Total cost: ${sum(costs):.5f}**") | |
| for cost in costs: | |
| st.sidebar.markdown(f"- ${cost:.5f}") | |
| # streamlit run app.py | |
| if __name__ == "__main__": | |
| main() |