cryogenic22's picture
Update app.py
0eeb228 verified
"""
Main application for Pharmaceutical Data Management Agent.
"""
import os
import sys
import streamlit as st
from anthropic import Anthropic
from dotenv import load_dotenv
# Add the current directory to the path to enable relative imports
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Import data module
from data.synthetic_db import SyntheticDatabase
# Import graph module
from graph.workflow import create_agent_graph
# Create ui directory if it doesn't exist
os.makedirs("ui", exist_ok=True)
# Create ui files if they don't exist
def ensure_ui_files_exist():
"""Create UI module files if they don't exist."""
# Conversation UI
conversation_py = "ui/conversation.py"
if not os.path.exists(conversation_py):
with open(conversation_py, "w") as f:
f.write("""
import streamlit as st
def render_conversation_tab(session_state, agent_graph, update_state_dict):
\"\"\"Render the conversation tab in the UI.\"\"\"
st.subheader("Conversation with Data Management Agent")
# Display conversation history
for message in session_state.conversation["messages"]:
if message["role"] == "user":
st.markdown(f"**You:** {message['content']}")
else:
st.markdown(f"**Agent:** {message['content']}")
# Input for new message
with st.form(key="user_input_form"):
user_input = st.text_area("What data pipeline do you need to create?",
placeholder="e.g., I need a sales performance dashboard showing regional performance by product for the last 2 years")
submit_button = st.form_submit_button("Submit")
if submit_button and user_input:
# Add user message to conversation
new_message = {"role": "user", "content": user_input}
session_state.conversation["messages"].append(new_message)
# Update agent state
agent_state = session_state.agent_state.copy()
agent_state["messages"] = agent_state["messages"] + [new_message]
# Run the agent graph
with st.spinner("Agent is processing..."):
try:
# Update the state dictionary for tools
update_state_dict(agent_state)
# Execute the agent workflow
result = agent_graph.invoke(agent_state)
# Update session state with result
session_state.agent_state = result
# Update the state dictionary for tools again with the result
update_state_dict(result)
# Update conversation with agent responses
for message in result["messages"]:
if message not in session_state.conversation["messages"]:
session_state.conversation["messages"].append(message)
# Update other state properties
session_state.conversation["user_intent"] = result.get("user_intent", {})
session_state.conversation["pipeline_plan"] = result.get("pipeline_plan", {})
session_state.conversation["sql_queries"] = result.get("sql_queries", [])
session_state.conversation["execution_results"] = result.get("execution_results", {})
session_state.conversation["confidence_scores"] = result.get("confidence_scores", {})
session_state.conversation["status"] = result.get("status", "planning")
session_state.conversation["current_agent"] = result.get("current_agent", "understanding_agent")
# Force refresh
st.rerun()
except Exception as e:
st.error(f"Error executing agent workflow: {str(e)}")
""")
# Pipeline UI
pipeline_py = "ui/pipeline.py"
if not os.path.exists(pipeline_py):
with open(pipeline_py, "w") as f:
f.write("""
import streamlit as st
def render_pipeline_tab(session_state):
\"\"\"Render the pipeline details tab in the UI.\"\"\"
st.subheader("Pipeline Details")
# Intent Understanding
st.markdown("### User Intent")
if session_state.conversation["user_intent"]:
st.markdown(session_state.conversation["user_intent"].get("description", "No intent captured yet"))
if "confidence_scores" in session_state.conversation and "intent_understanding" in session_state.conversation["confidence_scores"]:
score = session_state.conversation["confidence_scores"]["intent_understanding"] * 100
st.progress(score / 100, text=f"Intent Understanding Confidence: {score:.1f}%")
else:
st.info("No user intent has been captured yet. Start a conversation to extract intent.")
# Pipeline Plan
st.markdown("### Pipeline Plan")
if session_state.conversation["pipeline_plan"]:
st.markdown(session_state.conversation["pipeline_plan"].get("description", "No plan created yet"))
if "confidence_scores" in session_state.conversation and "plan_quality" in session_state.conversation["confidence_scores"]:
score = session_state.conversation["confidence_scores"]["plan_quality"] * 100
st.progress(score / 100, text=f"Plan Quality Confidence: {score:.1f}%")
else:
st.info("No pipeline plan has been created yet. Continue the conversation to develop a plan.")
# SQL Queries
st.markdown("### SQL Queries")
if session_state.conversation["sql_queries"]:
for i, query in enumerate(session_state.conversation["sql_queries"]):
with st.expander(f"Query {i+1}: {query.get('name', 'Unnamed Query')}"):
st.code(query.get("sql", ""), language="sql")
else:
st.info("No SQL queries have been generated yet. Continue the conversation to generate queries.")
# Execution Results
st.markdown("### Execution Results")
if session_state.conversation["execution_results"] and "details" in session_state.conversation["execution_results"]:
st.markdown(session_state.conversation["execution_results"].get("summary", ""))
if "success_rate" in session_state.conversation["execution_results"]:
score = session_state.conversation["execution_results"]["success_rate"] * 100
st.progress(score / 100, text=f"Execution Success Rate: {score:.1f}%")
results = session_state.conversation["execution_results"]["details"]
for i, result in enumerate(results):
status = "✅" if result["success"] else "❌"
with st.expander(f"{status} {result.get('query_name', f'Query {i+1}')}"):
st.markdown(f"**Result:** {result.get('result_summary', 'No summary available')}")
st.markdown(f"**Rows Processed:** {result.get('row_count', 0)}")
else:
st.info("No execution results available yet. Complete the pipeline creation to see results.")
""")
# Agent Workflow UI
agent_workflow_py = "ui/agent_workflow.py"
if not os.path.exists(agent_workflow_py):
with open(agent_workflow_py, "w") as f:
f.write("""
import streamlit as st
def render_workflow_tab(session_state):
\"\"\"Render the agent workflow visualization tab in the UI.\"\"\"
st.subheader("Agent Workflow Visualization")
# Display current agent and status
current_agent = session_state.conversation.get("current_agent", "understanding_agent")
status = session_state.conversation.get("status", "planning")
st.markdown(f"**Current State:** {status.title()}")
st.markdown(f"**Current Agent:** {current_agent.replace('_', ' ').title()}")
# Visualize the workflow
col1, col2, col3, col4 = st.columns(4)
# Determine which agent is active
understanding_active = current_agent == "understanding_agent"
planning_active = current_agent == "planning_agent"
sql_active = current_agent == "sql_generator_agent"
executor_active = current_agent == "executor_agent"
# Show the workflow visualization
with col1:
if understanding_active:
st.markdown("### 🔍 **Understanding**")
else:
st.markdown("### 🔍 Understanding")
st.markdown("Extracts user intent and asks clarification questions")
if "user_intent" in session_state.conversation and session_state.conversation["user_intent"]:
st.success("Completed")
elif understanding_active:
st.info("In Progress")
else:
st.warning("Not Started")
with col2:
if planning_active:
st.markdown("### 📋 **Planning**")
else:
st.markdown("### 📋 Planning")
st.markdown("Creates data pipeline plan with sources and transformations")
if "pipeline_plan" in session_state.conversation and session_state.conversation["pipeline_plan"]:
st.success("Completed")
elif planning_active:
st.info("In Progress")
elif understanding_active:
st.warning("Not Started")
else:
st.success("Completed")
with col3:
if sql_active:
st.markdown("### 💻 **SQL Generation**")
else:
st.markdown("### 💻 SQL Generation")
st.markdown("Converts plan into executable SQL queries")
if "sql_queries" in session_state.conversation and session_state.conversation["sql_queries"]:
st.success("Completed")
elif sql_active:
st.info("In Progress")
elif understanding_active or planning_active:
st.warning("Not Started")
else:
st.success("Completed")
with col4:
if executor_active:
st.markdown("### ⚙️ **Execution**")
else:
st.markdown("### ⚙️ Execution")
st.markdown("Executes queries and reports results")
if "execution_results" in session_state.conversation and session_state.conversation["execution_results"]:
st.success("Completed")
elif executor_active:
st.info("In Progress")
elif understanding_active or planning_active or sql_active:
st.warning("Not Started")
else:
st.success("Completed")
# Overall confidence score
if "confidence_scores" in session_state.conversation and "overall" in session_state.conversation["confidence_scores"]:
st.markdown("### Overall Pipeline Confidence")
score = session_state.conversation["confidence_scores"]["overall"] * 100
st.progress(score / 100, text=f"{score:.1f}%")
# Workflow decision points
if status == "complete":
if score > 80:
st.success("✅ High confidence - Pipeline can be deployed automatically")
else:
st.warning("⚠️ Medium confidence - Human review recommended before deployment")
# Add human review section for pending approval status
if status == "pending_approval":
st.markdown("### 👤 Human Review Required")
st.info("This pipeline requires human review before deployment")
col1, col2 = st.columns(2)
with col1:
if st.button("✅ Approve Pipeline"):
# Update state to approved
session_state.conversation["status"] = "approved"
# Trigger execution to continue
st.rerun()
with col2:
if st.button("❌ Reject Pipeline"):
# Update state to rejected
session_state.conversation["status"] = "rejected"
st.error("Pipeline rejected. Please provide feedback to refine the pipeline.")
""")
# DB Explorer UI
db_explorer_py = "ui/db_explorer.py"
if not os.path.exists(db_explorer_py):
with open(db_explorer_py, "w") as f:
f.write("""
import streamlit as st
import pandas as pd
def render_db_explorer_tab(session_state):
\"\"\"Render the database explorer tab in the UI.\"\"\"
st.subheader("Database Explorer")
# Get tables by category
tables = session_state.db.get_tables()
# Display tables by category
col1, col2 = st.columns(2)
with col1:
st.markdown("### Raw Data Tables")
for table in tables["raw_tables"]:
with st.expander(table):
sample = session_state.db.get_table_sample(table, 3)
st.dataframe(pd.DataFrame(sample))
st.markdown("### Staging Tables")
for table in tables["staging_tables"]:
with st.expander(table):
sample = session_state.db.get_table_sample(table, 3)
st.dataframe(pd.DataFrame(sample))
with col2:
st.markdown("### Analytics Ready Data")
for table in tables["ard_tables"]:
with st.expander(table):
sample = session_state.db.get_table_sample(table, 3)
st.dataframe(pd.DataFrame(sample))
st.markdown("### Data Products")
for table in tables["data_products"]:
with st.expander(table):
sample = session_state.db.get_table_sample(table, 3)
st.dataframe(pd.DataFrame(sample))
# SQL Query Executor
st.markdown("### Query Explorer")
with st.form(key="sql_form"):
sql_query = st.text_area("Enter SQL Query", height=100,
placeholder="SELECT * FROM ARD_SALES_PERFORMANCE WHERE region = 'North' LIMIT 5")
run_sql = st.form_submit_button("Run Query")
if run_sql and sql_query:
with st.spinner("Executing query..."):
result = session_state.db.execute_query(sql_query)
if "error" in result:
st.error(f"Error executing query: {result['error']}")
elif "data" in result:
st.dataframe(pd.DataFrame(result["data"]))
st.success(f"Query returned {len(result['data'])} rows")
elif "tables" in result:
st.write(result["tables"])
elif "schema" in result:
st.write(f"Schema for {result['table']}:")
st.dataframe(pd.DataFrame(result["schema"]))
""")
# Now import UI modules after ensuring they exist
ensure_ui_files_exist()
# Import UI modules
from ui.conversation import render_conversation_tab
from ui.pipeline import render_pipeline_tab
from ui.agent_workflow import render_workflow_tab
from ui.db_explorer import render_db_explorer_tab
# Import agent state
from agents.state import AgentState
# Load environment variables from .env file if it exists
load_dotenv()
def main():
"""Main application entry point."""
# Set page configuration
st.set_page_config(page_title="Pharmaceutical Data Agent", page_icon="💊", layout="wide")
api_key = os.environ.get("ANTHROPIC_API_KEY")
# Set the API key as an environment variable
os.environ["ANTHROPIC_API_KEY"] = api_key
# Initialize Anthropic client using default settings
anthropic_client = Anthropic() # Uses API key from environment variable
# Initialize database with synthetic data
if "db" not in st.session_state:
st.session_state.db = SyntheticDatabase()
# Initialize agent graph
if "agent_graph" not in st.session_state or "update_state_dict" not in st.session_state:
st.session_state.agent_graph, st.session_state.update_state_dict = create_agent_graph(
anthropic_client,
st.session_state.db
)
# Initialize conversation state
# Initialize conversation state
# Initialize conversation state
if "conversation" not in st.session_state:
st.session_state.conversation = {
"messages": [],
"user_intent": {},
"data_context": {},
"pipeline_plan": {},
"sql_queries": [],
"execution_results": {},
"confidence_scores": {},
"status": "planning",
"current_agent": "understanding_agent" # Always a string, not a list
}
# Initialize agent state for new runs
if "agent_state" not in st.session_state:
st.session_state.agent_state = AgentState(
messages=[],
user_intent={},
data_context={},
pipeline_plan={},
sql_queries=[],
execution_results={},
confidence_scores={},
status="planning",
current_agent="understanding_agent" # Always a string, not a list
)
# Main content area
st.title("Pharmaceutical Data Management Agent")
# Tabs for different views
tab1, tab2, tab3, tab4 = st.tabs(["Conversation", "Pipeline Details", "Agent Workflow", "Database Explorer"])
with tab1:
render_conversation_tab(
st.session_state,
st.session_state.agent_graph,
st.session_state.update_state_dict
)
with tab2:
render_pipeline_tab(st.session_state)
with tab3:
render_workflow_tab(st.session_state)
with tab4:
render_db_explorer_tab(st.session_state)
if __name__ == "__main__":
main()