Spaces:
Running
Running
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_openai import ChatOpenAI | |
| from langchain_chroma import Chroma | |
| import chromadb | |
| from chromadb.config import Settings | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from typing import List | |
| import datetime | |
| from langchain_core.documents import Document | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_core.output_parsers import XMLOutputParser | |
| import gradio as gr | |
| import pandas as pd | |
| import logging | |
| from langchain_core.exceptions import OutputParserException | |
| from langchain_core.prompts import MessagesPlaceholder | |
| import os | |
| from dotenv import load_dotenv | |
| import uuid | |
| import hashlib | |
| import azure.cosmos.cosmos_client as cosmos_client | |
| import azure.cosmos.exceptions as exceptions | |
| from azure.cosmos.partition_key import PartitionKey | |
| from langchain_community.chat_message_histories import ChatMessageHistory | |
| from langchain_core.chat_history import BaseChatMessageHistory | |
| from langchain_core.messages import ( | |
| AIMessage, | |
| HumanMessage, | |
| SystemMessage, | |
| ToolMessage, | |
| trim_messages, | |
| ) | |
| load_dotenv() | |
| # Constants | |
| PERSIST_DIRECTORY = "chroma_store" | |
| K_VALUE = 5 | |
| store = {} | |
| xml_system = """You are a helpful AI assistant for the FORTIFIED program, a voluntary initiative for construction | |
| and re-roofing designed to strengthen homes and commercial buildings against severe weather threats, including | |
| high winds, hail, hurricanes, and tornadoes. Your users may include homeowners, insurance agents, realtors, | |
| design professionals and construction professionals. When given a user question, you should consult the provided | |
| technical and general documents containing FORTIFIED standards, instructions, and program information to deliver | |
| accurate and relevant responses. Always provide clear, concise, and informed answers without unnecessary fluff. | |
| If the information necessary to answer the inquiry is not available within these | |
| documents, simply respond: ‘There is no information relevant to your inquiry in our current resources. | |
| Please contact FORTIFIED customer support for further assistance.’ | |
| Remember, you must return both an answer and citations. A citation consists of a VERBATIM quote that | |
| justifies the answer and the ID and also Source Name of the quote article. Return a citation for every quote across all articles | |
| that justify the answer. Use the following format for your final output: | |
| <cited_answer> | |
| <answer></answer> | |
| <citations> | |
| <citation><source_id></source_id><source></source><quote></quote></citation> | |
| <citation><source_id></source_id><source></source><quote></quote></citation> | |
| ... | |
| </citations> | |
| </cited_answer> | |
| Here are the articles:{context}""" | |
| xml_prompt = ChatPromptTemplate.from_messages([("system", xml_system), ("human", "{input}")]) | |
| ENV = os.getenv('ENV') | |
| HOST = os.getenv('ACCOUNT_HOST') | |
| MASTER_KEY = os.getenv('ACCOUNT_KEY') | |
| DATABASE_ID = os.getenv('COSMOS_DATABASE') | |
| CONTAINER_ID = os.getenv('COSMOS_CONTAINER') | |
| HISTORY_CONTAINER_ID = os.getenv('COSMOS_HISTORY_CONTAINER') | |
| client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True) | |
| database = client.get_database_client(DATABASE_ID) | |
| container = database.get_container_client(CONTAINER_ID) | |
| history_container = database.get_container_client(HISTORY_CONTAINER_ID) | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
| llm_mini = ChatOpenAI(model="gpt-4o-mini", temperature=0) | |
| contextualize_q_system_prompt = ( | |
| "Given a chat history and the latest user question " | |
| "which might reference context in the chat history, " | |
| "formulate a standalone question which can be understood " | |
| "without the chat history. Do NOT answer the question, " | |
| "just reformulate it if needed and otherwise return it as is." | |
| ) | |
| contextualize_q_prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", contextualize_q_system_prompt), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| ] | |
| ) | |
| runnable = contextualize_q_prompt | llm_mini | |
| def format_docs_xml(docs: List[Document]) -> str: | |
| formatted_docs = [ | |
| f"<source id=\"{i}\">\n<source>{doc.metadata['source']}</source>\n<article_snippet>{doc.page_content}</article_snippet>\n</source>" | |
| for i, doc in enumerate(docs) | |
| ] | |
| return f"\n\n<sources>\n{chr(10).join(formatted_docs)}\n</sources>" | |
| rag_chain_from_docs = ( | |
| RunnablePassthrough.assign(context=(lambda x: format_docs_xml(x["context"]))) | |
| | xml_prompt | |
| | llm | |
| | XMLOutputParser() | |
| ) | |
| settings = Settings(persist_directory=PERSIST_DIRECTORY) | |
| vectordb = Chroma(embedding_function=OpenAIEmbeddings(), persist_directory=PERSIST_DIRECTORY) | |
| retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": K_VALUE}) | |
| retrieve_docs = (lambda x: x["input"]) | retriever | |
| chain = RunnablePassthrough.assign(context=retrieve_docs).assign( | |
| answer=rag_chain_from_docs | |
| ) | |
| def get_article_info(df, file_name): | |
| if ".pdf" in file_name: | |
| title = df[df["FileName"] == file_name]["Title"].iloc[0] | |
| link = df[df["FileName"] == file_name]["Link"].iloc[0] | |
| else: | |
| title = df[df["Link"] == file_name]["Title"].iloc[0] | |
| link = file_name | |
| return title, link | |
| df = pd.read_csv("articles_db.csv") | |
| def vectordb_search(query): | |
| titles, links = [], [] | |
| question_search = retriever.invoke(query) | |
| for item in question_search: | |
| edited_item = item.metadata["source"].replace("Articles/", "") | |
| title, link = get_article_info(df, edited_item) | |
| if title not in titles: | |
| titles.append(title) | |
| if link not in links: | |
| links.append(link) | |
| return "\n".join([f"- [{title}]({link})" for title, link in zip(titles, links)]) | |
| def get_session_history(session_id: str) -> BaseChatMessageHistory: | |
| if session_id not in store: | |
| store[session_id] = ChatMessageHistory() | |
| return store[session_id] | |
| def generate_unique_string(): | |
| # Get the MAC address | |
| mac = uuid.getnode() | |
| # Convert the MAC address to a hexadecimal string | |
| mac_str = f'{mac:012x}' | |
| # Get the CPU info as a unique identifier for the machine | |
| try: | |
| with open('/proc/cpuinfo') as f: | |
| cpu_info = f.read() | |
| except FileNotFoundError: | |
| cpu_info = str(uuid.getnode()) # Fallback to MAC if CPU info is not accessible | |
| # Combine the MAC and CPU info, and hash it for uniqueness | |
| unique_string = hashlib.sha256((mac_str + cpu_info).encode()).hexdigest() | |
| return unique_string | |
| RESPONSE_ID = "" | |
| def llm_response(query, session_id): | |
| global RESPONSE_ID | |
| RESPONSE_ID = str(uuid.uuid4()) | |
| chat = {} | |
| chat["id"] = RESPONSE_ID | |
| chat["partitionKey"] = "FortifiedGeneral" | |
| chat["user"] = query | |
| chat["env"] = ENV | |
| titles, links, res_titles, res_links = [], [], [], [] | |
| unique_id = session_id | |
| config = {"configurable": {"thread_id": unique_id }} | |
| try: | |
| # modified_query = runnable.invoke({"input": query, "chat_history": history.messages}).content | |
| filtered_history = trim_messages(get_session_history(unique_id).messages, strategy="last", token_counter=len, max_tokens=5, | |
| start_on="human", end_on=("human", "tool"), include_system=True,) | |
| modified_query = runnable.invoke({"input": query, "chat_history": filtered_history}).content | |
| result = chain.invoke({"input": modified_query}, config=config) | |
| if not result['answer']['cited_answer'][0]["answer"]: | |
| return "There is no direct information in our database relevant to your inquiry. Please contact [FORTIFIED customer support](https://fortifiedhome.org/contact/) for further assistance." | |
| answer = result['answer']['cited_answer'][0]["answer"] | |
| history = get_session_history(unique_id) | |
| history.add_user_message(modified_query) | |
| history.add_ai_message(answer) | |
| if not result['answer']['cited_answer'][1]['citations']: | |
| answer_with_citations = f"{answer}" | |
| else: | |
| citations = result['answer']['cited_answer'][1]['citations'] | |
| for citation in citations: | |
| try: | |
| edited_item = citation['citation'][1]["source"].replace("Articles/", "") | |
| title, link = get_article_info(df, edited_item) | |
| if title not in titles: | |
| titles.append(title) | |
| if link not in links: | |
| links.append(link) | |
| except (TypeError, KeyError, IndexError): | |
| # Handle the error or simply pass if citation does not have the expected keys | |
| continue | |
| question_search = retriever.invoke(query) | |
| for res_item in question_search: | |
| edited_item = res_item.metadata["source"].replace("Articles/", "") | |
| res_title, res_link = get_article_info(df, edited_item) | |
| if res_title not in res_titles and res_title not in titles: | |
| res_titles.append(res_title) | |
| if res_link not in res_links and res_link not in links: | |
| res_links.append(res_link) | |
| # Build the answer with superscript citations | |
| answer_with_citations = f"{answer}" | |
| for i, (title, link) in enumerate(zip(titles, links), start=1): | |
| answer_with_citations += f" <sup>[[{i}]({link})]</sup> " # Append superscript citation numbers to the answer text | |
| if not links: | |
| markdown_list = f"{answer_with_citations}" | |
| else: | |
| citations_section = "\n\nCitations:\n" + "\n".join( | |
| [f"[{i}]: [{title}]({link})" for i, (title, link) in enumerate(zip(titles, links), start=1)] | |
| ) | |
| markdown_list = f"{answer_with_citations}{citations_section}" | |
| if not res_links: | |
| return markdown_list | |
| else: | |
| markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" | |
| markdown_list += "\n".join([f"- [{res_title}]({res_link})" for res_title, res_link in zip(res_titles, res_links)]) | |
| except OutputParserException: | |
| markdown_list = "There is no information relevant to your inquiry in my current resources. Please contact [FORTIFIED customer support](https://fortifiedhome.org/contact/) for further assistance." | |
| answer = markdown_list | |
| finally: | |
| chat["ai"] = answer | |
| chat["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| container.create_item(body=chat) | |
| return markdown_list | |
| def vote(value, data: gr.LikeData): | |
| chat_vote = {} | |
| chat_vote["id"] = str(uuid.uuid4()) | |
| chat_vote["chat_id"] = RESPONSE_ID | |
| chat_vote["partitionKey"] = "FortifiedGeneralVotes" | |
| chat_vote["response"] = data.value[0].split('<sup>', 1)[0].split('\n', 1)[0] | |
| chat_vote["env"] = ENV | |
| chat_vote["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| if data.liked: | |
| chat_vote["vote"] = "upvote" | |
| else: | |
| chat_vote["vote"] = "downvote" | |
| history_container.create_item(body=chat_vote) | |
| def show_feedback_column(visible): | |
| if visible: | |
| # If visible, hide the column | |
| return gr.update(visible=False), gr.update(value=""), False | |
| else: | |
| # If not visible, show the column and clear the Textbox | |
| return gr.update(visible=True), "", True | |
| def user_feedback(value, ): | |
| chat_feedback = {} | |
| chat_feedback["id"] = str(uuid.uuid4()) | |
| chat_feedback["chat_id"] = RESPONSE_ID | |
| chat_feedback["partitionKey"] = "FortifiedGeneralFeedback" | |
| chat_feedback["feedback"] = value | |
| chat_feedback["env"] = ENV | |
| chat_feedback["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| history_container.create_item(body=chat_feedback) | |
| return gr.update(visible=False), "" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## FORTIFIED AI Assistant!") | |
| with gr.Row(visible=True) as role_selector: | |
| with gr.Column(scale=4): | |
| pass | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Choose your role:") | |
| homeowner = gr.Button("Homeowner", variant="primary") | |
| design_pro = gr.Button("Design Professionals", variant="primary") | |
| evaluator = gr.Button("Evaluator", variant="primary") | |
| with gr.Column(scale=4): | |
| pass | |
| with gr.Row(visible=False) as chat_container: | |
| with gr.Column(): | |
| session_id = gr.Textbox(label="Session ID", visible=False) | |
| gr.Markdown( | |
| "### I'll try to answer any questions related to FORTIFIED program. Tell me what's on your mind?") | |
| chatbot = gr.Chatbot(type="messages", height=400) | |
| chatbot.like(vote, chatbot, None) | |
| msg = gr.Textbox(label="Hit the Enter to send your question", placeholder="What's on your mind?", show_copy_button=True) | |
| with gr.Row(): | |
| send = gr.Button("Send", variant="primary", scale=3) | |
| feedback = gr.Button("Feedback", variant="stop", scale=1) | |
| with gr.Column(visible=False, elem_id="feedback_column") as feedback_column: | |
| usr_msg = gr.Textbox(label="Submit feedback to IBHS", info="What went wrong?", placeholder="Give us as much detail as you can!", lines=3) | |
| usr_submit = gr.Button("Submit", variant="secondary") | |
| def user(user_message, history: list): | |
| return "", history + [{"role": "user", "content": user_message}] | |
| def bot(history: list, session_id_i): | |
| bot_message = llm_response(history[-1]['content'], session_id_i) | |
| history.append({"role": "assistant", "content": ""}) | |
| for character in bot_message: | |
| history[-1]['content'] += character | |
| yield history | |
| feedback_column_state = gr.State(False) | |
| msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, session_id], chatbot | |
| ) | |
| send.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| bot, [chatbot, session_id], chatbot | |
| ) | |
| feedback.click(fn=show_feedback_column, inputs=[feedback_column_state], | |
| outputs=[feedback_column, usr_msg, feedback_column_state]) | |
| usr_submit.click(user_feedback, usr_msg, outputs=[feedback_column, usr_msg]) | |
| gr.Markdown("*Our chatbot is constantly learning and improving to better serve you!*") | |
| def start(): | |
| unique_id = uuid.uuid4() | |
| return { | |
| chat_container: gr.update(visible=True), | |
| role_selector: gr.update(visible=False), | |
| session_id: gr.update(value=unique_id), | |
| } | |
| homeowner.click(start, [], [chat_container, role_selector, session_id]) | |
| design_pro.click(start, [], [chat_container, role_selector,session_id]) | |
| evaluator.click(start, [], [chat_container, role_selector, session_id]) | |
| if __name__ == "__main__": | |
| demo.launch() |