"""Integration tests for MACPRunner — run_round, arun_round, stream, astream.""" import sys from collections.abc import AsyncIterator, Iterator from unittest.mock import MagicMock, patch import pytest import torch from builder.graph_builder import build_property_graph from core.agent import AgentProfile from execution.runner import ( EarlyStopCondition, LLMCallerFactory, MACPResult, MACPRunner, RunnerConfig, StepContext, TopologyAction, ) from execution.streaming import StreamEventType # ============================================================================ # Helpers # ============================================================================ def _make_graph(n_agents: int = 2, query: str = "Test query", chain: bool = True): """Build a simple graph for runner tests.""" agents = [ AgentProfile(agent_id=f"a{i}", display_name=f"Agent {i}") for i in range(n_agents) ] edges = [(f"a{i}", f"a{i + 1}") for i in range(n_agents - 1)] if chain else [] return build_property_graph( agents=agents, workflow_edges=edges, query=query, include_task_node=True, ) def _simple_caller(prompt: str) -> str: return "Mock response" async def _async_caller(prompt: str) -> str: return "Async mock response" # ============================================================================ # run_round (sync) # noqa: ERA001 # ============================================================================ class TestRunRound: def test_basic_run_returns_result(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_messages_populated(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert len(result.messages) >= 1 for aid in result.execution_order: assert aid in result.messages def test_final_answer_set(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert isinstance(result.final_answer, str) def test_no_caller_raises(self): graph = _make_graph() runner = MACPRunner() with pytest.raises(ValueError, match="llm_caller"): runner.run_round(graph) def test_multi_agent_chain(self): graph = _make_graph(3) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert len(result.execution_order) == 3 assert result.total_tokens >= 0 def test_with_custom_token_counter(self): graph = _make_graph() token_count = [0] def counter(text: str) -> int: token_count[0] += 1 return len(text.split()) runner = MACPRunner(llm_caller=_simple_caller, token_counter=counter) result = runner.run_round(graph) assert result.total_tokens >= 0 def test_with_callbacks_param(self): """Covers line 1018: callbacks param merging.""" from callbacks.handlers.metrics import MetricsCallbackHandler graph = _make_graph() handler = MetricsCallbackHandler() runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph, callbacks=[handler]) assert isinstance(result, MACPResult) metrics = handler.get_metrics() assert metrics["runs_completed"] >= 1 def test_with_context_callback_manager(self): """Covers line 1023: context callback manager merging.""" from callbacks.context import trace_as_callback from callbacks.handlers.metrics import MetricsCallbackHandler graph = _make_graph() handler = MetricsCallbackHandler() runner = MACPRunner(llm_caller=_simple_caller) with trace_as_callback(handlers=[handler]): result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_caller_raises_error_continues(self): """Covers lines 2089-2098: error handling in agent execution.""" call_count = [0] def failing_caller(prompt: str) -> str: call_count[0] += 1 if call_count[0] == 1: msg = "LLM error" raise ValueError(msg) return "fallback" graph = _make_graph(2) runner = MACPRunner(llm_caller=failing_caller) result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_with_multi_callers(self): """Covers per-agent caller lookup.""" graph = _make_graph(2) runner = MACPRunner( llm_callers={"a0": lambda _p: "agent0 response", "a1": lambda _p: "agent1 response"} ) result = runner.run_round(graph) assert "a0" in result.messages assert "a1" in result.messages def test_empty_graph_returns_empty_result(self): """Covers _prepare_base_context returning None (no agents).""" import rustworkx as rx from core.graph import RoleGraph g = rx.PyDiGraph() g.add_node({"id": "__task__"}) empty_graph = RoleGraph( node_ids=["__task__"], task_node="__task__", graph=g, A_com=torch.zeros((1, 1)), agents=[], ) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(empty_graph) assert result.messages == {} def test_update_states_false(self): """Covers update_states=False path.""" graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph, update_states=False) assert result.agent_states is None def test_update_states_true(self): """Covers update_states=True path.""" graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph, update_states=True) # agent_states may be None or a dict depending on implementation assert result.agent_states is None or isinstance(result.agent_states, dict) def test_memory_enabled(self): """Covers lines 1081-1090: memory initialization.""" graph = _make_graph(2) config = RunnerConfig(enable_memory=True) runner = MACPRunner(llm_caller=_simple_caller, config=config) result = runner.run_round(graph) assert isinstance(result, MACPResult) assert runner.memory_pool is not None def test_early_stop_condition(self): """Covers lines 1150-1166: early stop conditions.""" stop = EarlyStopCondition.on_keyword("Mock") config = RunnerConfig(early_stop_conditions=[stop]) graph = _make_graph(3) runner = MACPRunner(llm_caller=_simple_caller, config=config) result = runner.run_round(graph) assert result.early_stopped is True or isinstance(result, MACPResult) def test_dynamic_topology_hook(self): """Covers lines 2117-2136: topology hooks in _run_simple.""" def hook(ctx: StepContext, role_graph) -> TopologyAction: return TopologyAction(skip_agents=["a1"] if ctx.agent_id == "a0" else []) config = RunnerConfig(enable_dynamic_topology=True, topology_hooks=[hook]) graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller, config=config) result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_broadcast_task_to_all_false(self): """Covers broadcast_task_to_all=False path.""" config = RunnerConfig(broadcast_task_to_all=False) graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller, config=config) result = runner.run_round(graph) assert isinstance(result, MACPResult) # ============================================================================ # arun_round (async) # ============================================================================ @pytest.mark.asyncio class TestARunRound: async def test_basic_async_run(self): graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) async def test_messages_populated(self): graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller) result = await runner.arun_round(graph) assert len(result.messages) >= 1 async def test_no_async_caller_raises(self): graph = _make_graph() runner = MACPRunner() with pytest.raises(ValueError, match="async_llm_caller"): await runner.arun_round(graph) async def test_multi_agent_chain(self): graph = _make_graph(3) runner = MACPRunner(async_llm_caller=_async_caller) result = await runner.arun_round(graph) assert len(result.execution_order) == 3 async def test_with_callbacks(self): from callbacks.handlers.metrics import MetricsCallbackHandler graph = _make_graph() handler = MetricsCallbackHandler() runner = MACPRunner(async_llm_caller=_async_caller) result = await runner.arun_round(graph, callbacks=[handler]) assert isinstance(result, MACPResult) async def test_memory_enabled_async(self): """Covers async memory init.""" graph = _make_graph(2) config = RunnerConfig(enable_memory=True) runner = MACPRunner(async_llm_caller=_async_caller, config=config) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) async def test_async_caller_raises_error_continues(self): """Covers error handling in _arun_simple.""" call_count = [0] async def failing_async(prompt: str) -> str: call_count[0] += 1 if call_count[0] == 1: msg = "Async LLM error" raise ValueError(msg) return "async fallback" graph = _make_graph(2) runner = MACPRunner(async_llm_caller=failing_async) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) # ============================================================================ # stream (sync generator) # ============================================================================ class TestStream: def test_basic_stream_yields_events(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(graph)) assert len(events) > 0 def test_stream_has_run_start_event(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.RUN_START in types def test_stream_has_run_end_event(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.RUN_END in types def test_stream_has_agent_output_events(self): graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.AGENT_OUTPUT in types def test_no_caller_raises(self): graph = _make_graph() runner = MACPRunner() with pytest.raises(ValueError, match="caller"): list(runner.stream(graph)) def test_stream_with_token_streaming(self): """Covers token streaming path (lines 3959-3987).""" def token_gen(prompt: str) -> Iterator[str]: yield "token1" yield " " yield "token2" graph = _make_graph(1) config = RunnerConfig(enable_token_streaming=True) runner = MACPRunner( llm_caller=_simple_caller, streaming_llm_caller=token_gen, config=config, ) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.TOKEN in types def test_stream_error_in_caller(self): """Covers error event in streaming.""" def error_caller(prompt: str) -> str: msg = "stream error" raise RuntimeError(msg) graph = _make_graph(1) runner = MACPRunner(llm_caller=error_caller) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.AGENT_ERROR in types def test_stream_multi_agent(self): graph = _make_graph(3) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(graph)) output_events = [e for e in events if e.event_type == StreamEventType.AGENT_OUTPUT] assert len(output_events) == 3 # ============================================================================ # astream (async generator) # ============================================================================ @pytest.mark.asyncio class TestAStream: async def test_basic_astream_yields_events(self): graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller) events = [event async for event in runner.astream(graph)] assert len(events) > 0 async def test_astream_has_run_start(self): graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller) events = [event async for event in runner.astream(graph)] types = [e.event_type for e in events] assert StreamEventType.RUN_START in types async def test_astream_has_run_end(self): graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller) events = [event async for event in runner.astream(graph)] types = [e.event_type for e in events] assert StreamEventType.RUN_END in types async def test_no_async_caller_raises(self): graph = _make_graph() runner = MACPRunner() with pytest.raises(ValueError, match="caller"): async for _ in runner.astream(graph): pass async def test_astream_with_async_token_streaming(self): """Covers async token streaming path (lines 4101-4127).""" async def async_token_gen(prompt: str) -> AsyncIterator[str]: for token in ["async", " ", "tokens"]: yield token graph = _make_graph(1) config = RunnerConfig(enable_token_streaming=True) runner = MACPRunner( async_llm_caller=_async_caller, async_streaming_llm_caller=async_token_gen, config=config, ) events = [event async for event in runner.astream(graph)] types = [e.event_type for e in events] assert StreamEventType.TOKEN in types async def test_astream_multi_agent(self): graph = _make_graph(3) runner = MACPRunner(async_llm_caller=_async_caller) events = [event async for event in runner.astream(graph)] output_events = [e for e in events if e.event_type == StreamEventType.AGENT_OUTPUT] assert len(output_events) == 3 async def test_astream_error_in_caller(self): """Covers error handling in _astream_simple.""" async def error_async(prompt: str) -> str: msg = "async stream error" raise RuntimeError(msg) graph = _make_graph(1) runner = MACPRunner(async_llm_caller=error_async) events = [event async for event in runner.astream(graph)] types = [e.event_type for e in events] assert StreamEventType.AGENT_ERROR in types # ============================================================================ # OpenAI caller creation (mocked openai) # ============================================================================ class TestOpenAICallerCreation: def test_create_openai_caller_from_config(self): """Covers lines 320-344.""" from execution.runner import _create_openai_caller_from_config mock_openai_module = MagicMock() mock_client = MagicMock() mock_openai_module.OpenAI.return_value = mock_client mock_response = MagicMock() mock_response.choices[0].message.content = "test response" mock_client.chat.completions.create.return_value = mock_response from core.agent import AgentLLMConfig config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) with patch.dict(sys.modules, {"openai": mock_openai_module}): caller = _create_openai_caller_from_config(config) assert callable(caller) result = caller("test prompt") assert result == "test response" def test_create_openai_caller_no_openai_raises(self): """Covers ImportError path in _create_openai_caller_from_config.""" from core.agent import AgentLLMConfig from execution.runner import _create_openai_caller_from_config config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) with patch.dict(sys.modules, {"openai": None}), pytest.raises(ImportError, match="openai"): _create_openai_caller_from_config(config) def test_create_async_openai_caller_from_config(self): """Covers lines 349-373.""" from execution.runner import _create_async_openai_caller_from_config mock_openai_module = MagicMock() mock_client = MagicMock() mock_openai_module.AsyncOpenAI.return_value = mock_client from core.agent import AgentLLMConfig config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) with patch.dict(sys.modules, {"openai": mock_openai_module}): caller = _create_async_openai_caller_from_config(config) assert callable(caller) def test_create_async_openai_caller_no_openai_raises(self): """Covers ImportError path in _create_async_openai_caller_from_config.""" from core.agent import AgentLLMConfig from execution.runner import _create_async_openai_caller_from_config config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) with patch.dict(sys.modules, {"openai": None}), pytest.raises(ImportError, match="openai"): _create_async_openai_caller_from_config(config) def test_create_openai_caller_function(self): """Covers lines 391-398.""" from execution.runner import create_openai_caller mock_openai_module = MagicMock() mock_client = MagicMock() mock_openai_module.OpenAI.return_value = mock_client mock_response = MagicMock() mock_response.choices[0].message.content = "hi" mock_client.chat.completions.create.return_value = mock_response with patch.dict(sys.modules, {"openai": mock_openai_module}): caller = create_openai_caller(api_key="test-key", model="gpt-4") assert callable(caller) # ============================================================================ # LLMCallerFactory - default_async_caller fallback # ============================================================================ class TestLLMCallerFactoryFallback: def test_get_async_caller_returns_default_when_no_builder(self): """Covers line 261: return self.default_async_caller.""" async def my_async_caller(p): return "hi" factory = LLMCallerFactory(default_async_caller=my_async_caller) # No async_caller_builder, so should return default from core.agent import AgentLLMConfig config = AgentLLMConfig(model_name="gpt-4", base_url="http://api.example.com") result = factory.get_async_caller(config) assert result is my_async_caller def test_create_openai_factory_builds_callers(self): """Covers lines 306, 309: builder closures in create_openai_factory.""" from core.agent import AgentLLMConfig mock_openai_module = MagicMock() mock_client = MagicMock() mock_openai_module.OpenAI.return_value = mock_client mock_openai_module.AsyncOpenAI.return_value = mock_client mock_response = MagicMock() mock_response.choices[0].message.content = "test" mock_client.chat.completions.create.return_value = mock_response with patch.dict(sys.modules, {"openai": mock_openai_module}): factory = LLMCallerFactory.create_openai_factory(default_api_key="test-key") config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) # This calls the builder closures (lines 306, 309) assert factory.caller_builder is not None assert factory.async_caller_builder is not None sync_caller = factory.caller_builder(config) async_caller = factory.async_caller_builder(config) assert callable(sync_caller) assert callable(async_caller) # ============================================================================ # MACPRunner - get_caller_for_agent with factory # ============================================================================ class TestGetCallerForAgent: def test_get_caller_uses_factory_with_llm_config(self): """Covers lines 1777-1785.""" from core.agent import AgentLLMConfig def built_caller(p): return "factory response" factory = LLMCallerFactory(caller_builder=lambda _cfg: built_caller) # Agent with get_llm_config method AgentProfile(agent_id="a0", display_name="A0") llm_cfg = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) class AgentWithLLMConfig(AgentProfile): def get_llm_config(self): return llm_cfg agent_w_cfg = AgentWithLLMConfig(agent_id="a0", display_name="A0") runner = MACPRunner(llm_factory=factory) caller = runner._get_caller_for_agent("a0", agent_w_cfg) assert caller is built_caller def test_get_async_caller_uses_factory_with_llm_config(self): """Covers lines 1808-1817.""" from core.agent import AgentLLMConfig async def built_async(p): return "async" factory = LLMCallerFactory(async_caller_builder=lambda _cfg: built_async) llm_cfg = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) class AgentWithLLMConfig(AgentProfile): def get_llm_config(self): return llm_cfg agent = AgentWithLLMConfig(agent_id="a0", display_name="A0") runner = MACPRunner(llm_factory=factory) caller = runner._get_async_caller_for_agent("a0", agent) assert caller is built_async def test_get_caller_uses_factory_via_llm_config_attr(self): """Covers the elif branch: factory with agent.llm_config attribute.""" from core.agent import AgentLLMConfig def built_caller(p): return "attr factory" factory = LLMCallerFactory(caller_builder=lambda _cfg: built_caller) llm_cfg = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) class AgentWithAttr(AgentProfile): llm_config: AgentLLMConfig | None = None agent = AgentWithAttr(agent_id="a0", display_name="A0", llm_config=llm_cfg) runner = MACPRunner(llm_factory=factory) caller = runner._get_caller_for_agent("a0", agent) assert caller is built_caller # ============================================================================ # MACPRunner - has_any_caller / has_any_async_caller # ============================================================================ class TestHasCallers: def test_has_any_caller_with_default(self): runner = MACPRunner(llm_caller=_simple_caller) assert runner._has_any_caller() is True def test_has_any_caller_with_callers_dict(self): runner = MACPRunner(llm_callers={"a0": _simple_caller}) assert runner._has_any_caller() is True def test_has_any_caller_with_factory(self): factory = LLMCallerFactory(default_caller=_simple_caller) runner = MACPRunner(llm_factory=factory) assert runner._has_any_caller() is True def test_has_any_caller_none(self): runner = MACPRunner() assert runner._has_any_caller() is False def test_has_any_async_caller_with_default(self): runner = MACPRunner(async_llm_caller=_async_caller) assert runner._has_any_async_caller() is True def test_has_any_async_caller_none(self): runner = MACPRunner() assert runner._has_any_async_caller() is False # ============================================================================ # MACPRunner - tools not available (TOOLS_AVAILABLE = False) # ============================================================================ class TestToolsNotAvailable: def test_run_without_tools_module(self): """Covers lines 87-88: TOOLS_AVAILABLE = False branch.""" import execution.runner as runner_module original = runner_module.TOOLS_AVAILABLE runner_module.TOOLS_AVAILABLE = False try: graph = _make_graph(1) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert isinstance(result, MACPResult) finally: runner_module.TOOLS_AVAILABLE = original # ============================================================================ # MACPRunner - filter_unreachable # ============================================================================ class TestFilterUnreachable: def test_filter_unreachable_with_start_agent(self): """Covers lines 1972-1978: filter_unreachable path.""" graph = _make_graph(3) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round( graph, start_agent_id="a0", final_agent_id="a2", filter_unreachable=True, ) assert isinstance(result, MACPResult) def test_no_filter_unreachable(self): """Covers path when filter_unreachable=False.""" graph = _make_graph(3) runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph, filter_unreachable=False) assert len(result.execution_order) == 3 # ============================================================================ # MACPRunner - adaptive mode # ============================================================================ class TestAdaptiveMode: def test_adaptive_run_round(self): """Covers adaptive=True path in run_round → _run_adaptive.""" config = RunnerConfig(adaptive=True) graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller, config=config) result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_adaptive_stream(self): """Covers adaptive stream path.""" config = RunnerConfig(adaptive=True) graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller, config=config) events = list(runner.stream(graph)) assert len(events) > 0 @pytest.mark.asyncio async def test_adaptive_arun_round(self): """Covers async adaptive path.""" config = RunnerConfig(adaptive=True) graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller, config=config) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) # ============================================================================ # MACPRunner properties # ============================================================================ class TestRunnerProperties: def test_memory_pool_initially_none(self): runner = MACPRunner(llm_caller=_simple_caller) assert runner.memory_pool is None def test_memory_pool_after_run_with_memory(self): config = RunnerConfig(enable_memory=True) graph = _make_graph(2) runner = MACPRunner(llm_caller=_simple_caller, config=config) runner.run_round(graph) assert runner.memory_pool is not None # ============================================================================ # Edge cases: caller is None for some agents # ============================================================================ class TestCallerIsNoneForAgent: def test_caller_none_for_agent_continues(self): """Covers lines 2049-2059: caller is None for some agents in _run_simple.""" # Only "a0" has a caller; "a1" falls back to self.llm_caller = None graph = _make_graph(2) runner = MACPRunner(llm_callers={"a0": _simple_caller}) result = runner.run_round(graph) # a0 should have a response; a1 should have an error message assert "a0" in result.messages assert "[Error:" in result.messages.get("a1", "[Error: no caller]") def test_stream_caller_none_for_agent(self): """Covers lines 3946-3956: stream simple, caller is None.""" graph = _make_graph(2) runner = MACPRunner(llm_callers={"a0": _simple_caller}) events = list(runner.stream(graph)) types = [e.event_type for e in events] assert StreamEventType.AGENT_ERROR in types @pytest.mark.asyncio async def test_arun_caller_none_for_agent(self): """Covers lines 2283-2293: async caller is None for some agents.""" graph = _make_graph(2) runner = MACPRunner(async_llm_callers={"a0": _async_caller}) result = await runner.arun_round(graph) assert "[Error:" in result.messages.get("a1", "[Error: no caller]") @pytest.mark.asyncio async def test_astream_no_async_caller_for_agent(self): """Covers lines 4130-4139: astream simple no async caller for agent.""" graph = _make_graph(2) runner = MACPRunner(async_llm_callers={"a0": _async_caller}) events = [event async for event in runner.astream(graph)] types = [e.event_type for e in events] assert StreamEventType.AGENT_ERROR in types # ============================================================================ # Disabled nodes # ============================================================================ class TestDisabledNodes: def test_disabled_node_skipped(self): """Covers lines 2016-2018: disabled nodes are skipped.""" graph = _make_graph(2) graph.disabled_nodes = {"a1"} runner = MACPRunner(llm_caller=_simple_caller) result = runner.run_round(graph) assert "a1" not in result.execution_order or result.messages.get("a1") is None @pytest.mark.asyncio async def test_async_disabled_node_skipped(self): """Covers async disabled nodes.""" graph = _make_graph(2) graph.disabled_nodes = {"a1"} runner = MACPRunner(async_llm_caller=_async_caller) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) # ============================================================================ # Run error raised # ============================================================================ class TestRunError: def test_run_error_propagated(self): """Covers lines 2138-2139, 2155: run error is re-raised after finalization.""" call_count = [0] def always_fail(prompt: str) -> str: call_count[0] += 1 msg = "Fatal LLM error" raise RuntimeError(msg) graph = _make_graph(1) runner = MACPRunner(llm_caller=always_fail) # The run should either propagate the error or swallow it try: result = runner.run_round(graph) # If not raised, check the error is recorded assert "[Error:" in result.messages.get("a0", "") except RuntimeError: pass # Error propagated, which is also acceptable @pytest.mark.asyncio async def test_async_run_error_propagated(self): """Covers async run error paths.""" async def always_fail_async(prompt: str) -> str: msg = "Async fatal error" raise RuntimeError(msg) graph = _make_graph(1) runner = MACPRunner(async_llm_caller=always_fail_async) try: result = await runner.arun_round(graph) assert "[Error:" in result.messages.get("a0", "") except RuntimeError: pass # ============================================================================ # Dynamic topology in async # ============================================================================ class TestAsyncDynamicTopology: @pytest.mark.asyncio async def test_async_topology_hook(self): """Covers _apply_async_topology_hooks (lines 2348-2371).""" async def async_hook(ctx: StepContext, role_graph) -> TopologyAction: return TopologyAction(skip_agents=["a1"] if ctx.agent_id == "a0" else []) config = RunnerConfig( enable_dynamic_topology=True, async_topology_hooks=[async_hook], ) graph = _make_graph(2) runner = MACPRunner(async_llm_caller=_async_caller, config=config) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) @pytest.mark.asyncio async def test_async_early_stop(self): """Covers async early stop lines (2344-2346).""" stop = EarlyStopCondition.on_keyword("Async") config = RunnerConfig(early_stop_conditions=[stop]) graph = _make_graph(3) runner = MACPRunner(async_llm_caller=_async_caller, config=config) result = await runner.arun_round(graph) assert isinstance(result, MACPResult) # ============================================================================ # Adaptive mode with caller errors # ============================================================================ class TestAdaptiveErrors: def test_adaptive_run_with_error(self): """Covers lines 3649-3671: error handling in _run_adaptive.""" call_count = [0] def erroring_caller(prompt: str) -> str: call_count[0] += 1 if call_count[0] == 1: msg = "Adaptive error" raise ValueError(msg) return "Recovery response" config = RunnerConfig(adaptive=True) graph = _make_graph(2) runner = MACPRunner(llm_caller=erroring_caller, config=config) result = runner.run_round(graph) assert isinstance(result, MACPResult) def test_adaptive_stream_with_caller_none(self): """Covers lines 3600-3624: adaptive stream caller is None.""" config = RunnerConfig(adaptive=True) graph = _make_graph(2) runner = MACPRunner(llm_callers={"a0": _simple_caller}, config=config) events = list(runner.stream(graph)) assert len(events) > 0 # ============================================================================ # stream/astream - empty graph path # ============================================================================ class TestEmptyGraphStreaming: def test_stream_empty_graph_yields_run_end(self): """Covers lines 3884-3887: stream simple with empty base.""" import rustworkx as rx from core.graph import RoleGraph g = rx.PyDiGraph() g.add_node({"id": "__task__"}) empty_graph = RoleGraph( node_ids=["__task__"], task_node="__task__", graph=g, A_com=torch.zeros((1, 1)), agents=[], ) runner = MACPRunner(llm_caller=_simple_caller) events = list(runner.stream(empty_graph)) types = [e.event_type for e in events] assert StreamEventType.RUN_END in types @pytest.mark.asyncio async def test_astream_empty_graph_yields_run_end(self): """Covers lines 4055-4058: astream simple with empty base.""" import rustworkx as rx from core.graph import RoleGraph g = rx.PyDiGraph() g.add_node({"id": "__task__"}) empty_graph = RoleGraph( node_ids=["__task__"], task_node="__task__", graph=g, A_com=torch.zeros((1, 1)), agents=[], ) runner = MACPRunner(async_llm_caller=_async_caller) events = [event async for event in runner.astream(empty_graph)] types = [e.event_type for e in events] assert StreamEventType.RUN_END in types # ============================================================================ # Async OpenAI caller - inner function # ============================================================================ @pytest.mark.asyncio async def test_async_openai_caller_inner_function(): """Covers lines 366-371: the inner async caller function.""" from core.agent import AgentLLMConfig from execution.runner import _create_async_openai_caller_from_config mock_openai_module = MagicMock() mock_client = MagicMock() mock_openai_module.AsyncOpenAI.return_value = mock_client mock_response = MagicMock() mock_response.choices[0].message.content = "async response" # The caller function uses `await client.chat.completions.create(...)`. # We make the mock coroutine-compatible. async def mock_create(*args, **kwargs): return mock_response mock_client.chat.completions.create = mock_create config = AgentLLMConfig( model_name="gpt-4", base_url="http://api.example.com", api_key="test-key", ) with patch.dict(sys.modules, {"openai": mock_openai_module}): caller = _create_async_openai_caller_from_config(config) result = await caller("test prompt") assert result == "async response" if __name__ == "__main__": pytest.main([__file__, "-v"])