""" Main application for Pharmaceutical Data Management Agent. """ import os import sys import streamlit as st from anthropic import Anthropic from dotenv import load_dotenv # Add the current directory to the path to enable relative imports sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Import data module from data.synthetic_db import SyntheticDatabase # Import graph module from graph.workflow import create_agent_graph # Create ui directory if it doesn't exist os.makedirs("ui", exist_ok=True) # Create ui files if they don't exist def ensure_ui_files_exist(): """Create UI module files if they don't exist.""" # Conversation UI conversation_py = "ui/conversation.py" if not os.path.exists(conversation_py): with open(conversation_py, "w") as f: f.write(""" import streamlit as st def render_conversation_tab(session_state, agent_graph, update_state_dict): \"\"\"Render the conversation tab in the UI.\"\"\" st.subheader("Conversation with Data Management Agent") # Display conversation history for message in session_state.conversation["messages"]: if message["role"] == "user": st.markdown(f"**You:** {message['content']}") else: st.markdown(f"**Agent:** {message['content']}") # Input for new message with st.form(key="user_input_form"): user_input = st.text_area("What data pipeline do you need to create?", placeholder="e.g., I need a sales performance dashboard showing regional performance by product for the last 2 years") submit_button = st.form_submit_button("Submit") if submit_button and user_input: # Add user message to conversation new_message = {"role": "user", "content": user_input} session_state.conversation["messages"].append(new_message) # Update agent state agent_state = session_state.agent_state.copy() agent_state["messages"] = agent_state["messages"] + [new_message] # Run the agent graph with st.spinner("Agent is processing..."): try: # Update the state dictionary for tools update_state_dict(agent_state) # Execute the agent workflow result = agent_graph.invoke(agent_state) # Update session state with result session_state.agent_state = result # Update the state dictionary for tools again with the result update_state_dict(result) # Update conversation with agent responses for message in result["messages"]: if message not in session_state.conversation["messages"]: session_state.conversation["messages"].append(message) # Update other state properties session_state.conversation["user_intent"] = result.get("user_intent", {}) session_state.conversation["pipeline_plan"] = result.get("pipeline_plan", {}) session_state.conversation["sql_queries"] = result.get("sql_queries", []) session_state.conversation["execution_results"] = result.get("execution_results", {}) session_state.conversation["confidence_scores"] = result.get("confidence_scores", {}) session_state.conversation["status"] = result.get("status", "planning") session_state.conversation["current_agent"] = result.get("current_agent", "understanding_agent") # Force refresh st.rerun() except Exception as e: st.error(f"Error executing agent workflow: {str(e)}") """) # Pipeline UI pipeline_py = "ui/pipeline.py" if not os.path.exists(pipeline_py): with open(pipeline_py, "w") as f: f.write(""" import streamlit as st def render_pipeline_tab(session_state): \"\"\"Render the pipeline details tab in the UI.\"\"\" st.subheader("Pipeline Details") # Intent Understanding st.markdown("### User Intent") if session_state.conversation["user_intent"]: st.markdown(session_state.conversation["user_intent"].get("description", "No intent captured yet")) if "confidence_scores" in session_state.conversation and "intent_understanding" in session_state.conversation["confidence_scores"]: score = session_state.conversation["confidence_scores"]["intent_understanding"] * 100 st.progress(score / 100, text=f"Intent Understanding Confidence: {score:.1f}%") else: st.info("No user intent has been captured yet. Start a conversation to extract intent.") # Pipeline Plan st.markdown("### Pipeline Plan") if session_state.conversation["pipeline_plan"]: st.markdown(session_state.conversation["pipeline_plan"].get("description", "No plan created yet")) if "confidence_scores" in session_state.conversation and "plan_quality" in session_state.conversation["confidence_scores"]: score = session_state.conversation["confidence_scores"]["plan_quality"] * 100 st.progress(score / 100, text=f"Plan Quality Confidence: {score:.1f}%") else: st.info("No pipeline plan has been created yet. Continue the conversation to develop a plan.") # SQL Queries st.markdown("### SQL Queries") if session_state.conversation["sql_queries"]: for i, query in enumerate(session_state.conversation["sql_queries"]): with st.expander(f"Query {i+1}: {query.get('name', 'Unnamed Query')}"): st.code(query.get("sql", ""), language="sql") else: st.info("No SQL queries have been generated yet. Continue the conversation to generate queries.") # Execution Results st.markdown("### Execution Results") if session_state.conversation["execution_results"] and "details" in session_state.conversation["execution_results"]: st.markdown(session_state.conversation["execution_results"].get("summary", "")) if "success_rate" in session_state.conversation["execution_results"]: score = session_state.conversation["execution_results"]["success_rate"] * 100 st.progress(score / 100, text=f"Execution Success Rate: {score:.1f}%") results = session_state.conversation["execution_results"]["details"] for i, result in enumerate(results): status = "✅" if result["success"] else "❌" with st.expander(f"{status} {result.get('query_name', f'Query {i+1}')}"): st.markdown(f"**Result:** {result.get('result_summary', 'No summary available')}") st.markdown(f"**Rows Processed:** {result.get('row_count', 0)}") else: st.info("No execution results available yet. Complete the pipeline creation to see results.") """) # Agent Workflow UI agent_workflow_py = "ui/agent_workflow.py" if not os.path.exists(agent_workflow_py): with open(agent_workflow_py, "w") as f: f.write(""" import streamlit as st def render_workflow_tab(session_state): \"\"\"Render the agent workflow visualization tab in the UI.\"\"\" st.subheader("Agent Workflow Visualization") # Display current agent and status current_agent = session_state.conversation.get("current_agent", "understanding_agent") status = session_state.conversation.get("status", "planning") st.markdown(f"**Current State:** {status.title()}") st.markdown(f"**Current Agent:** {current_agent.replace('_', ' ').title()}") # Visualize the workflow col1, col2, col3, col4 = st.columns(4) # Determine which agent is active understanding_active = current_agent == "understanding_agent" planning_active = current_agent == "planning_agent" sql_active = current_agent == "sql_generator_agent" executor_active = current_agent == "executor_agent" # Show the workflow visualization with col1: if understanding_active: st.markdown("### 🔍 **Understanding**") else: st.markdown("### 🔍 Understanding") st.markdown("Extracts user intent and asks clarification questions") if "user_intent" in session_state.conversation and session_state.conversation["user_intent"]: st.success("Completed") elif understanding_active: st.info("In Progress") else: st.warning("Not Started") with col2: if planning_active: st.markdown("### 📋 **Planning**") else: st.markdown("### 📋 Planning") st.markdown("Creates data pipeline plan with sources and transformations") if "pipeline_plan" in session_state.conversation and session_state.conversation["pipeline_plan"]: st.success("Completed") elif planning_active: st.info("In Progress") elif understanding_active: st.warning("Not Started") else: st.success("Completed") with col3: if sql_active: st.markdown("### 💻 **SQL Generation**") else: st.markdown("### 💻 SQL Generation") st.markdown("Converts plan into executable SQL queries") if "sql_queries" in session_state.conversation and session_state.conversation["sql_queries"]: st.success("Completed") elif sql_active: st.info("In Progress") elif understanding_active or planning_active: st.warning("Not Started") else: st.success("Completed") with col4: if executor_active: st.markdown("### ⚙️ **Execution**") else: st.markdown("### ⚙️ Execution") st.markdown("Executes queries and reports results") if "execution_results" in session_state.conversation and session_state.conversation["execution_results"]: st.success("Completed") elif executor_active: st.info("In Progress") elif understanding_active or planning_active or sql_active: st.warning("Not Started") else: st.success("Completed") # Overall confidence score if "confidence_scores" in session_state.conversation and "overall" in session_state.conversation["confidence_scores"]: st.markdown("### Overall Pipeline Confidence") score = session_state.conversation["confidence_scores"]["overall"] * 100 st.progress(score / 100, text=f"{score:.1f}%") # Workflow decision points if status == "complete": if score > 80: st.success("✅ High confidence - Pipeline can be deployed automatically") else: st.warning("⚠️ Medium confidence - Human review recommended before deployment") # Add human review section for pending approval status if status == "pending_approval": st.markdown("### 👤 Human Review Required") st.info("This pipeline requires human review before deployment") col1, col2 = st.columns(2) with col1: if st.button("✅ Approve Pipeline"): # Update state to approved session_state.conversation["status"] = "approved" # Trigger execution to continue st.rerun() with col2: if st.button("❌ Reject Pipeline"): # Update state to rejected session_state.conversation["status"] = "rejected" st.error("Pipeline rejected. Please provide feedback to refine the pipeline.") """) # DB Explorer UI db_explorer_py = "ui/db_explorer.py" if not os.path.exists(db_explorer_py): with open(db_explorer_py, "w") as f: f.write(""" import streamlit as st import pandas as pd def render_db_explorer_tab(session_state): \"\"\"Render the database explorer tab in the UI.\"\"\" st.subheader("Database Explorer") # Get tables by category tables = session_state.db.get_tables() # Display tables by category col1, col2 = st.columns(2) with col1: st.markdown("### Raw Data Tables") for table in tables["raw_tables"]: with st.expander(table): sample = session_state.db.get_table_sample(table, 3) st.dataframe(pd.DataFrame(sample)) st.markdown("### Staging Tables") for table in tables["staging_tables"]: with st.expander(table): sample = session_state.db.get_table_sample(table, 3) st.dataframe(pd.DataFrame(sample)) with col2: st.markdown("### Analytics Ready Data") for table in tables["ard_tables"]: with st.expander(table): sample = session_state.db.get_table_sample(table, 3) st.dataframe(pd.DataFrame(sample)) st.markdown("### Data Products") for table in tables["data_products"]: with st.expander(table): sample = session_state.db.get_table_sample(table, 3) st.dataframe(pd.DataFrame(sample)) # SQL Query Executor st.markdown("### Query Explorer") with st.form(key="sql_form"): sql_query = st.text_area("Enter SQL Query", height=100, placeholder="SELECT * FROM ARD_SALES_PERFORMANCE WHERE region = 'North' LIMIT 5") run_sql = st.form_submit_button("Run Query") if run_sql and sql_query: with st.spinner("Executing query..."): result = session_state.db.execute_query(sql_query) if "error" in result: st.error(f"Error executing query: {result['error']}") elif "data" in result: st.dataframe(pd.DataFrame(result["data"])) st.success(f"Query returned {len(result['data'])} rows") elif "tables" in result: st.write(result["tables"]) elif "schema" in result: st.write(f"Schema for {result['table']}:") st.dataframe(pd.DataFrame(result["schema"])) """) # Now import UI modules after ensuring they exist ensure_ui_files_exist() # Import UI modules from ui.conversation import render_conversation_tab from ui.pipeline import render_pipeline_tab from ui.agent_workflow import render_workflow_tab from ui.db_explorer import render_db_explorer_tab # Import agent state from agents.state import AgentState # Load environment variables from .env file if it exists load_dotenv() def main(): """Main application entry point.""" # Set page configuration st.set_page_config(page_title="Pharmaceutical Data Agent", page_icon="💊", layout="wide") api_key = os.environ.get("ANTHROPIC_API_KEY") # Set the API key as an environment variable os.environ["ANTHROPIC_API_KEY"] = api_key # Initialize Anthropic client using default settings anthropic_client = Anthropic() # Uses API key from environment variable # Initialize database with synthetic data if "db" not in st.session_state: st.session_state.db = SyntheticDatabase() # Initialize agent graph if "agent_graph" not in st.session_state or "update_state_dict" not in st.session_state: st.session_state.agent_graph, st.session_state.update_state_dict = create_agent_graph( anthropic_client, st.session_state.db ) # Initialize conversation state # Initialize conversation state # Initialize conversation state if "conversation" not in st.session_state: st.session_state.conversation = { "messages": [], "user_intent": {}, "data_context": {}, "pipeline_plan": {}, "sql_queries": [], "execution_results": {}, "confidence_scores": {}, "status": "planning", "current_agent": "understanding_agent" # Always a string, not a list } # Initialize agent state for new runs if "agent_state" not in st.session_state: st.session_state.agent_state = AgentState( messages=[], user_intent={}, data_context={}, pipeline_plan={}, sql_queries=[], execution_results={}, confidence_scores={}, status="planning", current_agent="understanding_agent" # Always a string, not a list ) # Main content area st.title("Pharmaceutical Data Management Agent") # Tabs for different views tab1, tab2, tab3, tab4 = st.tabs(["Conversation", "Pipeline Details", "Agent Workflow", "Database Explorer"]) with tab1: render_conversation_tab( st.session_state, st.session_state.agent_graph, st.session_state.update_state_dict ) with tab2: render_pipeline_tab(st.session_state) with tab3: render_workflow_tab(st.session_state) with tab4: render_db_explorer_tab(st.session_state) if __name__ == "__main__": main()