#!/usr/bin/env python3 """ Delegate Tool -- Subagent Architecture Spawns child AIAgent instances with isolated context, restricted toolsets, and their own terminal sessions. Supports single-task and batch (parallel) modes. The parent blocks until all children complete. Each child gets: - A fresh conversation (no parent history) - Its own task_id (own terminal session, file ops cache) - A restricted toolset (configurable, with blocked tools always stripped) - A focused system prompt built from the delegated goal + context The parent's context only sees the delegation call and the summary result, never the child's intermediate tool calls or reasoning. """ import json import logging logger = logging.getLogger(__name__) import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional # Tools that children must never have access to DELEGATE_BLOCKED_TOOLS = frozenset([ "delegate_task", # no recursive delegation "clarify", # no user interaction "memory", # no writes to shared MEMORY.md "send_message", # no cross-platform side effects "execute_code", # children should reason step-by-step, not write scripts ]) MAX_CONCURRENT_CHILDREN = 3 MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2) DEFAULT_MAX_ITERATIONS = 50 DEFAULT_TOOLSETS = ["terminal", "file", "web"] def check_delegate_requirements() -> bool: """Delegation has no external requirements -- always available.""" return True def _build_child_system_prompt(goal: str, context: Optional[str] = None) -> str: """Build a focused system prompt for a child agent.""" parts = [ "You are a focused subagent working on a specific delegated task.", "", f"YOUR TASK:\n{goal}", ] if context and context.strip(): parts.append(f"\nCONTEXT:\n{context}") parts.append( "\nComplete this task using the tools available to you. " "When finished, provide a clear, concise summary of:\n" "- What you did\n" "- What you found or accomplished\n" "- Any files you created or modified\n" "- Any issues encountered\n\n" "Be thorough but concise -- your response is returned to the " "parent agent as a summary." ) return "\n".join(parts) def _strip_blocked_tools(toolsets: List[str]) -> List[str]: """Remove toolsets that contain only blocked tools.""" blocked_toolset_names = { "delegation", "clarify", "memory", "code_execution", } return [t for t in toolsets if t not in blocked_toolset_names] def _build_child_progress_callback(task_index: int, parent_agent, task_count: int = 1) -> Optional[callable]: """Build a callback that relays child agent tool calls to the parent display. Two display paths: CLI: prints tree-view lines above the parent's delegation spinner Gateway: batches tool names and relays to parent's progress callback Returns None if no display mechanism is available, in which case the child agent runs with no progress callback (identical to current behavior). """ spinner = getattr(parent_agent, '_delegate_spinner', None) parent_cb = getattr(parent_agent, 'tool_progress_callback', None) if not spinner and not parent_cb: return None # No display → no callback → zero behavior change # Show 1-indexed prefix only in batch mode (multiple tasks) prefix = f"[{task_index + 1}] " if task_count > 1 else "" # Gateway: batch tool names, flush periodically _BATCH_SIZE = 5 _batch: List[str] = [] def _callback(tool_name: str, preview: str = None): # Special "_thinking" event: model produced text content (reasoning) if tool_name == "_thinking": if spinner: short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "") try: spinner.print_above(f" {prefix}├─ 💭 \"{short}\"") except Exception as e: logger.debug("Spinner print_above failed: %s", e) # Don't relay thinking to gateway (too noisy for chat) return # Regular tool call event if spinner: short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") from agent.display import get_tool_emoji emoji = get_tool_emoji(tool_name) line = f" {prefix}├─ {emoji} {tool_name}" if short: line += f" \"{short}\"" try: spinner.print_above(line) except Exception as e: logger.debug("Spinner print_above failed: %s", e) if parent_cb: _batch.append(tool_name) if len(_batch) >= _BATCH_SIZE: summary = ", ".join(_batch) try: parent_cb("subagent_progress", f"🔀 {prefix}{summary}") except Exception as e: logger.debug("Parent callback failed: %s", e) _batch.clear() def _flush(): """Flush remaining batched tool names to gateway on completion.""" if parent_cb and _batch: summary = ", ".join(_batch) try: parent_cb("subagent_progress", f"🔀 {prefix}{summary}") except Exception as e: logger.debug("Parent callback flush failed: %s", e) _batch.clear() _callback._flush = _flush return _callback def _build_child_agent( task_index: int, goal: str, context: Optional[str], toolsets: Optional[List[str]], model: Optional[str], max_iterations: int, parent_agent, # Credential overrides from delegation config (provider:model resolution) override_provider: Optional[str] = None, override_base_url: Optional[str] = None, override_api_key: Optional[str] = None, override_api_mode: Optional[str] = None, ): """ Build a child AIAgent on the main thread (thread-safe construction). Returns the constructed child agent without running it. When override_* params are set (from delegation config), the child uses those credentials instead of inheriting from the parent. This enables routing subagents to a different provider:model pair (e.g. cheap/fast model on OpenRouter while the parent runs on Nous Portal). """ from run_agent import AIAgent import model_tools # When no explicit toolsets given, inherit from parent's enabled toolsets # so disabled tools (e.g. web) don't leak to subagents. if toolsets: child_toolsets = _strip_blocked_tools(toolsets) elif parent_agent and getattr(parent_agent, "enabled_toolsets", None): child_toolsets = _strip_blocked_tools(parent_agent.enabled_toolsets) else: child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS) child_prompt = _build_child_system_prompt(goal, context) # Extract parent's API key so subagents inherit auth (e.g. Nous Portal). parent_api_key = getattr(parent_agent, "api_key", None) if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"): parent_api_key = parent_agent._client_kwargs.get("api_key") # Build progress callback to relay tool calls to parent display child_progress_cb = _build_child_progress_callback(task_index, parent_agent) # Share the parent's iteration budget so subagent tool calls # count toward the session-wide limit. shared_budget = getattr(parent_agent, "iteration_budget", None) # Resolve effective credentials: config override > parent inherit effective_model = model or parent_agent.model effective_provider = override_provider or getattr(parent_agent, "provider", None) effective_base_url = override_base_url or parent_agent.base_url effective_api_key = override_api_key or parent_api_key effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None) effective_acp_command = getattr(parent_agent, "acp_command", None) effective_acp_args = list(getattr(parent_agent, "acp_args", []) or []) child = AIAgent( base_url=effective_base_url, api_key=effective_api_key, model=effective_model, provider=effective_provider, api_mode=effective_api_mode, acp_command=effective_acp_command, acp_args=effective_acp_args, max_iterations=max_iterations, max_tokens=getattr(parent_agent, "max_tokens", None), reasoning_config=getattr(parent_agent, "reasoning_config", None), prefill_messages=getattr(parent_agent, "prefill_messages", None), enabled_toolsets=child_toolsets, quiet_mode=True, ephemeral_system_prompt=child_prompt, log_prefix=f"[subagent-{task_index}]", platform=parent_agent.platform, skip_context_files=True, skip_memory=True, clarify_callback=None, session_db=getattr(parent_agent, '_session_db', None), providers_allowed=parent_agent.providers_allowed, providers_ignored=parent_agent.providers_ignored, providers_order=parent_agent.providers_order, provider_sort=parent_agent.provider_sort, tool_progress_callback=child_progress_cb, iteration_budget=shared_budget, ) # Set delegation depth so children can't spawn grandchildren child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1 # Register child for interrupt propagation if hasattr(parent_agent, '_active_children'): lock = getattr(parent_agent, '_active_children_lock', None) if lock: with lock: parent_agent._active_children.append(child) else: parent_agent._active_children.append(child) return child def _run_single_child( task_index: int, goal: str, child=None, parent_agent=None, **_kwargs, ) -> Dict[str, Any]: """ Run a pre-built child agent. Called from within a thread. Returns a structured result dict. """ child_start = time.monotonic() # Get the progress callback from the child agent child_progress_cb = getattr(child, 'tool_progress_callback', None) # Restore parent tool names using the value saved before child construction # mutated the global. This is the correct parent toolset, not the child's. import model_tools _saved_tool_names = getattr(child, "_delegate_saved_tool_names", list(model_tools._last_resolved_tool_names)) try: result = child.run_conversation(user_message=goal) # Flush any remaining batched progress to gateway if child_progress_cb and hasattr(child_progress_cb, '_flush'): try: child_progress_cb._flush() except Exception as e: logger.debug("Progress callback flush failed: %s", e) duration = round(time.monotonic() - child_start, 2) summary = result.get("final_response") or "" completed = result.get("completed", False) interrupted = result.get("interrupted", False) api_calls = result.get("api_calls", 0) if interrupted: status = "interrupted" elif completed and summary: status = "completed" else: status = "failed" # Build tool trace from conversation messages (already in memory). # Uses tool_call_id to correctly pair parallel tool calls with results. tool_trace: list[Dict[str, Any]] = [] trace_by_id: Dict[str, Dict[str, Any]] = {} messages = result.get("messages") or [] if isinstance(messages, list): for msg in messages: if not isinstance(msg, dict): continue if msg.get("role") == "assistant": for tc in (msg.get("tool_calls") or []): fn = tc.get("function", {}) entry_t = { "tool": fn.get("name", "unknown"), "args_bytes": len(fn.get("arguments", "")), } tool_trace.append(entry_t) tc_id = tc.get("id") if tc_id: trace_by_id[tc_id] = entry_t elif msg.get("role") == "tool": content = msg.get("content", "") is_error = bool( content and "error" in content[:80].lower() ) result_meta = { "result_bytes": len(content), "status": "error" if is_error else "ok", } # Match by tool_call_id for parallel calls tc_id = msg.get("tool_call_id") target = trace_by_id.get(tc_id) if tc_id else None if target is not None: target.update(result_meta) elif tool_trace: # Fallback for messages without tool_call_id tool_trace[-1].update(result_meta) # Determine exit reason if interrupted: exit_reason = "interrupted" elif completed: exit_reason = "completed" else: exit_reason = "max_iterations" # Extract token counts (safe for mock objects) _input_tokens = getattr(child, "session_prompt_tokens", 0) _output_tokens = getattr(child, "session_completion_tokens", 0) _model = getattr(child, "model", None) entry: Dict[str, Any] = { "task_index": task_index, "status": status, "summary": summary, "api_calls": api_calls, "duration_seconds": duration, "model": _model if isinstance(_model, str) else None, "exit_reason": exit_reason, "tokens": { "input": _input_tokens if isinstance(_input_tokens, (int, float)) else 0, "output": _output_tokens if isinstance(_output_tokens, (int, float)) else 0, }, "tool_trace": tool_trace, } if status == "failed": entry["error"] = result.get("error", "Subagent did not produce a response.") return entry except Exception as exc: duration = round(time.monotonic() - child_start, 2) logging.exception(f"[subagent-{task_index}] failed") return { "task_index": task_index, "status": "error", "summary": None, "error": str(exc), "api_calls": 0, "duration_seconds": duration, } finally: # Restore the parent's tool names so the process-global is correct # for any subsequent execute_code calls or other consumers. import model_tools saved_tool_names = getattr(child, "_delegate_saved_tool_names", None) if isinstance(saved_tool_names, list): model_tools._last_resolved_tool_names = list(saved_tool_names) # Unregister child from interrupt propagation if hasattr(parent_agent, '_active_children'): try: lock = getattr(parent_agent, '_active_children_lock', None) if lock: with lock: parent_agent._active_children.remove(child) else: parent_agent._active_children.remove(child) except (ValueError, UnboundLocalError) as e: logger.debug("Could not remove child from active_children: %s", e) def delegate_task( goal: Optional[str] = None, context: Optional[str] = None, toolsets: Optional[List[str]] = None, tasks: Optional[List[Dict[str, Any]]] = None, max_iterations: Optional[int] = None, parent_agent=None, ) -> str: """ Spawn one or more child agents to handle delegated tasks. Supports two modes: - Single: provide goal (+ optional context, toolsets) - Batch: provide tasks array [{goal, context, toolsets}, ...] Returns JSON with results array, one entry per task. """ if parent_agent is None: return json.dumps({"error": "delegate_task requires a parent agent context."}) # Depth limit depth = getattr(parent_agent, '_delegate_depth', 0) if depth >= MAX_DEPTH: return json.dumps({ "error": ( f"Delegation depth limit reached ({MAX_DEPTH}). " "Subagents cannot spawn further subagents." ) }) # Load config cfg = _load_config() default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS) effective_max_iter = max_iterations or default_max_iter # Resolve delegation credentials (provider:model pair). # When delegation.provider is configured, this resolves the full credential # bundle (base_url, api_key, api_mode) via the same runtime provider system # used by CLI/gateway startup. When unconfigured, returns None values so # children inherit from the parent. try: creds = _resolve_delegation_credentials(cfg, parent_agent) except ValueError as exc: return json.dumps({"error": str(exc)}) # Normalize to task list if tasks and isinstance(tasks, list): task_list = tasks[:MAX_CONCURRENT_CHILDREN] elif goal and isinstance(goal, str) and goal.strip(): task_list = [{"goal": goal, "context": context, "toolsets": toolsets}] else: return json.dumps({"error": "Provide either 'goal' (single task) or 'tasks' (batch)."}) if not task_list: return json.dumps({"error": "No tasks provided."}) # Validate each task has a goal for i, task in enumerate(task_list): if not task.get("goal", "").strip(): return json.dumps({"error": f"Task {i} is missing a 'goal'."}) overall_start = time.monotonic() results = [] n_tasks = len(task_list) # Track goal labels for progress display (truncated for readability) task_labels = [t["goal"][:40] for t in task_list] # Save parent tool names BEFORE any child construction mutates the global. # _build_child_agent() calls AIAgent() which calls get_tool_definitions(), # which overwrites model_tools._last_resolved_tool_names with child's toolset. import model_tools as _model_tools _parent_tool_names = list(_model_tools._last_resolved_tool_names) # Build all child agents on the main thread (thread-safe construction) # Wrapped in try/finally so the global is always restored even if a # child build raises (otherwise _last_resolved_tool_names stays corrupted). children = [] try: for i, t in enumerate(task_list): child = _build_child_agent( task_index=i, goal=t["goal"], context=t.get("context"), toolsets=t.get("toolsets") or toolsets, model=creds["model"], max_iterations=effective_max_iter, parent_agent=parent_agent, override_provider=creds["provider"], override_base_url=creds["base_url"], override_api_key=creds["api_key"], override_api_mode=creds["api_mode"], ) # Override with correct parent tool names (before child construction mutated global) child._delegate_saved_tool_names = _parent_tool_names children.append((i, t, child)) finally: # Authoritative restore: reset global to parent's tool names after all children built _model_tools._last_resolved_tool_names = _parent_tool_names if n_tasks == 1: # Single task -- run directly (no thread pool overhead) _i, _t, child = children[0] result = _run_single_child(0, _t["goal"], child, parent_agent) results.append(result) else: # Batch -- run in parallel with per-task progress lines completed_count = 0 spinner_ref = getattr(parent_agent, '_delegate_spinner', None) with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor: futures = {} for i, t, child in children: future = executor.submit( _run_single_child, task_index=i, goal=t["goal"], child=child, parent_agent=parent_agent, ) futures[future] = i for future in as_completed(futures): try: entry = future.result() except Exception as exc: idx = futures[future] entry = { "task_index": idx, "status": "error", "summary": None, "error": str(exc), "api_calls": 0, "duration_seconds": 0, } results.append(entry) completed_count += 1 # Print per-task completion line above the spinner idx = entry["task_index"] label = task_labels[idx] if idx < len(task_labels) else f"Task {idx}" dur = entry.get("duration_seconds", 0) status = entry.get("status", "?") icon = "✓" if status == "completed" else "✗" remaining = n_tasks - completed_count completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)" if spinner_ref: try: spinner_ref.print_above(completion_line) except Exception: print(f" {completion_line}") else: print(f" {completion_line}") # Update spinner text to show remaining count if spinner_ref and remaining > 0: try: spinner_ref.update_text(f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining") except Exception as e: logger.debug("Spinner update_text failed: %s", e) # Sort by task_index so results match input order results.sort(key=lambda r: r["task_index"]) total_duration = round(time.monotonic() - overall_start, 2) return json.dumps({ "results": results, "total_duration_seconds": total_duration, }, ensure_ascii=False) def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict: """Resolve credentials for subagent delegation. If ``delegation.base_url`` is configured, subagents use that direct OpenAI-compatible endpoint. Otherwise, if ``delegation.provider`` is configured, the full credential bundle (base_url, api_key, api_mode, provider) is resolved via the runtime provider system — the same path used by CLI/gateway startup. This lets subagents run on a completely different provider:model pair. If neither base_url nor provider is configured, returns None values so the child inherits everything from the parent agent. Raises ValueError with a user-friendly message on credential failure. """ configured_model = str(cfg.get("model") or "").strip() or None configured_provider = str(cfg.get("provider") or "").strip() or None configured_base_url = str(cfg.get("base_url") or "").strip() or None configured_api_key = str(cfg.get("api_key") or "").strip() or None if configured_base_url: api_key = ( configured_api_key or os.getenv("OPENAI_API_KEY", "").strip() ) if not api_key: raise ValueError( "Delegation base_url is configured but no API key was found. " "Set delegation.api_key or OPENAI_API_KEY." ) base_lower = configured_base_url.lower() provider = "custom" api_mode = "chat_completions" if "chatgpt.com/backend-api/codex" in base_lower: provider = "openai-codex" api_mode = "codex_responses" elif "api.anthropic.com" in base_lower: provider = "anthropic" api_mode = "anthropic_messages" return { "model": configured_model, "provider": provider, "base_url": configured_base_url, "api_key": api_key, "api_mode": api_mode, } if not configured_provider: # No provider override — child inherits everything from parent return { "model": configured_model, "provider": None, "base_url": None, "api_key": None, "api_mode": None, } # Provider is configured — resolve full credentials try: from hermes_cli.runtime_provider import resolve_runtime_provider runtime = resolve_runtime_provider(requested=configured_provider) except Exception as exc: raise ValueError( f"Cannot resolve delegation provider '{configured_provider}': {exc}. " f"Check that the provider is configured (API key set, valid provider name), " f"or set delegation.base_url/delegation.api_key for a direct endpoint. " f"Available providers: openrouter, nous, zai, kimi-coding, minimax." ) from exc api_key = runtime.get("api_key", "") if not api_key: raise ValueError( f"Delegation provider '{configured_provider}' resolved but has no API key. " f"Set the appropriate environment variable or run 'hermes login'." ) return { "model": configured_model, "provider": runtime.get("provider"), "base_url": runtime.get("base_url"), "api_key": api_key, "api_mode": runtime.get("api_mode"), "command": runtime.get("command"), "args": list(runtime.get("args") or []), } def _load_config() -> dict: """Load delegation config from CLI_CONFIG or persistent config. Checks the runtime config (cli.py CLI_CONFIG) first, then falls back to the persistent config (hermes_cli/config.py load_config()) so that ``delegation.model`` / ``delegation.provider`` are picked up regardless of the entry point (CLI, gateway, cron). """ try: from cli import CLI_CONFIG cfg = CLI_CONFIG.get("delegation", {}) if cfg: return cfg except Exception: pass try: from hermes_cli.config import load_config full = load_config() return full.get("delegation", {}) except Exception: return {} # --------------------------------------------------------------------------- # OpenAI Function-Calling Schema # --------------------------------------------------------------------------- DELEGATE_TASK_SCHEMA = { "name": "delegate_task", "description": ( "Spawn one or more subagents to work on tasks in isolated contexts. " "Each subagent gets its own conversation, terminal session, and toolset. " "Only the final summary is returned -- intermediate tool results " "never enter your context window.\n\n" "TWO MODES (one of 'goal' or 'tasks' is required):\n" "1. Single task: provide 'goal' (+ optional context, toolsets)\n" "2. Batch (parallel): provide 'tasks' array with up to 3 items. " "All run concurrently and results are returned together.\n\n" "WHEN TO USE delegate_task:\n" "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n" "- Tasks that would flood your context with intermediate data\n" "- Parallel independent workstreams (research A and B simultaneously)\n\n" "WHEN NOT TO USE (use these instead):\n" "- Mechanical multi-step work with no reasoning needed -> use execute_code\n" "- Single tool call -> just call the tool directly\n" "- Tasks needing user interaction -> subagents cannot use clarify\n\n" "IMPORTANT:\n" "- Subagents have NO memory of your conversation. Pass all relevant " "info (file paths, error messages, constraints) via the 'context' field.\n" "- Subagents CANNOT call: delegate_task, clarify, memory, send_message, " "execute_code.\n" "- Each subagent gets its own terminal session (separate working directory and state).\n" "- Results are always returned as an array, one entry per task." ), "parameters": { "type": "object", "properties": { "goal": { "type": "string", "description": ( "What the subagent should accomplish. Be specific and " "self-contained -- the subagent knows nothing about your " "conversation history." ), }, "context": { "type": "string", "description": ( "Background information the subagent needs: file paths, " "error messages, project structure, constraints. The more " "specific you are, the better the subagent performs." ), }, "toolsets": { "type": "array", "items": {"type": "string"}, "description": ( "Toolsets to enable for this subagent. " "Default: inherits your enabled toolsets. " "Common patterns: ['terminal', 'file'] for code work, " "['web'] for research, ['terminal', 'file', 'web'] for " "full-stack tasks." ), }, "tasks": { "type": "array", "items": { "type": "object", "properties": { "goal": {"type": "string", "description": "Task goal"}, "context": {"type": "string", "description": "Task-specific context"}, "toolsets": { "type": "array", "items": {"type": "string"}, "description": "Toolsets for this specific task", }, }, "required": ["goal"], }, "maxItems": 3, "description": ( "Batch mode: up to 3 tasks to run in parallel. Each gets " "its own subagent with isolated context and terminal session. " "When provided, top-level goal/context/toolsets are ignored." ), }, "max_iterations": { "type": "integer", "description": ( "Max tool-calling turns per subagent (default: 50). " "Only set lower for simple tasks." ), }, }, "required": [], }, } # --- Registry --- from tools.registry import registry registry.register( name="delegate_task", toolset="delegation", schema=DELEGATE_TASK_SCHEMA, handler=lambda args, **kw: delegate_task( goal=args.get("goal"), context=args.get("context"), toolsets=args.get("toolsets"), tasks=args.get("tasks"), max_iterations=args.get("max_iterations"), parent_agent=kw.get("parent_agent")), check_fn=check_delegate_requirements, emoji="🔀", )