data_pipeline_agent / ui /conversation.py
cryogenic22's picture
Update ui/conversation.py
bf595ba verified
import streamlit as st
def render_conversation_tab(session_state, agent_graph, update_state_dict):
"""Render the conversation tab in the UI."""
st.subheader("Conversation with Data Management Agent")
# Initialize messages in conversation if not already present
if "messages" not in session_state.conversation:
session_state.conversation["messages"] = []
# Display conversation history
for message in session_state.conversation["messages"]:
if isinstance(message, dict) and "role" in message and "content" in message:
if message["role"] == "user":
st.markdown(f"**You:** {message['content']}")
else:
st.markdown(f"**Agent:** {message['content']}")
# Input for new message
with st.form(key="user_input_form"):
user_input = st.text_area("What data pipeline do you need to create?",
placeholder="e.g., I need a sales performance dashboard showing regional performance by product for the last 2 years")
submit_button = st.form_submit_button("Submit")
if submit_button and user_input:
# Add user message to conversation
new_message = {"role": "user", "content": user_input}
session_state.conversation["messages"].append(new_message)
# Update agent state - ensure messages is a list
agent_state = session_state.agent_state.copy()
if "messages" not in agent_state or not isinstance(agent_state["messages"], list):
agent_state["messages"] = []
# Add new message to agent state
agent_state["messages"].append(new_message)
# Run the agent graph
with st.spinner("Agent is processing..."):
try:
# Update the state dictionary for tools
update_state_dict(agent_state)
# Execute the agent workflow
result = agent_graph.invoke(agent_state)
# Update session state with result
session_state.agent_state = result
# Update the state dictionary for tools again with the result
update_state_dict(result)
# Ensure messages is a list in the result
if not isinstance(result.get("messages", []), list):
result["messages"] = []
# Update conversation with agent responses
for message in result.get("messages", []):
if isinstance(message, dict) and "role" in message and "content" in message:
# Check if this message isn't already in the conversation
message_exists = False
for existing_msg in session_state.conversation["messages"]:
if (isinstance(existing_msg, dict) and
existing_msg.get("role") == message.get("role") and
existing_msg.get("content") == message.get("content")):
message_exists = True
break
if not message_exists:
session_state.conversation["messages"].append(message)
# Update other state properties
session_state.conversation["user_intent"] = result.get("user_intent", {})
session_state.conversation["pipeline_plan"] = result.get("pipeline_plan", {})
session_state.conversation["sql_queries"] = result.get("sql_queries", [])
session_state.conversation["execution_results"] = result.get("execution_results", {})
session_state.conversation["confidence_scores"] = result.get("confidence_scores", {})
session_state.conversation["status"] = result.get("status", "planning")
session_state.conversation["current_agent"] = result.get("current_agent", "understanding_agent")
# Force refresh
st.rerun()
except Exception as e:
st.error(f"Error executing agent workflow: {str(e)}")