| |
| """ |
| Tests for the subagent delegation tool. |
| |
| Uses mock AIAgent instances to test the delegation logic without |
| requiring API keys or real LLM calls. |
| |
| Run with: python -m pytest tests/test_delegate.py -v |
| or: python tests/test_delegate.py |
| """ |
|
|
| import json |
| import os |
| import sys |
| import threading |
| import time |
| import unittest |
| from unittest.mock import MagicMock, patch |
|
|
| from tools.delegate_tool import ( |
| DELEGATE_BLOCKED_TOOLS, |
| DELEGATE_TASK_SCHEMA, |
| DelegateEvent, |
| _get_max_concurrent_children, |
| _LEGACY_EVENT_MAP, |
| MAX_DEPTH, |
| check_delegate_requirements, |
| delegate_task, |
| _build_child_agent, |
| _build_child_progress_callback, |
| _build_child_system_prompt, |
| _strip_blocked_tools, |
| _resolve_child_credential_pool, |
| _resolve_delegation_credentials, |
| ) |
|
|
|
|
| def _make_mock_parent(depth=0): |
| """Create a mock parent agent with the fields delegate_task expects.""" |
| parent = MagicMock() |
| parent.base_url = "https://openrouter.ai/api/v1" |
| parent.api_key="***" |
| parent.provider = "openrouter" |
| parent.api_mode = "chat_completions" |
| parent.model = "anthropic/claude-sonnet-4" |
| parent.platform = "cli" |
| parent.providers_allowed = None |
| parent.providers_ignored = None |
| parent.providers_order = None |
| parent.provider_sort = None |
| parent._session_db = None |
| parent._delegate_depth = depth |
| parent._active_children = [] |
| parent._active_children_lock = threading.Lock() |
| parent._print_fn = None |
| parent.tool_progress_callback = None |
| parent.thinking_callback = None |
| return parent |
|
|
|
|
| class TestDelegateRequirements(unittest.TestCase): |
| def test_always_available(self): |
| self.assertTrue(check_delegate_requirements()) |
|
|
| def test_schema_valid(self): |
| self.assertEqual(DELEGATE_TASK_SCHEMA["name"], "delegate_task") |
| props = DELEGATE_TASK_SCHEMA["parameters"]["properties"] |
| self.assertIn("goal", props) |
| self.assertIn("tasks", props) |
| self.assertIn("context", props) |
| self.assertIn("toolsets", props) |
| |
| |
| |
| self.assertNotIn("max_iterations", props) |
| self.assertNotIn("maxItems", props["tasks"]) |
|
|
|
|
| class TestChildSystemPrompt(unittest.TestCase): |
| def test_goal_only(self): |
| prompt = _build_child_system_prompt("Fix the tests") |
| self.assertIn("Fix the tests", prompt) |
| self.assertIn("YOUR TASK", prompt) |
| self.assertNotIn("CONTEXT", prompt) |
|
|
| def test_goal_with_context(self): |
| prompt = _build_child_system_prompt("Fix the tests", "Error: assertion failed in test_foo.py line 42") |
| self.assertIn("Fix the tests", prompt) |
| self.assertIn("CONTEXT", prompt) |
| self.assertIn("assertion failed", prompt) |
|
|
| def test_empty_context_ignored(self): |
| prompt = _build_child_system_prompt("Do something", " ") |
| self.assertNotIn("CONTEXT", prompt) |
|
|
|
|
| class TestStripBlockedTools(unittest.TestCase): |
| def test_removes_blocked_toolsets(self): |
| result = _strip_blocked_tools(["terminal", "file", "delegation", "clarify", "memory", "code_execution"]) |
| self.assertEqual(sorted(result), ["file", "terminal"]) |
|
|
| def test_preserves_allowed_toolsets(self): |
| result = _strip_blocked_tools(["terminal", "file", "web", "browser"]) |
| self.assertEqual(sorted(result), ["browser", "file", "terminal", "web"]) |
|
|
| def test_empty_input(self): |
| result = _strip_blocked_tools([]) |
| self.assertEqual(result, []) |
|
|
|
|
| class TestDelegateTask(unittest.TestCase): |
| def test_no_parent_agent(self): |
| result = json.loads(delegate_task(goal="test")) |
| self.assertIn("error", result) |
| self.assertIn("parent agent", result["error"]) |
|
|
| def test_depth_limit(self): |
| parent = _make_mock_parent(depth=2) |
| result = json.loads(delegate_task(goal="test", parent_agent=parent)) |
| self.assertIn("error", result) |
| self.assertIn("depth limit", result["error"].lower()) |
|
|
| def test_no_goal_or_tasks(self): |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task(parent_agent=parent)) |
| self.assertIn("error", result) |
|
|
| def test_empty_goal(self): |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task(goal=" ", parent_agent=parent)) |
| self.assertIn("error", result) |
|
|
| def test_task_missing_goal(self): |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task(tasks=[{"context": "no goal here"}], parent_agent=parent)) |
| self.assertIn("error", result) |
|
|
| @patch("tools.delegate_tool._run_single_child") |
| def test_single_task_mode(self, mock_run): |
| mock_run.return_value = { |
| "task_index": 0, "status": "completed", |
| "summary": "Done!", "api_calls": 3, "duration_seconds": 5.0 |
| } |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task(goal="Fix tests", context="error log...", parent_agent=parent)) |
| self.assertIn("results", result) |
| self.assertEqual(len(result["results"]), 1) |
| self.assertEqual(result["results"][0]["status"], "completed") |
| self.assertEqual(result["results"][0]["summary"], "Done!") |
| mock_run.assert_called_once() |
|
|
| @patch("tools.delegate_tool._run_single_child") |
| def test_batch_mode(self, mock_run): |
| mock_run.side_effect = [ |
| {"task_index": 0, "status": "completed", "summary": "Result A", "api_calls": 2, "duration_seconds": 3.0}, |
| {"task_index": 1, "status": "completed", "summary": "Result B", "api_calls": 4, "duration_seconds": 6.0}, |
| ] |
| parent = _make_mock_parent() |
| tasks = [ |
| {"goal": "Research topic A"}, |
| {"goal": "Research topic B"}, |
| ] |
| result = json.loads(delegate_task(tasks=tasks, parent_agent=parent)) |
| self.assertIn("results", result) |
| self.assertEqual(len(result["results"]), 2) |
| self.assertEqual(result["results"][0]["summary"], "Result A") |
| self.assertEqual(result["results"][1]["summary"], "Result B") |
| self.assertIn("total_duration_seconds", result) |
|
|
| @patch("tools.delegate_tool._run_single_child") |
| def test_batch_capped_at_3(self, mock_run): |
| mock_run.return_value = { |
| "task_index": 0, "status": "completed", |
| "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 |
| } |
| parent = _make_mock_parent() |
| limit = _get_max_concurrent_children() |
| tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)] |
| result = json.loads(delegate_task(tasks=tasks, parent_agent=parent)) |
| |
| self.assertIn("error", result) |
| self.assertIn("Too many tasks", result["error"]) |
| mock_run.assert_not_called() |
|
|
| @patch("tools.delegate_tool._run_single_child") |
| def test_batch_ignores_toplevel_goal(self, mock_run): |
| """When tasks array is provided, top-level goal/context/toolsets are ignored.""" |
| mock_run.return_value = { |
| "task_index": 0, "status": "completed", |
| "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 |
| } |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task( |
| goal="This should be ignored", |
| tasks=[{"goal": "Actual task"}], |
| parent_agent=parent, |
| )) |
| |
| call_args = mock_run.call_args |
| self.assertEqual(call_args.kwargs.get("goal") or call_args[1].get("goal", call_args[0][1] if len(call_args[0]) > 1 else None), "Actual task") |
|
|
| @patch("tools.delegate_tool._run_single_child") |
| def test_failed_child_included_in_results(self, mock_run): |
| mock_run.return_value = { |
| "task_index": 0, "status": "error", |
| "summary": None, "error": "Something broke", |
| "api_calls": 0, "duration_seconds": 0.5 |
| } |
| parent = _make_mock_parent() |
| result = json.loads(delegate_task(goal="Break things", parent_agent=parent)) |
| self.assertEqual(result["results"][0]["status"], "error") |
| self.assertIn("Something broke", result["results"][0]["error"]) |
|
|
| def test_depth_increments(self): |
| """Verify child gets parent's depth + 1.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test depth", parent_agent=parent) |
| self.assertEqual(mock_child._delegate_depth, 1) |
|
|
| def test_active_children_tracking(self): |
| """Verify children are registered/unregistered for interrupt propagation.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test tracking", parent_agent=parent) |
| self.assertEqual(len(parent._active_children), 0) |
|
|
| def test_child_inherits_runtime_credentials(self): |
| parent = _make_mock_parent(depth=0) |
| parent.base_url = "https://chatgpt.com/backend-api/codex" |
| parent.api_key="***" |
| parent.provider = "openai-codex" |
| parent.api_mode = "codex_responses" |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "ok", |
| "completed": True, |
| "api_calls": 1, |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test runtime inheritance", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| self.assertEqual(kwargs["base_url"], parent.base_url) |
| self.assertEqual(kwargs["api_key"], parent.api_key) |
| self.assertEqual(kwargs["provider"], parent.provider) |
| self.assertEqual(kwargs["api_mode"], parent.api_mode) |
|
|
| def test_child_inherits_parent_print_fn(self): |
| parent = _make_mock_parent(depth=0) |
| sink = MagicMock() |
| parent._print_fn = sink |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| MockAgent.return_value = mock_child |
|
|
| _build_child_agent( |
| task_index=0, |
| goal="Keep stdout clean", |
| context=None, |
| toolsets=None, |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
|
|
| self.assertIs(mock_child._print_fn, sink) |
|
|
| def test_child_uses_thinking_callback_when_progress_callback_available(self): |
| parent = _make_mock_parent(depth=0) |
| parent.tool_progress_callback = MagicMock() |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| MockAgent.return_value = mock_child |
|
|
| _build_child_agent( |
| task_index=0, |
| goal="Avoid raw child spinners", |
| context=None, |
| toolsets=None, |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
|
|
| self.assertTrue(callable(mock_child.thinking_callback)) |
| mock_child.thinking_callback("deliberating...") |
| parent.tool_progress_callback.assert_not_called() |
|
|
|
|
| class TestToolNamePreservation(unittest.TestCase): |
| """Verify _last_resolved_tool_names is restored after subagent runs.""" |
|
|
| def test_global_tool_names_restored_after_delegation(self): |
| """The process-global _last_resolved_tool_names must be restored |
| after a subagent completes so the parent's execute_code sandbox |
| generates correct imports.""" |
| import model_tools |
|
|
| parent = _make_mock_parent(depth=0) |
| original_tools = ["terminal", "read_file", "web_search", "execute_code", "delegate_task"] |
| model_tools._last_resolved_tool_names = list(original_tools) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1, |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test tool preservation", parent_agent=parent) |
|
|
| self.assertEqual(model_tools._last_resolved_tool_names, original_tools) |
|
|
| def test_global_tool_names_restored_after_child_failure(self): |
| """Even when the child agent raises, the global must be restored.""" |
| import model_tools |
|
|
| parent = _make_mock_parent(depth=0) |
| original_tools = ["terminal", "read_file", "web_search"] |
| model_tools._last_resolved_tool_names = list(original_tools) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.side_effect = RuntimeError("boom") |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Crash test", parent_agent=parent)) |
| self.assertEqual(result["results"][0]["status"], "error") |
|
|
| self.assertEqual(model_tools._last_resolved_tool_names, original_tools) |
|
|
| def test_build_child_agent_does_not_raise_name_error(self): |
| """Regression: _build_child_agent must not reference _saved_tool_names. |
| |
| The bug introduced by the e7844e9c merge conflict: line 235 inside |
| _build_child_agent read `list(_saved_tool_names)` where that variable |
| is only defined later in _run_single_child. Calling _build_child_agent |
| standalone (without _run_single_child's scope) must never raise NameError. |
| """ |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent"): |
| try: |
| _build_child_agent( |
| task_index=0, |
| goal="regression check", |
| context=None, |
| toolsets=None, |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
| except NameError as exc: |
| self.fail( |
| f"_build_child_agent raised NameError — " |
| f"_saved_tool_names leaked back into wrong scope: {exc}" |
| ) |
|
|
| def test_saved_tool_names_set_on_child_before_run(self): |
| """_run_single_child must set _delegate_saved_tool_names on the child |
| from model_tools._last_resolved_tool_names before run_conversation.""" |
| import model_tools |
|
|
| parent = _make_mock_parent(depth=0) |
| expected_tools = ["read_file", "web_search", "execute_code"] |
| model_tools._last_resolved_tool_names = list(expected_tools) |
|
|
| captured = {} |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
|
|
| def capture_and_return(user_message, task_id=None): |
| captured["saved"] = list(mock_child._delegate_saved_tool_names) |
| return {"final_response": "ok", "completed": True, "api_calls": 1} |
|
|
| mock_child.run_conversation.side_effect = capture_and_return |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="capture test", parent_agent=parent) |
|
|
| self.assertEqual(captured["saved"], expected_tools) |
|
|
|
|
| class TestDelegateObservability(unittest.TestCase): |
| """Tests for enriched metadata returned by _run_single_child.""" |
|
|
| def test_observability_fields_present(self): |
| """Completed child should return tool_trace, tokens, model, exit_reason.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.model = "claude-sonnet-4-6" |
| mock_child.session_prompt_tokens = 5000 |
| mock_child.session_completion_tokens = 1200 |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", |
| "completed": True, |
| "interrupted": False, |
| "api_calls": 3, |
| "messages": [ |
| {"role": "user", "content": "do something"}, |
| {"role": "assistant", "tool_calls": [ |
| {"id": "tc_1", "function": {"name": "web_search", "arguments": '{"query": "test"}'}} |
| ]}, |
| {"role": "tool", "tool_call_id": "tc_1", "content": '{"results": [1,2,3]}'}, |
| {"role": "assistant", "content": "done"}, |
| ], |
| } |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Test observability", parent_agent=parent)) |
| entry = result["results"][0] |
|
|
| |
| self.assertEqual(entry["model"], "claude-sonnet-4-6") |
| self.assertEqual(entry["exit_reason"], "completed") |
| self.assertEqual(entry["tokens"]["input"], 5000) |
| self.assertEqual(entry["tokens"]["output"], 1200) |
|
|
| |
| self.assertEqual(len(entry["tool_trace"]), 1) |
| self.assertEqual(entry["tool_trace"][0]["tool"], "web_search") |
| self.assertIn("args_bytes", entry["tool_trace"][0]) |
| self.assertIn("result_bytes", entry["tool_trace"][0]) |
| self.assertEqual(entry["tool_trace"][0]["status"], "ok") |
|
|
| def test_tool_trace_detects_error(self): |
| """Tool results containing 'error' should be marked as error status.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.model = "claude-sonnet-4-6" |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.run_conversation.return_value = { |
| "final_response": "failed", |
| "completed": True, |
| "interrupted": False, |
| "api_calls": 1, |
| "messages": [ |
| {"role": "assistant", "tool_calls": [ |
| {"id": "tc_1", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}} |
| ]}, |
| {"role": "tool", "tool_call_id": "tc_1", "content": "Error: command not found"}, |
| ], |
| } |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Test error trace", parent_agent=parent)) |
| trace = result["results"][0]["tool_trace"] |
| self.assertEqual(trace[0]["status"], "error") |
|
|
| def test_parallel_tool_calls_paired_correctly(self): |
| """Parallel tool calls should each get their own result via tool_call_id matching.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.model = "claude-sonnet-4-6" |
| mock_child.session_prompt_tokens = 3000 |
| mock_child.session_completion_tokens = 800 |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", |
| "completed": True, |
| "interrupted": False, |
| "api_calls": 1, |
| "messages": [ |
| {"role": "assistant", "tool_calls": [ |
| {"id": "tc_a", "function": {"name": "web_search", "arguments": '{"q": "a"}'}}, |
| {"id": "tc_b", "function": {"name": "web_search", "arguments": '{"q": "b"}'}}, |
| {"id": "tc_c", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}}, |
| ]}, |
| {"role": "tool", "tool_call_id": "tc_a", "content": '{"ok": true}'}, |
| {"role": "tool", "tool_call_id": "tc_b", "content": "Error: rate limited"}, |
| {"role": "tool", "tool_call_id": "tc_c", "content": "file1.txt\nfile2.txt"}, |
| {"role": "assistant", "content": "done"}, |
| ], |
| } |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Test parallel", parent_agent=parent)) |
| trace = result["results"][0]["tool_trace"] |
|
|
| |
| self.assertEqual(len(trace), 3) |
|
|
| |
| self.assertEqual(trace[0]["tool"], "web_search") |
| self.assertEqual(trace[0]["status"], "ok") |
| self.assertIn("result_bytes", trace[0]) |
|
|
| |
| self.assertEqual(trace[1]["tool"], "web_search") |
| self.assertEqual(trace[1]["status"], "error") |
| self.assertIn("result_bytes", trace[1]) |
|
|
| |
| self.assertEqual(trace[2]["tool"], "terminal") |
| self.assertEqual(trace[2]["status"], "ok") |
| self.assertIn("result_bytes", trace[2]) |
|
|
| def test_exit_reason_interrupted(self): |
| """Interrupted child should report exit_reason='interrupted'.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.model = "claude-sonnet-4-6" |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.run_conversation.return_value = { |
| "final_response": "", |
| "completed": False, |
| "interrupted": True, |
| "api_calls": 2, |
| "messages": [], |
| } |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Test interrupt", parent_agent=parent)) |
| self.assertEqual(result["results"][0]["exit_reason"], "interrupted") |
|
|
| def test_exit_reason_max_iterations(self): |
| """Child that didn't complete and wasn't interrupted hit max_iterations.""" |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.model = "claude-sonnet-4-6" |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.run_conversation.return_value = { |
| "final_response": "", |
| "completed": False, |
| "interrupted": False, |
| "api_calls": 50, |
| "messages": [], |
| } |
| MockAgent.return_value = mock_child |
|
|
| result = json.loads(delegate_task(goal="Test max iter", parent_agent=parent)) |
| self.assertEqual(result["results"][0]["exit_reason"], "max_iterations") |
|
|
|
|
| class TestBlockedTools(unittest.TestCase): |
| def test_blocked_tools_constant(self): |
| for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]: |
| self.assertIn(tool, DELEGATE_BLOCKED_TOOLS) |
|
|
| def test_constants(self): |
| from tools.delegate_tool import ( |
| _get_max_spawn_depth, _get_orchestrator_enabled, |
| _MIN_SPAWN_DEPTH, _MAX_SPAWN_DEPTH_CAP, |
| ) |
| self.assertEqual(_get_max_concurrent_children(), 3) |
| self.assertEqual(MAX_DEPTH, 1) |
| self.assertEqual(_get_max_spawn_depth(), 1) |
| self.assertTrue(_get_orchestrator_enabled()) |
| self.assertEqual(_MIN_SPAWN_DEPTH, 1) |
| self.assertEqual(_MAX_SPAWN_DEPTH_CAP, 3) |
|
|
|
|
| class TestDelegationCredentialResolution(unittest.TestCase): |
| """Tests for provider:model credential resolution in delegation config.""" |
|
|
| def test_no_provider_returns_none_credentials(self): |
| """When delegation.provider is empty, all credentials are None (inherit parent).""" |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "", "provider": ""} |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertIsNone(creds["provider"]) |
| self.assertIsNone(creds["base_url"]) |
| self.assertIsNone(creds["api_key"]) |
| self.assertIsNone(creds["api_mode"]) |
| self.assertIsNone(creds["model"]) |
|
|
| def test_model_only_no_provider(self): |
| """When only model is set (no provider), model is returned but credentials are None.""" |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "google/gemini-3-flash-preview", "provider": ""} |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertEqual(creds["model"], "google/gemini-3-flash-preview") |
| self.assertIsNone(creds["provider"]) |
| self.assertIsNone(creds["base_url"]) |
| self.assertIsNone(creds["api_key"]) |
|
|
| @patch("hermes_cli.runtime_provider.resolve_runtime_provider") |
| def test_provider_resolves_full_credentials(self, mock_resolve): |
| """When delegation.provider is set, full credentials are resolved.""" |
| mock_resolve.return_value = { |
| "provider": "openrouter", |
| "base_url": "https://openrouter.ai/api/v1", |
| "api_key": "sk-or-test-key", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "google/gemini-3-flash-preview", "provider": "openrouter"} |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertEqual(creds["model"], "google/gemini-3-flash-preview") |
| self.assertEqual(creds["provider"], "openrouter") |
| self.assertEqual(creds["base_url"], "https://openrouter.ai/api/v1") |
| self.assertEqual(creds["api_key"], "sk-or-test-key") |
| self.assertEqual(creds["api_mode"], "chat_completions") |
| mock_resolve.assert_called_once_with(requested="openrouter") |
|
|
| def test_direct_endpoint_uses_configured_base_url_and_api_key(self): |
| parent = _make_mock_parent(depth=0) |
| cfg = { |
| "model": "qwen2.5-coder", |
| "provider": "openrouter", |
| "base_url": "http://localhost:1234/v1", |
| "api_key": "local-key", |
| } |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertEqual(creds["model"], "qwen2.5-coder") |
| self.assertEqual(creds["provider"], "custom") |
| self.assertEqual(creds["base_url"], "http://localhost:1234/v1") |
| self.assertEqual(creds["api_key"], "local-key") |
| self.assertEqual(creds["api_mode"], "chat_completions") |
|
|
| def test_direct_endpoint_falls_back_to_openai_api_key_env(self): |
| parent = _make_mock_parent(depth=0) |
| cfg = { |
| "model": "qwen2.5-coder", |
| "base_url": "http://localhost:1234/v1", |
| } |
| with patch.dict(os.environ, {"OPENAI_API_KEY": "env-openai-key"}, clear=False): |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertEqual(creds["api_key"], "env-openai-key") |
| self.assertEqual(creds["provider"], "custom") |
|
|
| def test_direct_endpoint_does_not_fall_back_to_openrouter_api_key_env(self): |
| parent = _make_mock_parent(depth=0) |
| cfg = { |
| "model": "qwen2.5-coder", |
| "base_url": "http://localhost:1234/v1", |
| } |
| with patch.dict( |
| os.environ, |
| { |
| "OPENROUTER_API_KEY": "env-openrouter-key", |
| "OPENAI_API_KEY": "", |
| }, |
| clear=False, |
| ): |
| with self.assertRaises(ValueError) as ctx: |
| _resolve_delegation_credentials(cfg, parent) |
| self.assertIn("OPENAI_API_KEY", str(ctx.exception)) |
|
|
| @patch("hermes_cli.runtime_provider.resolve_runtime_provider") |
| def test_nous_provider_resolves_nous_credentials(self, mock_resolve): |
| """Nous provider resolves Nous Portal base_url and api_key.""" |
| mock_resolve.return_value = { |
| "provider": "nous", |
| "base_url": "https://inference-api.nousresearch.com/v1", |
| "api_key": "nous-agent-key-xyz", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "hermes-3-llama-3.1-8b", "provider": "nous"} |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertEqual(creds["provider"], "nous") |
| self.assertEqual(creds["base_url"], "https://inference-api.nousresearch.com/v1") |
| self.assertEqual(creds["api_key"], "nous-agent-key-xyz") |
| mock_resolve.assert_called_once_with(requested="nous") |
|
|
| @patch("hermes_cli.runtime_provider.resolve_runtime_provider") |
| def test_provider_resolution_failure_raises_valueerror(self, mock_resolve): |
| """When provider resolution fails, ValueError is raised with helpful message.""" |
| mock_resolve.side_effect = RuntimeError("OPENROUTER_API_KEY not set") |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "some-model", "provider": "openrouter"} |
| with self.assertRaises(ValueError) as ctx: |
| _resolve_delegation_credentials(cfg, parent) |
| self.assertIn("openrouter", str(ctx.exception).lower()) |
| self.assertIn("Cannot resolve", str(ctx.exception)) |
|
|
| @patch("hermes_cli.runtime_provider.resolve_runtime_provider") |
| def test_provider_resolves_but_no_api_key_raises(self, mock_resolve): |
| """When provider resolves but has no API key, ValueError is raised.""" |
| mock_resolve.return_value = { |
| "provider": "openrouter", |
| "base_url": "https://openrouter.ai/api/v1", |
| "api_key": "", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
| cfg = {"model": "some-model", "provider": "openrouter"} |
| with self.assertRaises(ValueError) as ctx: |
| _resolve_delegation_credentials(cfg, parent) |
| self.assertIn("no API key", str(ctx.exception)) |
|
|
| def test_missing_config_keys_inherit_parent(self): |
| """When config dict has no model/provider keys at all, inherits parent.""" |
| parent = _make_mock_parent(depth=0) |
| cfg = {"max_iterations": 45} |
| creds = _resolve_delegation_credentials(cfg, parent) |
| self.assertIsNone(creds["model"]) |
| self.assertIsNone(creds["provider"]) |
|
|
|
|
| class TestDelegationProviderIntegration(unittest.TestCase): |
| """Integration tests: delegation config → _run_single_child → AIAgent construction.""" |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_config_provider_credentials_reach_child_agent(self, mock_creds, mock_cfg): |
| """When delegation.provider is configured, child agent gets resolved credentials.""" |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "google/gemini-3-flash-preview", |
| "provider": "openrouter", |
| } |
| mock_creds.return_value = { |
| "model": "google/gemini-3-flash-preview", |
| "provider": "openrouter", |
| "base_url": "https://openrouter.ai/api/v1", |
| "api_key": "sk-or-delegation-key", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test provider routing", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview") |
| self.assertEqual(kwargs["provider"], "openrouter") |
| self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1") |
| self.assertEqual(kwargs["api_key"], "sk-or-delegation-key") |
| self.assertEqual(kwargs["api_mode"], "chat_completions") |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_cross_provider_delegation(self, mock_creds, mock_cfg): |
| """Parent on Nous, subagent on OpenRouter — full credential switch.""" |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "google/gemini-3-flash-preview", |
| "provider": "openrouter", |
| } |
| mock_creds.return_value = { |
| "model": "google/gemini-3-flash-preview", |
| "provider": "openrouter", |
| "base_url": "https://openrouter.ai/api/v1", |
| "api_key": "sk-or-key", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.provider = "nous" |
| parent.base_url = "https://inference-api.nousresearch.com/v1" |
| parent.api_key = "nous-key-abc" |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Cross-provider test", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| |
| self.assertEqual(kwargs["provider"], "openrouter") |
| self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1") |
| self.assertEqual(kwargs["api_key"], "sk-or-key") |
| self.assertNotEqual(kwargs["base_url"], parent.base_url) |
| self.assertNotEqual(kwargs["api_key"], parent.api_key) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_direct_endpoint_credentials_reach_child_agent(self, mock_creds, mock_cfg): |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "qwen2.5-coder", |
| "base_url": "http://localhost:1234/v1", |
| "api_key": "local-key", |
| } |
| mock_creds.return_value = { |
| "model": "qwen2.5-coder", |
| "provider": "custom", |
| "base_url": "http://localhost:1234/v1", |
| "api_key": "local-key", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Direct endpoint test", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| self.assertEqual(kwargs["model"], "qwen2.5-coder") |
| self.assertEqual(kwargs["provider"], "custom") |
| self.assertEqual(kwargs["base_url"], "http://localhost:1234/v1") |
| self.assertEqual(kwargs["api_key"], "local-key") |
| self.assertEqual(kwargs["api_mode"], "chat_completions") |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_empty_config_inherits_parent(self, mock_creds, mock_cfg): |
| """When delegation config is empty, child inherits parent credentials.""" |
| mock_cfg.return_value = {"max_iterations": 45, "model": "", "provider": ""} |
| mock_creds.return_value = { |
| "model": None, |
| "provider": None, |
| "base_url": None, |
| "api_key": None, |
| "api_mode": None, |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Test inherit", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| self.assertEqual(kwargs["model"], parent.model) |
| self.assertEqual(kwargs["provider"], parent.provider) |
| self.assertEqual(kwargs["base_url"], parent.base_url) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_credential_error_returns_json_error(self, mock_creds, mock_cfg): |
| """When credential resolution fails, delegate_task returns a JSON error.""" |
| mock_cfg.return_value = {"model": "bad-model", "provider": "nonexistent"} |
| mock_creds.side_effect = ValueError( |
| "Cannot resolve delegation provider 'nonexistent': Unknown provider" |
| ) |
| parent = _make_mock_parent(depth=0) |
|
|
| result = json.loads(delegate_task(goal="Should fail", parent_agent=parent)) |
| self.assertIn("error", result) |
| self.assertIn("Cannot resolve", result["error"]) |
| self.assertIn("nonexistent", result["error"]) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_batch_mode_all_children_get_credentials(self, mock_creds, mock_cfg): |
| """In batch mode, all children receive the resolved credentials.""" |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "meta-llama/llama-4-scout", |
| "provider": "openrouter", |
| } |
| mock_creds.return_value = { |
| "model": "meta-llama/llama-4-scout", |
| "provider": "openrouter", |
| "base_url": "https://openrouter.ai/api/v1", |
| "api_key": "sk-or-batch", |
| "api_mode": "chat_completions", |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| |
| |
| with patch("tools.delegate_tool._build_child_agent") as mock_build, \ |
| patch("tools.delegate_tool._run_single_child") as mock_run: |
| mock_child = MagicMock() |
| mock_build.return_value = mock_child |
| mock_run.return_value = { |
| "task_index": 0, "status": "completed", |
| "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 |
| } |
|
|
| tasks = [{"goal": "Task A"}, {"goal": "Task B"}] |
| delegate_task(tasks=tasks, parent_agent=parent) |
|
|
| self.assertEqual(mock_build.call_count, 2) |
| for call in mock_build.call_args_list: |
| self.assertEqual(call.kwargs.get("model"), "meta-llama/llama-4-scout") |
| self.assertEqual(call.kwargs.get("override_provider"), "openrouter") |
| self.assertEqual(call.kwargs.get("override_base_url"), "https://openrouter.ai/api/v1") |
| self.assertEqual(call.kwargs.get("override_api_key"), "sk-or-batch") |
| self.assertEqual(call.kwargs.get("override_api_mode"), "chat_completions") |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_delegation_acp_runtime_reaches_child_agent(self, mock_creds, mock_cfg): |
| """Resolved ACP runtime command/args must be forwarded to child agents.""" |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "copilot-model", |
| "provider": "copilot-acp", |
| } |
| mock_creds.return_value = { |
| "model": "copilot-model", |
| "provider": "copilot-acp", |
| "base_url": "acp://copilot", |
| "api_key": "copilot-acp", |
| "api_mode": "chat_completions", |
| "command": "custom-copilot", |
| "args": ["--stdio-custom"], |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("tools.delegate_tool._build_child_agent") as mock_build, \ |
| patch("tools.delegate_tool._run_single_child") as mock_run: |
| mock_child = MagicMock() |
| mock_build.return_value = mock_child |
| mock_run.return_value = { |
| "task_index": 0, "status": "completed", |
| "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 |
| } |
|
|
| delegate_task(goal="ACP delegation test", parent_agent=parent) |
|
|
| _, kwargs = mock_build.call_args |
| self.assertEqual(kwargs.get("override_provider"), "copilot-acp") |
| self.assertEqual(kwargs.get("override_base_url"), "acp://copilot") |
| self.assertEqual(kwargs.get("override_api_key"), "copilot-acp") |
| self.assertEqual(kwargs.get("override_api_mode"), "chat_completions") |
| self.assertEqual(kwargs.get("override_acp_command"), "custom-copilot") |
| self.assertEqual(kwargs.get("override_acp_args"), ["--stdio-custom"]) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_model_only_no_provider_inherits_parent_credentials(self, mock_creds, mock_cfg): |
| """Setting only model (no provider) changes model but keeps parent credentials.""" |
| mock_cfg.return_value = { |
| "max_iterations": 45, |
| "model": "google/gemini-3-flash-preview", |
| "provider": "", |
| } |
| mock_creds.return_value = { |
| "model": "google/gemini-3-flash-preview", |
| "provider": None, |
| "base_url": None, |
| "api_key": None, |
| "api_mode": None, |
| } |
| parent = _make_mock_parent(depth=0) |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1 |
| } |
| MockAgent.return_value = mock_child |
|
|
| delegate_task(goal="Model only test", parent_agent=parent) |
|
|
| _, kwargs = MockAgent.call_args |
| |
| self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview") |
| |
| self.assertEqual(kwargs["provider"], parent.provider) |
| self.assertEqual(kwargs["base_url"], parent.base_url) |
|
|
|
|
| class TestChildCredentialPoolResolution(unittest.TestCase): |
| def test_same_provider_shares_parent_pool(self): |
| parent = _make_mock_parent() |
| mock_pool = MagicMock() |
| parent._credential_pool = mock_pool |
|
|
| result = _resolve_child_credential_pool("openrouter", parent) |
| self.assertIs(result, mock_pool) |
|
|
| def test_no_provider_inherits_parent_pool(self): |
| parent = _make_mock_parent() |
| mock_pool = MagicMock() |
| parent._credential_pool = mock_pool |
|
|
| result = _resolve_child_credential_pool(None, parent) |
| self.assertIs(result, mock_pool) |
|
|
| def test_different_provider_loads_own_pool(self): |
| parent = _make_mock_parent() |
| parent._credential_pool = MagicMock() |
| mock_pool = MagicMock() |
| mock_pool.has_credentials.return_value = True |
|
|
| with patch("agent.credential_pool.load_pool", return_value=mock_pool): |
| result = _resolve_child_credential_pool("anthropic", parent) |
|
|
| self.assertIs(result, mock_pool) |
|
|
| def test_different_provider_empty_pool_returns_none(self): |
| parent = _make_mock_parent() |
| parent._credential_pool = MagicMock() |
| mock_pool = MagicMock() |
| mock_pool.has_credentials.return_value = False |
|
|
| with patch("agent.credential_pool.load_pool", return_value=mock_pool): |
| result = _resolve_child_credential_pool("anthropic", parent) |
|
|
| self.assertIsNone(result) |
|
|
| def test_different_provider_load_failure_returns_none(self): |
| parent = _make_mock_parent() |
| parent._credential_pool = MagicMock() |
|
|
| with patch("agent.credential_pool.load_pool", side_effect=Exception("disk error")): |
| result = _resolve_child_credential_pool("anthropic", parent) |
|
|
| self.assertIsNone(result) |
|
|
| def test_build_child_agent_assigns_parent_pool_when_shared(self): |
| parent = _make_mock_parent() |
| mock_pool = MagicMock() |
| parent._credential_pool = mock_pool |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| MockAgent.return_value = mock_child |
|
|
| _build_child_agent( |
| task_index=0, |
| goal="Test pool assignment", |
| context=None, |
| toolsets=["terminal"], |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
|
|
| self.assertEqual(mock_child._credential_pool, mock_pool) |
|
|
| @patch("tools.delegate_tool._load_config", return_value={}) |
| def test_build_child_agent_preserves_mcp_toolsets_by_default(self, mock_cfg): |
| parent = _make_mock_parent() |
| parent.enabled_toolsets = ["web", "browser", "mcp-MiniMax"] |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| MockAgent.return_value = mock_child |
|
|
| _build_child_agent( |
| task_index=0, |
| goal="Test narrowed toolsets", |
| context=None, |
| toolsets=["web", "browser"], |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
|
|
| self.assertEqual( |
| MockAgent.call_args[1]["enabled_toolsets"], |
| ["web", "browser", "mcp-MiniMax"], |
| ) |
|
|
| @patch( |
| "tools.delegate_tool._load_config", |
| return_value={"inherit_mcp_toolsets": False}, |
| ) |
| def test_build_child_agent_strict_intersection_when_opted_out(self, mock_cfg): |
| parent = _make_mock_parent() |
| parent.enabled_toolsets = ["web", "browser", "mcp-MiniMax"] |
|
|
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| MockAgent.return_value = mock_child |
|
|
| _build_child_agent( |
| task_index=0, |
| goal="Test narrowed toolsets", |
| context=None, |
| toolsets=["web", "browser"], |
| model=None, |
| max_iterations=10, |
| parent_agent=parent, |
| task_count=1, |
| ) |
|
|
| self.assertEqual( |
| MockAgent.call_args[1]["enabled_toolsets"], |
| ["web", "browser"], |
| ) |
|
|
|
|
| class TestChildCredentialLeasing(unittest.TestCase): |
| def test_run_single_child_acquires_and_releases_lease(self): |
| from tools.delegate_tool import _run_single_child |
|
|
| leased_entry = MagicMock() |
| leased_entry.id = "cred-b" |
|
|
| child = MagicMock() |
| child._credential_pool = MagicMock() |
| child._credential_pool.acquire_lease.return_value = "cred-b" |
| child._credential_pool.current.return_value = leased_entry |
| child.run_conversation.return_value = { |
| "final_response": "done", |
| "completed": True, |
| "interrupted": False, |
| "api_calls": 1, |
| "messages": [], |
| } |
|
|
| result = _run_single_child( |
| task_index=0, |
| goal="Investigate rate limits", |
| child=child, |
| parent_agent=_make_mock_parent(), |
| ) |
|
|
| self.assertEqual(result["status"], "completed") |
| child._credential_pool.acquire_lease.assert_called_once_with() |
| child._swap_credential.assert_called_once_with(leased_entry) |
| child._credential_pool.release_lease.assert_called_once_with("cred-b") |
|
|
| def test_run_single_child_releases_lease_after_failure(self): |
| from tools.delegate_tool import _run_single_child |
|
|
| child = MagicMock() |
| child._credential_pool = MagicMock() |
| child._credential_pool.acquire_lease.return_value = "cred-a" |
| child._credential_pool.current.return_value = MagicMock(id="cred-a") |
| child.run_conversation.side_effect = RuntimeError("boom") |
|
|
| result = _run_single_child( |
| task_index=1, |
| goal="Trigger failure", |
| child=child, |
| parent_agent=_make_mock_parent(), |
| ) |
|
|
| self.assertEqual(result["status"], "error") |
| child._credential_pool.release_lease.assert_called_once_with("cred-a") |
|
|
|
|
| class TestDelegateHeartbeat(unittest.TestCase): |
| """Heartbeat propagates child activity to parent during delegation. |
| |
| Without the heartbeat, the gateway inactivity timeout fires because the |
| parent's _last_activity_ts freezes when delegate_task starts. |
| """ |
|
|
| def test_heartbeat_touches_parent_activity_during_child_run(self): |
| """Parent's _touch_activity is called while child.run_conversation blocks.""" |
| from tools.delegate_tool import _run_single_child |
|
|
| parent = _make_mock_parent() |
| touch_calls = [] |
| parent._touch_activity = lambda desc: touch_calls.append(desc) |
|
|
| child = MagicMock() |
| child.get_activity_summary.return_value = { |
| "current_tool": "terminal", |
| "api_call_count": 3, |
| "max_iterations": 50, |
| "last_activity_desc": "executing tool: terminal", |
| } |
|
|
| |
| def slow_run(**kwargs): |
| time.sleep(0.25) |
| return {"final_response": "done", "completed": True, "api_calls": 3} |
|
|
| child.run_conversation.side_effect = slow_run |
|
|
| |
| with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): |
| _run_single_child( |
| task_index=0, |
| goal="Test heartbeat", |
| child=child, |
| parent_agent=parent, |
| ) |
|
|
| |
| self.assertGreater(len(touch_calls), 0, |
| "Heartbeat did not propagate activity to parent") |
| |
| self.assertTrue( |
| any("terminal" in desc for desc in touch_calls), |
| f"Heartbeat descriptions should include child tool info: {touch_calls}") |
|
|
| def test_heartbeat_stops_after_child_completes(self): |
| """Heartbeat thread is cleaned up when the child finishes.""" |
| from tools.delegate_tool import _run_single_child |
|
|
| parent = _make_mock_parent() |
| touch_calls = [] |
| parent._touch_activity = lambda desc: touch_calls.append(desc) |
|
|
| child = MagicMock() |
| child.get_activity_summary.return_value = { |
| "current_tool": None, |
| "api_call_count": 1, |
| "max_iterations": 50, |
| "last_activity_desc": "done", |
| } |
| child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, "api_calls": 1, |
| } |
|
|
| with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): |
| _run_single_child( |
| task_index=0, |
| goal="Test cleanup", |
| child=child, |
| parent_agent=parent, |
| ) |
|
|
| |
| count_after = len(touch_calls) |
| time.sleep(0.15) |
| self.assertEqual(len(touch_calls), count_after, |
| "Heartbeat continued firing after child completed") |
|
|
| def test_heartbeat_stops_after_child_error(self): |
| """Heartbeat thread is cleaned up even when the child raises.""" |
| from tools.delegate_tool import _run_single_child |
|
|
| parent = _make_mock_parent() |
| touch_calls = [] |
| parent._touch_activity = lambda desc: touch_calls.append(desc) |
|
|
| child = MagicMock() |
| child.get_activity_summary.return_value = { |
| "current_tool": "web_search", |
| "api_call_count": 2, |
| "max_iterations": 50, |
| "last_activity_desc": "executing tool: web_search", |
| } |
|
|
| def slow_fail(**kwargs): |
| time.sleep(0.15) |
| raise RuntimeError("network timeout") |
|
|
| child.run_conversation.side_effect = slow_fail |
|
|
| with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): |
| result = _run_single_child( |
| task_index=0, |
| goal="Test error cleanup", |
| child=child, |
| parent_agent=parent, |
| ) |
|
|
| self.assertEqual(result["status"], "error") |
|
|
| |
| count_after = len(touch_calls) |
| time.sleep(0.15) |
| self.assertEqual(len(touch_calls), count_after, |
| "Heartbeat continued firing after child error") |
|
|
| def test_heartbeat_includes_child_activity_desc_when_no_tool(self): |
| """When child has no current_tool, heartbeat uses last_activity_desc.""" |
| from tools.delegate_tool import _run_single_child |
|
|
| parent = _make_mock_parent() |
| touch_calls = [] |
| parent._touch_activity = lambda desc: touch_calls.append(desc) |
|
|
| child = MagicMock() |
| child.get_activity_summary.return_value = { |
| "current_tool": None, |
| "api_call_count": 5, |
| "max_iterations": 90, |
| "last_activity_desc": "API call #5 completed", |
| } |
|
|
| def slow_run(**kwargs): |
| time.sleep(0.15) |
| return {"final_response": "done", "completed": True, "api_calls": 5} |
|
|
| child.run_conversation.side_effect = slow_run |
|
|
| with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): |
| _run_single_child( |
| task_index=0, |
| goal="Test desc fallback", |
| child=child, |
| parent_agent=parent, |
| ) |
|
|
| self.assertGreater(len(touch_calls), 0) |
| self.assertTrue( |
| any("API call #5 completed" in desc for desc in touch_calls), |
| f"Heartbeat should include last_activity_desc: {touch_calls}") |
|
|
|
|
| class TestDelegationReasoningEffort(unittest.TestCase): |
| """Tests for delegation.reasoning_effort config override.""" |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("run_agent.AIAgent") |
| def test_inherits_parent_reasoning_when_no_override(self, MockAgent, mock_cfg): |
| """With no delegation.reasoning_effort, child inherits parent's config.""" |
| mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": ""} |
| MockAgent.return_value = MagicMock() |
| parent = _make_mock_parent() |
| parent.reasoning_config = {"enabled": True, "effort": "xhigh"} |
|
|
| _build_child_agent( |
| task_index=0, goal="test", context=None, toolsets=None, |
| model=None, max_iterations=50, parent_agent=parent, |
| task_count=1, |
| ) |
| call_kwargs = MockAgent.call_args[1] |
| self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "xhigh"}) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("run_agent.AIAgent") |
| def test_override_reasoning_effort_from_config(self, MockAgent, mock_cfg): |
| """delegation.reasoning_effort overrides the parent's level.""" |
| mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "low"} |
| MockAgent.return_value = MagicMock() |
| parent = _make_mock_parent() |
| parent.reasoning_config = {"enabled": True, "effort": "xhigh"} |
|
|
| _build_child_agent( |
| task_index=0, goal="test", context=None, toolsets=None, |
| model=None, max_iterations=50, parent_agent=parent, |
| task_count=1, |
| ) |
| call_kwargs = MockAgent.call_args[1] |
| self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "low"}) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("run_agent.AIAgent") |
| def test_override_reasoning_effort_none_disables(self, MockAgent, mock_cfg): |
| """delegation.reasoning_effort: 'none' disables thinking for subagents.""" |
| mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "none"} |
| MockAgent.return_value = MagicMock() |
| parent = _make_mock_parent() |
| parent.reasoning_config = {"enabled": True, "effort": "high"} |
|
|
| _build_child_agent( |
| task_index=0, goal="test", context=None, toolsets=None, |
| model=None, max_iterations=50, parent_agent=parent, |
| task_count=1, |
| ) |
| call_kwargs = MockAgent.call_args[1] |
| self.assertEqual(call_kwargs["reasoning_config"], {"enabled": False}) |
|
|
| @patch("tools.delegate_tool._load_config") |
| @patch("run_agent.AIAgent") |
| def test_invalid_reasoning_effort_falls_back_to_parent(self, MockAgent, mock_cfg): |
| """Invalid delegation.reasoning_effort falls back to parent's config.""" |
| mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "banana"} |
| MockAgent.return_value = MagicMock() |
| parent = _make_mock_parent() |
| parent.reasoning_config = {"enabled": True, "effort": "medium"} |
|
|
| _build_child_agent( |
| task_index=0, goal="test", context=None, toolsets=None, |
| model=None, max_iterations=50, parent_agent=parent, |
| task_count=1, |
| ) |
| call_kwargs = MockAgent.call_args[1] |
| self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"}) |
|
|
|
|
| |
| |
| |
|
|
| class TestDispatchDelegateTask(unittest.TestCase): |
| """Tests for the _dispatch_delegate_task helper and full param forwarding.""" |
|
|
| @patch("tools.delegate_tool._load_config", return_value={}) |
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_acp_args_forwarded(self, mock_creds, mock_cfg): |
| """Both acp_command and acp_args reach delegate_task via the helper.""" |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| with patch("tools.delegate_tool._build_child_agent") as mock_build: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, |
| "api_calls": 1, "messages": [], |
| } |
| mock_child._delegate_saved_tool_names = [] |
| mock_child._credential_pool = None |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.model = "test" |
| mock_build.return_value = mock_child |
|
|
| delegate_task( |
| goal="test", |
| acp_command="claude", |
| acp_args=["--acp", "--stdio"], |
| parent_agent=parent, |
| ) |
| _, kwargs = mock_build.call_args |
| self.assertEqual(kwargs["override_acp_command"], "claude") |
| self.assertEqual(kwargs["override_acp_args"], ["--acp", "--stdio"]) |
|
|
| class TestDelegateEventEnum(unittest.TestCase): |
| """Tests for DelegateEvent enum and back-compat aliases.""" |
|
|
| def test_enum_values_are_strings(self): |
| for event in DelegateEvent: |
| self.assertIsInstance(event.value, str) |
| self.assertTrue(event.value.startswith("delegate.")) |
|
|
| def test_legacy_map_covers_all_old_names(self): |
| expected_legacy = {"_thinking", "reasoning.available", |
| "tool.started", "tool.completed", "subagent_progress"} |
| self.assertEqual(set(_LEGACY_EVENT_MAP.keys()), expected_legacy) |
|
|
| def test_legacy_map_values_are_delegate_events(self): |
| for old_name, event in _LEGACY_EVENT_MAP.items(): |
| self.assertIsInstance(event, DelegateEvent) |
|
|
| def test_progress_callback_normalises_tool_started(self): |
| """_build_child_progress_callback handles tool.started via enum.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
| parent.tool_progress_callback = MagicMock() |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| self.assertIsNotNone(cb) |
|
|
| cb("tool.started", tool_name="terminal", preview="ls") |
| parent._delegate_spinner.print_above.assert_called() |
|
|
| def test_progress_callback_normalises_thinking(self): |
| """Both _thinking and reasoning.available route to TASK_THINKING.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
| parent.tool_progress_callback = None |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
|
|
| cb("_thinking", tool_name=None, preview="pondering...") |
| assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list) |
|
|
| parent._delegate_spinner.print_above.reset_mock() |
| cb("reasoning.available", tool_name=None, preview="hmm") |
| assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list) |
|
|
| def test_progress_callback_tool_completed_is_noop(self): |
| """tool.completed is normalised but produces no display output.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
| parent.tool_progress_callback = None |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| cb("tool.completed", tool_name="terminal") |
| parent._delegate_spinner.print_above.assert_not_called() |
|
|
| def test_progress_callback_ignores_unknown_events(self): |
| """Unknown event types are silently ignored.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| |
| cb("some.unknown.event", tool_name="x") |
| parent._delegate_spinner.print_above.assert_not_called() |
|
|
| def test_progress_callback_accepts_enum_value_directly(self): |
| """cb(DelegateEvent.TASK_THINKING, ...) must route to the thinking |
| branch. Pre-fix the callback only handled legacy strings via |
| _LEGACY_EVENT_MAP.get and silently dropped enum-typed callers.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
| parent.tool_progress_callback = None |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| cb(DelegateEvent.TASK_THINKING, preview="pondering") |
| |
| assert any( |
| "💭" in str(c) |
| for c in parent._delegate_spinner.print_above.call_args_list |
| ) |
|
|
| def test_progress_callback_accepts_new_style_string(self): |
| """cb('delegate.task_thinking', ...) — the string form of the |
| enum value — must route to the thinking branch too, so new-style |
| emitters don't have to import DelegateEvent.""" |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| cb("delegate.task_thinking", preview="hmm") |
| assert any( |
| "💭" in str(c) |
| for c in parent._delegate_spinner.print_above.call_args_list |
| ) |
|
|
| def test_progress_callback_task_progress_not_misrendered(self): |
| """'subagent_progress' (legacy name for TASK_PROGRESS) carries a |
| pre-batched summary in the tool_name slot. Before the fix, this |
| fell through to the TASK_TOOL_STARTED rendering path, treating |
| the summary string as a tool name. After the fix: distinct |
| render (no tool-start emoji lookup) and pass-through relay |
| upward (no re-batching). |
| |
| Regression path only reachable once nested orchestration is |
| enabled: nested orchestrators relay subagent_progress from |
| grandchildren upward through this callback. |
| """ |
| parent = _make_mock_parent() |
| parent._delegate_spinner = MagicMock() |
| parent.tool_progress_callback = MagicMock() |
|
|
| cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) |
| cb("subagent_progress", tool_name="🔀 [1] terminal, file") |
|
|
| |
| |
| calls = parent._delegate_spinner.print_above.call_args_list |
| self.assertTrue(any("🔀 🔀 [1] terminal, file" in str(c) for c in calls)) |
| |
| parent.tool_progress_callback.assert_called_once() |
| |
| self.assertFalse(any("⚡" in str(c) for c in calls)) |
|
|
|
|
| class TestConcurrencyDefaults(unittest.TestCase): |
| """Tests for the concurrency default and no hard ceiling.""" |
|
|
| @patch("tools.delegate_tool._load_config", return_value={}) |
| def test_default_is_three(self, mock_cfg): |
| |
| with patch.dict(os.environ, {}, clear=True): |
| self.assertEqual(_get_max_concurrent_children(), 3) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_concurrent_children": 10}) |
| def test_no_upper_ceiling(self, mock_cfg): |
| """Users can raise concurrency as high as they want — no hard cap.""" |
| self.assertEqual(_get_max_concurrent_children(), 10) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_concurrent_children": 100}) |
| def test_very_high_values_honored(self, mock_cfg): |
| self.assertEqual(_get_max_concurrent_children(), 100) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_concurrent_children": 0}) |
| def test_zero_clamped_to_one(self, mock_cfg): |
| """Floor of 1 is enforced; zero or negative values raise to 1.""" |
| self.assertEqual(_get_max_concurrent_children(), 1) |
|
|
| @patch("tools.delegate_tool._load_config", return_value={}) |
| def test_env_var_honored_uncapped(self, mock_cfg): |
| with patch.dict(os.environ, {"DELEGATION_MAX_CONCURRENT_CHILDREN": "12"}): |
| self.assertEqual(_get_max_concurrent_children(), 12) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_concurrent_children": 6}) |
| def test_configured_value_returned(self, mock_cfg): |
| self.assertEqual(_get_max_concurrent_children(), 6) |
|
|
|
|
| |
| |
| |
|
|
| class TestMaxSpawnDepth(unittest.TestCase): |
| """Tests for _get_max_spawn_depth clamping and fallback behavior.""" |
|
|
| @patch("tools.delegate_tool._load_config", return_value={}) |
| def test_max_spawn_depth_defaults_to_1(self, mock_cfg): |
| from tools.delegate_tool import _get_max_spawn_depth |
| self.assertEqual(_get_max_spawn_depth(), 1) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 0}) |
| def test_max_spawn_depth_clamped_below_one(self, mock_cfg): |
| import logging |
| from tools.delegate_tool import _get_max_spawn_depth |
| with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: |
| result = _get_max_spawn_depth() |
| self.assertEqual(result, 1) |
| self.assertTrue(any("clamping to 1" in m for m in cm.output)) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 99}) |
| def test_max_spawn_depth_clamped_above_three(self, mock_cfg): |
| import logging |
| from tools.delegate_tool import _get_max_spawn_depth |
| with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: |
| result = _get_max_spawn_depth() |
| self.assertEqual(result, 3) |
| self.assertTrue(any("clamping to 3" in m for m in cm.output)) |
|
|
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": "not-a-number"}) |
| def test_max_spawn_depth_invalid_falls_back_to_default(self, mock_cfg): |
| from tools.delegate_tool import _get_max_spawn_depth |
| self.assertEqual(_get_max_spawn_depth(), 1) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class TestOrchestratorRoleSchema(unittest.TestCase): |
| """Tests that the role param reaches the child via dispatch.""" |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def _run_with_mock_child(self, role_arg, mock_cfg, mock_creds): |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, |
| "api_calls": 1, "messages": [], |
| } |
| mock_child._delegate_saved_tool_names = [] |
| mock_child._credential_pool = None |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.model = "test" |
| MockAgent.return_value = mock_child |
| kwargs = {"goal": "test", "parent_agent": parent} |
| if role_arg is not _SENTINEL: |
| kwargs["role"] = role_arg |
| delegate_task(**kwargs) |
| return mock_child |
|
|
| def test_default_role_is_leaf(self): |
| child = self._run_with_mock_child(_SENTINEL) |
| self.assertEqual(child._delegate_role, "leaf") |
|
|
| def test_explicit_orchestrator_role_stashed(self): |
| """role='orchestrator' reaches _build_child_agent and is stashed. |
| Full behavior (toolset re-add) lands in commit 3; commit 2 only |
| verifies the plumbing.""" |
| child = self._run_with_mock_child("orchestrator") |
| self.assertEqual(child._delegate_role, "orchestrator") |
|
|
| def test_unknown_role_coerces_to_leaf(self): |
| """role='nonsense' → _normalize_role warns and returns 'leaf'.""" |
| import logging |
| with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: |
| child = self._run_with_mock_child("nonsense") |
| self.assertEqual(child._delegate_role, "leaf") |
| self.assertTrue(any("coercing" in m.lower() for m in cm.output)) |
|
|
| def test_schema_has_role_top_level_and_per_task(self): |
| from tools.delegate_tool import DELEGATE_TASK_SCHEMA |
| props = DELEGATE_TASK_SCHEMA["parameters"]["properties"] |
| self.assertIn("role", props) |
| self.assertEqual(props["role"]["enum"], ["leaf", "orchestrator"]) |
| task_props = props["tasks"]["items"]["properties"] |
| self.assertIn("role", task_props) |
| self.assertEqual(task_props["role"]["enum"], ["leaf", "orchestrator"]) |
|
|
|
|
| |
| _SENTINEL = object() |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _make_role_mock_child(): |
| """Helper: mock child with minimal fields for delegate_task to process.""" |
| mock_child = MagicMock() |
| mock_child.run_conversation.return_value = { |
| "final_response": "done", "completed": True, |
| "api_calls": 1, "messages": [], |
| } |
| mock_child._delegate_saved_tool_names = [] |
| mock_child._credential_pool = None |
| mock_child.session_prompt_tokens = 0 |
| mock_child.session_completion_tokens = 0 |
| mock_child.model = "test" |
| return mock_child |
|
|
|
|
| class TestOrchestratorRoleBehavior(unittest.TestCase): |
| """Tests that role='orchestrator' actually changes toolset + prompt.""" |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def test_orchestrator_role_keeps_delegation_at_depth_1( |
| self, mock_cfg, mock_creds |
| ): |
| """role='orchestrator' + depth-0 parent with max_spawn_depth=2 → |
| child at depth 1 gets 'delegation' in enabled_toolsets (can |
| further delegate). Requires max_spawn_depth>=2 since the new |
| default is 1 (flat).""" |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "file"] |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = _make_role_mock_child() |
| MockAgent.return_value = mock_child |
| delegate_task(goal="test", role="orchestrator", parent_agent=parent) |
| kwargs = MockAgent.call_args[1] |
| self.assertIn("delegation", kwargs["enabled_toolsets"]) |
| self.assertEqual(mock_child._delegate_role, "orchestrator") |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def test_orchestrator_blocked_at_max_spawn_depth( |
| self, mock_cfg, mock_creds |
| ): |
| """Parent at depth 1 with max_spawn_depth=2 spawns child |
| at depth 2 (the floor); role='orchestrator' degrades to leaf.""" |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=1) |
| parent.enabled_toolsets = ["terminal", "delegation"] |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = _make_role_mock_child() |
| MockAgent.return_value = mock_child |
| delegate_task(goal="test", role="orchestrator", parent_agent=parent) |
| kwargs = MockAgent.call_args[1] |
| self.assertNotIn("delegation", kwargs["enabled_toolsets"]) |
| self.assertEqual(mock_child._delegate_role, "leaf") |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", return_value={}) |
| def test_orchestrator_blocked_at_default_flat_depth( |
| self, mock_cfg, mock_creds |
| ): |
| """With default max_spawn_depth=1 (flat), role='orchestrator' |
| on a depth-0 parent produces a depth-1 child that is already at |
| the floor — the role degrades to 'leaf' and the delegation |
| toolset is stripped. This is the new default posture.""" |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "file", "delegation"] |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = _make_role_mock_child() |
| MockAgent.return_value = mock_child |
| delegate_task(goal="test", role="orchestrator", parent_agent=parent) |
| kwargs = MockAgent.call_args[1] |
| self.assertNotIn("delegation", kwargs["enabled_toolsets"]) |
| self.assertEqual(mock_child._delegate_role, "leaf") |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| def test_orchestrator_enabled_false_forces_leaf(self, mock_creds): |
| """Kill switch delegation.orchestrator_enabled=false overrides |
| role='orchestrator'.""" |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "delegation"] |
| with patch("tools.delegate_tool._load_config", |
| return_value={"orchestrator_enabled": False}): |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = _make_role_mock_child() |
| MockAgent.return_value = mock_child |
| delegate_task(goal="test", role="orchestrator", |
| parent_agent=parent) |
| kwargs = MockAgent.call_args[1] |
| self.assertNotIn("delegation", kwargs["enabled_toolsets"]) |
| self.assertEqual(mock_child._delegate_role, "leaf") |
|
|
| |
|
|
| def test_leaf_prompt_does_not_mention_delegation(self): |
| prompt = _build_child_system_prompt( |
| "Fix tests", role="leaf", |
| max_spawn_depth=2, child_depth=1, |
| ) |
| self.assertNotIn("delegate_task", prompt) |
| self.assertNotIn("Orchestrator Role", prompt) |
|
|
| def test_orchestrator_prompt_mentions_delegation_capability(self): |
| prompt = _build_child_system_prompt( |
| "Survey approaches", role="orchestrator", |
| max_spawn_depth=2, child_depth=1, |
| ) |
| self.assertIn("delegate_task", prompt) |
| self.assertIn("Orchestrator Role", prompt) |
| |
| self.assertIn("depth 1", prompt) |
| self.assertIn("max_spawn_depth=2", prompt) |
|
|
| def test_orchestrator_prompt_at_depth_floor_says_children_are_leaves(self): |
| """With max_spawn_depth=2 and child_depth=1, the orchestrator's |
| own children would be at depth 2 (the floor) → must be leaves.""" |
| prompt = _build_child_system_prompt( |
| "Survey", role="orchestrator", |
| max_spawn_depth=2, child_depth=1, |
| ) |
| self.assertIn("MUST be leaves", prompt) |
|
|
| def test_orchestrator_prompt_below_floor_allows_more_nesting(self): |
| """With max_spawn_depth=3 and child_depth=1, the orchestrator's |
| own children can themselves be orchestrators (depth 2 < 3).""" |
| prompt = _build_child_system_prompt( |
| "Deep work", role="orchestrator", |
| max_spawn_depth=3, child_depth=1, |
| ) |
| self.assertIn("can themselves be orchestrators", prompt) |
|
|
| |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def test_batch_mode_per_task_role_override(self, mock_cfg, mock_creds): |
| """Per-task role beats top-level; no top-level role → "leaf". |
| |
| tasks=[{role:'orchestrator'},{role:'leaf'},{}] → first gets |
| delegation, second and third don't. Requires max_spawn_depth>=2 |
| (raised explicitly here) since the new default is 1 (flat). |
| """ |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "file", "delegation"] |
| built_toolsets = [] |
|
|
| def _factory(*a, **kw): |
| m = _make_role_mock_child() |
| built_toolsets.append(kw.get("enabled_toolsets")) |
| return m |
|
|
| with patch("run_agent.AIAgent", side_effect=_factory): |
| delegate_task( |
| tasks=[ |
| {"goal": "A", "role": "orchestrator"}, |
| {"goal": "B", "role": "leaf"}, |
| {"goal": "C"}, |
| ], |
| parent_agent=parent, |
| ) |
| self.assertIn("delegation", built_toolsets[0]) |
| self.assertNotIn("delegation", built_toolsets[1]) |
| self.assertNotIn("delegation", built_toolsets[2]) |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def test_intersection_preserves_delegation_bound( |
| self, mock_cfg, mock_creds |
| ): |
| """Design decision: orchestrator capability is granted by role, |
| NOT inherited from the parent's toolset. A parent without |
| 'delegation' in its enabled_toolsets can still spawn an |
| orchestrator child — the re-add in _build_child_agent runs |
| unconditionally for orchestrators (when max_spawn_depth allows). |
| |
| If you want to change to "parent must have delegation too", |
| update _build_child_agent to check parent_toolsets before the |
| re-add and update this test to match. |
| """ |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "file"] |
| with patch("run_agent.AIAgent") as MockAgent: |
| mock_child = _make_role_mock_child() |
| MockAgent.return_value = mock_child |
| delegate_task(goal="test", role="orchestrator", |
| parent_agent=parent) |
| self.assertIn("delegation", MockAgent.call_args[1]["enabled_toolsets"]) |
|
|
|
|
| class TestOrchestratorEndToEnd(unittest.TestCase): |
| """End-to-end: parent -> orchestrator -> two-leaf nested orchestration. |
| |
| Covers the acceptance gate: parent delegates to an orchestrator |
| child; the orchestrator delegates to two leaf grandchildren; the |
| role/toolset/depth chain all resolve correctly. |
| |
| Mock strategy: a single AIAgent patch with a side_effect factory |
| that keys on the child's ephemeral_system_prompt — orchestrator |
| prompts contain the string "Orchestrator Role" (see |
| _build_child_system_prompt), leaves don't. The orchestrator |
| mock's run_conversation recursively calls delegate_task with |
| tasks=[{goal:...},{goal:...}] to spawn two leaves. This keeps |
| the test in one patch context and avoids depth-indexed nesting. |
| """ |
|
|
| @patch("tools.delegate_tool._resolve_delegation_credentials") |
| @patch("tools.delegate_tool._load_config", |
| return_value={"max_spawn_depth": 2}) |
| def test_end_to_end_nested_orchestration(self, mock_cfg, mock_creds): |
| mock_creds.return_value = { |
| "provider": None, "base_url": None, |
| "api_key": None, "api_mode": None, "model": None, |
| } |
| parent = _make_mock_parent(depth=0) |
| parent.enabled_toolsets = ["terminal", "file", "delegation"] |
|
|
| |
| built_agents: list = [] |
| |
| |
| orch_mock = {} |
|
|
| def _factory(*a, **kw): |
| prompt = kw.get("ephemeral_system_prompt", "") or "" |
| is_orchestrator = "Orchestrator Role" in prompt |
| m = _make_role_mock_child() |
| built_agents.append({ |
| "enabled_toolsets": list(kw.get("enabled_toolsets") or []), |
| "is_orchestrator_prompt": is_orchestrator, |
| }) |
|
|
| if is_orchestrator: |
| |
| |
| m._delegate_depth = 1 |
| m._delegate_role = "orchestrator" |
| m._active_children = [] |
| m._active_children_lock = threading.Lock() |
| m._session_db = None |
| m.platform = "cli" |
| m.enabled_toolsets = ["terminal", "file", "delegation"] |
| m.api_key = "***" |
| m.base_url = "" |
| m.provider = None |
| m.api_mode = None |
| m.providers_allowed = None |
| m.providers_ignored = None |
| m.providers_order = None |
| m.provider_sort = None |
| m._print_fn = None |
| m.tool_progress_callback = None |
| m.thinking_callback = None |
| orch_mock["agent"] = m |
|
|
| def _orchestrator_run(user_message=None, task_id=None): |
| |
| delegate_task( |
| tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}], |
| parent_agent=m, |
| ) |
| return { |
| "final_response": "orchestrated 2 workers", |
| "completed": True, "api_calls": 1, |
| "messages": [], |
| } |
| m.run_conversation.side_effect = _orchestrator_run |
|
|
| return m |
|
|
| with patch("run_agent.AIAgent", side_effect=_factory) as MockAgent: |
| delegate_task( |
| goal="top-level orchestration", |
| role="orchestrator", |
| parent_agent=parent, |
| ) |
|
|
| |
| self.assertEqual(MockAgent.call_count, 3) |
| |
| self.assertIn("delegation", built_agents[0]["enabled_toolsets"]) |
| self.assertTrue(built_agents[0]["is_orchestrator_prompt"]) |
| |
| self.assertNotIn("delegation", built_agents[1]["enabled_toolsets"]) |
| self.assertFalse(built_agents[1]["is_orchestrator_prompt"]) |
| self.assertNotIn("delegation", built_agents[2]["enabled_toolsets"]) |
| self.assertFalse(built_agents[2]["is_orchestrator_prompt"]) |
|
|
|
|
| if __name__ == "__main__": |
| unittest.main() |
|
|