File size: 5,768 Bytes
fc78d2d c9c58c4 fc78d2d c9c58c4 fc78d2d c9c58c4 fc78d2d c9c58c4 fc78d2d c9c58c4 fc78d2d c9c58c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
from unittest.mock import MagicMock
import pytest
from agent_framework import (
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
ORCH_MSG_KIND_INSTRUCTION,
ORCH_MSG_KIND_TASK_LEDGER,
AgentRunUpdateEvent,
ExecutorCompletedEvent,
)
from src.orchestrators.advanced import REPORTER_AGENT_ID, AdvancedOrchestrator
async def _empty_async_generator(query):
"""Empty async generator for mocking _init_workflow_events."""
return
yield # Makes this an async generator that yields nothing
@pytest.mark.unit
@pytest.mark.asyncio
async def test_executor_completed_event_is_silenced():
"""Verify ExecutorCompletedEvent produces NO UI events."""
orchestrator = AdvancedOrchestrator()
# Mock the workflow build to return our custom event stream
mock_workflow = MagicMock()
# Create a stream of events: Start -> ExecutorCompleted -> End
async def event_stream(task):
# 1. Completion event (should be ignored)
yield ExecutorCompletedEvent(executor_id="ManagerAgent", data=None)
# 2. Reporter completion (should set flag but yield nothing)
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None)
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
# Mock init services to avoid side effects
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
events = []
async for event in orchestrator.run("query"):
events.append(event)
# Assertions
# We should have NO "progress" events with "task completed" message
for event in events:
if event.type == "progress":
assert "task completed" not in event.message
# We should have NO "judging" events from the manager completion
if event.type == "judging":
assert "ManagerAgent" not in event.message
@pytest.mark.unit
@pytest.mark.asyncio
async def test_internal_messages_are_filtered():
"""Verify internal task_ledger/instruction messages are filtered."""
orchestrator = AdvancedOrchestrator()
mock_workflow = MagicMock()
async def event_stream(task):
# 1. Task Ledger (Should be skipped)
ledger_update = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
ledger_update.data.text = '{"some": "json"}'
ledger_update.data.additional_properties = {
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
"orchestrator_message_kind": ORCH_MSG_KIND_TASK_LEDGER,
}
yield ledger_update
# 2. Instruction (Should be skipped)
instruction = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
instruction.data.text = "Internal instruction to agent"
instruction.data.additional_properties = {
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
"orchestrator_message_kind": ORCH_MSG_KIND_INSTRUCTION,
}
yield instruction
# 3. Normal agent message (SHOULD pass through)
# The streaming block filters task_ledger/instruction but passes agent content.
normal_msg = AgentRunUpdateEvent(executor_id="Searcher", data=MagicMock())
normal_msg.data.text = "I found something"
normal_msg.data.author_name = "Searcher"
normal_msg.data.additional_properties = {}
yield normal_msg
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
events = []
async for event in orchestrator.run("query"):
events.append(event)
# Assertions
# 1. Verify we got the normal message
streaming_messages = [e.message for e in events if e.type == "streaming"]
assert "I found something" in streaming_messages
# 2. Verify we did NOT get the internal messages
all_messages = [e.message for e in events]
# The JSON from task_ledger should be filtered
assert not any('{"some": "json"}' in msg for msg in all_messages)
# The instruction text should be filtered
assert not any("Internal instruction to agent" in msg for msg in all_messages)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_reporter_ran_tracking_still_works():
"""Verify internal state.reporter_ran is set correctly even though UI events are silenced."""
orchestrator = AdvancedOrchestrator()
mock_workflow = MagicMock()
async def event_stream(task):
# Reporter completion event - should set internal flag
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None)
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
# Run the workflow
events = []
async for event in orchestrator.run("query"):
events.append(event)
# The key assertion: No "synthesis" fallback should have been triggered
# If reporter_ran was NOT set, we'd see a fallback synthesis event
fallback_events = [
e for e in events if e.type == "synthesis" or "fallback" in e.message.lower()
]
assert len(fallback_events) == 0, (
f"Fallback synthesis triggered - reporter_ran tracking broken: {fallback_events}"
)
|