Spaces:
Runtime error
Runtime error
| """ | |
| Main application for Pharmaceutical Data Management Agent. | |
| """ | |
| import os | |
| import sys | |
| import streamlit as st | |
| from anthropic import Anthropic | |
| from dotenv import load_dotenv | |
| # Add the current directory to the path to enable relative imports | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| # Import data module | |
| from data.synthetic_db import SyntheticDatabase | |
| # Import graph module | |
| from graph.workflow import create_agent_graph | |
| # Create ui directory if it doesn't exist | |
| os.makedirs("ui", exist_ok=True) | |
| # Create ui files if they don't exist | |
| def ensure_ui_files_exist(): | |
| """Create UI module files if they don't exist.""" | |
| # Conversation UI | |
| conversation_py = "ui/conversation.py" | |
| if not os.path.exists(conversation_py): | |
| with open(conversation_py, "w") as f: | |
| f.write(""" | |
| import streamlit as st | |
| def render_conversation_tab(session_state, agent_graph, update_state_dict): | |
| \"\"\"Render the conversation tab in the UI.\"\"\" | |
| st.subheader("Conversation with Data Management Agent") | |
| # Display conversation history | |
| for message in session_state.conversation["messages"]: | |
| if message["role"] == "user": | |
| st.markdown(f"**You:** {message['content']}") | |
| else: | |
| st.markdown(f"**Agent:** {message['content']}") | |
| # Input for new message | |
| with st.form(key="user_input_form"): | |
| user_input = st.text_area("What data pipeline do you need to create?", | |
| placeholder="e.g., I need a sales performance dashboard showing regional performance by product for the last 2 years") | |
| submit_button = st.form_submit_button("Submit") | |
| if submit_button and user_input: | |
| # Add user message to conversation | |
| new_message = {"role": "user", "content": user_input} | |
| session_state.conversation["messages"].append(new_message) | |
| # Update agent state | |
| agent_state = session_state.agent_state.copy() | |
| agent_state["messages"] = agent_state["messages"] + [new_message] | |
| # Run the agent graph | |
| with st.spinner("Agent is processing..."): | |
| try: | |
| # Update the state dictionary for tools | |
| update_state_dict(agent_state) | |
| # Execute the agent workflow | |
| result = agent_graph.invoke(agent_state) | |
| # Update session state with result | |
| session_state.agent_state = result | |
| # Update the state dictionary for tools again with the result | |
| update_state_dict(result) | |
| # Update conversation with agent responses | |
| for message in result["messages"]: | |
| if message not in session_state.conversation["messages"]: | |
| session_state.conversation["messages"].append(message) | |
| # Update other state properties | |
| session_state.conversation["user_intent"] = result.get("user_intent", {}) | |
| session_state.conversation["pipeline_plan"] = result.get("pipeline_plan", {}) | |
| session_state.conversation["sql_queries"] = result.get("sql_queries", []) | |
| session_state.conversation["execution_results"] = result.get("execution_results", {}) | |
| session_state.conversation["confidence_scores"] = result.get("confidence_scores", {}) | |
| session_state.conversation["status"] = result.get("status", "planning") | |
| session_state.conversation["current_agent"] = result.get("current_agent", "understanding_agent") | |
| # Force refresh | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error executing agent workflow: {str(e)}") | |
| """) | |
| # Pipeline UI | |
| pipeline_py = "ui/pipeline.py" | |
| if not os.path.exists(pipeline_py): | |
| with open(pipeline_py, "w") as f: | |
| f.write(""" | |
| import streamlit as st | |
| def render_pipeline_tab(session_state): | |
| \"\"\"Render the pipeline details tab in the UI.\"\"\" | |
| st.subheader("Pipeline Details") | |
| # Intent Understanding | |
| st.markdown("### User Intent") | |
| if session_state.conversation["user_intent"]: | |
| st.markdown(session_state.conversation["user_intent"].get("description", "No intent captured yet")) | |
| if "confidence_scores" in session_state.conversation and "intent_understanding" in session_state.conversation["confidence_scores"]: | |
| score = session_state.conversation["confidence_scores"]["intent_understanding"] * 100 | |
| st.progress(score / 100, text=f"Intent Understanding Confidence: {score:.1f}%") | |
| else: | |
| st.info("No user intent has been captured yet. Start a conversation to extract intent.") | |
| # Pipeline Plan | |
| st.markdown("### Pipeline Plan") | |
| if session_state.conversation["pipeline_plan"]: | |
| st.markdown(session_state.conversation["pipeline_plan"].get("description", "No plan created yet")) | |
| if "confidence_scores" in session_state.conversation and "plan_quality" in session_state.conversation["confidence_scores"]: | |
| score = session_state.conversation["confidence_scores"]["plan_quality"] * 100 | |
| st.progress(score / 100, text=f"Plan Quality Confidence: {score:.1f}%") | |
| else: | |
| st.info("No pipeline plan has been created yet. Continue the conversation to develop a plan.") | |
| # SQL Queries | |
| st.markdown("### SQL Queries") | |
| if session_state.conversation["sql_queries"]: | |
| for i, query in enumerate(session_state.conversation["sql_queries"]): | |
| with st.expander(f"Query {i+1}: {query.get('name', 'Unnamed Query')}"): | |
| st.code(query.get("sql", ""), language="sql") | |
| else: | |
| st.info("No SQL queries have been generated yet. Continue the conversation to generate queries.") | |
| # Execution Results | |
| st.markdown("### Execution Results") | |
| if session_state.conversation["execution_results"] and "details" in session_state.conversation["execution_results"]: | |
| st.markdown(session_state.conversation["execution_results"].get("summary", "")) | |
| if "success_rate" in session_state.conversation["execution_results"]: | |
| score = session_state.conversation["execution_results"]["success_rate"] * 100 | |
| st.progress(score / 100, text=f"Execution Success Rate: {score:.1f}%") | |
| results = session_state.conversation["execution_results"]["details"] | |
| for i, result in enumerate(results): | |
| status = "✅" if result["success"] else "❌" | |
| with st.expander(f"{status} {result.get('query_name', f'Query {i+1}')}"): | |
| st.markdown(f"**Result:** {result.get('result_summary', 'No summary available')}") | |
| st.markdown(f"**Rows Processed:** {result.get('row_count', 0)}") | |
| else: | |
| st.info("No execution results available yet. Complete the pipeline creation to see results.") | |
| """) | |
| # Agent Workflow UI | |
| agent_workflow_py = "ui/agent_workflow.py" | |
| if not os.path.exists(agent_workflow_py): | |
| with open(agent_workflow_py, "w") as f: | |
| f.write(""" | |
| import streamlit as st | |
| def render_workflow_tab(session_state): | |
| \"\"\"Render the agent workflow visualization tab in the UI.\"\"\" | |
| st.subheader("Agent Workflow Visualization") | |
| # Display current agent and status | |
| current_agent = session_state.conversation.get("current_agent", "understanding_agent") | |
| status = session_state.conversation.get("status", "planning") | |
| st.markdown(f"**Current State:** {status.title()}") | |
| st.markdown(f"**Current Agent:** {current_agent.replace('_', ' ').title()}") | |
| # Visualize the workflow | |
| col1, col2, col3, col4 = st.columns(4) | |
| # Determine which agent is active | |
| understanding_active = current_agent == "understanding_agent" | |
| planning_active = current_agent == "planning_agent" | |
| sql_active = current_agent == "sql_generator_agent" | |
| executor_active = current_agent == "executor_agent" | |
| # Show the workflow visualization | |
| with col1: | |
| if understanding_active: | |
| st.markdown("### 🔍 **Understanding**") | |
| else: | |
| st.markdown("### 🔍 Understanding") | |
| st.markdown("Extracts user intent and asks clarification questions") | |
| if "user_intent" in session_state.conversation and session_state.conversation["user_intent"]: | |
| st.success("Completed") | |
| elif understanding_active: | |
| st.info("In Progress") | |
| else: | |
| st.warning("Not Started") | |
| with col2: | |
| if planning_active: | |
| st.markdown("### 📋 **Planning**") | |
| else: | |
| st.markdown("### 📋 Planning") | |
| st.markdown("Creates data pipeline plan with sources and transformations") | |
| if "pipeline_plan" in session_state.conversation and session_state.conversation["pipeline_plan"]: | |
| st.success("Completed") | |
| elif planning_active: | |
| st.info("In Progress") | |
| elif understanding_active: | |
| st.warning("Not Started") | |
| else: | |
| st.success("Completed") | |
| with col3: | |
| if sql_active: | |
| st.markdown("### 💻 **SQL Generation**") | |
| else: | |
| st.markdown("### 💻 SQL Generation") | |
| st.markdown("Converts plan into executable SQL queries") | |
| if "sql_queries" in session_state.conversation and session_state.conversation["sql_queries"]: | |
| st.success("Completed") | |
| elif sql_active: | |
| st.info("In Progress") | |
| elif understanding_active or planning_active: | |
| st.warning("Not Started") | |
| else: | |
| st.success("Completed") | |
| with col4: | |
| if executor_active: | |
| st.markdown("### ⚙️ **Execution**") | |
| else: | |
| st.markdown("### ⚙️ Execution") | |
| st.markdown("Executes queries and reports results") | |
| if "execution_results" in session_state.conversation and session_state.conversation["execution_results"]: | |
| st.success("Completed") | |
| elif executor_active: | |
| st.info("In Progress") | |
| elif understanding_active or planning_active or sql_active: | |
| st.warning("Not Started") | |
| else: | |
| st.success("Completed") | |
| # Overall confidence score | |
| if "confidence_scores" in session_state.conversation and "overall" in session_state.conversation["confidence_scores"]: | |
| st.markdown("### Overall Pipeline Confidence") | |
| score = session_state.conversation["confidence_scores"]["overall"] * 100 | |
| st.progress(score / 100, text=f"{score:.1f}%") | |
| # Workflow decision points | |
| if status == "complete": | |
| if score > 80: | |
| st.success("✅ High confidence - Pipeline can be deployed automatically") | |
| else: | |
| st.warning("⚠️ Medium confidence - Human review recommended before deployment") | |
| # Add human review section for pending approval status | |
| if status == "pending_approval": | |
| st.markdown("### 👤 Human Review Required") | |
| st.info("This pipeline requires human review before deployment") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("✅ Approve Pipeline"): | |
| # Update state to approved | |
| session_state.conversation["status"] = "approved" | |
| # Trigger execution to continue | |
| st.rerun() | |
| with col2: | |
| if st.button("❌ Reject Pipeline"): | |
| # Update state to rejected | |
| session_state.conversation["status"] = "rejected" | |
| st.error("Pipeline rejected. Please provide feedback to refine the pipeline.") | |
| """) | |
| # DB Explorer UI | |
| db_explorer_py = "ui/db_explorer.py" | |
| if not os.path.exists(db_explorer_py): | |
| with open(db_explorer_py, "w") as f: | |
| f.write(""" | |
| import streamlit as st | |
| import pandas as pd | |
| def render_db_explorer_tab(session_state): | |
| \"\"\"Render the database explorer tab in the UI.\"\"\" | |
| st.subheader("Database Explorer") | |
| # Get tables by category | |
| tables = session_state.db.get_tables() | |
| # Display tables by category | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Raw Data Tables") | |
| for table in tables["raw_tables"]: | |
| with st.expander(table): | |
| sample = session_state.db.get_table_sample(table, 3) | |
| st.dataframe(pd.DataFrame(sample)) | |
| st.markdown("### Staging Tables") | |
| for table in tables["staging_tables"]: | |
| with st.expander(table): | |
| sample = session_state.db.get_table_sample(table, 3) | |
| st.dataframe(pd.DataFrame(sample)) | |
| with col2: | |
| st.markdown("### Analytics Ready Data") | |
| for table in tables["ard_tables"]: | |
| with st.expander(table): | |
| sample = session_state.db.get_table_sample(table, 3) | |
| st.dataframe(pd.DataFrame(sample)) | |
| st.markdown("### Data Products") | |
| for table in tables["data_products"]: | |
| with st.expander(table): | |
| sample = session_state.db.get_table_sample(table, 3) | |
| st.dataframe(pd.DataFrame(sample)) | |
| # SQL Query Executor | |
| st.markdown("### Query Explorer") | |
| with st.form(key="sql_form"): | |
| sql_query = st.text_area("Enter SQL Query", height=100, | |
| placeholder="SELECT * FROM ARD_SALES_PERFORMANCE WHERE region = 'North' LIMIT 5") | |
| run_sql = st.form_submit_button("Run Query") | |
| if run_sql and sql_query: | |
| with st.spinner("Executing query..."): | |
| result = session_state.db.execute_query(sql_query) | |
| if "error" in result: | |
| st.error(f"Error executing query: {result['error']}") | |
| elif "data" in result: | |
| st.dataframe(pd.DataFrame(result["data"])) | |
| st.success(f"Query returned {len(result['data'])} rows") | |
| elif "tables" in result: | |
| st.write(result["tables"]) | |
| elif "schema" in result: | |
| st.write(f"Schema for {result['table']}:") | |
| st.dataframe(pd.DataFrame(result["schema"])) | |
| """) | |
| # Now import UI modules after ensuring they exist | |
| ensure_ui_files_exist() | |
| # Import UI modules | |
| from ui.conversation import render_conversation_tab | |
| from ui.pipeline import render_pipeline_tab | |
| from ui.agent_workflow import render_workflow_tab | |
| from ui.db_explorer import render_db_explorer_tab | |
| # Import agent state | |
| from agents.state import AgentState | |
| # Load environment variables from .env file if it exists | |
| load_dotenv() | |
| def main(): | |
| """Main application entry point.""" | |
| # Set page configuration | |
| st.set_page_config(page_title="Pharmaceutical Data Agent", page_icon="💊", layout="wide") | |
| api_key = os.environ.get("ANTHROPIC_API_KEY") | |
| # Set the API key as an environment variable | |
| os.environ["ANTHROPIC_API_KEY"] = api_key | |
| # Initialize Anthropic client using default settings | |
| anthropic_client = Anthropic() # Uses API key from environment variable | |
| # Initialize database with synthetic data | |
| if "db" not in st.session_state: | |
| st.session_state.db = SyntheticDatabase() | |
| # Initialize agent graph | |
| if "agent_graph" not in st.session_state or "update_state_dict" not in st.session_state: | |
| st.session_state.agent_graph, st.session_state.update_state_dict = create_agent_graph( | |
| anthropic_client, | |
| st.session_state.db | |
| ) | |
| # Initialize conversation state | |
| # Initialize conversation state | |
| # Initialize conversation state | |
| if "conversation" not in st.session_state: | |
| st.session_state.conversation = { | |
| "messages": [], | |
| "user_intent": {}, | |
| "data_context": {}, | |
| "pipeline_plan": {}, | |
| "sql_queries": [], | |
| "execution_results": {}, | |
| "confidence_scores": {}, | |
| "status": "planning", | |
| "current_agent": "understanding_agent" # Always a string, not a list | |
| } | |
| # Initialize agent state for new runs | |
| if "agent_state" not in st.session_state: | |
| st.session_state.agent_state = AgentState( | |
| messages=[], | |
| user_intent={}, | |
| data_context={}, | |
| pipeline_plan={}, | |
| sql_queries=[], | |
| execution_results={}, | |
| confidence_scores={}, | |
| status="planning", | |
| current_agent="understanding_agent" # Always a string, not a list | |
| ) | |
| # Main content area | |
| st.title("Pharmaceutical Data Management Agent") | |
| # Tabs for different views | |
| tab1, tab2, tab3, tab4 = st.tabs(["Conversation", "Pipeline Details", "Agent Workflow", "Database Explorer"]) | |
| with tab1: | |
| render_conversation_tab( | |
| st.session_state, | |
| st.session_state.agent_graph, | |
| st.session_state.update_state_dict | |
| ) | |
| with tab2: | |
| render_pipeline_tab(st.session_state) | |
| with tab3: | |
| render_workflow_tab(st.session_state) | |
| with tab4: | |
| render_db_explorer_tab(st.session_state) | |
| if __name__ == "__main__": | |
| main() |