from langchain_openai import OpenAIEmbeddings from langchain_openai import ChatOpenAI from langchain_chroma import Chroma import chromadb from chromadb.config import Settings from langchain_core.prompts import ChatPromptTemplate from typing import List import datetime from langchain_core.documents import Document from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import XMLOutputParser import gradio as gr import pandas as pd import logging from langchain_core.exceptions import OutputParserException from langchain_core.prompts import MessagesPlaceholder import os from dotenv import load_dotenv import uuid import hashlib import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.exceptions as exceptions from azure.cosmos.partition_key import PartitionKey from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import ( AIMessage, HumanMessage, SystemMessage, ToolMessage, trim_messages, ) load_dotenv() # Constants PERSIST_DIRECTORY = "chroma_store" K_VALUE = 5 store = {} xml_system = """You are a helpful AI assistant for the FORTIFIED program, a voluntary initiative for construction and re-roofing designed to strengthen homes and commercial buildings against severe weather threats, including high winds, hail, hurricanes, and tornadoes. Your users may include homeowners, insurance agents, realtors, design professionals and construction professionals. When given a user question, you should consult the provided technical and general documents containing FORTIFIED standards, instructions, and program information to deliver accurate and relevant responses. Always provide clear, concise, and informed answers without unnecessary fluff. If the information necessary to answer the inquiry is not available within these documents, simply respond: ‘There is no information relevant to your inquiry in our current resources. Please contact FORTIFIED customer support for further assistance.’ Remember, you must return both an answer and citations. A citation consists of a VERBATIM quote that justifies the answer and the ID and also Source Name of the quote article. Return a citation for every quote across all articles that justify the answer. Use the following format for your final output: ... Here are the articles:{context}""" xml_prompt = ChatPromptTemplate.from_messages([("system", xml_system), ("human", "{input}")]) ENV = os.getenv('ENV') HOST = os.getenv('ACCOUNT_HOST') MASTER_KEY = os.getenv('ACCOUNT_KEY') DATABASE_ID = os.getenv('COSMOS_DATABASE') CONTAINER_ID = os.getenv('COSMOS_CONTAINER') HISTORY_CONTAINER_ID = os.getenv('COSMOS_HISTORY_CONTAINER') client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True) database = client.get_database_client(DATABASE_ID) container = database.get_container_client(CONTAINER_ID) history_container = database.get_container_client(HISTORY_CONTAINER_ID) llm = ChatOpenAI(model="gpt-4o", temperature=0) llm_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0) contextualize_q_system_prompt = ( "Given a chat history and the latest user question " "which might reference context in the chat history, " "formulate a standalone question which can be understood " "without the chat history. Do NOT answer the question, " "just reformulate it if needed and otherwise return it as is." ) contextualize_q_prompt = ChatPromptTemplate.from_messages( [ ("system", contextualize_q_system_prompt), MessagesPlaceholder("chat_history"), ("human", "{input}"), ] ) runnable = contextualize_q_prompt | llm_mini def format_docs_xml(docs: List[Document]) -> str: formatted_docs = [ f"\n{doc.metadata['source']}\n{doc.page_content}\n" for i, doc in enumerate(docs) ] return f"\n\n\n{chr(10).join(formatted_docs)}\n" rag_chain_from_docs = ( RunnablePassthrough.assign(context=(lambda x: format_docs_xml(x["context"]))) | xml_prompt | llm | XMLOutputParser() ) settings = Settings(persist_directory=PERSIST_DIRECTORY) vectordb = Chroma(embedding_function=OpenAIEmbeddings(), persist_directory=PERSIST_DIRECTORY) retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": K_VALUE}) retrieve_docs = (lambda x: x["input"]) | retriever chain = RunnablePassthrough.assign(context=retrieve_docs).assign( answer=rag_chain_from_docs ) def get_article_info(df, file_name): if ".pdf" in file_name: title = df[df["FileName"] == file_name]["Title"].iloc[0] link = df[df["FileName"] == file_name]["Link"].iloc[0] else: title = df[df["Link"] == file_name]["Title"].iloc[0] link = file_name return title, link df = pd.read_csv("articles_db.csv") def vectordb_search(query): titles, links = [], [] question_search = retriever.invoke(query) for item in question_search: edited_item = item.metadata["source"].replace("Articles/", "") title, link = get_article_info(df, edited_item) if title not in titles: titles.append(title) if link not in links: links.append(link) return "\n".join([f"- [{title}]({link})" for title, link in zip(titles, links)]) def get_session_history(session_id: str) -> BaseChatMessageHistory: if session_id not in store: store[session_id] = ChatMessageHistory() return store[session_id] def generate_unique_string(): # Get the MAC address mac = uuid.getnode() # Convert the MAC address to a hexadecimal string mac_str = f'{mac:012x}' # Get the CPU info as a unique identifier for the machine try: with open('/proc/cpuinfo') as f: cpu_info = f.read() except FileNotFoundError: cpu_info = str(uuid.getnode()) # Fallback to MAC if CPU info is not accessible # Combine the MAC and CPU info, and hash it for uniqueness unique_string = hashlib.sha256((mac_str + cpu_info).encode()).hexdigest() return unique_string RESPONSE_ID = "" def llm_response(query, session_id): global RESPONSE_ID RESPONSE_ID = str(uuid.uuid4()) chat = {} chat["id"] = RESPONSE_ID chat["partitionKey"] = "FortifiedGeneral" chat["user"] = query chat["env"] = ENV titles, links, res_titles, res_links = [], [], [], [] unique_id = session_id config = {"configurable": {"thread_id": unique_id }} try: # modified_query = runnable.invoke({"input": query, "chat_history": history.messages}).content filtered_history = trim_messages(get_session_history(unique_id).messages, strategy="last", token_counter=len, max_tokens=5, start_on="human", end_on=("human", "tool"), include_system=True,) modified_query = runnable.invoke({"input": query, "chat_history": filtered_history}).content result = chain.invoke({"input": modified_query}, config=config) if not result['answer']['cited_answer'][0]["answer"]: return "There is no direct information in our database relevant to your inquiry. Please contact [FORTIFIED customer support](https://fortifiedhome.org/contact/) for further assistance." answer = result['answer']['cited_answer'][0]["answer"] history = get_session_history(unique_id) history.add_user_message(modified_query) history.add_ai_message(answer) if not result['answer']['cited_answer'][1]['citations']: answer_with_citations = f"{answer}" else: citations = result['answer']['cited_answer'][1]['citations'] for citation in citations: try: edited_item = citation['citation'][1]["source"].replace("Articles/", "") title, link = get_article_info(df, edited_item) if title not in titles: titles.append(title) if link not in links: links.append(link) except (TypeError, KeyError, IndexError): # Handle the error or simply pass if citation does not have the expected keys continue question_search = retriever.invoke(query) for res_item in question_search: edited_item = res_item.metadata["source"].replace("Articles/", "") res_title, res_link = get_article_info(df, edited_item) if res_title not in res_titles and res_title not in titles: res_titles.append(res_title) if res_link not in res_links and res_link not in links: res_links.append(res_link) # Build the answer with superscript citations answer_with_citations = f"{answer}" for i, (title, link) in enumerate(zip(titles, links), start=1): answer_with_citations += f" [[{i}]({link})] " # Append superscript citation numbers to the answer text if not links: markdown_list = f"{answer_with_citations}" else: citations_section = "\n\nCitations:\n" + "\n".join( [f"[{i}]: [{title}]({link})" for i, (title, link) in enumerate(zip(titles, links), start=1)] ) markdown_list = f"{answer_with_citations}{citations_section}" if not res_links: return markdown_list else: markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" markdown_list += "\n".join([f"- [{res_title}]({res_link})" for res_title, res_link in zip(res_titles, res_links)]) except OutputParserException: markdown_list = "There is no information relevant to your inquiry in my current resources. Please contact [FORTIFIED customer support](https://fortifiedhome.org/contact/) for further assistance." answer = markdown_list finally: chat["ai"] = answer chat["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') container.create_item(body=chat) return markdown_list def vote(value, data: gr.LikeData): chat_vote = {} chat_vote["id"] = str(uuid.uuid4()) chat_vote["chat_id"] = RESPONSE_ID chat_vote["partitionKey"] = "FortifiedGeneralVotes" chat_vote["response"] = data.value[0].split('', 1)[0].split('\n', 1)[0] chat_vote["env"] = ENV chat_vote["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') if data.liked: chat_vote["vote"] = "upvote" else: chat_vote["vote"] = "downvote" history_container.create_item(body=chat_vote) def show_feedback_column(visible): if visible: # If visible, hide the column return gr.update(visible=False), gr.update(value=""), False else: # If not visible, show the column and clear the Textbox return gr.update(visible=True), "", True def user_feedback(value, ): chat_feedback = {} chat_feedback["id"] = str(uuid.uuid4()) chat_feedback["chat_id"] = RESPONSE_ID chat_feedback["partitionKey"] = "FortifiedGeneralFeedback" chat_feedback["feedback"] = value chat_feedback["env"] = ENV chat_feedback["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') history_container.create_item(body=chat_feedback) return gr.update(visible=False), "" with gr.Blocks() as demo: gr.Markdown("## FORTIFIED AI Assistant!") with gr.Row(visible=True) as role_selector: with gr.Column(scale=4): pass with gr.Column(scale=2): gr.Markdown("### Choose your role:") homeowner = gr.Button("Homeowner", variant="primary") design_pro = gr.Button("Design Professionals", variant="primary") evaluator = gr.Button("Evaluator", variant="primary") with gr.Column(scale=4): pass with gr.Row(visible=False) as chat_container: with gr.Column(): session_id = gr.Textbox(label="Session ID", visible=False) gr.Markdown( "### I'll try to answer any questions related to FORTIFIED program. Tell me what's on your mind?") chatbot = gr.Chatbot(type="messages", height=400) chatbot.like(vote, chatbot, None) msg = gr.Textbox(label="Hit the Enter to send your question", placeholder="What's on your mind?", show_copy_button=True) with gr.Row(): send = gr.Button("Send", variant="primary", scale=3) feedback = gr.Button("Feedback", variant="stop", scale=1) with gr.Column(visible=False, elem_id="feedback_column") as feedback_column: usr_msg = gr.Textbox(label="Submit feedback to IBHS", info="What went wrong?", placeholder="Give us as much detail as you can!", lines=3) usr_submit = gr.Button("Submit", variant="secondary") def user(user_message, history: list): return "", history + [{"role": "user", "content": user_message}] def bot(history: list, session_id_i): bot_message = llm_response(history[-1]['content'], session_id_i) history.append({"role": "assistant", "content": ""}) for character in bot_message: history[-1]['content'] += character yield history feedback_column_state = gr.State(False) msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, session_id], chatbot ) send.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, session_id], chatbot ) feedback.click(fn=show_feedback_column, inputs=[feedback_column_state], outputs=[feedback_column, usr_msg, feedback_column_state]) usr_submit.click(user_feedback, usr_msg, outputs=[feedback_column, usr_msg]) gr.Markdown("*Our chatbot is constantly learning and improving to better serve you!*") def start(): unique_id = uuid.uuid4() return { chat_container: gr.update(visible=True), role_selector: gr.update(visible=False), session_id: gr.update(value=unique_id), } homeowner.click(start, [], [chat_container, role_selector, session_id]) design_pro.click(start, [], [chat_container, role_selector,session_id]) evaluator.click(start, [], [chat_container, role_selector, session_id]) if __name__ == "__main__": demo.launch()