Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import logging | |
| from langchain_core.exceptions import OutputParserException | |
| import os | |
| from dotenv import load_dotenv | |
| import azure.cosmos.cosmos_client as cosmos_client | |
| from langchain.chains import create_retrieval_chain | |
| import datetime | |
| import uuid | |
| from LiteratureAgent import RoofCoverChatbot | |
| from Refiner import RefinementPipeline | |
| from helpers import get_article_info | |
| load_dotenv() | |
| refiner = RefinementPipeline() | |
| literature_agent = RoofCoverChatbot() | |
| ENV = os.getenv('ENV') | |
| HOST = os.getenv('ACCOUNT_HOST') | |
| MASTER_KEY = os.getenv('ACCOUNT_KEY') | |
| DATABASE_ID = os.getenv('COSMOS_DATABASE') | |
| CONTAINER_ID = os.getenv('COSMOS_CONTAINER') | |
| HISTORY_CONTAINER_ID = os.getenv('COSMOS_HISTORY_CONTAINER') | |
| client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True) | |
| database = client.get_database_client(DATABASE_ID) | |
| container = database.get_container_client(CONTAINER_ID) | |
| history_container = database.get_container_client(HISTORY_CONTAINER_ID) | |
| df = pd.read_csv("articles_db.csv") | |
| def initialize_session(session_id): | |
| # If no session_id exists, generate a new one | |
| if session_id is None: | |
| session_id = str(uuid.uuid4()) | |
| return session_id | |
| def llm_response(query, session_id): | |
| chat = {} | |
| titles, links, res_titles, res_links = [], [], [], [] | |
| session_id = initialize_session(session_id) | |
| chat["id"] = str(uuid.uuid4()) | |
| chat["chat_id"] = session_id | |
| chat["partitionKey"] = "RoofingRoadmap" | |
| chat["user"] = query | |
| chat["env"] = ENV | |
| answer = None | |
| if 'f wave' in query.lower() or 'f-wave' in query.lower() or 'fwave' in query.lower(): | |
| query = query.replace('f wave', 'f-wave shingle').replace('f-wave', 'f-wave shingle').replace('fwave', | |
| 'f-wave shingle') | |
| try: | |
| response = literature_agent.get_response(query) | |
| enhanced_query = refiner.invoke(question=query, answer=response) | |
| try: | |
| initial_answer = response['answer']['cited_answer'][0].get("answer", "Nothing") | |
| except Exception as e: | |
| initial_answer = "Nothing" | |
| if enhanced_query.get("enhanced_answer") == "Nothing" and initial_answer == "Nothing": | |
| answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." | |
| return answer | |
| if enhanced_query.get("enhanced_answer") != "Nothing": | |
| answer = enhanced_query['enhanced_answer'] | |
| else: | |
| answer = response | |
| citations = response['answer']['cited_answer'][1].get('citations', []) | |
| original_citations = [] | |
| if citations: | |
| for citation in citations: | |
| try: | |
| # edited_item = citation['citation'][1]["source"].replace("\\", "/").replace("Articles/", "").replace("Articles\\", "") | |
| original_citations.append(citation['citation'][1]["source"]) | |
| title, link = get_article_info(df, citation['citation'][1]["source"]) | |
| if title not in titles: | |
| titles.append(title) | |
| # if link not in links: | |
| links.append(link) | |
| except Exception as e: | |
| continue | |
| try: | |
| question_search = literature_agent.get_extra_resources(query, original_citations) | |
| except Exception as e: | |
| question_search = [] | |
| if question_search: | |
| for res_item in question_search: | |
| res_title, res_link = get_article_info(df, res_item.metadata["source"]) | |
| if res_title not in res_titles and res_title not in titles: | |
| res_titles.append(res_title) | |
| res_links.append(res_link) | |
| if len(res_titles) == 5: | |
| break | |
| except Exception as e: | |
| answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." | |
| return answer | |
| finally: | |
| if answer is None: | |
| answer = "Your search is beyond the scope of this tool at this time. Please explore the rest of [IBHS website](https://ibhs.org) to find research on this topic." | |
| chat["ai"] = answer | |
| chat["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| container.create_item(body=chat) | |
| # Build the answer with superscript citations | |
| answer_with_citations = f"{answer}" | |
| for i, (title, link) in enumerate(zip(titles, links), start=1): | |
| answer_with_citations += f" <sup>[[{i}]({link})]</sup> " | |
| # Build the references section with clickable links | |
| if not links: | |
| markdown_list = f"{answer_with_citations}" | |
| else: | |
| citations_section = "\n\nCitations:\n" + "\n".join( | |
| [f"[{i}]: [{title}]({link})" for i, (title, link) in enumerate(zip(titles, links), start=1)] | |
| ) | |
| markdown_list = f"{answer_with_citations}{citations_section}" | |
| # Combine answer and citations for final markdown output | |
| if not res_links and not links: | |
| markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" | |
| markdown_list += "\n".join(["- [IBHS Website](https://ibhs.org)", "- [FORTIFIED Website](https://fortifiedhome.org/roof/)" ]) | |
| else: | |
| markdown_list += f"\n\n\nHere is a list of articles that can provide more information about your inquiry:\n" | |
| markdown_list += "\n".join([f"- [{res_title}]({res_link})" for res_title, res_link in zip(res_titles, res_links)]) | |
| return markdown_list | |
| def vote(value, data: gr.LikeData, session_id: str = None): | |
| session_id = initialize_session(session_id) | |
| chat_vote = {} | |
| chat_vote["id"] = str(uuid.uuid4()) | |
| chat_vote["chat_id"] = session_id | |
| chat_vote["partitionKey"] = "RoofingRoadmapVotes" | |
| chat_vote["response"] = data.value[0].split('<sup>', 1)[0].split('\n', 1)[0] | |
| chat_vote["env"] = ENV | |
| chat_vote["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| if data.liked: | |
| chat_vote["vote"] = "upvote" | |
| else: | |
| chat_vote["vote"] = "downvote" | |
| history_container.create_item(body=chat_vote) | |
| def show_feedback_column(visible): | |
| if visible: | |
| # If visible, hide the column | |
| return gr.update(visible=False), gr.update(value=""), False | |
| else: | |
| # If not visible, show the column and clear the Textbox | |
| return gr.update(visible=True), "", True | |
| def user_feedback(value, session_id): | |
| session_id = initialize_session(session_id) | |
| chat_feedback = {} | |
| chat_feedback["id"] = str(uuid.uuid4()) | |
| chat_feedback["chat_id"] = session_id | |
| chat_feedback["partitionKey"] = "RoofingRoadmapFeedback" | |
| chat_feedback["feedback"] = value | |
| chat_feedback["env"] = ENV | |
| chat_feedback["timestamp"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| history_container.create_item(body=chat_feedback) | |
| return gr.update(visible=False), "", session_id | |
| with gr.Blocks() as demo: | |
| session_id = gr.State(None) | |
| gr.Markdown("## Find literature to answer your question!") | |
| gr.Markdown("### Ask a question about the wind and hail performance of asphalt shingle, metal, and tile roofs.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| chatbot = gr.Chatbot(type="messages", height=400) | |
| chatbot.like(vote, [chatbot, session_id], None) | |
| msg = gr.Textbox(label="Hit the Enter to send your question", placeholder="What's on your mind?", show_copy_button=True) | |
| with gr.Row(): | |
| send = gr.Button("Send", variant="secondary", scale=3) | |
| feedback = gr.Button("Feedback", variant="stop", scale=1) | |
| with gr.Column(visible=False, elem_id="feedback_column") as feedback_column: | |
| usr_msg = gr.Textbox(label="Submit feedback to IBHS", info="What went wrong?", placeholder="Give us as much detail as you can!", lines=3) | |
| usr_submit = gr.Button("Submit", variant="secondary") | |
| def user(user_message, history: list): | |
| return "", history + [{"role": "user", "content": user_message}] | |
| def bot(history: list, session_id_i): | |
| if session_id_i is None: | |
| session_id_i = initialize_session(session_id_i) | |
| bot_message = llm_response(history[-1]['content'], session_id_i) | |
| history.append({"role": "assistant", "content": ""}) | |
| for character in bot_message: | |
| history[-1]['content'] += character | |
| yield history, session_id_i | |
| feedback_column_state = gr.State(False) | |
| msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(bot, [chatbot, session_id], [chatbot,session_id]) | |
| send.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(bot, [chatbot, session_id], [chatbot, session_id]) | |
| feedback.click(fn=show_feedback_column, inputs=[feedback_column_state], outputs=[feedback_column, usr_msg, feedback_column_state]) | |
| usr_submit.click(user_feedback, [usr_msg, session_id], outputs=[feedback_column, usr_msg, session_id]) | |
| gr.Markdown("*Our chatbot is constantly learning and improving to better serve you!*") | |
| gr.Markdown("#### Additional questions? Contact IBHS Membership Manager Larry Scott at [lscott@ibhs.org]().") | |
| if __name__ == "__main__": | |
| demo.launch() |