| """ |
| Human Approval Node for ReAct Pattern - Enhanced Version |
| |
| This module implements an enhanced human-in-the-loop approval system for the ReAct workflow |
| using LangGraph's interrupt() for API-friendly tool approval with argument modification support. |
| |
| Key Features: |
| - LangGraph interrupt() for clean API integration |
| - Individual tool approval/rejection/modification |
| - Argument modification support with re-execution |
| - Batch approval support |
| - State management for approved/rejected/modified tools |
| |
| Enhanced Capabilities: |
| - Approve: Execute tool with original arguments |
| - Reject: Skip tool execution |
| - Modify: Change tool arguments and re-execute reasoning |
| - Batch operations: Approve/reject/modify multiple tools at once |
| |
| State Updates: |
| After approval, the state is updated with: |
| - approved_tool_calls: List of tools approved for execution (may include modified args) |
| - rejected_tool_calls: List of tools rejected by user |
| - modified_tool_calls: List of tools with modified arguments |
| - needs_re_reasoning: Flag to indicate if agent should re-reason with modified tools |
| - pending_tool_calls: Cleared after approval process |
| |
| Example API Request: |
| >>> # Approve all tools |
| >>> user_decision = { |
| ... "action": "approve_all" |
| ... } |
| |
| >>> # Approve specific tools |
| >>> user_decision = { |
| ... "action": "approve_selected", |
| ... "tool_indices": [0, 2] # Approve tools at index 0 and 2 |
| ... } |
| |
| >>> # Reject all tools |
| >>> user_decision = { |
| ... "action": "reject_all" |
| ... } |
| |
| >>> # Modify tool arguments |
| >>> user_decision = { |
| ... "action": "modify_and_approve", |
| ... "modifications": [ |
| ... { |
| ... "tool_index": 0, |
| ... "new_args": {"query": "modified search query"}, |
| ... "approve": True |
| ... }, |
| ... { |
| ... "tool_index": 1, |
| ... "new_args": {"calculation": "2+2"}, |
| ... "approve": False |
| ... } |
| ... ] |
| ... } |
| |
| >>> # Request re-reasoning with tool context |
| >>> user_decision = { |
| ... "action": "request_re_reasoning", |
| ... "feedback": "Please search for more recent information" |
| ... } |
| """ |
|
|
| from typing import Dict, Any, List |
| import logging |
| from langgraph.types import interrupt |
|
|
| logger = logging.getLogger("ReAct Human Approval") |
|
|
|
|
| def _get_tools_from_registry(workflow_id: int): |
| """ |
| Get tools from the global registry using workflow ID. |
| """ |
| from ComputeAgent.graph.graph_ReAct import _TOOLS_REGISTRY |
| tools = _TOOLS_REGISTRY.get(workflow_id) |
| if tools is None: |
| logger.warning(f"β οΈ Tools not found in registry for workflow_id: {workflow_id}") |
| return [] |
| return tools |
|
|
|
|
| async def human_approval_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Enhanced node that handles human approval for tool execution using LangGraph interrupt. |
| |
| Supports: |
| 1. Approve all tools |
| 2. Approve selected tools by index |
| 3. Reject all tools |
| 4. Reject selected tools by index |
| 5. Modify tool arguments and approve/reject |
| 6. Request re-reasoning with feedback |
| |
| Args: |
| state: Current ReAct state with pending tool calls |
| |
| Returns: |
| Updated state with approved, rejected, and/or modified tool calls |
| """ |
| pending_tools = state.get("pending_tool_calls", []) |
|
|
| if not pending_tools: |
| logger.info("βΉοΈ No pending tool calls for approval") |
| return state |
|
|
| logger.info(f"π€ Requesting human approval for {len(pending_tools)} tool call(s)") |
|
|
| |
| workflow_id = state.get("workflow_id") |
| tools = _get_tools_from_registry(workflow_id) if workflow_id else [] |
|
|
| |
| approval_data = { |
| "tool_calls": [ |
| { |
| "index": i, |
| "id": tool.get("id"), |
| "name": tool.get("name"), |
| "args": tool.get("args"), |
| "description": _get_tool_description(tool.get("name"), tools) |
| } |
| for i, tool in enumerate(pending_tools) |
| ], |
| "query": state.get("query", ""), |
| "total_tools": len(pending_tools) |
| } |
| |
| |
| user_decision = interrupt(approval_data) |
| |
| logger.info(f"π₯ Received tool approval decision: {user_decision.get('action', 'unknown')}") |
| |
| |
| return await _process_tool_approval_decision(state, pending_tools, user_decision) |
|
|
|
|
| async def _process_tool_approval_decision( |
| state: Dict[str, Any], |
| pending_tools: List[Dict[str, Any]], |
| user_decision: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """ |
| Process user's tool approval decision and update state accordingly. |
| |
| Args: |
| state: Current workflow state |
| pending_tools: List of pending tool calls |
| user_decision: User's decision dictionary |
| |
| Returns: |
| Updated state with approval results |
| """ |
| action = user_decision.get("action", "reject_all") |
| |
| approved_calls = [] |
| rejected_calls = [] |
| modified_calls = [] |
| needs_re_reasoning = False |
| |
| |
| if action == "approve_all": |
| logger.info("β
User approved all tools") |
| approved_calls = pending_tools.copy() |
| |
| elif action == "approve_selected": |
| tool_indices = user_decision.get("tool_indices", []) |
| logger.info(f"β
User approved tools at indices: {tool_indices}") |
| |
| for i, tool in enumerate(pending_tools): |
| if i in tool_indices: |
| approved_calls.append(tool) |
| else: |
| rejected_calls.append(tool) |
| |
| elif action == "reject_all": |
| logger.info("β User rejected all tools") |
| rejected_calls = pending_tools.copy() |
| |
| elif action == "reject_selected": |
| tool_indices = user_decision.get("tool_indices", []) |
| logger.info(f"β User rejected tools at indices: {tool_indices}") |
| |
| for i, tool in enumerate(pending_tools): |
| if i in tool_indices: |
| rejected_calls.append(tool) |
| else: |
| approved_calls.append(tool) |
| |
| elif action == "modify_and_approve": |
| modifications = user_decision.get("modifications", []) |
| logger.info(f"π§ User requested modifications for {len(modifications)} tool(s)") |
| |
| |
| mod_map = {mod["tool_index"]: mod for mod in modifications} |
| |
| for i, tool in enumerate(pending_tools): |
| if i in mod_map: |
| mod = mod_map[i] |
| modified_tool = tool.copy() |
| |
| |
| modified_tool["args"] = mod.get("new_args", tool["args"]) |
| modified_calls.append({ |
| "original": tool, |
| "modified": modified_tool, |
| "index": i |
| }) |
| |
| |
| if mod.get("approve", True): |
| approved_calls.append(modified_tool) |
| logger.info(f"β
Modified and approved tool at index {i}: {modified_tool['name']}") |
| else: |
| rejected_calls.append(modified_tool) |
| logger.info(f"β Modified but rejected tool at index {i}: {modified_tool['name']}") |
| else: |
| |
| approved_calls.append(tool) |
| |
| elif action == "request_re_reasoning": |
| logger.info("π User requested re-reasoning") |
| needs_re_reasoning = True |
| rejected_calls = pending_tools.copy() |
| |
| |
| state["re_reasoning_feedback"] = user_decision.get("feedback", "") |
| |
| else: |
| logger.warning(f"β οΈ Unknown action '{action}', defaulting to reject all") |
| rejected_calls = pending_tools.copy() |
| |
| |
| updated_state = state.copy() |
| updated_state["approved_tool_calls"] = approved_calls |
| updated_state["rejected_tool_calls"] = rejected_calls |
| updated_state["modified_tool_calls"] = modified_calls |
| updated_state["needs_re_reasoning"] = needs_re_reasoning |
| updated_state["pending_tool_calls"] = [] |
| updated_state["current_step"] = "human_approval_complete" |
|
|
| |
| |
|
|
| logger.info( |
| f"π Approval results: " |
| f"{len(approved_calls)} approved, " |
| f"{len(rejected_calls)} rejected, " |
| f"{len(modified_calls)} modified, " |
| f"re-reasoning: {needs_re_reasoning}" |
| ) |
|
|
| return updated_state |
|
|
|
|
| def _get_tool_description(tool_name: str, tools: List[Any]) -> str: |
| """ |
| Get concise description for a tool by name (first sentence only). |
| |
| Args: |
| tool_name: Name of the tool |
| tools: List of available tool objects |
| |
| Returns: |
| First sentence of tool description or empty string |
| """ |
| for tool in tools: |
| if hasattr(tool, 'name') and tool.name == tool_name: |
| full_description = getattr(tool, 'description', '') |
| if full_description: |
| |
| import re |
| |
| sentences = re.split(r'[.!?\n]+', full_description) |
| |
| first_sentence = next((s.strip() for s in sentences if s.strip()), '') |
| return first_sentence if first_sentence else full_description |
| return '' |
| return '' |