Spaces:
Runtime error
Runtime error
| """ | |
| LangGraph workflow for pharmaceutical data management agents. | |
| """ | |
| from langgraph.graph import StateGraph, END, START | |
| from langchain_core.tools import tool | |
| from typing import Dict, Any | |
| import operator | |
| from agents.state import AgentState | |
| from agents.understanding import understanding_agent | |
| from agents.planning import planning_agent | |
| from agents.sql_generator import sql_generator_agent | |
| from agents.executor import executor_agent | |
| from agents.tools import ( | |
| tool_list_tables, | |
| tool_describe_table, | |
| tool_sample_table, | |
| tool_execute_query, | |
| tool_get_confidence | |
| ) | |
| from agents.utils.logging import log_agent_activity | |
| def create_agent_graph(anthropic_client, db): | |
| """ | |
| Create the agent workflow graph. | |
| Args: | |
| anthropic_client: The Anthropic client for calling Claude API | |
| db: The database connection | |
| Returns: | |
| Compiled LangGraph workflow and state update function | |
| """ | |
| # Wrap the agents with the anthropic client | |
| def understanding(state): | |
| return understanding_agent(anthropic_client, state) | |
| def planning(state): | |
| return planning_agent(anthropic_client, state) | |
| def sql_generation(state): | |
| return sql_generator_agent(anthropic_client, state) | |
| def execution(state): | |
| return executor_agent(anthropic_client, db, state) | |
| # Define LangGraph nodes | |
| nodes = { | |
| "understanding_agent": understanding, | |
| "planning_agent": planning, | |
| "sql_generator_agent": sql_generation, | |
| "executor_agent": execution | |
| } | |
| # Create a function to access the current state for the confidence tool | |
| state_dict = {} # This will be updated in the Streamlit app | |
| state_provider = lambda: state_dict | |
| # Create tools node with database-related tools | |
| # Tools implementation remains the same | |
| # Create Tool Agent Node | |
| def tool_handler(state): | |
| """Handle tool calls from the agent workflow.""" | |
| log_agent_activity("TOOL", state) | |
| return {} # Return empty state update since we don't modify state | |
| nodes["tools"] = tool_handler | |
| # Create the state graph | |
| workflow = StateGraph(AgentState) | |
| # Add all nodes | |
| for name, node in nodes.items(): | |
| workflow.add_node(name, node) | |
| # Set the entry point to understanding_agent | |
| workflow.add_edge(START, "understanding_agent") | |
| # Define routing functions | |
| def route_understanding(state): | |
| """Route from understanding agent.""" | |
| current_agent = state.get("current_agent", "understanding_agent") | |
| if current_agent == "planning_agent": | |
| return "planning_agent" | |
| return "understanding_agent" | |
| def route_planning(state): | |
| """Route from planning agent.""" | |
| current_agent = state.get("current_agent", "planning_agent") | |
| if current_agent == "sql_generator_agent": | |
| return "sql_generator_agent" | |
| return "planning_agent" | |
| def route_sql_generator(state): | |
| """Route from SQL generator agent.""" | |
| current_agent = state.get("current_agent", "sql_generator_agent") | |
| if current_agent == "executor_agent": | |
| return "executor_agent" | |
| return "sql_generator_agent" | |
| # Define conditional edges with our routing functions | |
| workflow.add_conditional_edges( | |
| "understanding_agent", | |
| route_understanding, | |
| { | |
| "understanding_agent": "understanding_agent", | |
| "planning_agent": "planning_agent" | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "planning_agent", | |
| route_planning, | |
| { | |
| "planning_agent": "planning_agent", | |
| "sql_generator_agent": "sql_generator_agent" | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "sql_generator_agent", | |
| route_sql_generator, | |
| { | |
| "sql_generator_agent": "sql_generator_agent", | |
| "executor_agent": "executor_agent" | |
| } | |
| ) | |
| # Executor agent finishes the workflow | |
| workflow.add_edge("executor_agent", END) | |
| # Add edges for tools - they can be called from any agent and return to that agent | |
| for agent in ["understanding_agent", "planning_agent", "sql_generator_agent", "executor_agent"]: | |
| workflow.add_edge(agent, "tools") | |
| workflow.add_edge("tools", agent) | |
| # Compile the workflow | |
| app = workflow.compile() | |
| # Update the state dictionary reference (used by the confidence tool) | |
| def update_state_dict(new_state): | |
| state_dict.clear() | |
| state_dict.update(new_state) | |
| # Return both the compiled workflow and the update function | |
| return app, update_state_dict |