cryogenic22's picture
Update graph/workflow.py
01db754 verified
"""
LangGraph workflow for pharmaceutical data management agents.
"""
from langgraph.graph import StateGraph, END, START
from langchain_core.tools import tool
from typing import Dict, Any
import operator
from agents.state import AgentState
from agents.understanding import understanding_agent
from agents.planning import planning_agent
from agents.sql_generator import sql_generator_agent
from agents.executor import executor_agent
from agents.tools import (
tool_list_tables,
tool_describe_table,
tool_sample_table,
tool_execute_query,
tool_get_confidence
)
from agents.utils.logging import log_agent_activity
def create_agent_graph(anthropic_client, db):
"""
Create the agent workflow graph.
Args:
anthropic_client: The Anthropic client for calling Claude API
db: The database connection
Returns:
Compiled LangGraph workflow and state update function
"""
# Wrap the agents with the anthropic client
def understanding(state):
return understanding_agent(anthropic_client, state)
def planning(state):
return planning_agent(anthropic_client, state)
def sql_generation(state):
return sql_generator_agent(anthropic_client, state)
def execution(state):
return executor_agent(anthropic_client, db, state)
# Define LangGraph nodes
nodes = {
"understanding_agent": understanding,
"planning_agent": planning,
"sql_generator_agent": sql_generation,
"executor_agent": execution
}
# Create a function to access the current state for the confidence tool
state_dict = {} # This will be updated in the Streamlit app
state_provider = lambda: state_dict
# Create tools node with database-related tools
# Tools implementation remains the same
# Create Tool Agent Node
def tool_handler(state):
"""Handle tool calls from the agent workflow."""
log_agent_activity("TOOL", state)
return {} # Return empty state update since we don't modify state
nodes["tools"] = tool_handler
# Create the state graph
workflow = StateGraph(AgentState)
# Add all nodes
for name, node in nodes.items():
workflow.add_node(name, node)
# Set the entry point to understanding_agent
workflow.add_edge(START, "understanding_agent")
# Define routing functions
def route_understanding(state):
"""Route from understanding agent."""
current_agent = state.get("current_agent", "understanding_agent")
if current_agent == "planning_agent":
return "planning_agent"
return "understanding_agent"
def route_planning(state):
"""Route from planning agent."""
current_agent = state.get("current_agent", "planning_agent")
if current_agent == "sql_generator_agent":
return "sql_generator_agent"
return "planning_agent"
def route_sql_generator(state):
"""Route from SQL generator agent."""
current_agent = state.get("current_agent", "sql_generator_agent")
if current_agent == "executor_agent":
return "executor_agent"
return "sql_generator_agent"
# Define conditional edges with our routing functions
workflow.add_conditional_edges(
"understanding_agent",
route_understanding,
{
"understanding_agent": "understanding_agent",
"planning_agent": "planning_agent"
}
)
workflow.add_conditional_edges(
"planning_agent",
route_planning,
{
"planning_agent": "planning_agent",
"sql_generator_agent": "sql_generator_agent"
}
)
workflow.add_conditional_edges(
"sql_generator_agent",
route_sql_generator,
{
"sql_generator_agent": "sql_generator_agent",
"executor_agent": "executor_agent"
}
)
# Executor agent finishes the workflow
workflow.add_edge("executor_agent", END)
# Add edges for tools - they can be called from any agent and return to that agent
for agent in ["understanding_agent", "planning_agent", "sql_generator_agent", "executor_agent"]:
workflow.add_edge(agent, "tools")
workflow.add_edge("tools", agent)
# Compile the workflow
app = workflow.compile()
# Update the state dictionary reference (used by the confidence tool)
def update_state_dict(new_state):
state_dict.clear()
state_dict.update(new_state)
# Return both the compiled workflow and the update function
return app, update_state_dict