|
|
""" |
|
|
Test the Accumulator Pattern for Microsoft Agent Framework event handling. |
|
|
|
|
|
This tests SPEC-17 (updated for SPEC-18): We use AgentRunUpdateEvent.data.text as the |
|
|
sole source of streaming content, and ExecutorCompletedEvent as a completion signal. |
|
|
|
|
|
Event mapping (SPEC-18 migration): |
|
|
- MagenticAgentDeltaEvent β AgentRunUpdateEvent |
|
|
- MagenticAgentMessageEvent β ExecutorCompletedEvent |
|
|
- MagenticFinalResultEvent β WorkflowOutputEvent |
|
|
""" |
|
|
|
|
|
import importlib |
|
|
import sys |
|
|
import types |
|
|
from unittest.mock import MagicMock, patch |
|
|
|
|
|
import pytest |
|
|
|
|
|
|
|
|
|
|
|
class MockAgentRunUpdateEvent: |
|
|
"""Simulates AgentRunUpdateEvent with streaming data.""" |
|
|
|
|
|
def __init__(self, text: str, author_name: str = "TestAgent"): |
|
|
self.data = MagicMock() |
|
|
self.data.text = text |
|
|
self.data.author_name = author_name |
|
|
|
|
|
|
|
|
class MockExecutorCompletedEvent: |
|
|
"""Simulates ExecutorCompletedEvent signaling agent turn completion.""" |
|
|
|
|
|
def __init__(self, executor_id: str = "TestAgent"): |
|
|
self.executor_id = executor_id |
|
|
|
|
|
|
|
|
class MockWorkflowOutputEvent: |
|
|
"""Simulates WorkflowOutputEvent.""" |
|
|
|
|
|
def __init__(self, data=None): |
|
|
self.data = data |
|
|
|
|
|
|
|
|
class MockOrchestratorMessageEvent: |
|
|
"""Simulates orchestrator message event (formerly MagenticOrchestratorMessageEvent).""" |
|
|
|
|
|
def __init__(self, kind: str = "user_task", message: str = "test"): |
|
|
from agent_framework import MAGENTIC_EVENT_TYPE_ORCHESTRATOR |
|
|
|
|
|
self.type = MAGENTIC_EVENT_TYPE_ORCHESTRATOR |
|
|
self.kind = kind |
|
|
self.message = MagicMock() |
|
|
self.message.text = message |
|
|
|
|
|
|
|
|
|
|
|
def mock_use_function_invocation(func=None): |
|
|
return func if func else lambda f: f |
|
|
|
|
|
|
|
|
def mock_use_observability(func=None): |
|
|
return func if func else lambda f: f |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def mock_agent_framework(): |
|
|
""" |
|
|
Mock the agent_framework module structure in sys.modules. |
|
|
""" |
|
|
|
|
|
mock_af = types.ModuleType("agent_framework") |
|
|
mock_af_openai = types.ModuleType("agent_framework.openai") |
|
|
mock_af_middleware = types.ModuleType("agent_framework._middleware") |
|
|
mock_af_tools = types.ModuleType("agent_framework._tools") |
|
|
mock_af_types = types.ModuleType("agent_framework._types") |
|
|
mock_af_observability = types.ModuleType("agent_framework.observability") |
|
|
|
|
|
|
|
|
mock_af.openai = mock_af_openai |
|
|
mock_af._middleware = mock_af_middleware |
|
|
mock_af._tools = mock_af_tools |
|
|
mock_af._types = mock_af_types |
|
|
mock_af.observability = mock_af_observability |
|
|
|
|
|
|
|
|
mock_af.AgentRunUpdateEvent = MockAgentRunUpdateEvent |
|
|
mock_af.ExecutorCompletedEvent = MockExecutorCompletedEvent |
|
|
mock_af.WorkflowOutputEvent = MockWorkflowOutputEvent |
|
|
mock_af.MagenticOrchestratorMessageEvent = MockOrchestratorMessageEvent |
|
|
mock_af.AgentRunResponse = MagicMock |
|
|
mock_af.MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message" |
|
|
|
|
|
mock_af.ORCH_MSG_KIND_INSTRUCTION = "instruction" |
|
|
mock_af.ORCH_MSG_KIND_TASK_LEDGER = "task_ledger" |
|
|
|
|
|
|
|
|
mock_af.MagenticBuilder = MagicMock |
|
|
mock_af.ChatAgent = MagicMock |
|
|
mock_af.ai_function = MagicMock |
|
|
mock_af.BaseChatClient = MagicMock |
|
|
mock_af.ToolProtocol = MagicMock |
|
|
mock_af.ChatMessage = MagicMock |
|
|
mock_af.ChatResponse = MagicMock |
|
|
mock_af.ChatResponseUpdate = MagicMock |
|
|
mock_af.ChatOptions = MagicMock |
|
|
mock_af.FinishReason = MagicMock |
|
|
mock_af.Role = MagicMock |
|
|
|
|
|
|
|
|
mock_af_openai.OpenAIChatClient = MagicMock |
|
|
mock_af_middleware.use_chat_middleware = MagicMock |
|
|
mock_af_tools.use_function_invocation = mock_use_function_invocation |
|
|
mock_af_types.FunctionCallContent = MagicMock |
|
|
mock_af_types.FunctionResultContent = MagicMock |
|
|
mock_af_observability.use_observability = mock_use_observability |
|
|
|
|
|
|
|
|
with patch.dict( |
|
|
sys.modules, |
|
|
{ |
|
|
"agent_framework": mock_af, |
|
|
"agent_framework.openai": mock_af_openai, |
|
|
"agent_framework._middleware": mock_af_middleware, |
|
|
"agent_framework._tools": mock_af_tools, |
|
|
"agent_framework._types": mock_af_types, |
|
|
"agent_framework.observability": mock_af_observability, |
|
|
}, |
|
|
): |
|
|
yield mock_af |
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True) |
|
|
def cleanup_orchestrator_module(): |
|
|
""" |
|
|
Ensure src.orchestrators.advanced is restored to a clean state after tests. |
|
|
This prevents 'Mock' classes from leaking into other tests via module globals. |
|
|
""" |
|
|
yield |
|
|
|
|
|
|
|
|
import src.orchestrators.advanced |
|
|
|
|
|
importlib.reload(src.orchestrators.advanced) |
|
|
|
|
|
|
|
|
@pytest.fixture |
|
|
def mock_orchestrator(mock_agent_framework): |
|
|
""" |
|
|
Create an AdvancedOrchestrator with all dependencies mocked. |
|
|
Relies on reloading the module to pick up the mocked agent_framework. |
|
|
""" |
|
|
|
|
|
import src.orchestrators.advanced |
|
|
|
|
|
|
|
|
importlib.reload(src.orchestrators.advanced) |
|
|
|
|
|
from src.orchestrators.advanced import AdvancedOrchestrator |
|
|
|
|
|
with ( |
|
|
patch("src.orchestrators.advanced.get_chat_client"), |
|
|
patch("src.orchestrators.advanced.get_embedding_service_if_available", return_value=None), |
|
|
patch("src.orchestrators.advanced.init_magentic_state"), |
|
|
patch("src.agents.state.ResearchMemory"), |
|
|
patch("src.utils.service_loader.get_embedding_service", return_value=MagicMock()), |
|
|
): |
|
|
orch = AdvancedOrchestrator(max_rounds=5) |
|
|
yield orch |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator): |
|
|
""" |
|
|
Scenario A: Standard Text Message (P2 Fix) |
|
|
Input: Updates ("Hello", " World") -> Completed |
|
|
Expected: Streaming events for text, NO completion events (P2 fix silences them) |
|
|
""" |
|
|
|
|
|
events = [ |
|
|
MockAgentRunUpdateEvent("Hello", author_name="searcher"), |
|
|
MockAgentRunUpdateEvent(" World", author_name="searcher"), |
|
|
MockExecutorCompletedEvent(executor_id="searcher"), |
|
|
] |
|
|
|
|
|
async def mock_stream(*args, **kwargs): |
|
|
for event in events: |
|
|
yield event |
|
|
|
|
|
mock_workflow = MagicMock() |
|
|
mock_workflow.run_stream = mock_stream |
|
|
|
|
|
with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
|
|
generated_events = [] |
|
|
async for event in mock_orchestrator.run("test query"): |
|
|
generated_events.append(event) |
|
|
|
|
|
|
|
|
|
|
|
streaming_events = [e for e in generated_events if e.type == "streaming"] |
|
|
assert len(streaming_events) >= 1, ( |
|
|
f"Expected streaming events, got: {[e.type for e in generated_events]}" |
|
|
) |
|
|
|
|
|
|
|
|
completion_events = [ |
|
|
e |
|
|
for e in generated_events |
|
|
if "SearchAgent" in str(e.message) |
|
|
and e.type not in ("streaming", "started", "progress", "thinking") |
|
|
] |
|
|
assert len(completion_events) == 0, ( |
|
|
f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in completion_events]}" |
|
|
) |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator): |
|
|
""" |
|
|
Scenario B: Tool Call (No Text Deltas) - P2 Fix |
|
|
Input: No Deltas -> Completed |
|
|
Expected: NO completion events (P2 fix silences ExecutorCompletedEvent) |
|
|
""" |
|
|
|
|
|
events = [ |
|
|
MockExecutorCompletedEvent(executor_id="searcher"), |
|
|
] |
|
|
|
|
|
async def mock_stream(*args, **kwargs): |
|
|
for event in events: |
|
|
yield event |
|
|
|
|
|
mock_workflow = MagicMock() |
|
|
mock_workflow.run_stream = mock_stream |
|
|
|
|
|
with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
|
|
generated_events = [] |
|
|
async for event in mock_orchestrator.run("test query"): |
|
|
generated_events.append(event) |
|
|
|
|
|
|
|
|
search_events = [ |
|
|
e |
|
|
for e in generated_events |
|
|
if "SearchAgent" in str(e.message) |
|
|
and e.type not in ("streaming", "started", "progress", "thinking") |
|
|
] |
|
|
|
|
|
|
|
|
assert len(search_events) == 0, ( |
|
|
f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in search_events]}" |
|
|
) |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_accumulator_pattern_buffer_clearing(mock_orchestrator): |
|
|
""" |
|
|
Verify buffer clears between agents (P2 Fix). |
|
|
P2 Fix: ExecutorCompletedEvent is silenced, so we verify via streaming events. |
|
|
Agent B's streaming should NOT contain Agent A's text. |
|
|
""" |
|
|
|
|
|
events = [ |
|
|
MockAgentRunUpdateEvent("Searcher says hi", author_name="searcher"), |
|
|
MockExecutorCompletedEvent(executor_id="searcher"), |
|
|
MockAgentRunUpdateEvent("Judge responds", author_name="judge"), |
|
|
MockExecutorCompletedEvent(executor_id="judge"), |
|
|
] |
|
|
|
|
|
async def mock_stream(*args, **kwargs): |
|
|
for event in events: |
|
|
yield event |
|
|
|
|
|
mock_workflow = MagicMock() |
|
|
mock_workflow.run_stream = mock_stream |
|
|
|
|
|
with patch.object(mock_orchestrator, "_build_workflow", return_value=mock_workflow): |
|
|
generated_events = [] |
|
|
async for event in mock_orchestrator.run("test query"): |
|
|
generated_events.append(event) |
|
|
|
|
|
|
|
|
|
|
|
streaming_events = [e for e in generated_events if e.type == "streaming"] |
|
|
|
|
|
|
|
|
assert len(streaming_events) >= 2, ( |
|
|
f"Expected streaming events, got: {[e.type for e in generated_events]}" |
|
|
) |
|
|
|
|
|
|
|
|
searcher_streams = [e for e in streaming_events if "Searcher" in e.message] |
|
|
judge_streams = [e for e in streaming_events if "Judge" in e.message] |
|
|
|
|
|
assert len(searcher_streams) >= 1, "Missing searcher streaming events" |
|
|
assert len(judge_streams) >= 1, "Missing judge streaming events" |
|
|
|
|
|
|
|
|
for judge_event in judge_streams: |
|
|
assert "Searcher" not in judge_event.message, "Buffer not cleared between agents!" |
|
|
|