hermes / tools /delegate_tool.py
lenson78's picture
initial upload: v2026.3.23 with HF Spaces deployment
9aa5185 verified
#!/usr/bin/env python3
"""
Delegate Tool -- Subagent Architecture
Spawns child AIAgent instances with isolated context, restricted toolsets,
and their own terminal sessions. Supports single-task and batch (parallel)
modes. The parent blocks until all children complete.
Each child gets:
- A fresh conversation (no parent history)
- Its own task_id (own terminal session, file ops cache)
- A restricted toolset (configurable, with blocked tools always stripped)
- A focused system prompt built from the delegated goal + context
The parent's context only sees the delegation call and the summary result,
never the child's intermediate tool calls or reasoning.
"""
import json
import logging
logger = logging.getLogger(__name__)
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
# Tools that children must never have access to
DELEGATE_BLOCKED_TOOLS = frozenset([
"delegate_task", # no recursive delegation
"clarify", # no user interaction
"memory", # no writes to shared MEMORY.md
"send_message", # no cross-platform side effects
"execute_code", # children should reason step-by-step, not write scripts
])
MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
DEFAULT_MAX_ITERATIONS = 50
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
def check_delegate_requirements() -> bool:
"""Delegation has no external requirements -- always available."""
return True
def _build_child_system_prompt(goal: str, context: Optional[str] = None) -> str:
"""Build a focused system prompt for a child agent."""
parts = [
"You are a focused subagent working on a specific delegated task.",
"",
f"YOUR TASK:\n{goal}",
]
if context and context.strip():
parts.append(f"\nCONTEXT:\n{context}")
parts.append(
"\nComplete this task using the tools available to you. "
"When finished, provide a clear, concise summary of:\n"
"- What you did\n"
"- What you found or accomplished\n"
"- Any files you created or modified\n"
"- Any issues encountered\n\n"
"Be thorough but concise -- your response is returned to the "
"parent agent as a summary."
)
return "\n".join(parts)
def _strip_blocked_tools(toolsets: List[str]) -> List[str]:
"""Remove toolsets that contain only blocked tools."""
blocked_toolset_names = {
"delegation", "clarify", "memory", "code_execution",
}
return [t for t in toolsets if t not in blocked_toolset_names]
def _build_child_progress_callback(task_index: int, parent_agent, task_count: int = 1) -> Optional[callable]:
"""Build a callback that relays child agent tool calls to the parent display.
Two display paths:
CLI: prints tree-view lines above the parent's delegation spinner
Gateway: batches tool names and relays to parent's progress callback
Returns None if no display mechanism is available, in which case the
child agent runs with no progress callback (identical to current behavior).
"""
spinner = getattr(parent_agent, '_delegate_spinner', None)
parent_cb = getattr(parent_agent, 'tool_progress_callback', None)
if not spinner and not parent_cb:
return None # No display → no callback → zero behavior change
# Show 1-indexed prefix only in batch mode (multiple tasks)
prefix = f"[{task_index + 1}] " if task_count > 1 else ""
# Gateway: batch tool names, flush periodically
_BATCH_SIZE = 5
_batch: List[str] = []
def _callback(tool_name: str, preview: str = None):
# Special "_thinking" event: model produced text content (reasoning)
if tool_name == "_thinking":
if spinner:
short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "")
try:
spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
# Don't relay thinking to gateway (too noisy for chat)
return
# Regular tool call event
if spinner:
short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "")
from agent.display import get_tool_emoji
emoji = get_tool_emoji(tool_name)
line = f" {prefix}├─ {emoji} {tool_name}"
if short:
line += f" \"{short}\""
try:
spinner.print_above(line)
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
if parent_cb:
_batch.append(tool_name)
if len(_batch) >= _BATCH_SIZE:
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception as e:
logger.debug("Parent callback failed: %s", e)
_batch.clear()
def _flush():
"""Flush remaining batched tool names to gateway on completion."""
if parent_cb and _batch:
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception as e:
logger.debug("Parent callback flush failed: %s", e)
_batch.clear()
_callback._flush = _flush
return _callback
def _build_child_agent(
task_index: int,
goal: str,
context: Optional[str],
toolsets: Optional[List[str]],
model: Optional[str],
max_iterations: int,
parent_agent,
# Credential overrides from delegation config (provider:model resolution)
override_provider: Optional[str] = None,
override_base_url: Optional[str] = None,
override_api_key: Optional[str] = None,
override_api_mode: Optional[str] = None,
):
"""
Build a child AIAgent on the main thread (thread-safe construction).
Returns the constructed child agent without running it.
When override_* params are set (from delegation config), the child uses
those credentials instead of inheriting from the parent. This enables
routing subagents to a different provider:model pair (e.g. cheap/fast
model on OpenRouter while the parent runs on Nous Portal).
"""
from run_agent import AIAgent
import model_tools
# When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents.
if toolsets:
child_toolsets = _strip_blocked_tools(toolsets)
elif parent_agent and getattr(parent_agent, "enabled_toolsets", None):
child_toolsets = _strip_blocked_tools(parent_agent.enabled_toolsets)
else:
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
child_prompt = _build_child_system_prompt(goal, context)
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
parent_api_key = getattr(parent_agent, "api_key", None)
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
parent_api_key = parent_agent._client_kwargs.get("api_key")
# Build progress callback to relay tool calls to parent display
child_progress_cb = _build_child_progress_callback(task_index, parent_agent)
# Share the parent's iteration budget so subagent tool calls
# count toward the session-wide limit.
shared_budget = getattr(parent_agent, "iteration_budget", None)
# Resolve effective credentials: config override > parent inherit
effective_model = model or parent_agent.model
effective_provider = override_provider or getattr(parent_agent, "provider", None)
effective_base_url = override_base_url or parent_agent.base_url
effective_api_key = override_api_key or parent_api_key
effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None)
effective_acp_command = getattr(parent_agent, "acp_command", None)
effective_acp_args = list(getattr(parent_agent, "acp_args", []) or [])
child = AIAgent(
base_url=effective_base_url,
api_key=effective_api_key,
model=effective_model,
provider=effective_provider,
api_mode=effective_api_mode,
acp_command=effective_acp_command,
acp_args=effective_acp_args,
max_iterations=max_iterations,
max_tokens=getattr(parent_agent, "max_tokens", None),
reasoning_config=getattr(parent_agent, "reasoning_config", None),
prefill_messages=getattr(parent_agent, "prefill_messages", None),
enabled_toolsets=child_toolsets,
quiet_mode=True,
ephemeral_system_prompt=child_prompt,
log_prefix=f"[subagent-{task_index}]",
platform=parent_agent.platform,
skip_context_files=True,
skip_memory=True,
clarify_callback=None,
session_db=getattr(parent_agent, '_session_db', None),
providers_allowed=parent_agent.providers_allowed,
providers_ignored=parent_agent.providers_ignored,
providers_order=parent_agent.providers_order,
provider_sort=parent_agent.provider_sort,
tool_progress_callback=child_progress_cb,
iteration_budget=shared_budget,
)
# Set delegation depth so children can't spawn grandchildren
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
# Register child for interrupt propagation
if hasattr(parent_agent, '_active_children'):
lock = getattr(parent_agent, '_active_children_lock', None)
if lock:
with lock:
parent_agent._active_children.append(child)
else:
parent_agent._active_children.append(child)
return child
def _run_single_child(
task_index: int,
goal: str,
child=None,
parent_agent=None,
**_kwargs,
) -> Dict[str, Any]:
"""
Run a pre-built child agent. Called from within a thread.
Returns a structured result dict.
"""
child_start = time.monotonic()
# Get the progress callback from the child agent
child_progress_cb = getattr(child, 'tool_progress_callback', None)
# Restore parent tool names using the value saved before child construction
# mutated the global. This is the correct parent toolset, not the child's.
import model_tools
_saved_tool_names = getattr(child, "_delegate_saved_tool_names",
list(model_tools._last_resolved_tool_names))
try:
result = child.run_conversation(user_message=goal)
# Flush any remaining batched progress to gateway
if child_progress_cb and hasattr(child_progress_cb, '_flush'):
try:
child_progress_cb._flush()
except Exception as e:
logger.debug("Progress callback flush failed: %s", e)
duration = round(time.monotonic() - child_start, 2)
summary = result.get("final_response") or ""
completed = result.get("completed", False)
interrupted = result.get("interrupted", False)
api_calls = result.get("api_calls", 0)
if interrupted:
status = "interrupted"
elif completed and summary:
status = "completed"
else:
status = "failed"
# Build tool trace from conversation messages (already in memory).
# Uses tool_call_id to correctly pair parallel tool calls with results.
tool_trace: list[Dict[str, Any]] = []
trace_by_id: Dict[str, Dict[str, Any]] = {}
messages = result.get("messages") or []
if isinstance(messages, list):
for msg in messages:
if not isinstance(msg, dict):
continue
if msg.get("role") == "assistant":
for tc in (msg.get("tool_calls") or []):
fn = tc.get("function", {})
entry_t = {
"tool": fn.get("name", "unknown"),
"args_bytes": len(fn.get("arguments", "")),
}
tool_trace.append(entry_t)
tc_id = tc.get("id")
if tc_id:
trace_by_id[tc_id] = entry_t
elif msg.get("role") == "tool":
content = msg.get("content", "")
is_error = bool(
content and "error" in content[:80].lower()
)
result_meta = {
"result_bytes": len(content),
"status": "error" if is_error else "ok",
}
# Match by tool_call_id for parallel calls
tc_id = msg.get("tool_call_id")
target = trace_by_id.get(tc_id) if tc_id else None
if target is not None:
target.update(result_meta)
elif tool_trace:
# Fallback for messages without tool_call_id
tool_trace[-1].update(result_meta)
# Determine exit reason
if interrupted:
exit_reason = "interrupted"
elif completed:
exit_reason = "completed"
else:
exit_reason = "max_iterations"
# Extract token counts (safe for mock objects)
_input_tokens = getattr(child, "session_prompt_tokens", 0)
_output_tokens = getattr(child, "session_completion_tokens", 0)
_model = getattr(child, "model", None)
entry: Dict[str, Any] = {
"task_index": task_index,
"status": status,
"summary": summary,
"api_calls": api_calls,
"duration_seconds": duration,
"model": _model if isinstance(_model, str) else None,
"exit_reason": exit_reason,
"tokens": {
"input": _input_tokens if isinstance(_input_tokens, (int, float)) else 0,
"output": _output_tokens if isinstance(_output_tokens, (int, float)) else 0,
},
"tool_trace": tool_trace,
}
if status == "failed":
entry["error"] = result.get("error", "Subagent did not produce a response.")
return entry
except Exception as exc:
duration = round(time.monotonic() - child_start, 2)
logging.exception(f"[subagent-{task_index}] failed")
return {
"task_index": task_index,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": duration,
}
finally:
# Restore the parent's tool names so the process-global is correct
# for any subsequent execute_code calls or other consumers.
import model_tools
saved_tool_names = getattr(child, "_delegate_saved_tool_names", None)
if isinstance(saved_tool_names, list):
model_tools._last_resolved_tool_names = list(saved_tool_names)
# Unregister child from interrupt propagation
if hasattr(parent_agent, '_active_children'):
try:
lock = getattr(parent_agent, '_active_children_lock', None)
if lock:
with lock:
parent_agent._active_children.remove(child)
else:
parent_agent._active_children.remove(child)
except (ValueError, UnboundLocalError) as e:
logger.debug("Could not remove child from active_children: %s", e)
def delegate_task(
goal: Optional[str] = None,
context: Optional[str] = None,
toolsets: Optional[List[str]] = None,
tasks: Optional[List[Dict[str, Any]]] = None,
max_iterations: Optional[int] = None,
parent_agent=None,
) -> str:
"""
Spawn one or more child agents to handle delegated tasks.
Supports two modes:
- Single: provide goal (+ optional context, toolsets)
- Batch: provide tasks array [{goal, context, toolsets}, ...]
Returns JSON with results array, one entry per task.
"""
if parent_agent is None:
return json.dumps({"error": "delegate_task requires a parent agent context."})
# Depth limit
depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH:
return json.dumps({
"error": (
f"Delegation depth limit reached ({MAX_DEPTH}). "
"Subagents cannot spawn further subagents."
)
})
# Load config
cfg = _load_config()
default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS)
effective_max_iter = max_iterations or default_max_iter
# Resolve delegation credentials (provider:model pair).
# When delegation.provider is configured, this resolves the full credential
# bundle (base_url, api_key, api_mode) via the same runtime provider system
# used by CLI/gateway startup. When unconfigured, returns None values so
# children inherit from the parent.
try:
creds = _resolve_delegation_credentials(cfg, parent_agent)
except ValueError as exc:
return json.dumps({"error": str(exc)})
# Normalize to task list
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
else:
return json.dumps({"error": "Provide either 'goal' (single task) or 'tasks' (batch)."})
if not task_list:
return json.dumps({"error": "No tasks provided."})
# Validate each task has a goal
for i, task in enumerate(task_list):
if not task.get("goal", "").strip():
return json.dumps({"error": f"Task {i} is missing a 'goal'."})
overall_start = time.monotonic()
results = []
n_tasks = len(task_list)
# Track goal labels for progress display (truncated for readability)
task_labels = [t["goal"][:40] for t in task_list]
# Save parent tool names BEFORE any child construction mutates the global.
# _build_child_agent() calls AIAgent() which calls get_tool_definitions(),
# which overwrites model_tools._last_resolved_tool_names with child's toolset.
import model_tools as _model_tools
_parent_tool_names = list(_model_tools._last_resolved_tool_names)
# Build all child agents on the main thread (thread-safe construction)
# Wrapped in try/finally so the global is always restored even if a
# child build raises (otherwise _last_resolved_tool_names stays corrupted).
children = []
try:
for i, t in enumerate(task_list):
child = _build_child_agent(
task_index=i, goal=t["goal"], context=t.get("context"),
toolsets=t.get("toolsets") or toolsets, model=creds["model"],
max_iterations=effective_max_iter, parent_agent=parent_agent,
override_provider=creds["provider"], override_base_url=creds["base_url"],
override_api_key=creds["api_key"],
override_api_mode=creds["api_mode"],
)
# Override with correct parent tool names (before child construction mutated global)
child._delegate_saved_tool_names = _parent_tool_names
children.append((i, t, child))
finally:
# Authoritative restore: reset global to parent's tool names after all children built
_model_tools._last_resolved_tool_names = _parent_tool_names
if n_tasks == 1:
# Single task -- run directly (no thread pool overhead)
_i, _t, child = children[0]
result = _run_single_child(0, _t["goal"], child, parent_agent)
results.append(result)
else:
# Batch -- run in parallel with per-task progress lines
completed_count = 0
spinner_ref = getattr(parent_agent, '_delegate_spinner', None)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
_run_single_child,
task_index=i,
goal=t["goal"],
child=child,
parent_agent=parent_agent,
)
futures[future] = i
for future in as_completed(futures):
try:
entry = future.result()
except Exception as exc:
idx = futures[future]
entry = {
"task_index": idx,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": 0,
}
results.append(entry)
completed_count += 1
# Print per-task completion line above the spinner
idx = entry["task_index"]
label = task_labels[idx] if idx < len(task_labels) else f"Task {idx}"
dur = entry.get("duration_seconds", 0)
status = entry.get("status", "?")
icon = "✓" if status == "completed" else "✗"
remaining = n_tasks - completed_count
completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)"
if spinner_ref:
try:
spinner_ref.print_above(completion_line)
except Exception:
print(f" {completion_line}")
else:
print(f" {completion_line}")
# Update spinner text to show remaining count
if spinner_ref and remaining > 0:
try:
spinner_ref.update_text(f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining")
except Exception as e:
logger.debug("Spinner update_text failed: %s", e)
# Sort by task_index so results match input order
results.sort(key=lambda r: r["task_index"])
total_duration = round(time.monotonic() - overall_start, 2)
return json.dumps({
"results": results,
"total_duration_seconds": total_duration,
}, ensure_ascii=False)
def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict:
"""Resolve credentials for subagent delegation.
If ``delegation.base_url`` is configured, subagents use that direct
OpenAI-compatible endpoint. Otherwise, if ``delegation.provider`` is
configured, the full credential bundle (base_url, api_key, api_mode,
provider) is resolved via the runtime provider system — the same path used
by CLI/gateway startup. This lets subagents run on a completely different
provider:model pair.
If neither base_url nor provider is configured, returns None values so the
child inherits everything from the parent agent.
Raises ValueError with a user-friendly message on credential failure.
"""
configured_model = str(cfg.get("model") or "").strip() or None
configured_provider = str(cfg.get("provider") or "").strip() or None
configured_base_url = str(cfg.get("base_url") or "").strip() or None
configured_api_key = str(cfg.get("api_key") or "").strip() or None
if configured_base_url:
api_key = (
configured_api_key
or os.getenv("OPENAI_API_KEY", "").strip()
)
if not api_key:
raise ValueError(
"Delegation base_url is configured but no API key was found. "
"Set delegation.api_key or OPENAI_API_KEY."
)
base_lower = configured_base_url.lower()
provider = "custom"
api_mode = "chat_completions"
if "chatgpt.com/backend-api/codex" in base_lower:
provider = "openai-codex"
api_mode = "codex_responses"
elif "api.anthropic.com" in base_lower:
provider = "anthropic"
api_mode = "anthropic_messages"
return {
"model": configured_model,
"provider": provider,
"base_url": configured_base_url,
"api_key": api_key,
"api_mode": api_mode,
}
if not configured_provider:
# No provider override — child inherits everything from parent
return {
"model": configured_model,
"provider": None,
"base_url": None,
"api_key": None,
"api_mode": None,
}
# Provider is configured — resolve full credentials
try:
from hermes_cli.runtime_provider import resolve_runtime_provider
runtime = resolve_runtime_provider(requested=configured_provider)
except Exception as exc:
raise ValueError(
f"Cannot resolve delegation provider '{configured_provider}': {exc}. "
f"Check that the provider is configured (API key set, valid provider name), "
f"or set delegation.base_url/delegation.api_key for a direct endpoint. "
f"Available providers: openrouter, nous, zai, kimi-coding, minimax."
) from exc
api_key = runtime.get("api_key", "")
if not api_key:
raise ValueError(
f"Delegation provider '{configured_provider}' resolved but has no API key. "
f"Set the appropriate environment variable or run 'hermes login'."
)
return {
"model": configured_model,
"provider": runtime.get("provider"),
"base_url": runtime.get("base_url"),
"api_key": api_key,
"api_mode": runtime.get("api_mode"),
"command": runtime.get("command"),
"args": list(runtime.get("args") or []),
}
def _load_config() -> dict:
"""Load delegation config from CLI_CONFIG or persistent config.
Checks the runtime config (cli.py CLI_CONFIG) first, then falls back
to the persistent config (hermes_cli/config.py load_config()) so that
``delegation.model`` / ``delegation.provider`` are picked up regardless
of the entry point (CLI, gateway, cron).
"""
try:
from cli import CLI_CONFIG
cfg = CLI_CONFIG.get("delegation", {})
if cfg:
return cfg
except Exception:
pass
try:
from hermes_cli.config import load_config
full = load_config()
return full.get("delegation", {})
except Exception:
return {}
# ---------------------------------------------------------------------------
# OpenAI Function-Calling Schema
# ---------------------------------------------------------------------------
DELEGATE_TASK_SCHEMA = {
"name": "delegate_task",
"description": (
"Spawn one or more subagents to work on tasks in isolated contexts. "
"Each subagent gets its own conversation, terminal session, and toolset. "
"Only the final summary is returned -- intermediate tool results "
"never enter your context window.\n\n"
"TWO MODES (one of 'goal' or 'tasks' is required):\n"
"1. Single task: provide 'goal' (+ optional context, toolsets)\n"
"2. Batch (parallel): provide 'tasks' array with up to 3 items. "
"All run concurrently and results are returned together.\n\n"
"WHEN TO USE delegate_task:\n"
"- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
"- Tasks that would flood your context with intermediate data\n"
"- Parallel independent workstreams (research A and B simultaneously)\n\n"
"WHEN NOT TO USE (use these instead):\n"
"- Mechanical multi-step work with no reasoning needed -> use execute_code\n"
"- Single tool call -> just call the tool directly\n"
"- Tasks needing user interaction -> subagents cannot use clarify\n\n"
"IMPORTANT:\n"
"- Subagents have NO memory of your conversation. Pass all relevant "
"info (file paths, error messages, constraints) via the 'context' field.\n"
"- Subagents CANNOT call: delegate_task, clarify, memory, send_message, "
"execute_code.\n"
"- Each subagent gets its own terminal session (separate working directory and state).\n"
"- Results are always returned as an array, one entry per task."
),
"parameters": {
"type": "object",
"properties": {
"goal": {
"type": "string",
"description": (
"What the subagent should accomplish. Be specific and "
"self-contained -- the subagent knows nothing about your "
"conversation history."
),
},
"context": {
"type": "string",
"description": (
"Background information the subagent needs: file paths, "
"error messages, project structure, constraints. The more "
"specific you are, the better the subagent performs."
),
},
"toolsets": {
"type": "array",
"items": {"type": "string"},
"description": (
"Toolsets to enable for this subagent. "
"Default: inherits your enabled toolsets. "
"Common patterns: ['terminal', 'file'] for code work, "
"['web'] for research, ['terminal', 'file', 'web'] for "
"full-stack tasks."
),
},
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"goal": {"type": "string", "description": "Task goal"},
"context": {"type": "string", "description": "Task-specific context"},
"toolsets": {
"type": "array",
"items": {"type": "string"},
"description": "Toolsets for this specific task",
},
},
"required": ["goal"],
},
"maxItems": 3,
"description": (
"Batch mode: up to 3 tasks to run in parallel. Each gets "
"its own subagent with isolated context and terminal session. "
"When provided, top-level goal/context/toolsets are ignored."
),
},
"max_iterations": {
"type": "integer",
"description": (
"Max tool-calling turns per subagent (default: 50). "
"Only set lower for simple tasks."
),
},
},
"required": [],
},
}
# --- Registry ---
from tools.registry import registry
registry.register(
name="delegate_task",
toolset="delegation",
schema=DELEGATE_TASK_SCHEMA,
handler=lambda args, **kw: delegate_task(
goal=args.get("goal"),
context=args.get("context"),
toolsets=args.get("toolsets"),
tasks=args.get("tasks"),
max_iterations=args.get("max_iterations"),
parent_agent=kw.get("parent_agent")),
check_fn=check_delegate_requirements,
emoji="🔀",
)