import gradio as gr import pandas as pd import logging from langchain_core.exceptions import OutputParserException import os from dotenv import load_dotenv import azure.cosmos.cosmos_client as cosmos_client from langchain.chains import create_retrieval_chain import datetime import uuid from LiteratureAgent import RoofCoverChatbot from Refiner import RefinementPipeline from helpers import get_article_info load_dotenv() refiner = RefinementPipeline() literature_agent = RoofCoverChatbot() ENV = os.getenv('ENV') HOST = os.getenv('ACCOUNT_HOST') MASTER_KEY = os.getenv('ACCOUNT_KEY') DATABASE_ID = os.getenv('COSMOS_DATABASE') CONTAINER_ID = os.getenv('COSMOS_CONTAINER') HISTORY_CONTAINER_ID = os.getenv('COSMOS_HISTORY_CONTAINER') client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True) database = client.get_database_client(DATABASE_ID) container = database.get_container_client(CONTAINER_ID) history_container = database.get_container_client(HISTORY_CONTAINER_ID) df = pd.read_csv("articles_db.csv") def initialize_session(session_id): # If no session_id exists, generate a new one if session_id is None: session_id = str(uuid.uuid4()) return session_id def llm_response(query, session_id): chat = {} titles, links, res_titles, res_links = [], [], [], [] session_id = initialize_session(session_id) chat["id"] = str(uuid.uuid4()) chat["chat_id"] = session_id chat["partitionKey"] = "RoofingRoadmap" chat["user"] = query chat["env"] = ENV answer = None if 'f wave' in query.lower() or 'f-wave' in query.lower() or 'fwave' in query.lower(): query = query.replace('f wave', 'f-wave shingle').replace('f-wave', 'f-wave shingle').replace('fwave', 'f-wave shingle') try: response = literature_agent.get_response(query) enhanced_query = refiner.invoke(question=query, answer=response) try: initial_answer = response['answer']['cited_answer'][0].get("answer", "Nothing") except Exception as e: initial_answer = "Nothing" if enhanced_query.get("enhanced_answer") == "Nothing" and initial_answer == "Nothing": answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." return answer if enhanced_query.get("enhanced_answer") != "Nothing": answer = enhanced_query['enhanced_answer'] else: answer = response citations = response['answer']['cited_answer'][1].get('citations', []) original_citations = [] if citations: for citation in citations: try: # edited_item = citation['citation'][1]["source"].replace("\\", "/").replace("Articles/", "").replace("Articles\\", "") original_citations.append(citation['citation'][1]["source"]) title, link = get_article_info(df, citation['citation'][1]["source"]) if title not in titles: titles.append(title) # if link not in links: links.append(link) except Exception as e: continue try: question_search = literature_agent.get_extra_resources(query, original_citations) except Exception as e: question_search = [] if question_search: for res_item in question_search: res_title, res_link = get_article_info(df, res_item.metadata["source"]) if res_title not in res_titles and res_title not in titles: res_titles.append(res_title) res_links.append(res_link) if len(res_titles) == 5: break except Exception as e: answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." return answer finally: if answer is None: answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." chat["ai"] = answer chat["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') container.create_item(body=chat) # Build the answer with superscript citations answer_with_citations = f"{answer}" for i, (title, link) in enumerate(zip(titles, links), start=1): answer_with_citations += f" [[{i}]({link})] " # Build the references section with clickable links if not links: markdown_list = f"{answer_with_citations}" else: citations_section = "\n\nCitations:\n" + "\n".join( [f"[{i}]: [{title}]({link})" for i, (title, link) in enumerate(zip(titles, links), start=1)] ) markdown_list = f"{answer_with_citations}{citations_section}" # Combine answer and citations for final markdown output if not res_links and not links: markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" markdown_list += "\n".join(["- [IBHS Website](https://ibhs.org)", "- [FORTIFIED Website](https://fortifiedhome.org/roof/)" ]) else: markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" markdown_list += "\n".join([f"- [{res_title}]({res_link})" for res_title, res_link in zip(res_titles, res_links)]) return markdown_list def vote(value, data: gr.LikeData, session_id: str = None): session_id = initialize_session(session_id) chat_vote = {} chat_vote["id"] = str(uuid.uuid4()) chat_vote["chat_id"] = session_id chat_vote["partitionKey"] = "RoofingRoadmapVotes" chat_vote["response"] = data.value[0].split('', 1)[0].split('\n', 1)[0] chat_vote["env"] = ENV chat_vote["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') if data.liked: chat_vote["vote"] = "upvote" else: chat_vote["vote"] = "downvote" history_container.create_item(body=chat_vote) def show_feedback_column(visible): if visible: # If visible, hide the column return gr.update(visible=False), gr.update(value=""), False else: # If not visible, show the column and clear the Textbox return gr.update(visible=True), "", True def user_feedback(value, session_id): session_id = initialize_session(session_id) chat_feedback = {} chat_feedback["id"] = str(uuid.uuid4()) chat_feedback["chat_id"] = session_id chat_feedback["partitionKey"] = "RoofingRoadmapFeedback" chat_feedback["feedback"] = value chat_feedback["env"] = ENV chat_feedback["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') history_container.create_item(body=chat_feedback) return gr.update(visible=False), "", session_id with gr.Blocks() as demo: session_id = gr.State(None) gr.Markdown("## Find literature to answer your question!") gr.Markdown("### Ask a question about the wind and hail performance of asphalt shingle, metal, and tile roofs.") with gr.Row(): with gr.Column(): chatbot = gr.Chatbot(type="messages", height=400) chatbot.like(vote, [chatbot, session_id], None) msg = gr.Textbox(label="Hit the Enter to send your question", placeholder="What's on your mind?", show_copy_button=True) with gr.Row(): send = gr.Button("Send", variant="secondary", scale=3) feedback = gr.Button("Feedback", variant="stop", scale=1) with gr.Column(visible=False, elem_id="feedback_column") as feedback_column: usr_msg = gr.Textbox(label="Submit feedback to IBHS", info="What went wrong?", placeholder="Give us as much detail as you can!", lines=3) usr_submit = gr.Button("Submit", variant="secondary") def user(user_message, history: list): return "", history + [{"role": "user", "content": user_message}] def bot(history: list, session_id_i): if session_id_i is None: session_id_i = initialize_session(session_id_i) bot_message = llm_response(history[-1]['content'], session_id_i) history.append({"role": "assistant", "content": ""}) for character in bot_message: history[-1]['content'] += character yield history, session_id_i feedback_column_state = gr.State(False) msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(bot, [chatbot, session_id], [chatbot,session_id]) send.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(bot, [chatbot, session_id], [chatbot, session_id]) feedback.click(fn=show_feedback_column, inputs=[feedback_column_state], outputs=[feedback_column, usr_msg, feedback_column_state]) usr_submit.click(user_feedback, [usr_msg, session_id], outputs=[feedback_column, usr_msg, session_id]) gr.Markdown("*Our chatbot is constantly learning and improving to better serve you!*") gr.Markdown("#### Additional questions? Contact IBHS Membership Manager Larry Scott at [lscott@ibhs.org]().") if __name__ == "__main__": demo.launch()