|
|
from unittest.mock import MagicMock |
|
|
|
|
|
import pytest |
|
|
from agent_framework import ( |
|
|
MAGENTIC_EVENT_TYPE_ORCHESTRATOR, |
|
|
ORCH_MSG_KIND_INSTRUCTION, |
|
|
ORCH_MSG_KIND_TASK_LEDGER, |
|
|
AgentRunUpdateEvent, |
|
|
ExecutorCompletedEvent, |
|
|
) |
|
|
|
|
|
from src.orchestrators.advanced import REPORTER_AGENT_ID, AdvancedOrchestrator |
|
|
|
|
|
|
|
|
async def _empty_async_generator(query): |
|
|
"""Empty async generator for mocking _init_workflow_events.""" |
|
|
return |
|
|
yield |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_executor_completed_event_is_silenced(): |
|
|
"""Verify ExecutorCompletedEvent produces NO UI events.""" |
|
|
orchestrator = AdvancedOrchestrator() |
|
|
|
|
|
|
|
|
mock_workflow = MagicMock() |
|
|
|
|
|
|
|
|
async def event_stream(task): |
|
|
|
|
|
yield ExecutorCompletedEvent(executor_id="ManagerAgent", data=None) |
|
|
|
|
|
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None) |
|
|
|
|
|
mock_workflow.run_stream = event_stream |
|
|
orchestrator._build_workflow = MagicMock(return_value=mock_workflow) |
|
|
|
|
|
|
|
|
orchestrator._init_workflow_events = _empty_async_generator |
|
|
orchestrator._init_embedding_service = MagicMock(return_value=None) |
|
|
orchestrator._create_task_prompt = MagicMock(return_value="task") |
|
|
|
|
|
events = [] |
|
|
async for event in orchestrator.run("query"): |
|
|
events.append(event) |
|
|
|
|
|
|
|
|
|
|
|
for event in events: |
|
|
if event.type == "progress": |
|
|
assert "task completed" not in event.message |
|
|
|
|
|
if event.type == "judging": |
|
|
assert "ManagerAgent" not in event.message |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_internal_messages_are_filtered(): |
|
|
"""Verify internal task_ledger/instruction messages are filtered.""" |
|
|
orchestrator = AdvancedOrchestrator() |
|
|
mock_workflow = MagicMock() |
|
|
|
|
|
async def event_stream(task): |
|
|
|
|
|
ledger_update = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock()) |
|
|
ledger_update.data.text = '{"some": "json"}' |
|
|
ledger_update.data.additional_properties = { |
|
|
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR, |
|
|
"orchestrator_message_kind": ORCH_MSG_KIND_TASK_LEDGER, |
|
|
} |
|
|
yield ledger_update |
|
|
|
|
|
|
|
|
instruction = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock()) |
|
|
instruction.data.text = "Internal instruction to agent" |
|
|
instruction.data.additional_properties = { |
|
|
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR, |
|
|
"orchestrator_message_kind": ORCH_MSG_KIND_INSTRUCTION, |
|
|
} |
|
|
yield instruction |
|
|
|
|
|
|
|
|
|
|
|
normal_msg = AgentRunUpdateEvent(executor_id="Searcher", data=MagicMock()) |
|
|
normal_msg.data.text = "I found something" |
|
|
normal_msg.data.author_name = "Searcher" |
|
|
normal_msg.data.additional_properties = {} |
|
|
yield normal_msg |
|
|
|
|
|
mock_workflow.run_stream = event_stream |
|
|
orchestrator._build_workflow = MagicMock(return_value=mock_workflow) |
|
|
|
|
|
orchestrator._init_workflow_events = _empty_async_generator |
|
|
orchestrator._init_embedding_service = MagicMock(return_value=None) |
|
|
orchestrator._create_task_prompt = MagicMock(return_value="task") |
|
|
|
|
|
events = [] |
|
|
async for event in orchestrator.run("query"): |
|
|
events.append(event) |
|
|
|
|
|
|
|
|
|
|
|
streaming_messages = [e.message for e in events if e.type == "streaming"] |
|
|
assert "I found something" in streaming_messages |
|
|
|
|
|
|
|
|
all_messages = [e.message for e in events] |
|
|
|
|
|
assert not any('{"some": "json"}' in msg for msg in all_messages) |
|
|
|
|
|
assert not any("Internal instruction to agent" in msg for msg in all_messages) |
|
|
|
|
|
|
|
|
@pytest.mark.unit |
|
|
@pytest.mark.asyncio |
|
|
async def test_reporter_ran_tracking_still_works(): |
|
|
"""Verify internal state.reporter_ran is set correctly even though UI events are silenced.""" |
|
|
orchestrator = AdvancedOrchestrator() |
|
|
mock_workflow = MagicMock() |
|
|
|
|
|
async def event_stream(task): |
|
|
|
|
|
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None) |
|
|
|
|
|
mock_workflow.run_stream = event_stream |
|
|
orchestrator._build_workflow = MagicMock(return_value=mock_workflow) |
|
|
|
|
|
orchestrator._init_workflow_events = _empty_async_generator |
|
|
orchestrator._init_embedding_service = MagicMock(return_value=None) |
|
|
orchestrator._create_task_prompt = MagicMock(return_value="task") |
|
|
|
|
|
|
|
|
events = [] |
|
|
async for event in orchestrator.run("query"): |
|
|
events.append(event) |
|
|
|
|
|
|
|
|
|
|
|
fallback_events = [ |
|
|
e for e in events if e.type == "synthesis" or "fallback" in e.message.lower() |
|
|
] |
|
|
assert len(fallback_events) == 0, ( |
|
|
f"Fallback synthesis triggered - reporter_ran tracking broken: {fallback_events}" |
|
|
) |
|
|
|