Spaces:
Runtime error
Runtime error
| import os | |
| import logging | |
| import logging.config | |
| from typing import Any | |
| from uuid import uuid4, UUID | |
| import json | |
| import sys | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage | |
| from langgraph.types import RunnableConfig | |
| from pydantic import BaseModel | |
| from pathlib import Path | |
| import subprocess | |
| # def update_repo(): | |
| # try: | |
| # subprocess.run(["git", "fetch", "origin"], check=True) | |
| # subprocess.run(["git", "reset", "--hard", "origin/main"], check=True) | |
| # subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], check=True) | |
| # subprocess.run([sys.executable, "app.py"], check=True) | |
| # except Exception as e: | |
| # print(f"Git update failed: {e}") | |
| # update_repo() | |
| load_dotenv() | |
| # There are tools set here dependent on environment variables | |
| from graph import graph, weak_model, search_enabled # noqa | |
| FOLLOWUP_QUESTION_NUMBER = 3 | |
| TRIM_MESSAGE_LENGTH = 16 # Includes tool messages | |
| USER_INPUT_MAX_LENGTH = 10000 # Characters | |
| # We need the same secret for data persistance | |
| # If you store sensitive data, you should store your secret in .env | |
| BROWSER_STORAGE_SECRET = "itsnosecret" | |
| with open('logging-config.json', 'r') as fh: | |
| config = json.load(fh) | |
| logging.config.dictConfig(config) | |
| logger = logging.getLogger(__name__) | |
| def load_initial_greeting(filepath="greeting_prompt.txt") -> str: | |
| """ | |
| Loads the initial greeting message from a specified text file. | |
| """ | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read().strip() | |
| except FileNotFoundError: | |
| # Use a logger if you have one configured, otherwise print | |
| # logger.warning(f"Warning: Prompt file '{filepath}' not found.") | |
| print(f"Warning: Prompt file '{filepath}' not found. Using default.") | |
| return "Welcome to the application! (Default Greeting)" | |
| async def chat_fn(user_input: str, history: dict, input_graph_state: dict, uuid: UUID, prompt: str, search_enabled: bool, download_website_text_enabled: bool): | |
| """ | |
| Args: | |
| user_input (str): The user's input message | |
| history (dict): The history of the conversation in gradio | |
| input_graph_state (dict): The current state of the graph. This includes tool call history | |
| uuid (UUID): The unique identifier for the current conversation. This can be used in conjunction with langgraph or for memory | |
| prompt (str): The system prompt | |
| Yields: | |
| str: The output message | |
| dict|Any: The final state of the graph | |
| bool|Any: Whether to trigger follow up questions | |
| We do not use gradio history in the graph since we want the ToolMessage in the history | |
| ordered properly. GraphProcessingState.messages is used as history instead | |
| """ | |
| try: | |
| # logger.info(f"Prompt: {prompt}") | |
| input_graph_state["tools_enabled"] = { | |
| "download_website_text": download_website_text_enabled, | |
| "tavily_search_results_json": search_enabled, | |
| } | |
| if prompt: | |
| input_graph_state["prompt"] = prompt | |
| if input_graph_state.get("awaiting_human_input"): | |
| input_graph_state["messages"].append( | |
| ToolMessage( | |
| tool_call_id=input_graph_state.pop("human_assistance_tool_id"), | |
| content=user_input | |
| ) | |
| ) | |
| input_graph_state["awaiting_human_input"] = False | |
| else: | |
| # New user message | |
| if "messages" not in input_graph_state: | |
| input_graph_state["messages"] = [] | |
| input_graph_state["messages"].append( | |
| HumanMessage(user_input[:USER_INPUT_MAX_LENGTH]) | |
| ) | |
| input_graph_state["messages"] = input_graph_state["messages"][-TRIM_MESSAGE_LENGTH:] | |
| config = RunnableConfig( | |
| recursion_limit=20, | |
| run_name="user_chat", | |
| configurable={"thread_id": uuid} | |
| ) | |
| output: str = "" | |
| final_state: dict | Any = {} | |
| waiting_output_seq: list[str] = [] | |
| async for stream_mode, chunk in graph.astream( | |
| input_graph_state, | |
| config=config, | |
| stream_mode=["values", "messages"], | |
| ): | |
| if stream_mode == "values": | |
| final_state = chunk | |
| last_message = chunk["messages"][-1] | |
| if hasattr(last_message, "tool_calls"): | |
| for msg_tool_call in last_message.tool_calls: | |
| tool_name: str = msg_tool_call['name'] | |
| if tool_name == "tavily_search_results_json": | |
| query = msg_tool_call['args']['query'] | |
| waiting_output_seq.append(f"Searching for '{query}'...") | |
| yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
| # download_website_text is the name of the function defined in graph.py | |
| elif tool_name == "download_website_text": | |
| url = msg_tool_call['args']['url'] | |
| waiting_output_seq.append(f"Downloading text from '{url}'...") | |
| yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
| elif tool_name == "human_assistance": | |
| query = msg_tool_call["args"]["query"] | |
| waiting_output_seq.append(f"🤖: {query}") | |
| # # Save state to resume after user provides input | |
| # input_graph_state["awaiting_human_input"] = True | |
| # input_graph_state["human_assistance_tool_id"] = msg_tool_call["id"] | |
| # # Indicate that human input is needed | |
| # yield "\n".join(waiting_output_seq), input_graph_state, gr.skip(), True | |
| # return # Pause execution, resume in next call | |
| # FIX 1: Modify `final_state`, which contains the latest AIMessage from the graph. | |
| final_state["awaiting_human_input"] = True | |
| final_state["human_assistance_tool_id"] = msg_tool_call["id"] | |
| # FIX 2: Yield a 3-item tuple to match your 3 Gradio outputs. | |
| # The 'end_of_response' flag is False because the stream is pausing, not ending. | |
| yield "\n".join(waiting_output_seq), final_state, False | |
| return # Pause execution, resume in next call | |
| else: | |
| waiting_output_seq.append(f"Running {tool_name}...") | |
| yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
| elif stream_mode == "messages": | |
| msg, metadata = chunk | |
| # print("output: ", msg, metadata) | |
| # assistant_node is the name we defined in the langgraph graph | |
| if metadata.get('langgraph_node') == "assistant_node": # Use .get for safety | |
| current_chunk_text = "" | |
| if isinstance(msg.content, str): | |
| current_chunk_text = msg.content | |
| elif isinstance(msg.content, list): | |
| for block in msg.content: | |
| if isinstance(block, dict) and block.get("type") == "text": | |
| current_chunk_text += block.get("text", "") | |
| elif isinstance(block, str): # Fallback if content is list of strings | |
| current_chunk_text += block | |
| if current_chunk_text: # Only add and yield if there's actually text | |
| output += current_chunk_text | |
| yield output, gr.skip(), gr.skip() | |
| # Trigger for asking follow up questions | |
| # + store the graph state for next iteration | |
| # yield output, dict(final_state), gr.skip() | |
| yield output + " ", dict(final_state), True | |
| except Exception: | |
| logger.exception("Exception occurred") | |
| user_error_message = "There was an error processing your request. Please try again." | |
| yield user_error_message, gr.skip(), False | |
| def clear(): | |
| return dict(), uuid4() | |
| class FollowupQuestions(BaseModel): | |
| """Model for langchain to use for structured output for followup questions""" | |
| questions: list[str] | |
| async def populate_followup_questions(end_of_chat_response: bool, messages: dict[str, str], uuid: UUID): | |
| """ | |
| This function gets called a lot due to the asynchronous nature of streaming | |
| Only populate followup questions if streaming has completed and the message is coming from the assistant | |
| """ | |
| if not end_of_chat_response or not messages or messages[-1]["role"] != "assistant": | |
| return *[gr.skip() for _ in range(FOLLOWUP_QUESTION_NUMBER)], False | |
| config = RunnableConfig( | |
| run_name="populate_followup_questions", | |
| configurable={"thread_id": uuid} | |
| ) | |
| weak_model_with_config = weak_model.with_config(config) | |
| follow_up_questions = await weak_model_with_config.with_structured_output(FollowupQuestions).ainvoke([ | |
| ("system", f"suggest {FOLLOWUP_QUESTION_NUMBER} followup questions for the user to ask the assistant. Refrain from asking personal questions."), | |
| *messages, | |
| ]) | |
| if len(follow_up_questions.questions) != FOLLOWUP_QUESTION_NUMBER: | |
| raise ValueError("Invalid value of followup questions") | |
| buttons = [] | |
| for i in range(FOLLOWUP_QUESTION_NUMBER): | |
| buttons.append( | |
| gr.Button(follow_up_questions.questions[i], visible=True, elem_classes="chat-tab"), | |
| ) | |
| return *buttons, False | |
| async def summarize_chat(end_of_chat_response: bool, messages: dict, sidebar_summaries: dict, uuid: UUID): | |
| """Summarize chat for tab names""" | |
| # print("\n------------------------") | |
| # print("not end_of_chat_response", not end_of_chat_response) | |
| # print("not messages", not messages) | |
| # if messages: | |
| # print("messages[-1][role] != assistant", messages[-1]["role"] != "assistant") | |
| # print("isinstance(sidebar_summaries, type(lambda x: x))", isinstance(sidebar_summaries, type(lambda x: x))) | |
| # print("uuid in sidebar_summaries", uuid in sidebar_summaries) | |
| should_return = ( | |
| not end_of_chat_response or | |
| not messages or | |
| messages[-1]["role"] != "assistant" or | |
| # This is a bug with gradio | |
| isinstance(sidebar_summaries, type(lambda x: x)) or | |
| # Already created summary | |
| uuid in sidebar_summaries | |
| ) | |
| if should_return: | |
| return gr.skip(), gr.skip() | |
| filtered_messages = [] | |
| for msg in messages: | |
| if isinstance(msg, dict) and msg.get("content") and msg["content"].strip(): | |
| filtered_messages.append(msg) | |
| # If we don't have any valid messages after filtering, provide a default summary | |
| if not filtered_messages: | |
| if uuid not in sidebar_summaries: | |
| sidebar_summaries[uuid] = "Chat History" | |
| return sidebar_summaries, False | |
| config = RunnableConfig( | |
| run_name="summarize_chat", | |
| configurable={"thread_id": uuid} | |
| ) | |
| try: | |
| weak_model_with_config = weak_model.with_config(config) | |
| summary_response = await weak_model_with_config.ainvoke([ | |
| ("system", "summarize this chat in 7 tokens or less. Refrain from using periods"), | |
| *filtered_messages, | |
| ]) | |
| if uuid not in sidebar_summaries: | |
| sidebar_summaries[uuid] = summary_response.content | |
| except Exception as e: | |
| logger.error(f"Error summarizing chat: {e}") | |
| # Provide a fallback summary if an error occurs | |
| if uuid not in sidebar_summaries: | |
| sidebar_summaries[uuid] = "Previous Chat" | |
| return sidebar_summaries, False | |
| async def new_tab(uuid, gradio_graph, messages, tabs, prompt, sidebar_summaries): | |
| new_uuid = uuid4() | |
| new_graph = {} | |
| if uuid not in sidebar_summaries: | |
| sidebar_summaries, _ = await summarize_chat(True, messages, sidebar_summaries, uuid) | |
| tabs[uuid] = { | |
| "graph": gradio_graph, | |
| "messages": messages, | |
| "prompt": prompt, | |
| } | |
| suggestion_buttons = [] | |
| for _ in range(FOLLOWUP_QUESTION_NUMBER): | |
| suggestion_buttons.append(gr.Button(visible=False)) | |
| new_messages = {} | |
| # --- MODIFICATION FOR GREETING IN EVERY NEW CHAT --- | |
| greeting_text = load_initial_greeting() # Get the greeting | |
| # `gr.Chatbot` expects a list of tuples or list of dicts. | |
| # For `type="messages"`, it's list of dicts: [{"role": "assistant", "content": "Hello"}] | |
| # Or list of tuples: [(None, "Hello")] | |
| # Let's assume your chatbot is configured for list of tuples (None, bot_message) for initial messages | |
| new_chat_messages_for_display = [{"role": "assistant", "content": greeting_text}] | |
| # If your chat_interface.chatbot_value expects list of dicts: | |
| # new_messages_history = [{"role": "assistant", "content": greeting_text}] | |
| # --- END MODIFICATION --- | |
| new_prompt = "You are a helpful assistant." | |
| return new_uuid, new_graph, new_chat_messages_for_display, tabs, new_prompt, sidebar_summaries, *suggestion_buttons | |
| def switch_tab(selected_uuid, tabs, gradio_graph, uuid, messages, prompt): | |
| # I don't know of another way to lookup uuid other than | |
| # by the button value | |
| # Save current state | |
| if messages: | |
| tabs[uuid] = { | |
| "graph": gradio_graph, | |
| "messages": messages, | |
| "prompt": prompt | |
| } | |
| if selected_uuid not in tabs: | |
| logger.error(f"Could not find the selected tab in offloaded_tabs_data_storage {selected_uuid}") | |
| return gr.skip(), gr.skip(), gr.skip(), gr.skip() | |
| selected_tab_state = tabs[selected_uuid] | |
| selected_graph = selected_tab_state["graph"] | |
| selected_messages = selected_tab_state["messages"] | |
| selected_prompt = selected_tab_state.get("prompt", "") | |
| suggestion_buttons = [] | |
| for _ in range(FOLLOWUP_QUESTION_NUMBER): | |
| suggestion_buttons.append(gr.Button(visible=False)) | |
| return selected_graph, selected_uuid, selected_messages, tabs, selected_prompt, *suggestion_buttons | |
| def delete_tab(current_chat_uuid, selected_uuid, sidebar_summaries, tabs): | |
| output_messages = gr.skip() | |
| if current_chat_uuid == selected_uuid: | |
| output_messages = dict() | |
| if selected_uuid in tabs: | |
| del tabs[selected_uuid] | |
| if selected_uuid in sidebar_summaries: | |
| del sidebar_summaries[selected_uuid] | |
| return sidebar_summaries, tabs, output_messages | |
| def submit_edit_tab(selected_uuid, sidebar_summaries, text): | |
| sidebar_summaries[selected_uuid] = text | |
| return sidebar_summaries, "" | |
| def load_mesh(mesh_file_name): | |
| return mesh_file_name | |
| def display_initial_greeting(is_new_user_state_value: bool): | |
| """ | |
| Determines if a greeting should be displayed and returns the UI updates. | |
| It also returns the new state for 'is_new_user_for_greeting'. | |
| """ | |
| if is_new_user_state_value: | |
| greeting_message_text = load_initial_greeting() | |
| # For a chatbot, the history is a list of tuples: [(user_msg, bot_msg)] | |
| # For an initial message from the bot, user_msg is None. | |
| initial_chat_history = [(None, greeting_message_text)] | |
| updated_is_new_user_flag = False # Greeting shown, so set to False | |
| return initial_chat_history, updated_is_new_user_flag | |
| else: | |
| # Not a new user (or already greeted), so no initial message in chat history | |
| # and the flag remains False. | |
| return [], False | |
| def get_sorted_3d_model_files(): | |
| """ | |
| Gets all 3D model files and sorts them by creation time (latest first). | |
| """ | |
| examples_dir = Path("./generated_3d_models") | |
| if not examples_dir.exists(): | |
| examples_dir.mkdir(parents=True, exist_ok=True) # Create dir if it doesn't exist | |
| return [] | |
| model_files = [ | |
| file for file in examples_dir.glob("*") | |
| if file.suffix.lower() in {".obj", ".glb", ".gltf"} | |
| ] | |
| sorted_files = sorted( | |
| model_files, | |
| key=lambda x: x.stat().st_ctime, | |
| reverse=True | |
| ) | |
| return sorted_files | |
| def get_3d_model_examples(): | |
| """ | |
| Returns a list of file paths for the gr.Examples component. | |
| """ | |
| return [str(file) for file in get_sorted_3d_model_files()] | |
| def get_latest_3d_model(): | |
| """ | |
| Returns the path to the most recently created 3D model. | |
| """ | |
| sorted_files = get_sorted_3d_model_files() | |
| if sorted_files: | |
| return str(sorted_files[0]) | |
| return None | |
| def update_3d_models_on_load(): | |
| """ | |
| Gets the latest 3D model to display and updates the examples radio list on app load. | |
| """ | |
| print("\n🍱🍱 Loading generated 3d models") | |
| sorted_files = get_sorted_3d_model_files() | |
| latest_model = str(sorted_files[0]) if sorted_files else None | |
| example_paths = [str(file) for file in sorted_files] | |
| # Return the latest model to the 3D viewer, and update the choices | |
| # and selected value of the Radio component. | |
| return latest_model, gr.update(choices=example_paths, value=latest_model) | |
| def update_generated_image_on_state_change(state: dict): | |
| """ | |
| Checks the langgraph state for a generated image URL and updates the UI if found. | |
| """ | |
| # The key comes from your `prompt_planning_node` | |
| image_url = state.get("generated_image_url_from_dalle") | |
| if image_url: | |
| print(f"🖼️ Found image URL in state, updating UI: {image_url}") | |
| return image_url | |
| else: | |
| # If the key doesn't exist or is None, don't update the image component. | |
| print(f"🖼️ Image is not generated yet") | |
| return gr.skip() | |
| # #! fix this shit | |
| # def update_prompt_with_last_ai_message(state: dict): | |
| # """ | |
| # Finds the last AI message WITH VISIBLE CONTENT in the state and updates the prompt textbox. | |
| # """ | |
| # if not state or "messages" not in state or not state["messages"]: | |
| # print('🧙🧙 No messages found to display') | |
| # return gr.skip() | |
| # # Iterate backwards through the history to find the latest AI message with text | |
| # for message in reversed(state["messages"]): | |
| # print("\n") | |
| # print('message in message state--> ', message) | |
| # # ======================= ✨ START OF CHANGES ✨ ======================= | |
| # # FIX: Make the check robust. Handle both AIMessage objects (live state) | |
| # # and dictionaries (state restored from browser). | |
| # is_ai_message = isinstance(message, AIMessage) or \ | |
| # (isinstance(message, dict) and message.get("type") == "ai") | |
| # if is_ai_message: | |
| # text_content = "" | |
| # # Use dictionary-style access, which is safer and works for both objects and dicts. | |
| # msg_content = message.get("content") if isinstance(message, dict) else message.content | |
| # # Tool calls can be in different locations after deserialization. | |
| # tool_calls = [] | |
| # if isinstance(message, dict): | |
| # # Check both common locations for tool calls in a deserialized message dict | |
| # tool_calls = message.get("tool_calls") or message.get("additional_kwargs", {}).get("tool_calls", []) | |
| # else: | |
| # tool_calls = getattr(message, "tool_calls", []) | |
| # # Check for regular text content | |
| # if isinstance(msg_content, str) and msg_content.strip(): | |
| # text_content = msg_content | |
| # # ALSO check for text inside a human_assistance tool call | |
| # elif tool_calls: | |
| # for tool_call in tool_calls: | |
| # if tool_call.get("name") == "human_assistance": | |
| # args = tool_call.get("args", {}) | |
| # # The arguments might be a stringified JSON, so we parse it safely. | |
| # if isinstance(args, str): | |
| # try: | |
| # args = json.loads(args) | |
| # except json.JSONDecodeError: | |
| # continue # Skip if args are not valid JSON | |
| # query = args.get("query") | |
| # if query: | |
| # text_content = query | |
| # break # Found the query in the tool call | |
| # # If we found any displayable text, update the textbox and exit. | |
| # if text_content: | |
| # print(f"🤖 Found last displayable AI message: '{text_content}'") | |
| # return gr.update(value=text_content) | |
| # # ======================== ✨ END OF CHANGES ✨ ======================== | |
| # # If the loop finishes without finding any displayable AI message | |
| # print('🧙🧙 Retunring without any found messages') | |
| # return gr.skip() | |
| def update_build_plan_display(state: dict): | |
| print('\n📝 Loading build plan') | |
| """ | |
| Searches the message history for the build plan and updates the Markdown display. | |
| """ | |
| if not state or "messages" not in state: | |
| print('\n📝 Start the chat to create a build plan') | |
| return gr.skip() | |
| # Search backwards through messages to find the one with the plan | |
| for message in reversed(state.get("messages", [])): | |
| # Handle both live AIMessage objects and deserialized dicts | |
| is_ai_message = isinstance(message, AIMessage) or \ | |
| (isinstance(message, dict) and message.get("type") == "ai") | |
| if is_ai_message: | |
| content = message.get("content") if isinstance(message, dict) else message.content | |
| # Check for unique keywords that identify the build plan | |
| if isinstance(content, str) and "Materials" in content: | |
| # Extract only the plan part, stopping before the 3D prompt section | |
| # plan_start_marker = "1. **Materials Needed**" | |
| # plan_end_marker = "Now, let's create a 3D model prompt" | |
| # plan_start_index = content.find(plan_start_marker) | |
| # if plan_start_index != -1: | |
| # plan_end_index = content.find(plan_end_marker) | |
| # # Take the substring containing the plan | |
| # plan_text = content[plan_start_index:plan_end_index if plan_end_index != -1 else None].strip() | |
| print(f"📝 Found and displaying Build Plan.") | |
| return gr.update(value=content) # Update the Markdown component | |
| print('📝 Build plan is not generated yet') | |
| return gr.skip() # Return skip if no plan is found | |
| def update_model_prompt_display(state: dict): | |
| """ | |
| Searches the message history for the 3D model prompt and updates the Textbox display. | |
| """ | |
| if not state or "messages" not in state: | |
| return gr.skip() | |
| prompt_marker = "ACCURATE PROMPT FOR MODEL GENERATING:" | |
| # Search backwards through messages | |
| for message in reversed(state.get("messages", [])): | |
| is_ai_message = isinstance(message, AIMessage) or \ | |
| (isinstance(message, dict) and message.get("type") == "ai") | |
| if is_ai_message: | |
| content = message.get("content") if isinstance(message, dict) else message.content | |
| if isinstance(content, str) and prompt_marker in content: | |
| # Extract the text that comes after the marker | |
| prompt_start_index = content.find(prompt_marker) + len(prompt_marker) | |
| prompt_text = content[prompt_start_index:].strip() | |
| print(f"🧙🧙 Found and displaying 3D Model Prompt.") | |
| return gr.update(value=prompt_text) # Update the Textbox component | |
| return gr.skip() # Return skip if no prompt is found | |
| CSS = """ | |
| footer {visibility: visible} | |
| .followup-question-button {font-size: 12px } | |
| .chat-tab { | |
| font-size: 12px; | |
| padding-inline: 0; | |
| } | |
| .chat-tab.active { | |
| background-color: #654343; | |
| } | |
| #new-chat-button { background-color: #0f0f11; color: white; } | |
| .tab-button-control { | |
| min-width: 0; | |
| padding-left: 0; | |
| padding-right: 0; | |
| } | |
| """ | |
| # We set the ChatInterface textbox id to chat-textbox for this to work | |
| TRIGGER_CHATINTERFACE_BUTTON = """ | |
| function triggerChatButtonClick() { | |
| // Find the div with id "chat-textbox" | |
| const chatTextbox = document.getElementById("chat-textbox"); | |
| if (!chatTextbox) { | |
| console.error("Error: Could not find element with id 'chat-textbox'"); | |
| return; | |
| } | |
| // Find the button that is a descendant of the div | |
| const button = chatTextbox.querySelector("button"); | |
| if (!button) { | |
| console.error("Error: No button found inside the chat-textbox element"); | |
| return; | |
| } | |
| // Trigger the click event | |
| button.click(); | |
| }""" | |
| TOGGLE_SIDEBAR_JS = """ | |
| function toggleSidebarVisibility() { | |
| console.log("Called the side bar funnction"); | |
| const sidebar = document.querySelector(".sidebar svelte-7y53u7 open"); | |
| if (!sidebar) { | |
| console.error("Error: Could not find the sidebar element"); | |
| return; | |
| } | |
| sidebar.classList.toggle("sidebar-collapsed"); | |
| } | |
| """ | |
| if __name__ == "__main__": | |
| logger.info("Loading Interface") | |
| with gr.Blocks(title="DIYO is here",fill_height=True, css=CSS, elem_id="main-app") as demo: | |
| is_new_user_for_greeting = gr.State(True) | |
| chatbot_message_storage = gr.State([]) | |
| model_update_trigger = gr.State() | |
| current_prompt_state = gr.BrowserState( | |
| storage_key="current_prompt_state", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| current_uuid_state = gr.BrowserState( | |
| uuid4, | |
| storage_key="current_uuid_state", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| current_langgraph_state = gr.BrowserState( | |
| dict(), | |
| storage_key="current_langgraph_state", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| end_of_assistant_response_state = gr.State( | |
| bool(), | |
| ) | |
| # [uuid] -> summary of chat | |
| sidebar_names_state = gr.BrowserState( | |
| dict(), | |
| storage_key="sidebar_names_state", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| # [uuid] -> {"graph": gradio_graph, "messages": messages} | |
| offloaded_tabs_data_storage = gr.BrowserState( | |
| dict(), | |
| storage_key="offloaded_tabs_data_storage", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| chatbot_message_storage = gr.BrowserState( | |
| [], | |
| storage_key="chatbot_message_storage", | |
| secret=BROWSER_STORAGE_SECRET, | |
| ) | |
| with gr.Row(elem_id="header-image"): | |
| header_image = gr.Image(value="https://yqewezudxihyadvmfovd.supabase.co/storage/v1/object/public/product_images/hodabass/header.png") | |
| with gr.Row(elem_id="system-prompt-input"): | |
| prompt_textbox = gr.Textbox(show_label=False, interactive=True,visible=False) | |
| with gr.Row(elem_id="image_prompt"): | |
| with gr.Column(scale=1): | |
| build_plan_display = gr.Markdown(label="Build Plan", elem_id="build-plan") | |
| model_prompt_display = gr.Textbox(label="Image generation prompt", interactive=False, lines=7, elem_id="model-prompt") | |
| with gr.Column(scale=1): | |
| generated_image = gr.Image() | |
| with gr.Row(): | |
| checkbox_search_enabled = gr.Checkbox( | |
| value=True, | |
| label="Enable search", | |
| show_label=True, | |
| visible=search_enabled, | |
| scale=1, | |
| ) | |
| checkbox_download_website_text = gr.Checkbox( | |
| value=True, | |
| show_label=True, | |
| label="Enable downloading text from urls", | |
| scale=1, | |
| ) | |
| with gr.Row(): | |
| guidelines = gr.Textbox(lines=2, label="ℹ️Readmeℹ️",value="Just a heads-up! 💡 You can tell the agent to finalize the plan at the any stage of the process to immediately start generating model. 💡 It takes an average of 300 seconds to generate a 3d model. 💡 Click the logs button to view the progress. 💡 If the header component isnt visible on screen please resize your browser window twice. 💡New chat need to be started once a 3d model has been generated. ✨") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_3d_output = gr.Model3D( | |
| clear_color=[0.0, 0.0, 0.0, 0.0], | |
| label="3D Model", | |
| ) | |
| with gr.Column(scale=1): | |
| # Input for the 3D model | |
| # Using UploadButton is often clearer for users than a clickable Model3D input | |
| model_3d_upload_button = gr.UploadButton( | |
| "Upload 3D Model (.obj, .glb, .gltf)", | |
| file_types=[".obj", ".glb", ".gltf"], | |
| # scale=0 # make it take less space if needed | |
| ) | |
| output_list = gr.Radio( | |
| label="Example 3D Models", | |
| choices=[], # Will be populated on load | |
| type="value", | |
| info="Select a model to view. The list updates on page load." | |
| ) | |
| with gr.Row(): | |
| multimodal = False | |
| textbox_component = ( | |
| gr.MultimodalTextbox if multimodal else gr.Textbox | |
| ) | |
| textbox = textbox_component( | |
| show_label=False, | |
| label="Message", | |
| placeholder="Type a message...", | |
| scale=1, | |
| autofocus=True, | |
| submit_btn=True, | |
| stop_btn=True, | |
| elem_id="chat-textbox", | |
| lines=1, | |
| ) | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| scale=0, | |
| show_copy_button=True, | |
| editable="all", | |
| elem_classes="main-chatbox" | |
| ) | |
| tab_edit_uuid_state = gr.State( | |
| str() | |
| ) | |
| prompt_textbox.change(lambda prompt: prompt, inputs=[prompt_textbox], outputs=[current_prompt_state]) | |
| MAX_CHATS_IN_SIDEBAR = 25 | |
| with gr.Sidebar() as sidebar: | |
| new_chat_button = gr.Button("New Chat", elem_id="new-chat-button") | |
| # --- This is the "Component Pool" --- | |
| # We create a fixed number of rows for our chats and hide them initially. | |
| sidebar_chat_rows = [] | |
| for i in range(MAX_CHATS_IN_SIDEBAR): | |
| with gr.Row(visible=False, elem_classes="chat-tab-row") as row: | |
| # We need a gr.State to hold the unique ID for each chat in the list | |
| chat_id_state = gr.State() | |
| # The main button to switch to the chat | |
| chat_button = gr.Button(scale=2) | |
| # The delete button for the chat | |
| delete_button = gr.Button("🗑", scale=0, elem_classes=["tab-button-control"]) | |
| sidebar_chat_rows.append({ | |
| "row": row, | |
| "chat_id_state": chat_id_state, | |
| "chat_button": chat_button, | |
| "delete_button": delete_button | |
| }) | |
| def update_sidebar_display(sidebar_summaries, all_tabs_data, active_uuid): | |
| updates = [] | |
| # Use reversed to show the newest chats on top | |
| chat_uuids_to_display = list(reversed(list(all_tabs_data.keys()))) | |
| for i in range(MAX_CHATS_IN_SIDEBAR): | |
| if i < len(chat_uuids_to_display): | |
| # If we have a chat for this slot, we make the row visible and update it. | |
| chat_uuid = chat_uuids_to_display[i] | |
| chat_name = sidebar_summaries.get(chat_uuid, f"Chat {i+1}") | |
| elem_classes = ["chat-tab", "active"] if chat_uuid == active_uuid else ["chat-tab"] | |
| updates.extend([ | |
| gr.update(visible=True), # Show the row | |
| chat_uuid, # Set the UUID for this row | |
| gr.update(value=chat_name, elem_classes=elem_classes), # Update the button text/style | |
| gr.update(visible=True) # Show the delete button | |
| ]) | |
| else: | |
| # If there's no chat for this slot, we hide the entire row. | |
| updates.extend([ | |
| gr.update(visible=False), # Hide the row | |
| None, # Clear the state | |
| gr.update(value=""), # Clear button text | |
| gr.update(visible=False) # Hide delete button | |
| ]) | |
| return updates | |
| chat_interface = gr.ChatInterface( | |
| chatbot=chatbot, | |
| fn=chat_fn, | |
| additional_inputs=[ | |
| current_langgraph_state, | |
| current_uuid_state, | |
| prompt_textbox, | |
| checkbox_search_enabled, | |
| checkbox_download_website_text, | |
| ], | |
| additional_outputs=[ | |
| current_langgraph_state, | |
| end_of_assistant_response_state, | |
| ], | |
| multimodal=multimodal, | |
| textbox=textbox, | |
| ) | |
| with gr.Row(): | |
| followup_question_buttons = [] | |
| for i in range(FOLLOWUP_QUESTION_NUMBER): | |
| btn = gr.Button(f"Button {i+1}", visible=False) | |
| followup_question_buttons.append(btn) | |
| # --- Define all event listeners ONCE, outside of any update function --- | |
| for components in sidebar_chat_rows: | |
| # Event listener for switching to a different chat | |
| components["chat_button"].click( | |
| fn=switch_tab, | |
| inputs=[ | |
| components["chat_id_state"], # The UUID of the clicked tab | |
| offloaded_tabs_data_storage, | |
| current_langgraph_state, | |
| current_uuid_state, | |
| chatbot, | |
| prompt_textbox | |
| ], | |
| outputs=[ | |
| current_langgraph_state, | |
| current_uuid_state, | |
| chat_interface.chatbot, | |
| offloaded_tabs_data_storage, | |
| prompt_textbox, | |
| *followup_question_buttons, | |
| ] | |
| ) | |
| # Event listener for deleting a chat | |
| components["delete_button"].click( | |
| fn=delete_tab, | |
| inputs=[ | |
| current_uuid_state, | |
| components["chat_id_state"], # The UUID of the tab to delete | |
| sidebar_names_state, | |
| offloaded_tabs_data_storage | |
| ], | |
| outputs=[ | |
| sidebar_names_state, | |
| offloaded_tabs_data_storage, | |
| chat_interface.chatbot | |
| ] | |
| ) | |
| # --- This section replaces the @gr.render decorator --- | |
| # We trigger the update function whenever the underlying data changes. | |
| sidebar_update_triggers = [ | |
| sidebar_names_state, | |
| offloaded_tabs_data_storage, | |
| current_uuid_state, | |
| ] | |
| # The list of all components we need to update | |
| sidebar_update_outputs = [] | |
| for comp_dict in sidebar_chat_rows: | |
| sidebar_update_outputs.extend(comp_dict.values()) | |
| for trigger in sidebar_update_triggers: | |
| trigger.change( | |
| fn=update_sidebar_display, | |
| inputs=[sidebar_names_state, offloaded_tabs_data_storage, current_uuid_state], | |
| outputs=sidebar_update_outputs, | |
| show_progress="hidden" | |
| ) | |
| # Also trigger an update when the app loads | |
| demo.load( | |
| fn=update_sidebar_display, | |
| inputs=[sidebar_names_state, offloaded_tabs_data_storage, current_uuid_state], | |
| outputs=sidebar_update_outputs, | |
| show_progress="hidden" | |
| ) | |
| new_chat_button.click( | |
| new_tab, | |
| inputs=[ | |
| current_uuid_state, | |
| current_langgraph_state, | |
| chatbot, | |
| offloaded_tabs_data_storage, | |
| prompt_textbox, | |
| sidebar_names_state, | |
| ], | |
| outputs=[ | |
| current_uuid_state, | |
| current_langgraph_state, | |
| chat_interface.chatbot_value, | |
| offloaded_tabs_data_storage, | |
| prompt_textbox, | |
| sidebar_names_state, | |
| *followup_question_buttons, | |
| ] | |
| ) | |
| def click_followup_button(btn): | |
| buttons = [gr.Button(visible=False) for _ in range(len(followup_question_buttons))] | |
| return btn, *buttons | |
| for btn in followup_question_buttons: | |
| btn.click( | |
| fn=click_followup_button, | |
| inputs=[btn], | |
| outputs=[ | |
| chat_interface.textbox, | |
| *followup_question_buttons | |
| ] | |
| ).success(lambda: None, js=TRIGGER_CHATINTERFACE_BUTTON) | |
| output_list.change(fn=lambda x: x, inputs=output_list, outputs=model_3d_output) | |
| chatbot.change( | |
| fn=populate_followup_questions, | |
| inputs=[ | |
| end_of_assistant_response_state, | |
| chatbot, | |
| current_uuid_state | |
| ], | |
| outputs=[ | |
| *followup_question_buttons, | |
| end_of_assistant_response_state | |
| ], | |
| trigger_mode="multiple" | |
| ) | |
| chatbot.change( | |
| fn=summarize_chat, | |
| inputs=[ | |
| end_of_assistant_response_state, | |
| chatbot, | |
| sidebar_names_state, | |
| current_uuid_state | |
| ], | |
| outputs=[ | |
| sidebar_names_state, | |
| end_of_assistant_response_state | |
| ], | |
| trigger_mode="multiple" | |
| ) | |
| chatbot.change( | |
| fn=lambda x: x, | |
| inputs=[chatbot], | |
| outputs=[chatbot_message_storage], | |
| trigger_mode="always_last" | |
| ) | |
| current_langgraph_state.change( | |
| fn=update_3d_models_on_load, | |
| inputs=None, # The function doesn't need any inputs | |
| outputs=[model_3d_output, output_list], | |
| show_progress="hidden" | |
| ) | |
| current_langgraph_state.change( | |
| fn=update_generated_image_on_state_change, | |
| inputs=[current_langgraph_state], | |
| outputs=[generated_image], | |
| show_progress="hidden" | |
| ) | |
| # current_langgraph_state.change( | |
| # fn=update_prompt_with_last_ai_message, | |
| # inputs=[current_langgraph_state], | |
| # outputs=[last_AI_message], | |
| # show_progress="hidden" | |
| # ) | |
| current_langgraph_state.change( | |
| fn=update_build_plan_display, | |
| inputs=[current_langgraph_state], | |
| outputs=[build_plan_display], | |
| show_progress="hidden" | |
| ) | |
| current_langgraph_state.change( | |
| fn=update_model_prompt_display, | |
| inputs=[current_langgraph_state], | |
| outputs=[model_prompt_display], | |
| show_progress="hidden" | |
| ) | |
| model_3d_upload_button.upload( | |
| fn=load_mesh, | |
| inputs=model_3d_upload_button, | |
| outputs=model_3d_output | |
| ) | |
| def handle_initial_greeting_load(current_is_new_user_flag: bool, existing_chat_history: list): | |
| """ | |
| This function is called by the @app.load decorator above. | |
| It decides whether to add a greeting to the chat history. | |
| """ | |
| # You can either put the logic directly here, or call the globally defined one. | |
| # Option 1: Call the globally defined function (cleaner if it's complex) | |
| # Make sure 'display_initial_greeting_on_load' is defined globally in your app.py | |
| # For this example, I'm assuming 'display_initial_greeting_on_load' is the one we defined earlier: | |
| # def display_initial_greeting_on_load(current_is_new_user_flag: bool, existing_chat_history: list): | |
| # if current_is_new_user_flag: | |
| # greeting_message_text = load_initial_greeting() # from graph.py | |
| # greeting_entry = (None, greeting_message_text) | |
| # if not isinstance(existing_chat_history, list): existing_chat_history = [] | |
| # updated_chat_history = [greeting_entry] + existing_chat_history | |
| # updated_is_new_user_flag = False | |
| # logger.info("Greeting added for new user.") | |
| # return updated_chat_history, updated_is_new_user_flag | |
| # else: | |
| # logger.info("Not a new user or already greeted, no greeting added.") | |
| # return existing_chat_history, False | |
| # | |
| # return display_initial_greeting_on_load(current_is_new_user_flag, existing_chat_history) | |
| # Option 2: Put logic directly here (if simple enough) | |
| if current_is_new_user_flag: | |
| greeting_message_text = load_initial_greeting() # Make sure load_initial_greeting is imported | |
| greeting_entry = {"role": "assistant", "content": greeting_message_text} | |
| # Ensure existing_chat_history is a list before concatenation | |
| if not isinstance(existing_chat_history, list): | |
| existing_chat_history = [] | |
| updated_chat_history = [greeting_entry] + existing_chat_history | |
| updated_is_new_user_flag = False | |
| logger.info("starting new chat") | |
| return updated_chat_history, updated_is_new_user_flag | |
| else: | |
| logger.info("loading existing chat") | |
| return existing_chat_history, False | |
| def load_messages(messages): | |
| return messages | |
| def load_prompt(current_prompt): | |
| return current_prompt | |
| def update_3d_models_on_load(): | |
| """ | |
| Gets the latest 3D model to display and updates the examples radio list on app load. | |
| """ | |
| sorted_files = get_sorted_3d_model_files() | |
| latest_model = str(sorted_files[0]) if sorted_files else None | |
| example_paths = [str(file) for file in sorted_files] | |
| # Return the latest model to the 3D viewer, and update the choices | |
| # and selected value of the Radio component. | |
| return latest_model, gr.update(choices=example_paths, value=latest_model) | |
| # @demo.load(inputs=None,outputs=[model_3d_output]) | |
| # def get_latest_3d_model(): | |
| # """ | |
| # Returns the path to the most recently created 3D model. | |
| # """ | |
| # sorted_files = get_sorted_3d_model_files() | |
| # if sorted_files: | |
| # return str(sorted_files[0]) | |
| # return None | |
| # @demo.load(inputs=None,outputs=[output_list]) | |
| # def get_3d_model_examples(): | |
| # """ | |
| # Returns a list of file paths for the gr.Examples component. | |
| # """ | |
| # return [str(file) for file in get_sorted_3d_model_files()] | |
| #!environment variable gradio_server_mname is overridden here | |
| #*on local host | |
| # demo.launch(server_name="127.0.0.1", server_port=8080, share=True) | |
| #*in gradio space without docker | |
| # demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |
| #*on docker container inside local host | |
| # demo.launch(height=1500,server_name="0.0.0.0", server_port=8080, share=True) | |
| # #*in gradio space with docker | |
| demo.launch(height=2500) | |
| # with gr.Sidebar() as sidebar: | |
| # @gr.render(inputs=[tab_edit_uuid_state, end_of_assistant_response_state, sidebar_names_state, current_uuid_state, chatbot, offloaded_tabs_data_storage]) | |
| # def render_chats(tab_uuid_edit, end_of_chat_response, sidebar_summaries, active_uuid, messages, tabs): | |
| # current_tab_button_text = "" | |
| # if active_uuid not in sidebar_summaries: | |
| # current_tab_button_text = "Current Chat" | |
| # elif active_uuid not in tabs: | |
| # current_tab_button_text = sidebar_summaries[active_uuid] | |
| # if current_tab_button_text: | |
| # unique_id = f"current-tab-{active_uuid}-{uuid4()}" | |
| # gr.Button( | |
| # current_tab_button_text, | |
| # elem_classes=["chat-tab", "active"], | |
| # elem_id=unique_id # Add unique elem_id | |
| # ) | |
| # for chat_uuid, tab in reversed(tabs.items()): | |
| # elem_classes = ["chat-tab"] | |
| # if chat_uuid == active_uuid: | |
| # elem_classes.append("active") | |
| # button_uuid_state = gr.State(chat_uuid) | |
| # with gr.Row(): | |
| # clear_tab_button = gr.Button( | |
| # "🗑", | |
| # scale=0, | |
| # elem_classes=["tab-button-control"], | |
| # elem_id=f"delete-btn-{chat_uuid}-{uuid4()}" # Add unique ID | |
| # ) | |
| # clear_tab_button.click( | |
| # fn=delete_tab, | |
| # inputs=[ | |
| # current_uuid_state, | |
| # button_uuid_state, | |
| # sidebar_names_state, | |
| # offloaded_tabs_data_storage | |
| # ], | |
| # outputs=[ | |
| # sidebar_names_state, | |
| # offloaded_tabs_data_storage, | |
| # chat_interface.chatbot_value | |
| # ] | |
| # ) | |
| # chat_button_text = sidebar_summaries.get(chat_uuid) | |
| # if not chat_button_text: | |
| # chat_button_text = str(chat_uuid) | |
| # if chat_uuid != tab_uuid_edit: | |
| # set_edit_tab_button = gr.Button( | |
| # "✎", | |
| # scale=0, | |
| # elem_classes=["tab-button-control"], | |
| # elem_id=f"edit-btn-{chat_uuid}-{uuid4()}" # Add unique ID | |
| # ) | |
| # set_edit_tab_button.click( | |
| # fn=lambda x: x, | |
| # inputs=[button_uuid_state], | |
| # outputs=[tab_edit_uuid_state] | |
| # ) | |
| # chat_tab_button = gr.Button( | |
| # chat_button_text, | |
| # elem_id=f"chat-{chat_uuid}-{uuid4()}", # Add truly unique ID | |
| # elem_classes=elem_classes, | |
| # scale=2 | |
| # ) | |
| # chat_tab_button.click( | |
| # fn=switch_tab, | |
| # inputs=[ | |
| # button_uuid_state, | |
| # offloaded_tabs_data_storage, | |
| # current_langgraph_state, | |
| # current_uuid_state, | |
| # chatbot, | |
| # prompt_textbox | |
| # ], | |
| # outputs=[ | |
| # current_langgraph_state, | |
| # current_uuid_state, | |
| # chat_interface.chatbot_value, | |
| # offloaded_tabs_data_storage, | |
| # prompt_textbox, | |
| # *followup_question_buttons | |
| # ] | |
| # ) | |
| # else: | |
| # chat_tab_text = gr.Textbox( | |
| # chat_button_text, | |
| # scale=2, | |
| # interactive=True, | |
| # show_label=False, | |
| # elem_id=f"edit-text-{chat_uuid}-{uuid4()}" # Add unique ID | |
| # ) | |
| # chat_tab_text.submit( | |
| # fn=submit_edit_tab, | |
| # inputs=[ | |
| # button_uuid_state, | |
| # sidebar_names_state, | |
| # chat_tab_text | |
| # ], | |
| # outputs=[ | |
| # sidebar_names_state, | |
| # tab_edit_uuid_state | |
| # ] | |
| # ) | |
| # # ) | |
| # # return chat_tabs, sidebar_summaries | |
| # new_chat_button = gr.Button("New Chat", elem_id="new-chat-button") | |
| # chatbot.clear(fn=clear, outputs=[current_langgraph_state, current_uuid_state]) | |
| # chat_interface = gr.ChatInterface( | |
| # chatbot=chatbot, | |
| # fn=chat_fn, | |
| # additional_inputs=[ | |
| # current_langgraph_state, | |
| # current_uuid_state, | |
| # prompt_textbox, | |
| # checkbox_search_enabled, | |
| # checkbox_download_website_text, | |
| # ], | |
| # additional_outputs=[ | |
| # current_langgraph_state, | |
| # end_of_assistant_response_state | |
| # ], | |
| # type="messages", | |
| # multimodal=multimodal, | |
| # textbox=textbox, | |
| # ) |