DeepBoner / tests /unit /test_orchestrator_noise.py
VibecoderMcSwaggins's picture
fix(tests): Address CodeRabbit review feedback
c9c58c4
from unittest.mock import MagicMock
import pytest
from agent_framework import (
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
ORCH_MSG_KIND_INSTRUCTION,
ORCH_MSG_KIND_TASK_LEDGER,
AgentRunUpdateEvent,
ExecutorCompletedEvent,
)
from src.orchestrators.advanced import REPORTER_AGENT_ID, AdvancedOrchestrator
async def _empty_async_generator(query):
"""Empty async generator for mocking _init_workflow_events."""
return
yield # Makes this an async generator that yields nothing
@pytest.mark.unit
@pytest.mark.asyncio
async def test_executor_completed_event_is_silenced():
"""Verify ExecutorCompletedEvent produces NO UI events."""
orchestrator = AdvancedOrchestrator()
# Mock the workflow build to return our custom event stream
mock_workflow = MagicMock()
# Create a stream of events: Start -> ExecutorCompleted -> End
async def event_stream(task):
# 1. Completion event (should be ignored)
yield ExecutorCompletedEvent(executor_id="ManagerAgent", data=None)
# 2. Reporter completion (should set flag but yield nothing)
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None)
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
# Mock init services to avoid side effects
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
events = []
async for event in orchestrator.run("query"):
events.append(event)
# Assertions
# We should have NO "progress" events with "task completed" message
for event in events:
if event.type == "progress":
assert "task completed" not in event.message
# We should have NO "judging" events from the manager completion
if event.type == "judging":
assert "ManagerAgent" not in event.message
@pytest.mark.unit
@pytest.mark.asyncio
async def test_internal_messages_are_filtered():
"""Verify internal task_ledger/instruction messages are filtered."""
orchestrator = AdvancedOrchestrator()
mock_workflow = MagicMock()
async def event_stream(task):
# 1. Task Ledger (Should be skipped)
ledger_update = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
ledger_update.data.text = '{"some": "json"}'
ledger_update.data.additional_properties = {
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
"orchestrator_message_kind": ORCH_MSG_KIND_TASK_LEDGER,
}
yield ledger_update
# 2. Instruction (Should be skipped)
instruction = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
instruction.data.text = "Internal instruction to agent"
instruction.data.additional_properties = {
"magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
"orchestrator_message_kind": ORCH_MSG_KIND_INSTRUCTION,
}
yield instruction
# 3. Normal agent message (SHOULD pass through)
# The streaming block filters task_ledger/instruction but passes agent content.
normal_msg = AgentRunUpdateEvent(executor_id="Searcher", data=MagicMock())
normal_msg.data.text = "I found something"
normal_msg.data.author_name = "Searcher"
normal_msg.data.additional_properties = {}
yield normal_msg
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
events = []
async for event in orchestrator.run("query"):
events.append(event)
# Assertions
# 1. Verify we got the normal message
streaming_messages = [e.message for e in events if e.type == "streaming"]
assert "I found something" in streaming_messages
# 2. Verify we did NOT get the internal messages
all_messages = [e.message for e in events]
# The JSON from task_ledger should be filtered
assert not any('{"some": "json"}' in msg for msg in all_messages)
# The instruction text should be filtered
assert not any("Internal instruction to agent" in msg for msg in all_messages)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_reporter_ran_tracking_still_works():
"""Verify internal state.reporter_ran is set correctly even though UI events are silenced."""
orchestrator = AdvancedOrchestrator()
mock_workflow = MagicMock()
async def event_stream(task):
# Reporter completion event - should set internal flag
yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None)
mock_workflow.run_stream = event_stream
orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
orchestrator._init_workflow_events = _empty_async_generator
orchestrator._init_embedding_service = MagicMock(return_value=None)
orchestrator._create_task_prompt = MagicMock(return_value="task")
# Run the workflow
events = []
async for event in orchestrator.run("query"):
events.append(event)
# The key assertion: No "synthesis" fallback should have been triggered
# If reporter_ran was NOT set, we'd see a fallback synthesis event
fallback_events = [
e for e in events if e.type == "synthesis" or "fallback" in e.message.lower()
]
assert len(fallback_events) == 0, (
f"Fallback synthesis triggered - reporter_ran tracking broken: {fallback_events}"
)