Spaces:
Sleeping
Sleeping
| from openai import OpenAI | |
| import gradio as gr | |
| import json | |
| from bot_actions import functions_dictionary, save_record | |
| import os | |
| CSS =""" | |
| .contain { display: flex; flex-direction: column; } | |
| .svelte-vt1mxs div:first-child { flex-grow: 1; overflow: auto;} | |
| #chatbot { flex-grow: 1; overflow: auto;} | |
| footer {display: none !important;} | |
| .app.svelte-182fdeq.svelte-182fdeq { | |
| max-width: 100vw !important; | |
| } | |
| #main_container { | |
| height: 95vh; | |
| } | |
| #markup_container { | |
| height: 100%; | |
| overflow:auto; | |
| } | |
| """ | |
| openAIToken = os.environ['openAIToken'] | |
| assistantId = os.environ['assistantId'] | |
| initial_message = os.environ['initialMessage'] | |
| client = OpenAI(api_key=openAIToken) | |
| def handle_requires_action(data): | |
| actions_results = [] | |
| for tool in data.required_action.submit_tool_outputs.tool_calls: | |
| function_name = tool.function.name | |
| function_args = json.loads(tool.function.arguments) | |
| print(function_name) | |
| print(function_args) | |
| try: | |
| result = functions_dictionary[tool.function.name](**function_args) | |
| print("Function result:", result) | |
| actions_results.append({"tool_output" : {"tool_call_id": tool.id, "output": result["message"]}}) | |
| except Exception as e: | |
| print(e) | |
| # Submit all tool_outputs at the same time | |
| return actions_results | |
| def create_thread_openai(sessionStorage): | |
| streaming_thread = client.beta.threads.create() | |
| sessionStorage["threadId"] = streaming_thread.id | |
| return sessionStorage | |
| def add_message_to_openai(text, threadId): | |
| print("User message: ", text) | |
| return client.beta.threads.messages.create( | |
| thread_id=threadId, | |
| role="user", | |
| content=text | |
| ) | |
| def transform_suggestions_into_list(string_of_suggestions): | |
| local_message = None | |
| parts = string_of_suggestions.split('#s#') | |
| list_of_suggestions = json.loads(parts[0]) | |
| list_of_suggestions = [ x for x in list_of_suggestions if x] | |
| if len(parts) > 1: | |
| local_message = parts[1] | |
| return list_of_suggestions, local_message | |
| def create_suggestions_list(suggestions): | |
| update_show = [gr.update(visible=True, value=w) for w in suggestions] | |
| update_hide = [gr.update(visible=False, value="") for _ in range(6-len(suggestions))] | |
| return update_show + update_hide | |
| def process_text_chunk(text, storage): | |
| print(text, end="", flush=True) | |
| local_message = None | |
| if "[" in text: | |
| storage["is_loading_suggestions"] = True | |
| if "#" in text and storage["is_loading_suggestions"] != True: | |
| storage["is_loading_markup"] = True | |
| if "<" in text: | |
| storage["is_loading_suggestions"] = False | |
| storage["is_loading_markup"] = False | |
| storage["is_loading_svg"] = True | |
| if storage["is_loading_suggestions"] == True or storage["is_loading_markup"] == True or storage["is_loading_svg"] == True: | |
| accumulative_string = storage["accumulative_string"] + text | |
| if storage["is_loading_suggestions"] == True: | |
| if "#s#" in accumulative_string: | |
| storage["is_loading_suggestions"] = False | |
| list_of_suggestions, local_message = transform_suggestions_into_list(accumulative_string) | |
| storage["list_of_suggestions"] = list_of_suggestions | |
| accumulative_string = "" | |
| elif "]" in accumulative_string and "]#" not in accumulative_string and not accumulative_string.endswith("]"): | |
| storage["is_loading_suggestions"] = False | |
| local_message = accumulative_string | |
| accumulative_string = "" | |
| elif storage["is_loading_markup"]: | |
| if "#p#" in accumulative_string: | |
| parts = accumulative_string.split("#p#") | |
| if len(parts) > 2: | |
| accumulative_string = parts[0] + parts[2] | |
| storage["markup_string"] = parts[1] | |
| storage["is_loading_markup"] = False | |
| else: | |
| local_message = parts[0] | |
| accumulative_string = "#p#" + parts[1] | |
| storage["markup_string"] = parts[1] | |
| elif "#" in accumulative_string and "#p" not in accumulative_string and not accumulative_string.endswith("#"): | |
| storage["is_loading_markup"] = False | |
| local_message = accumulative_string | |
| accumulative_string = "" | |
| else: | |
| if "<" in accumulative_string and "<s" not in accumulative_string and not accumulative_string.endswith("<"): | |
| storage["is_loading_svg"] = False | |
| local_message = accumulative_string | |
| accumulative_string = "" | |
| elif "<svg" in accumulative_string: | |
| parts = accumulative_string.split("<svg") | |
| if "#p#" in parts[0]: | |
| info_parts = parts[0].split('#p#') | |
| local_message = info_parts[0] | |
| else: | |
| local_message = parts[0] | |
| if "</svg>" in parts[1]: | |
| svg_ending = ("<svg" + parts[1]).split('</svg>') | |
| storage["svg"] = svg_ending[0] + '</svg>' | |
| accumulative_string = svg_ending[1] | |
| storage["is_loading_svg"] = False | |
| else: | |
| accumulative_string = "<svg" + parts[1] | |
| storage["svg"] = accumulative_string | |
| storage["accumulative_string"] = accumulative_string | |
| else: | |
| local_message = text | |
| return local_message, storage | |
| def handle_events(threadId, chat_history, storage): | |
| storage.update({ | |
| "list_of_suggestions" : [], | |
| "is_loading_suggestions" : False, | |
| "is_loading_markup" : False, | |
| "is_loading_svg": False, | |
| "accumulative_string" : "", | |
| "markup_string": "", | |
| "svg": "" | |
| }) | |
| try: | |
| with client.beta.threads.runs.stream( | |
| thread_id=threadId, | |
| assistant_id=assistantId | |
| ) as stream: | |
| for event in stream: | |
| if event.event == "thread.message.delta" and event.data.delta.content: | |
| text = event.data.delta.content[0].text.value | |
| local_message, storage = process_text_chunk(text, storage) | |
| if local_message is not None: | |
| chat_history[-1][1] += local_message | |
| yield [ chat_history, storage, storage["markup_string"], storage["svg"]] | |
| if event.event == 'thread.run.requires_action': | |
| result = handle_requires_action(event.data) | |
| tool_outputs = [x["tool_output"] for x in result] | |
| with client.beta.threads.runs.submit_tool_outputs_stream( | |
| thread_id=stream.current_run.thread_id, | |
| run_id=event.data.id, | |
| tool_outputs=tool_outputs, | |
| ) as action_stream: | |
| for text in action_stream.text_deltas: | |
| local_message, storage = process_text_chunk(text, storage) | |
| if local_message is not None: | |
| chat_history[-1][1] += local_message | |
| yield [chat_history, storage, storage["markup_string"], storage["svg"]] | |
| action_stream.close() | |
| stream.until_done() | |
| print("") | |
| return [chat_history, storage, storage["markup_string"], storage["svg"]] | |
| except Exception as e: | |
| print(e) | |
| chat_history[-1][1] = "Error occured during processing your message. Please try again" | |
| yield [chat_history, storage, storage["markup_string"], storage["svg"]] | |
| def initiate_chatting(chat_history, storage): | |
| threadId = storage["threadId"] | |
| chat_history = [[None, ""]] | |
| add_message_to_openai(initial_message, threadId) | |
| for response in handle_events(threadId, chat_history, storage): | |
| yield response | |
| def respond_on_user_msg(chat_history, storage): | |
| message = chat_history[-1][0] | |
| threadId = storage["threadId"] | |
| print("Responding for threadId: ", threadId) | |
| chat_history[-1][1] = "" | |
| add_message_to_openai(message, threadId) | |
| for response in handle_events(threadId, chat_history, storage): | |
| yield response | |
| def create_application(): | |
| with gr.Blocks(css=CSS, fill_height=True) as demo: | |
| storage = gr.State({"list_of_suggestions": [], "is_loading_suggestions": False, "is_loading_markup": False, "accumulative_string": "", "markup_string": ""}) | |
| btn_list = [] | |
| with gr.Row(elem_id="main_container"): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot(label="Facility managment bot", line_breaks=False, height=300, show_label=False, show_share_button=False, elem_id="chatbot") | |
| with gr.Row(): | |
| for i in range(6): | |
| btn = gr.Button(visible=False, size="sm") | |
| btn_list.append(btn) | |
| msg = gr.Textbox(label="Answer", interactive=False) | |
| with gr.Column(scale=1, elem_id="markup_container"): | |
| markdown = gr.Markdown(label="Bullet-list", value="# Facility information") | |
| with gr.Row(variant="compact"): | |
| svg_container = gr.HTML(label="SVG Container", value="""""") | |
| def user(user_message, history): | |
| return "", history + [[user_message, None]] | |
| def update_suggestions(storage): | |
| list_of_suggestions = storage['list_of_suggestions'] or [] | |
| btn_list = create_suggestions_list(list_of_suggestions) | |
| return btn_list | |
| def hide_suggestions(): | |
| return [gr.update(visible=False, value="") for _ in range(6)] | |
| def disable_msg(): | |
| message_box = gr.Textbox(value=None, interactive=False) | |
| return message_box | |
| def enable_msg(): | |
| message_box = gr.Textbox(value=None, interactive=True) | |
| return message_box | |
| add_user_message_flow = [user, [msg,chatbot], [msg,chatbot]] | |
| chat_response_flow = [respond_on_user_msg, [chatbot, storage], [chatbot, storage, markdown, svg_container]] | |
| update_suggestions_flow = [update_suggestions, storage, btn_list] | |
| hide_suggestions_flow = [hide_suggestions, None, btn_list] | |
| disable_msg_flow = [disable_msg, None, msg] | |
| enable_msg_flow = [enable_msg, None, msg] | |
| msg.submit(*add_user_message_flow | |
| ).then(*hide_suggestions_flow | |
| ).then(*disable_msg_flow | |
| ).then(*chat_response_flow | |
| ).then(*update_suggestions_flow | |
| ).then(*enable_msg_flow) | |
| for sug_btn in btn_list: | |
| add_suggestion_message_flow = [user, [sug_btn, chatbot], [msg, chatbot]] | |
| sug_btn.click(*add_suggestion_message_flow | |
| ).then(*hide_suggestions_flow | |
| ).then(*disable_msg_flow | |
| ).then(*chat_response_flow | |
| ).then(*update_suggestions_flow | |
| ).then(*enable_msg_flow) | |
| demo.load(create_thread_openai, inputs=storage, outputs=storage | |
| ).then(initiate_chatting, inputs=[chatbot, storage], outputs=[chatbot, storage, markdown, svg_container] | |
| ).then(*update_suggestions_flow | |
| ).then(*enable_msg_flow) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_application() | |
| demo.launch() |