Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Delegate Tool -- Subagent Architecture | |
| Spawns child AIAgent instances with isolated context, restricted toolsets, | |
| and their own terminal sessions. Supports single-task and batch (parallel) | |
| modes. The parent blocks until all children complete. | |
| Each child gets: | |
| - A fresh conversation (no parent history) | |
| - Its own task_id (own terminal session, file ops cache) | |
| - A restricted toolset (configurable, with blocked tools always stripped) | |
| - A focused system prompt built from the delegated goal + context | |
| The parent's context only sees the delegation call and the summary result, | |
| never the child's intermediate tool calls or reasoning. | |
| """ | |
| import json | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| import os | |
| import threading | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import Any, Dict, List, Optional | |
| from toolsets import TOOLSETS | |
| # Tools that children must never have access to | |
| DELEGATE_BLOCKED_TOOLS = frozenset([ | |
| "delegate_task", # no recursive delegation | |
| "clarify", # no user interaction | |
| "memory", # no writes to shared MEMORY.md | |
| "send_message", # no cross-platform side effects | |
| "execute_code", # children should reason step-by-step, not write scripts | |
| ]) | |
| # Build a description fragment listing toolsets available for subagents. | |
| # Excludes toolsets where ALL tools are blocked, composite/platform toolsets | |
| # (hermes-* prefixed), and scenario toolsets. | |
| _EXCLUDED_TOOLSET_NAMES = frozenset({"debugging", "safe", "delegation", "moa", "rl"}) | |
| _SUBAGENT_TOOLSETS = sorted( | |
| name for name, defn in TOOLSETS.items() | |
| if name not in _EXCLUDED_TOOLSET_NAMES | |
| and not name.startswith("hermes-") | |
| and not all(t in DELEGATE_BLOCKED_TOOLS for t in defn.get("tools", [])) | |
| ) | |
| _TOOLSET_LIST_STR = ", ".join(f"'{n}'" for n in _SUBAGENT_TOOLSETS) | |
| _DEFAULT_MAX_CONCURRENT_CHILDREN = 3 | |
| MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2) | |
| def _get_max_concurrent_children() -> int: | |
| """Read delegation.max_concurrent_children from config, falling back to | |
| DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3). | |
| Uses the same ``_load_config()`` path that the rest of ``delegate_task`` | |
| uses, keeping config priority consistent (config.yaml > env > default). | |
| """ | |
| cfg = _load_config() | |
| val = cfg.get("max_concurrent_children") | |
| if val is not None: | |
| try: | |
| return max(1, int(val)) | |
| except (TypeError, ValueError): | |
| logger.warning( | |
| "delegation.max_concurrent_children=%r is not a valid integer; " | |
| "using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN, | |
| ) | |
| env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN") | |
| if env_val: | |
| try: | |
| return max(1, int(env_val)) | |
| except (TypeError, ValueError): | |
| pass | |
| return _DEFAULT_MAX_CONCURRENT_CHILDREN | |
| DEFAULT_MAX_ITERATIONS = 50 | |
| _HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation | |
| DEFAULT_TOOLSETS = ["terminal", "file", "web"] | |
| def check_delegate_requirements() -> bool: | |
| """Delegation has no external requirements -- always available.""" | |
| return True | |
| def _build_child_system_prompt( | |
| goal: str, | |
| context: Optional[str] = None, | |
| *, | |
| workspace_path: Optional[str] = None, | |
| ) -> str: | |
| """Build a focused system prompt for a child agent.""" | |
| parts = [ | |
| "You are a focused subagent working on a specific delegated task.", | |
| "", | |
| f"YOUR TASK:\n{goal}", | |
| ] | |
| if context and context.strip(): | |
| parts.append(f"\nCONTEXT:\n{context}") | |
| if workspace_path and str(workspace_path).strip(): | |
| parts.append( | |
| "\nWORKSPACE PATH:\n" | |
| f"{workspace_path}\n" | |
| "Use this exact path for local repository/workdir operations unless the task explicitly says otherwise." | |
| ) | |
| parts.append( | |
| "\nComplete this task using the tools available to you. " | |
| "When finished, provide a clear, concise summary of:\n" | |
| "- What you did\n" | |
| "- What you found or accomplished\n" | |
| "- Any files you created or modified\n" | |
| "- Any issues encountered\n\n" | |
| "Important workspace rule: Never assume a repository lives at /workspace/... or any other container-style path unless the task/context explicitly gives that path. " | |
| "If no exact local path is provided, discover it first before issuing git/workdir-specific commands.\n\n" | |
| "Be thorough but concise -- your response is returned to the " | |
| "parent agent as a summary." | |
| ) | |
| return "\n".join(parts) | |
| def _resolve_workspace_hint(parent_agent) -> Optional[str]: | |
| """Best-effort local workspace hint for child prompts. | |
| We only inject a path when we have a concrete absolute directory. This avoids | |
| teaching subagents a fake container path while still helping them avoid | |
| guessing `/workspace/...` for local repo tasks. | |
| """ | |
| candidates = [ | |
| os.getenv("TERMINAL_CWD"), | |
| getattr(getattr(parent_agent, "_subdirectory_hints", None), "working_dir", None), | |
| getattr(parent_agent, "terminal_cwd", None), | |
| getattr(parent_agent, "cwd", None), | |
| ] | |
| for candidate in candidates: | |
| if not candidate: | |
| continue | |
| try: | |
| text = os.path.abspath(os.path.expanduser(str(candidate))) | |
| except Exception: | |
| continue | |
| if os.path.isabs(text) and os.path.isdir(text): | |
| return text | |
| return None | |
| def _strip_blocked_tools(toolsets: List[str]) -> List[str]: | |
| """Remove toolsets that contain only blocked tools.""" | |
| blocked_toolset_names = { | |
| "delegation", "clarify", "memory", "code_execution", | |
| } | |
| return [t for t in toolsets if t not in blocked_toolset_names] | |
| def _build_child_progress_callback(task_index: int, parent_agent, task_count: int = 1) -> Optional[callable]: | |
| """Build a callback that relays child agent tool calls to the parent display. | |
| Two display paths: | |
| CLI: prints tree-view lines above the parent's delegation spinner | |
| Gateway: batches tool names and relays to parent's progress callback | |
| Returns None if no display mechanism is available, in which case the | |
| child agent runs with no progress callback (identical to current behavior). | |
| """ | |
| spinner = getattr(parent_agent, '_delegate_spinner', None) | |
| parent_cb = getattr(parent_agent, 'tool_progress_callback', None) | |
| if not spinner and not parent_cb: | |
| return None # No display → no callback → zero behavior change | |
| # Show 1-indexed prefix only in batch mode (multiple tasks) | |
| prefix = f"[{task_index + 1}] " if task_count > 1 else "" | |
| # Gateway: batch tool names, flush periodically | |
| _BATCH_SIZE = 5 | |
| _batch: List[str] = [] | |
| def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs): | |
| # event_type is one of: "tool.started", "tool.completed", | |
| # "reasoning.available", "_thinking", "subagent_progress" | |
| # "_thinking" / reasoning events | |
| if event_type in ("_thinking", "reasoning.available"): | |
| text = preview or tool_name or "" | |
| if spinner: | |
| short = (text[:55] + "...") if len(text) > 55 else text | |
| try: | |
| spinner.print_above(f" {prefix}├─ 💭 \"{short}\"") | |
| except Exception as e: | |
| logger.debug("Spinner print_above failed: %s", e) | |
| # Don't relay thinking to gateway (too noisy for chat) | |
| return | |
| # tool.completed — no display needed here (spinner shows on started) | |
| if event_type == "tool.completed": | |
| return | |
| # tool.started — display and batch for parent relay | |
| if spinner: | |
| short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") | |
| from agent.display import get_tool_emoji | |
| emoji = get_tool_emoji(tool_name or "") | |
| line = f" {prefix}├─ {emoji} {tool_name}" | |
| if short: | |
| line += f" \"{short}\"" | |
| try: | |
| spinner.print_above(line) | |
| except Exception as e: | |
| logger.debug("Spinner print_above failed: %s", e) | |
| if parent_cb: | |
| _batch.append(tool_name or "") | |
| if len(_batch) >= _BATCH_SIZE: | |
| summary = ", ".join(_batch) | |
| try: | |
| parent_cb("subagent_progress", f"🔀 {prefix}{summary}") | |
| except Exception as e: | |
| logger.debug("Parent callback failed: %s", e) | |
| _batch.clear() | |
| def _flush(): | |
| """Flush remaining batched tool names to gateway on completion.""" | |
| if parent_cb and _batch: | |
| summary = ", ".join(_batch) | |
| try: | |
| parent_cb("subagent_progress", f"🔀 {prefix}{summary}") | |
| except Exception as e: | |
| logger.debug("Parent callback flush failed: %s", e) | |
| _batch.clear() | |
| _callback._flush = _flush | |
| return _callback | |
| def _build_child_agent( | |
| task_index: int, | |
| goal: str, | |
| context: Optional[str], | |
| toolsets: Optional[List[str]], | |
| model: Optional[str], | |
| max_iterations: int, | |
| parent_agent, | |
| # Credential overrides from delegation config (provider:model resolution) | |
| override_provider: Optional[str] = None, | |
| override_base_url: Optional[str] = None, | |
| override_api_key: Optional[str] = None, | |
| override_api_mode: Optional[str] = None, | |
| # ACP transport overrides — lets a non-ACP parent spawn ACP child agents | |
| override_acp_command: Optional[str] = None, | |
| override_acp_args: Optional[List[str]] = None, | |
| ): | |
| """ | |
| Build a child AIAgent on the main thread (thread-safe construction). | |
| Returns the constructed child agent without running it. | |
| When override_* params are set (from delegation config), the child uses | |
| those credentials instead of inheriting from the parent. This enables | |
| routing subagents to a different provider:model pair (e.g. cheap/fast | |
| model on OpenRouter while the parent runs on Nous Portal). | |
| """ | |
| from run_agent import AIAgent | |
| # When no explicit toolsets given, inherit from parent's enabled toolsets | |
| # so disabled tools (e.g. web) don't leak to subagents. | |
| # Note: enabled_toolsets=None means "all tools enabled" (the default), | |
| # so we must derive effective toolsets from the parent's loaded tools. | |
| parent_enabled = getattr(parent_agent, "enabled_toolsets", None) | |
| if parent_enabled is not None: | |
| parent_toolsets = set(parent_enabled) | |
| elif parent_agent and hasattr(parent_agent, "valid_tool_names"): | |
| # enabled_toolsets is None (all tools) — derive from loaded tool names | |
| import model_tools | |
| parent_toolsets = { | |
| ts for name in parent_agent.valid_tool_names | |
| if (ts := model_tools.get_toolset_for_tool(name)) is not None | |
| } | |
| else: | |
| parent_toolsets = set(DEFAULT_TOOLSETS) | |
| if toolsets: | |
| # Intersect with parent — subagent must not gain tools the parent lacks | |
| child_toolsets = _strip_blocked_tools([t for t in toolsets if t in parent_toolsets]) | |
| elif parent_agent and parent_enabled is not None: | |
| child_toolsets = _strip_blocked_tools(parent_enabled) | |
| elif parent_toolsets: | |
| child_toolsets = _strip_blocked_tools(sorted(parent_toolsets)) | |
| else: | |
| child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS) | |
| workspace_hint = _resolve_workspace_hint(parent_agent) | |
| child_prompt = _build_child_system_prompt(goal, context, workspace_path=workspace_hint) | |
| # Extract parent's API key so subagents inherit auth (e.g. Nous Portal). | |
| parent_api_key = getattr(parent_agent, "api_key", None) | |
| if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"): | |
| parent_api_key = parent_agent._client_kwargs.get("api_key") | |
| # Build progress callback to relay tool calls to parent display | |
| child_progress_cb = _build_child_progress_callback(task_index, parent_agent) | |
| # Each subagent gets its own iteration budget capped at max_iterations | |
| # (configurable via delegation.max_iterations, default 50). This means | |
| # total iterations across parent + subagents can exceed the parent's | |
| # max_iterations. The user controls the per-subagent cap in config.yaml. | |
| child_thinking_cb = None | |
| if child_progress_cb: | |
| def _child_thinking(text: str) -> None: | |
| if not text: | |
| return | |
| try: | |
| child_progress_cb("_thinking", text) | |
| except Exception as e: | |
| logger.debug("Child thinking callback relay failed: %s", e) | |
| child_thinking_cb = _child_thinking | |
| # Resolve effective credentials: config override > parent inherit | |
| effective_model = model or parent_agent.model | |
| effective_provider = override_provider or getattr(parent_agent, "provider", None) | |
| effective_base_url = override_base_url or parent_agent.base_url | |
| effective_api_key = override_api_key or parent_api_key | |
| effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None) | |
| effective_acp_command = override_acp_command or getattr(parent_agent, "acp_command", None) | |
| effective_acp_args = list(override_acp_args if override_acp_args is not None else (getattr(parent_agent, "acp_args", []) or [])) | |
| # Resolve reasoning config: delegation override > parent inherit | |
| parent_reasoning = getattr(parent_agent, "reasoning_config", None) | |
| child_reasoning = parent_reasoning | |
| try: | |
| delegation_cfg = _load_config() | |
| delegation_effort = str(delegation_cfg.get("reasoning_effort") or "").strip() | |
| if delegation_effort: | |
| from hermes_constants import parse_reasoning_effort | |
| parsed = parse_reasoning_effort(delegation_effort) | |
| if parsed is not None: | |
| child_reasoning = parsed | |
| else: | |
| logger.warning( | |
| "Unknown delegation.reasoning_effort '%s', inheriting parent level", | |
| delegation_effort, | |
| ) | |
| except Exception as exc: | |
| logger.debug("Could not load delegation reasoning_effort: %s", exc) | |
| child = AIAgent( | |
| base_url=effective_base_url, | |
| api_key=effective_api_key, | |
| model=effective_model, | |
| provider=effective_provider, | |
| api_mode=effective_api_mode, | |
| acp_command=effective_acp_command, | |
| acp_args=effective_acp_args, | |
| max_iterations=max_iterations, | |
| max_tokens=getattr(parent_agent, "max_tokens", None), | |
| reasoning_config=child_reasoning, | |
| prefill_messages=getattr(parent_agent, "prefill_messages", None), | |
| enabled_toolsets=child_toolsets, | |
| quiet_mode=True, | |
| ephemeral_system_prompt=child_prompt, | |
| log_prefix=f"[subagent-{task_index}]", | |
| platform=parent_agent.platform, | |
| skip_context_files=True, | |
| skip_memory=True, | |
| clarify_callback=None, | |
| thinking_callback=child_thinking_cb, | |
| session_db=getattr(parent_agent, '_session_db', None), | |
| parent_session_id=getattr(parent_agent, 'session_id', None), | |
| providers_allowed=parent_agent.providers_allowed, | |
| providers_ignored=parent_agent.providers_ignored, | |
| providers_order=parent_agent.providers_order, | |
| provider_sort=parent_agent.provider_sort, | |
| tool_progress_callback=child_progress_cb, | |
| iteration_budget=None, # fresh budget per subagent | |
| ) | |
| child._print_fn = getattr(parent_agent, '_print_fn', None) | |
| # Set delegation depth so children can't spawn grandchildren | |
| child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1 | |
| # Share a credential pool with the child when possible so subagents can | |
| # rotate credentials on rate limits instead of getting pinned to one key. | |
| child_pool = _resolve_child_credential_pool(effective_provider, parent_agent) | |
| if child_pool is not None: | |
| child._credential_pool = child_pool | |
| # Register child for interrupt propagation | |
| if hasattr(parent_agent, '_active_children'): | |
| lock = getattr(parent_agent, '_active_children_lock', None) | |
| if lock: | |
| with lock: | |
| parent_agent._active_children.append(child) | |
| else: | |
| parent_agent._active_children.append(child) | |
| return child | |
| def _run_single_child( | |
| task_index: int, | |
| goal: str, | |
| child=None, | |
| parent_agent=None, | |
| **_kwargs, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run a pre-built child agent. Called from within a thread. | |
| Returns a structured result dict. | |
| """ | |
| child_start = time.monotonic() | |
| # Get the progress callback from the child agent | |
| child_progress_cb = getattr(child, 'tool_progress_callback', None) | |
| # Restore parent tool names using the value saved before child construction | |
| # mutated the global. This is the correct parent toolset, not the child's. | |
| import model_tools | |
| _saved_tool_names = getattr(child, "_delegate_saved_tool_names", | |
| list(model_tools._last_resolved_tool_names)) | |
| child_pool = getattr(child, '_credential_pool', None) | |
| leased_cred_id = None | |
| if child_pool is not None: | |
| leased_cred_id = child_pool.acquire_lease() | |
| if leased_cred_id is not None: | |
| try: | |
| leased_entry = child_pool.current() | |
| if leased_entry is not None and hasattr(child, '_swap_credential'): | |
| child._swap_credential(leased_entry) | |
| except Exception as exc: | |
| logger.debug("Failed to bind child to leased credential: %s", exc) | |
| # Heartbeat: periodically propagate child activity to the parent so the | |
| # gateway inactivity timeout doesn't fire while the subagent is working. | |
| # Without this, the parent's _last_activity_ts freezes when delegate_task | |
| # starts and the gateway eventually kills the agent for "no activity". | |
| _heartbeat_stop = threading.Event() | |
| def _heartbeat_loop(): | |
| while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL): | |
| if parent_agent is None: | |
| continue | |
| touch = getattr(parent_agent, '_touch_activity', None) | |
| if not touch: | |
| continue | |
| # Pull detail from the child's own activity tracker | |
| desc = f"delegate_task: subagent {task_index} working" | |
| try: | |
| child_summary = child.get_activity_summary() | |
| child_tool = child_summary.get("current_tool") | |
| child_iter = child_summary.get("api_call_count", 0) | |
| child_max = child_summary.get("max_iterations", 0) | |
| if child_tool: | |
| desc = (f"delegate_task: subagent running {child_tool} " | |
| f"(iteration {child_iter}/{child_max})") | |
| else: | |
| child_desc = child_summary.get("last_activity_desc", "") | |
| if child_desc: | |
| desc = (f"delegate_task: subagent {child_desc} " | |
| f"(iteration {child_iter}/{child_max})") | |
| except Exception: | |
| pass | |
| try: | |
| touch(desc) | |
| except Exception: | |
| pass | |
| _heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True) | |
| _heartbeat_thread.start() | |
| try: | |
| result = child.run_conversation(user_message=goal) | |
| # Flush any remaining batched progress to gateway | |
| if child_progress_cb and hasattr(child_progress_cb, '_flush'): | |
| try: | |
| child_progress_cb._flush() | |
| except Exception as e: | |
| logger.debug("Progress callback flush failed: %s", e) | |
| duration = round(time.monotonic() - child_start, 2) | |
| summary = result.get("final_response") or "" | |
| completed = result.get("completed", False) | |
| interrupted = result.get("interrupted", False) | |
| api_calls = result.get("api_calls", 0) | |
| if interrupted: | |
| status = "interrupted" | |
| elif summary: | |
| # A summary means the subagent produced usable output. | |
| # exit_reason ("completed" vs "max_iterations") already | |
| # tells the parent *how* the task ended. | |
| status = "completed" | |
| else: | |
| status = "failed" | |
| # Build tool trace from conversation messages (already in memory). | |
| # Uses tool_call_id to correctly pair parallel tool calls with results. | |
| tool_trace: list[Dict[str, Any]] = [] | |
| trace_by_id: Dict[str, Dict[str, Any]] = {} | |
| messages = result.get("messages") or [] | |
| if isinstance(messages, list): | |
| for msg in messages: | |
| if not isinstance(msg, dict): | |
| continue | |
| if msg.get("role") == "assistant": | |
| for tc in (msg.get("tool_calls") or []): | |
| fn = tc.get("function", {}) | |
| entry_t = { | |
| "tool": fn.get("name", "unknown"), | |
| "args_bytes": len(fn.get("arguments", "")), | |
| } | |
| tool_trace.append(entry_t) | |
| tc_id = tc.get("id") | |
| if tc_id: | |
| trace_by_id[tc_id] = entry_t | |
| elif msg.get("role") == "tool": | |
| content = msg.get("content", "") | |
| is_error = bool( | |
| content and "error" in content[:80].lower() | |
| ) | |
| result_meta = { | |
| "result_bytes": len(content), | |
| "status": "error" if is_error else "ok", | |
| } | |
| # Match by tool_call_id for parallel calls | |
| tc_id = msg.get("tool_call_id") | |
| target = trace_by_id.get(tc_id) if tc_id else None | |
| if target is not None: | |
| target.update(result_meta) | |
| elif tool_trace: | |
| # Fallback for messages without tool_call_id | |
| tool_trace[-1].update(result_meta) | |
| # Determine exit reason | |
| if interrupted: | |
| exit_reason = "interrupted" | |
| elif completed: | |
| exit_reason = "completed" | |
| else: | |
| exit_reason = "max_iterations" | |
| # Extract token counts (safe for mock objects) | |
| _input_tokens = getattr(child, "session_prompt_tokens", 0) | |
| _output_tokens = getattr(child, "session_completion_tokens", 0) | |
| _model = getattr(child, "model", None) | |
| entry: Dict[str, Any] = { | |
| "task_index": task_index, | |
| "status": status, | |
| "summary": summary, | |
| "api_calls": api_calls, | |
| "duration_seconds": duration, | |
| "model": _model if isinstance(_model, str) else None, | |
| "exit_reason": exit_reason, | |
| "tokens": { | |
| "input": _input_tokens if isinstance(_input_tokens, (int, float)) else 0, | |
| "output": _output_tokens if isinstance(_output_tokens, (int, float)) else 0, | |
| }, | |
| "tool_trace": tool_trace, | |
| } | |
| if status == "failed": | |
| entry["error"] = result.get("error", "Subagent did not produce a response.") | |
| return entry | |
| except Exception as exc: | |
| duration = round(time.monotonic() - child_start, 2) | |
| logging.exception(f"[subagent-{task_index}] failed") | |
| return { | |
| "task_index": task_index, | |
| "status": "error", | |
| "summary": None, | |
| "error": str(exc), | |
| "api_calls": 0, | |
| "duration_seconds": duration, | |
| } | |
| finally: | |
| # Stop the heartbeat thread so it doesn't keep touching parent activity | |
| # after the child has finished (or failed). | |
| _heartbeat_stop.set() | |
| _heartbeat_thread.join(timeout=5) | |
| if child_pool is not None and leased_cred_id is not None: | |
| try: | |
| child_pool.release_lease(leased_cred_id) | |
| except Exception as exc: | |
| logger.debug("Failed to release credential lease: %s", exc) | |
| # Restore the parent's tool names so the process-global is correct | |
| # for any subsequent execute_code calls or other consumers. | |
| import model_tools | |
| saved_tool_names = getattr(child, "_delegate_saved_tool_names", None) | |
| if isinstance(saved_tool_names, list): | |
| model_tools._last_resolved_tool_names = list(saved_tool_names) | |
| # Remove child from active tracking | |
| # Unregister child from interrupt propagation | |
| if hasattr(parent_agent, '_active_children'): | |
| try: | |
| lock = getattr(parent_agent, '_active_children_lock', None) | |
| if lock: | |
| with lock: | |
| parent_agent._active_children.remove(child) | |
| else: | |
| parent_agent._active_children.remove(child) | |
| except (ValueError, UnboundLocalError) as e: | |
| logger.debug("Could not remove child from active_children: %s", e) | |
| # Close tool resources (terminal sandboxes, browser daemons, | |
| # background processes, httpx clients) so subagent subprocesses | |
| # don't outlive the delegation. | |
| try: | |
| if hasattr(child, 'close'): | |
| child.close() | |
| except Exception: | |
| logger.debug("Failed to close child agent after delegation") | |
| def delegate_task( | |
| goal: Optional[str] = None, | |
| context: Optional[str] = None, | |
| toolsets: Optional[List[str]] = None, | |
| tasks: Optional[List[Dict[str, Any]]] = None, | |
| max_iterations: Optional[int] = None, | |
| acp_command: Optional[str] = None, | |
| acp_args: Optional[List[str]] = None, | |
| parent_agent=None, | |
| ) -> str: | |
| """ | |
| Spawn one or more child agents to handle delegated tasks. | |
| Supports two modes: | |
| - Single: provide goal (+ optional context, toolsets) | |
| - Batch: provide tasks array [{goal, context, toolsets}, ...] | |
| Returns JSON with results array, one entry per task. | |
| """ | |
| if parent_agent is None: | |
| return tool_error("delegate_task requires a parent agent context.") | |
| # Depth limit | |
| depth = getattr(parent_agent, '_delegate_depth', 0) | |
| if depth >= MAX_DEPTH: | |
| return json.dumps({ | |
| "error": ( | |
| f"Delegation depth limit reached ({MAX_DEPTH}). " | |
| "Subagents cannot spawn further subagents." | |
| ) | |
| }) | |
| # Load config | |
| cfg = _load_config() | |
| default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS) | |
| effective_max_iter = max_iterations or default_max_iter | |
| # Resolve delegation credentials (provider:model pair). | |
| # When delegation.provider is configured, this resolves the full credential | |
| # bundle (base_url, api_key, api_mode) via the same runtime provider system | |
| # used by CLI/gateway startup. When unconfigured, returns None values so | |
| # children inherit from the parent. | |
| try: | |
| creds = _resolve_delegation_credentials(cfg, parent_agent) | |
| except ValueError as exc: | |
| return tool_error(str(exc)) | |
| # Normalize to task list | |
| max_children = _get_max_concurrent_children() | |
| if tasks and isinstance(tasks, list): | |
| if len(tasks) > max_children: | |
| return tool_error( | |
| f"Too many tasks: {len(tasks)} provided, but " | |
| f"max_concurrent_children is {max_children}. " | |
| f"Either reduce the task count, split into multiple " | |
| f"delegate_task calls, or increase " | |
| f"delegation.max_concurrent_children in config.yaml." | |
| ) | |
| task_list = tasks | |
| elif goal and isinstance(goal, str) and goal.strip(): | |
| task_list = [{"goal": goal, "context": context, "toolsets": toolsets}] | |
| else: | |
| return tool_error("Provide either 'goal' (single task) or 'tasks' (batch).") | |
| if not task_list: | |
| return tool_error("No tasks provided.") | |
| # Validate each task has a goal | |
| for i, task in enumerate(task_list): | |
| if not task.get("goal", "").strip(): | |
| return tool_error(f"Task {i} is missing a 'goal'.") | |
| overall_start = time.monotonic() | |
| results = [] | |
| n_tasks = len(task_list) | |
| # Track goal labels for progress display (truncated for readability) | |
| task_labels = [t["goal"][:40] for t in task_list] | |
| # Save parent tool names BEFORE any child construction mutates the global. | |
| # _build_child_agent() calls AIAgent() which calls get_tool_definitions(), | |
| # which overwrites model_tools._last_resolved_tool_names with child's toolset. | |
| import model_tools as _model_tools | |
| _parent_tool_names = list(_model_tools._last_resolved_tool_names) | |
| # Build all child agents on the main thread (thread-safe construction) | |
| # Wrapped in try/finally so the global is always restored even if a | |
| # child build raises (otherwise _last_resolved_tool_names stays corrupted). | |
| children = [] | |
| try: | |
| for i, t in enumerate(task_list): | |
| child = _build_child_agent( | |
| task_index=i, goal=t["goal"], context=t.get("context"), | |
| toolsets=t.get("toolsets") or toolsets, model=creds["model"], | |
| max_iterations=effective_max_iter, parent_agent=parent_agent, | |
| override_provider=creds["provider"], override_base_url=creds["base_url"], | |
| override_api_key=creds["api_key"], | |
| override_api_mode=creds["api_mode"], | |
| override_acp_command=t.get("acp_command") or acp_command, | |
| override_acp_args=t.get("acp_args") or acp_args, | |
| ) | |
| # Override with correct parent tool names (before child construction mutated global) | |
| child._delegate_saved_tool_names = _parent_tool_names | |
| children.append((i, t, child)) | |
| finally: | |
| # Authoritative restore: reset global to parent's tool names after all children built | |
| _model_tools._last_resolved_tool_names = _parent_tool_names | |
| if n_tasks == 1: | |
| # Single task -- run directly (no thread pool overhead) | |
| _i, _t, child = children[0] | |
| result = _run_single_child(0, _t["goal"], child, parent_agent) | |
| results.append(result) | |
| else: | |
| # Batch -- run in parallel with per-task progress lines | |
| completed_count = 0 | |
| spinner_ref = getattr(parent_agent, '_delegate_spinner', None) | |
| with ThreadPoolExecutor(max_workers=max_children) as executor: | |
| futures = {} | |
| for i, t, child in children: | |
| future = executor.submit( | |
| _run_single_child, | |
| task_index=i, | |
| goal=t["goal"], | |
| child=child, | |
| parent_agent=parent_agent, | |
| ) | |
| futures[future] = i | |
| for future in as_completed(futures): | |
| try: | |
| entry = future.result() | |
| except Exception as exc: | |
| idx = futures[future] | |
| entry = { | |
| "task_index": idx, | |
| "status": "error", | |
| "summary": None, | |
| "error": str(exc), | |
| "api_calls": 0, | |
| "duration_seconds": 0, | |
| } | |
| results.append(entry) | |
| completed_count += 1 | |
| # Print per-task completion line above the spinner | |
| idx = entry["task_index"] | |
| label = task_labels[idx] if idx < len(task_labels) else f"Task {idx}" | |
| dur = entry.get("duration_seconds", 0) | |
| status = entry.get("status", "?") | |
| icon = "✓" if status == "completed" else "✗" | |
| remaining = n_tasks - completed_count | |
| completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)" | |
| if spinner_ref: | |
| try: | |
| spinner_ref.print_above(completion_line) | |
| except Exception: | |
| print(f" {completion_line}") | |
| else: | |
| print(f" {completion_line}") | |
| # Update spinner text to show remaining count | |
| if spinner_ref and remaining > 0: | |
| try: | |
| spinner_ref.update_text(f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining") | |
| except Exception as e: | |
| logger.debug("Spinner update_text failed: %s", e) | |
| # Sort by task_index so results match input order | |
| results.sort(key=lambda r: r["task_index"]) | |
| # Notify parent's memory provider of delegation outcomes | |
| if parent_agent and hasattr(parent_agent, '_memory_manager') and parent_agent._memory_manager: | |
| for entry in results: | |
| try: | |
| _task_goal = task_list[entry["task_index"]]["goal"] if entry["task_index"] < len(task_list) else "" | |
| parent_agent._memory_manager.on_delegation( | |
| task=_task_goal, | |
| result=entry.get("summary", "") or "", | |
| child_session_id=getattr(children[entry["task_index"]][2], "session_id", "") if entry["task_index"] < len(children) else "", | |
| ) | |
| except Exception: | |
| pass | |
| total_duration = round(time.monotonic() - overall_start, 2) | |
| return json.dumps({ | |
| "results": results, | |
| "total_duration_seconds": total_duration, | |
| }, ensure_ascii=False) | |
| def _resolve_child_credential_pool(effective_provider: Optional[str], parent_agent): | |
| """Resolve a credential pool for the child agent. | |
| Rules: | |
| 1. Same provider as the parent -> share the parent's pool so cooldown state | |
| and rotation stay synchronized. | |
| 2. Different provider -> try to load that provider's own pool. | |
| 3. No pool available -> return None and let the child keep the inherited | |
| fixed credential behavior. | |
| """ | |
| if not effective_provider: | |
| return getattr(parent_agent, "_credential_pool", None) | |
| parent_provider = getattr(parent_agent, "provider", None) or "" | |
| parent_pool = getattr(parent_agent, "_credential_pool", None) | |
| if parent_pool is not None and effective_provider == parent_provider: | |
| return parent_pool | |
| try: | |
| from agent.credential_pool import load_pool | |
| pool = load_pool(effective_provider) | |
| if pool is not None and pool.has_credentials(): | |
| return pool | |
| except Exception as exc: | |
| logger.debug( | |
| "Could not load credential pool for child provider '%s': %s", | |
| effective_provider, | |
| exc, | |
| ) | |
| return None | |
| def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict: | |
| """Resolve credentials for subagent delegation. | |
| If ``delegation.base_url`` is configured, subagents use that direct | |
| OpenAI-compatible endpoint. Otherwise, if ``delegation.provider`` is | |
| configured, the full credential bundle (base_url, api_key, api_mode, | |
| provider) is resolved via the runtime provider system — the same path used | |
| by CLI/gateway startup. This lets subagents run on a completely different | |
| provider:model pair. | |
| If neither base_url nor provider is configured, returns None values so the | |
| child inherits everything from the parent agent. | |
| Raises ValueError with a user-friendly message on credential failure. | |
| """ | |
| configured_model = str(cfg.get("model") or "").strip() or None | |
| configured_provider = str(cfg.get("provider") or "").strip() or None | |
| configured_base_url = str(cfg.get("base_url") or "").strip() or None | |
| configured_api_key = str(cfg.get("api_key") or "").strip() or None | |
| if configured_base_url: | |
| api_key = ( | |
| configured_api_key | |
| or os.getenv("OPENAI_API_KEY", "").strip() | |
| ) | |
| if not api_key: | |
| raise ValueError( | |
| "Delegation base_url is configured but no API key was found. " | |
| "Set delegation.api_key or OPENAI_API_KEY." | |
| ) | |
| base_lower = configured_base_url.lower() | |
| provider = "custom" | |
| api_mode = "chat_completions" | |
| if "chatgpt.com/backend-api/codex" in base_lower: | |
| provider = "openai-codex" | |
| api_mode = "codex_responses" | |
| elif "api.anthropic.com" in base_lower: | |
| provider = "anthropic" | |
| api_mode = "anthropic_messages" | |
| return { | |
| "model": configured_model, | |
| "provider": provider, | |
| "base_url": configured_base_url, | |
| "api_key": api_key, | |
| "api_mode": api_mode, | |
| } | |
| if not configured_provider: | |
| # No provider override — child inherits everything from parent | |
| return { | |
| "model": configured_model, | |
| "provider": None, | |
| "base_url": None, | |
| "api_key": None, | |
| "api_mode": None, | |
| } | |
| # Provider is configured — resolve full credentials | |
| try: | |
| from hermes_cli.runtime_provider import resolve_runtime_provider | |
| runtime = resolve_runtime_provider(requested=configured_provider) | |
| except Exception as exc: | |
| raise ValueError( | |
| f"Cannot resolve delegation provider '{configured_provider}': {exc}. " | |
| f"Check that the provider is configured (API key set, valid provider name), " | |
| f"or set delegation.base_url/delegation.api_key for a direct endpoint. " | |
| f"Available providers: openrouter, nous, zai, kimi-coding, minimax." | |
| ) from exc | |
| api_key = runtime.get("api_key", "") | |
| if not api_key: | |
| raise ValueError( | |
| f"Delegation provider '{configured_provider}' resolved but has no API key. " | |
| f"Set the appropriate environment variable or run 'hermes auth'." | |
| ) | |
| return { | |
| "model": configured_model, | |
| "provider": runtime.get("provider"), | |
| "base_url": runtime.get("base_url"), | |
| "api_key": api_key, | |
| "api_mode": runtime.get("api_mode"), | |
| "command": runtime.get("command"), | |
| "args": list(runtime.get("args") or []), | |
| } | |
| def _load_config() -> dict: | |
| """Load delegation config from CLI_CONFIG or persistent config. | |
| Checks the runtime config (cli.py CLI_CONFIG) first, then falls back | |
| to the persistent config (hermes_cli/config.py load_config()) so that | |
| ``delegation.model`` / ``delegation.provider`` are picked up regardless | |
| of the entry point (CLI, gateway, cron). | |
| """ | |
| try: | |
| from cli import CLI_CONFIG | |
| cfg = CLI_CONFIG.get("delegation", {}) | |
| if cfg: | |
| return cfg | |
| except Exception: | |
| pass | |
| try: | |
| from hermes_cli.config import load_config | |
| full = load_config() | |
| return full.get("delegation", {}) | |
| except Exception: | |
| return {} | |
| # --------------------------------------------------------------------------- | |
| # OpenAI Function-Calling Schema | |
| # --------------------------------------------------------------------------- | |
| DELEGATE_TASK_SCHEMA = { | |
| "name": "delegate_task", | |
| "description": ( | |
| "Spawn one or more subagents to work on tasks in isolated contexts. " | |
| "Each subagent gets its own conversation, terminal session, and toolset. " | |
| "Only the final summary is returned -- intermediate tool results " | |
| "never enter your context window.\n\n" | |
| "TWO MODES (one of 'goal' or 'tasks' is required):\n" | |
| "1. Single task: provide 'goal' (+ optional context, toolsets)\n" | |
| "2. Batch (parallel): provide 'tasks' array with up to 3 items. " | |
| "All run concurrently and results are returned together.\n\n" | |
| "WHEN TO USE delegate_task:\n" | |
| "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n" | |
| "- Tasks that would flood your context with intermediate data\n" | |
| "- Parallel independent workstreams (research A and B simultaneously)\n\n" | |
| "WHEN NOT TO USE (use these instead):\n" | |
| "- Mechanical multi-step work with no reasoning needed -> use execute_code\n" | |
| "- Single tool call -> just call the tool directly\n" | |
| "- Tasks needing user interaction -> subagents cannot use clarify\n\n" | |
| "IMPORTANT:\n" | |
| "- Subagents have NO memory of your conversation. Pass all relevant " | |
| "info (file paths, error messages, constraints) via the 'context' field.\n" | |
| "- Subagents CANNOT call: delegate_task, clarify, memory, send_message, " | |
| "execute_code.\n" | |
| "- Each subagent gets its own terminal session (separate working directory and state).\n" | |
| "- Results are always returned as an array, one entry per task." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "goal": { | |
| "type": "string", | |
| "description": ( | |
| "What the subagent should accomplish. Be specific and " | |
| "self-contained -- the subagent knows nothing about your " | |
| "conversation history." | |
| ), | |
| }, | |
| "context": { | |
| "type": "string", | |
| "description": ( | |
| "Background information the subagent needs: file paths, " | |
| "error messages, project structure, constraints. The more " | |
| "specific you are, the better the subagent performs." | |
| ), | |
| }, | |
| "toolsets": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": ( | |
| "Toolsets to enable for this subagent. " | |
| "Default: inherits your enabled toolsets. " | |
| f"Available toolsets: {_TOOLSET_LIST_STR}. " | |
| "Common patterns: ['terminal', 'file'] for code work, " | |
| "['web'] for research, ['browser'] for web interaction, " | |
| "['terminal', 'file', 'web'] for full-stack tasks." | |
| ), | |
| }, | |
| "tasks": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "goal": {"type": "string", "description": "Task goal"}, | |
| "context": {"type": "string", "description": "Task-specific context"}, | |
| "toolsets": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": f"Toolsets for this specific task. Available: {_TOOLSET_LIST_STR}. Use 'web' for network access, 'terminal' for shell, 'browser' for web interaction.", | |
| }, | |
| "acp_command": { | |
| "type": "string", | |
| "description": "Per-task ACP command override (e.g. 'claude'). Overrides the top-level acp_command for this task only.", | |
| }, | |
| "acp_args": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Per-task ACP args override.", | |
| }, | |
| }, | |
| "required": ["goal"], | |
| }, | |
| # No maxItems — the runtime limit is configurable via | |
| # delegation.max_concurrent_children (default 3) and | |
| # enforced with a clear error in delegate_task(). | |
| "description": ( | |
| "Batch mode: tasks to run in parallel (limit configurable via delegation.max_concurrent_children, default 3). Each gets " | |
| "its own subagent with isolated context and terminal session. " | |
| "When provided, top-level goal/context/toolsets are ignored." | |
| ), | |
| }, | |
| "max_iterations": { | |
| "type": "integer", | |
| "description": ( | |
| "Max tool-calling turns per subagent (default: 50). " | |
| "Only set lower for simple tasks." | |
| ), | |
| }, | |
| "acp_command": { | |
| "type": "string", | |
| "description": ( | |
| "Override ACP command for child agents (e.g. 'claude', 'copilot'). " | |
| "When set, children use ACP subprocess transport instead of inheriting " | |
| "the parent's transport. Enables spawning Claude Code (claude --acp --stdio) " | |
| "or other ACP-capable agents from any parent, including Discord/Telegram/CLI." | |
| ), | |
| }, | |
| "acp_args": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": ( | |
| "Arguments for the ACP command (default: ['--acp', '--stdio']). " | |
| "Only used when acp_command is set. Example: ['--acp', '--stdio', '--model', 'claude-opus-4-6']" | |
| ), | |
| }, | |
| }, | |
| "required": [], | |
| }, | |
| } | |
| # --- Registry --- | |
| from tools.registry import registry, tool_error | |
| registry.register( | |
| name="delegate_task", | |
| toolset="delegation", | |
| schema=DELEGATE_TASK_SCHEMA, | |
| handler=lambda args, **kw: delegate_task( | |
| goal=args.get("goal"), | |
| context=args.get("context"), | |
| toolsets=args.get("toolsets"), | |
| tasks=args.get("tasks"), | |
| max_iterations=args.get("max_iterations"), | |
| acp_command=args.get("acp_command"), | |
| acp_args=args.get("acp_args"), | |
| parent_agent=kw.get("parent_agent")), | |
| check_fn=check_delegate_requirements, | |
| emoji="🔀", | |
| ) | |