Hivenet_ComputeAgent / ComputeAgent /nodes /ReAct /human_approval_node.py
carraraig's picture
finish (#8)
5dd4236 verified
"""
Human Approval Node for ReAct Pattern - Enhanced Version
This module implements an enhanced human-in-the-loop approval system for the ReAct workflow
using LangGraph's interrupt() for API-friendly tool approval with argument modification support.
Key Features:
- LangGraph interrupt() for clean API integration
- Individual tool approval/rejection/modification
- Argument modification support with re-execution
- Batch approval support
- State management for approved/rejected/modified tools
Enhanced Capabilities:
- Approve: Execute tool with original arguments
- Reject: Skip tool execution
- Modify: Change tool arguments and re-execute reasoning
- Batch operations: Approve/reject/modify multiple tools at once
State Updates:
After approval, the state is updated with:
- approved_tool_calls: List of tools approved for execution (may include modified args)
- rejected_tool_calls: List of tools rejected by user
- modified_tool_calls: List of tools with modified arguments
- needs_re_reasoning: Flag to indicate if agent should re-reason with modified tools
- pending_tool_calls: Cleared after approval process
Example API Request:
>>> # Approve all tools
>>> user_decision = {
... "action": "approve_all"
... }
>>> # Approve specific tools
>>> user_decision = {
... "action": "approve_selected",
... "tool_indices": [0, 2] # Approve tools at index 0 and 2
... }
>>> # Reject all tools
>>> user_decision = {
... "action": "reject_all"
... }
>>> # Modify tool arguments
>>> user_decision = {
... "action": "modify_and_approve",
... "modifications": [
... {
... "tool_index": 0,
... "new_args": {"query": "modified search query"},
... "approve": True
... },
... {
... "tool_index": 1,
... "new_args": {"calculation": "2+2"},
... "approve": False
... }
... ]
... }
>>> # Request re-reasoning with tool context
>>> user_decision = {
... "action": "request_re_reasoning",
... "feedback": "Please search for more recent information"
... }
"""
from typing import Dict, Any, List
import logging
from langgraph.types import interrupt
logger = logging.getLogger("ReAct Human Approval")
def _get_tools_from_registry(workflow_id: int):
"""
Get tools from the global registry using workflow ID.
"""
from ComputeAgent.graph.graph_ReAct import _TOOLS_REGISTRY
tools = _TOOLS_REGISTRY.get(workflow_id)
if tools is None:
logger.warning(f"⚠️ Tools not found in registry for workflow_id: {workflow_id}")
return []
return tools
async def human_approval_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Enhanced node that handles human approval for tool execution using LangGraph interrupt.
Supports:
1. Approve all tools
2. Approve selected tools by index
3. Reject all tools
4. Reject selected tools by index
5. Modify tool arguments and approve/reject
6. Request re-reasoning with feedback
Args:
state: Current ReAct state with pending tool calls
Returns:
Updated state with approved, rejected, and/or modified tool calls
"""
pending_tools = state.get("pending_tool_calls", [])
if not pending_tools:
logger.info("ℹ️ No pending tool calls for approval")
return state
logger.info(f"πŸ‘€ Requesting human approval for {len(pending_tools)} tool call(s)")
# Get tools from registry for description lookup
workflow_id = state.get("workflow_id")
tools = _get_tools_from_registry(workflow_id) if workflow_id else []
# Prepare approval data to send to user
approval_data = {
"tool_calls": [
{
"index": i,
"id": tool.get("id"),
"name": tool.get("name"),
"args": tool.get("args"),
"description": _get_tool_description(tool.get("name"), tools)
}
for i, tool in enumerate(pending_tools)
],
"query": state.get("query", ""),
"total_tools": len(pending_tools)
}
# ✨ USE INTERRUPT - This pauses execution and waits for user input
user_decision = interrupt(approval_data)
logger.info(f"πŸ“₯ Received tool approval decision: {user_decision.get('action', 'unknown')}")
# Process the approval decision
return await _process_tool_approval_decision(state, pending_tools, user_decision)
async def _process_tool_approval_decision(
state: Dict[str, Any],
pending_tools: List[Dict[str, Any]],
user_decision: Dict[str, Any]
) -> Dict[str, Any]:
"""
Process user's tool approval decision and update state accordingly.
Args:
state: Current workflow state
pending_tools: List of pending tool calls
user_decision: User's decision dictionary
Returns:
Updated state with approval results
"""
action = user_decision.get("action", "reject_all")
approved_calls = []
rejected_calls = []
modified_calls = []
needs_re_reasoning = False
# Handle different action types
if action == "approve_all":
logger.info("βœ… User approved all tools")
approved_calls = pending_tools.copy()
elif action == "approve_selected":
tool_indices = user_decision.get("tool_indices", [])
logger.info(f"βœ… User approved tools at indices: {tool_indices}")
for i, tool in enumerate(pending_tools):
if i in tool_indices:
approved_calls.append(tool)
else:
rejected_calls.append(tool)
elif action == "reject_all":
logger.info("❌ User rejected all tools")
rejected_calls = pending_tools.copy()
elif action == "reject_selected":
tool_indices = user_decision.get("tool_indices", [])
logger.info(f"❌ User rejected tools at indices: {tool_indices}")
for i, tool in enumerate(pending_tools):
if i in tool_indices:
rejected_calls.append(tool)
else:
approved_calls.append(tool)
elif action == "modify_and_approve":
modifications = user_decision.get("modifications", [])
logger.info(f"πŸ”§ User requested modifications for {len(modifications)} tool(s)")
# Create a mapping of tool indices to modifications
mod_map = {mod["tool_index"]: mod for mod in modifications}
for i, tool in enumerate(pending_tools):
if i in mod_map:
mod = mod_map[i]
modified_tool = tool.copy()
# Update arguments
modified_tool["args"] = mod.get("new_args", tool["args"])
modified_calls.append({
"original": tool,
"modified": modified_tool,
"index": i
})
# Decide if this modified tool should be approved or rejected
if mod.get("approve", True):
approved_calls.append(modified_tool)
logger.info(f"βœ… Modified and approved tool at index {i}: {modified_tool['name']}")
else:
rejected_calls.append(modified_tool)
logger.info(f"❌ Modified but rejected tool at index {i}: {modified_tool['name']}")
else:
# No modification for this tool, keep original
approved_calls.append(tool)
elif action == "request_re_reasoning":
logger.info("πŸ”„ User requested re-reasoning")
needs_re_reasoning = True
rejected_calls = pending_tools.copy() # Reject current tools
# Store user feedback for re-reasoning
state["re_reasoning_feedback"] = user_decision.get("feedback", "")
else:
logger.warning(f"⚠️ Unknown action '{action}', defaulting to reject all")
rejected_calls = pending_tools.copy()
# Update state with approval results
updated_state = state.copy()
updated_state["approved_tool_calls"] = approved_calls
updated_state["rejected_tool_calls"] = rejected_calls
updated_state["modified_tool_calls"] = modified_calls
updated_state["needs_re_reasoning"] = needs_re_reasoning
updated_state["pending_tool_calls"] = [] # Clear pending calls
updated_state["current_step"] = "human_approval_complete"
# NOTE: Don't remove tools here - tool_execution needs them next
# Tools are only removed in terminal nodes (generate, tool_rejection_exit)
logger.info(
f"πŸ“Š Approval results: "
f"{len(approved_calls)} approved, "
f"{len(rejected_calls)} rejected, "
f"{len(modified_calls)} modified, "
f"re-reasoning: {needs_re_reasoning}"
)
return updated_state
def _get_tool_description(tool_name: str, tools: List[Any]) -> str:
"""
Get concise description for a tool by name (first sentence only).
Args:
tool_name: Name of the tool
tools: List of available tool objects
Returns:
First sentence of tool description or empty string
"""
for tool in tools:
if hasattr(tool, 'name') and tool.name == tool_name:
full_description = getattr(tool, 'description', '')
if full_description:
# Extract first sentence (split by period, newline, or question mark)
import re
# Split by sentence-ending punctuation
sentences = re.split(r'[.!?\n]+', full_description)
# Return first non-empty sentence
first_sentence = next((s.strip() for s in sentences if s.strip()), '')
return first_sentence if first_sentence else full_description
return ''
return ''