| """ |
| Test the Accumulator Pattern for Microsoft Agent Framework event handling. |
| |
| This tests SPEC-17 (updated for SPEC-18): We use AgentRunUpdateEvent.data.text as the |
| sole source of streaming content, and ExecutorCompletedEvent as a completion signal. |
| |
| Event mapping (SPEC-18 migration): |
| - MagenticAgentDeltaEvent β AgentRunUpdateEvent |
| - MagenticAgentMessageEvent β ExecutorCompletedEvent |
| - MagenticFinalResultEvent β WorkflowOutputEvent |
| """ |
|
|
| import importlib |
| import sys |
| import types |
| from unittest.mock import MagicMock, patch |
|
|
| import pytest |
|
|
|
|
| |
| class MockAgentRunUpdateEvent: |
| """Simulates AgentRunUpdateEvent with streaming data.""" |
|
|
| def __init__(self, text: str, author_name: str = "TestAgent"): |
| self.data = MagicMock() |
| self.data.text = text |
| self.data.author_name = author_name |
|
|
|
|
| class MockExecutorCompletedEvent: |
| """Simulates ExecutorCompletedEvent signaling agent turn completion.""" |
|
|
| def __init__(self, executor_id: str = "TestAgent"): |
| self.executor_id = executor_id |
|
|
|
|
| class MockWorkflowOutputEvent: |
| """Simulates WorkflowOutputEvent.""" |
|
|
| def __init__(self, data=None): |
| self.data = data |
|
|
|
|
| class MockOrchestratorMessageEvent: |
| """Simulates orchestrator message event (formerly MagenticOrchestratorMessageEvent).""" |
|
|
| def __init__(self, kind: str = "user_task", message: str = "test"): |
| from agent_framework import MAGENTIC_EVENT_TYPE_ORCHESTRATOR |
|
|
| self.type = MAGENTIC_EVENT_TYPE_ORCHESTRATOR |
| self.kind = kind |
| self.message = MagicMock() |
| self.message.text = message |
|
|
|
|
| |
| def mock_use_function_invocation(func=None): |
| return func if func else lambda f: f |
|
|
|
|
| def mock_use_observability(func=None): |
| return func if func else lambda f: f |
|
|
|
|
| @pytest.fixture |
| def mock_agent_framework(): |
| """ |
| Mock the agent_framework module structure in sys.modules. |
| """ |
| |
| mock_af = types.ModuleType("agent_framework") |
| mock_af_openai = types.ModuleType("agent_framework.openai") |
| mock_af_middleware = types.ModuleType("agent_framework._middleware") |
| mock_af_tools = types.ModuleType("agent_framework._tools") |
| mock_af_types = types.ModuleType("agent_framework._types") |
| mock_af_observability = types.ModuleType("agent_framework.observability") |
|
|
| |
| mock_af.openai = mock_af_openai |
| mock_af._middleware = mock_af_middleware |
| mock_af._tools = mock_af_tools |
| mock_af._types = mock_af_types |
| mock_af.observability = mock_af_observability |
|
|
| |
| mock_af.AgentRunUpdateEvent = MockAgentRunUpdateEvent |
| mock_af.ExecutorCompletedEvent = MockExecutorCompletedEvent |
| mock_af.WorkflowOutputEvent = MockWorkflowOutputEvent |
| mock_af.MagenticOrchestratorMessageEvent = MockOrchestratorMessageEvent |
| mock_af.AgentRunResponse = MagicMock |
| mock_af.MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message" |
| |
| mock_af.ORCH_MSG_KIND_INSTRUCTION = "instruction" |
| mock_af.ORCH_MSG_KIND_TASK_LEDGER = "task_ledger" |
|
|
| |
| mock_af.MagenticBuilder = MagicMock |
| mock_af.ChatAgent = MagicMock |
| mock_af.ai_function = MagicMock |
| mock_af.BaseChatClient = MagicMock |
| mock_af.ToolProtocol = MagicMock |
| mock_af.ChatMessage = MagicMock |
| mock_af.ChatResponse = MagicMock |
| mock_af.ChatResponseUpdate = MagicMock |
| mock_af.ChatOptions = MagicMock |
| mock_af.FinishReason = MagicMock |
| mock_af.Role = MagicMock |
|
|
| |
| mock_af_openai.OpenAIChatClient = MagicMock |
| mock_af_middleware.use_chat_middleware = MagicMock |
| mock_af_tools.use_function_invocation = mock_use_function_invocation |
| mock_af_types.FunctionCallContent = MagicMock |
| mock_af_types.FunctionResultContent = MagicMock |
| mock_af_observability.use_observability = mock_use_observability |
|
|
| |
| with patch.dict( |
| sys.modules, |
| { |
| "agent_framework": mock_af, |
| "agent_framework.openai": mock_af_openai, |
| "agent_framework._middleware": mock_af_middleware, |
| "agent_framework._tools": mock_af_tools, |
| "agent_framework._types": mock_af_types, |
| "agent_framework.observability": mock_af_observability, |
| }, |
| ): |
| yield mock_af |
|
|
|
|
| @pytest.fixture(scope="module", autouse=True) |
| def cleanup_orchestrator_module(): |
| """ |
| Ensure src.orchestrators.advanced is restored to a clean state after tests. |
| This prevents 'Mock' classes from leaking into other tests via module globals. |
| """ |
| yield |
| |
| |
| import src.orchestrators.advanced |
|
|
| importlib.reload(src.orchestrators.advanced) |
|
|
|
|
| @pytest.fixture |
| def mock_orchestrator(mock_agent_framework): |
| """ |
| Create an AdvancedOrchestrator with all dependencies mocked. |
| Relies on reloading the module to pick up the mocked agent_framework. |
| """ |
| |
| import src.orchestrators.advanced |
|
|
| |
| importlib.reload(src.orchestrators.advanced) |
|
|
| from src.orchestrators.advanced import AdvancedOrchestrator |
|
|
| with ( |
| patch("src.orchestrators.advanced.get_chat_client"), |
| patch("src.orchestrators.advanced.get_embedding_service_if_available", return_value=None), |
| patch("src.orchestrators.advanced.init_magentic_state"), |
| patch("src.agents.state.ResearchMemory"), |
| patch("src.utils.service_loader.get_embedding_service", return_value=MagicMock()), |
| ): |
| orch = AdvancedOrchestrator(max_rounds=5) |
| yield orch |
|
|
|
|
| @pytest.mark.unit |
| @pytest.mark.asyncio |
| async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator): |
| """ |
| Scenario A: Standard Text Message (P2 Fix) |
| Input: Updates ("Hello", " World") -> Completed |
| Expected: Streaming events for text, NO completion events (P2 fix silences them) |
| """ |
| |
| events = [ |
| MockAgentRunUpdateEvent("Hello", author_name="searcher"), |
| MockAgentRunUpdateEvent(" World", author_name="searcher"), |
| MockExecutorCompletedEvent(executor_id="searcher"), |
| ] |
|
|
| async def mock_stream(*args, **kwargs): |
| for event in events: |
| yield event |
|
|
| mock_workflow = MagicMock() |
| mock_workflow.run_stream = mock_stream |
|
|
| with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
| generated_events = [] |
| async for event in mock_orchestrator.run("test query"): |
| generated_events.append(event) |
|
|
| |
| |
| streaming_events = [e for e in generated_events if e.type == "streaming"] |
| assert len(streaming_events) >= 1, ( |
| f"Expected streaming events, got: {[e.type for e in generated_events]}" |
| ) |
|
|
| |
| completion_events = [ |
| e |
| for e in generated_events |
| if "SearchAgent" in str(e.message) |
| and e.type not in ("streaming", "started", "progress", "thinking") |
| ] |
| assert len(completion_events) == 0, ( |
| f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in completion_events]}" |
| ) |
|
|
|
|
| @pytest.mark.unit |
| @pytest.mark.asyncio |
| async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator): |
| """ |
| Scenario B: Tool Call (No Text Deltas) - P2 Fix |
| Input: No Deltas -> Completed |
| Expected: NO completion events (P2 fix silences ExecutorCompletedEvent) |
| """ |
| |
| events = [ |
| MockExecutorCompletedEvent(executor_id="searcher"), |
| ] |
|
|
| async def mock_stream(*args, **kwargs): |
| for event in events: |
| yield event |
|
|
| mock_workflow = MagicMock() |
| mock_workflow.run_stream = mock_stream |
|
|
| with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
| generated_events = [] |
| async for event in mock_orchestrator.run("test query"): |
| generated_events.append(event) |
|
|
| |
| search_events = [ |
| e |
| for e in generated_events |
| if "SearchAgent" in str(e.message) |
| and e.type not in ("streaming", "started", "progress", "thinking") |
| ] |
|
|
| |
| assert len(search_events) == 0, ( |
| f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in search_events]}" |
| ) |
|
|
|
|
| @pytest.mark.unit |
| @pytest.mark.asyncio |
| async def test_accumulator_pattern_buffer_clearing(mock_orchestrator): |
| """ |
| Verify buffer clears between agents (P2 Fix). |
| P2 Fix: ExecutorCompletedEvent is silenced, so we verify via streaming events. |
| Agent B's streaming should NOT contain Agent A's text. |
| """ |
| |
| events = [ |
| MockAgentRunUpdateEvent("Searcher says hi", author_name="searcher"), |
| MockExecutorCompletedEvent(executor_id="searcher"), |
| MockAgentRunUpdateEvent("Judge responds", author_name="judge"), |
| MockExecutorCompletedEvent(executor_id="judge"), |
| ] |
|
|
| async def mock_stream(*args, **kwargs): |
| for event in events: |
| yield event |
|
|
| mock_workflow = MagicMock() |
| mock_workflow.run_stream = mock_stream |
|
|
| with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
| generated_events = [] |
| async for event in mock_orchestrator.run("test query"): |
| generated_events.append(event) |
|
|
| |
| |
| streaming_events = [e for e in generated_events if e.type == "streaming"] |
|
|
| |
| assert len(streaming_events) >= 2, ( |
| f"Expected streaming events, got: {[e.type for e in generated_events]}" |
| ) |
|
|
| |
| searcher_streams = [e for e in streaming_events if "Searcher" in e.message] |
| judge_streams = [e for e in streaming_events if "Judge" in e.message] |
|
|
| assert len(searcher_streams) >= 1, "Missing searcher streaming events" |
| assert len(judge_streams) >= 1, "Missing judge streaming events" |
|
|
| |
| for judge_event in judge_streams: |
| assert "Searcher" not in judge_event.message, "Buffer not cleared between agents!" |
|
|