""" LangGraph workflow for pharmaceutical data management agents. """ from langgraph.graph import StateGraph, END, START from langchain_core.tools import tool from typing import Dict, Any import operator from agents.state import AgentState from agents.understanding import understanding_agent from agents.planning import planning_agent from agents.sql_generator import sql_generator_agent from agents.executor import executor_agent from agents.tools import ( tool_list_tables, tool_describe_table, tool_sample_table, tool_execute_query, tool_get_confidence ) from agents.utils.logging import log_agent_activity def create_agent_graph(anthropic_client, db): """ Create the agent workflow graph. Args: anthropic_client: The Anthropic client for calling Claude API db: The database connection Returns: Compiled LangGraph workflow and state update function """ # Wrap the agents with the anthropic client def understanding(state): return understanding_agent(anthropic_client, state) def planning(state): return planning_agent(anthropic_client, state) def sql_generation(state): return sql_generator_agent(anthropic_client, state) def execution(state): return executor_agent(anthropic_client, db, state) # Define LangGraph nodes nodes = { "understanding_agent": understanding, "planning_agent": planning, "sql_generator_agent": sql_generation, "executor_agent": execution } # Create a function to access the current state for the confidence tool state_dict = {} # This will be updated in the Streamlit app state_provider = lambda: state_dict # Create tools node with database-related tools # Tools implementation remains the same # Create Tool Agent Node def tool_handler(state): """Handle tool calls from the agent workflow.""" log_agent_activity("TOOL", state) return {} # Return empty state update since we don't modify state nodes["tools"] = tool_handler # Create the state graph workflow = StateGraph(AgentState) # Add all nodes for name, node in nodes.items(): workflow.add_node(name, node) # Set the entry point to understanding_agent workflow.add_edge(START, "understanding_agent") # Define routing functions def route_understanding(state): """Route from understanding agent.""" current_agent = state.get("current_agent", "understanding_agent") if current_agent == "planning_agent": return "planning_agent" return "understanding_agent" def route_planning(state): """Route from planning agent.""" current_agent = state.get("current_agent", "planning_agent") if current_agent == "sql_generator_agent": return "sql_generator_agent" return "planning_agent" def route_sql_generator(state): """Route from SQL generator agent.""" current_agent = state.get("current_agent", "sql_generator_agent") if current_agent == "executor_agent": return "executor_agent" return "sql_generator_agent" # Define conditional edges with our routing functions workflow.add_conditional_edges( "understanding_agent", route_understanding, { "understanding_agent": "understanding_agent", "planning_agent": "planning_agent" } ) workflow.add_conditional_edges( "planning_agent", route_planning, { "planning_agent": "planning_agent", "sql_generator_agent": "sql_generator_agent" } ) workflow.add_conditional_edges( "sql_generator_agent", route_sql_generator, { "sql_generator_agent": "sql_generator_agent", "executor_agent": "executor_agent" } ) # Executor agent finishes the workflow workflow.add_edge("executor_agent", END) # Add edges for tools - they can be called from any agent and return to that agent for agent in ["understanding_agent", "planning_agent", "sql_generator_agent", "executor_agent"]: workflow.add_edge(agent, "tools") workflow.add_edge("tools", agent) # Compile the workflow app = workflow.compile() # Update the state dictionary reference (used by the confidence tool) def update_state_dict(new_state): state_dict.clear() state_dict.update(new_state) # Return both the compiled workflow and the update function return app, update_state_dict