VibecoderMcSwaggins commited on
Commit
fc78d2d
·
1 Parent(s): b6a1a09

fix(orchestrator): P2 - Silence ExecutorCompletedEvent UI noise

Browse files

## Problem
After report synthesis, extra "JUDGING: ManagerAgent" and "PROGRESS: Step 11"
events appeared in the UI, confusing users. Root cause: we were treating
`ExecutorCompletedEvent` as a UI event when it's actually internal framework
bookkeeping (auto-emitted by MS Agent Framework for every executor).

## Solution
1. **Silence ExecutorCompletedEvent**: Remove UI event emission, keep only
internal state tracking (reporter_ran flag)
2. **Add metadata filtering**: Filter out `task_ledger` and `instruction`
messages from AgentRunUpdateEvent stream
3. **Remove dead code**: Delete unused `_handle_completion_event` and
`_get_event_type_for_agent` methods

## Changes
- src/orchestrators/advanced.py: Silence completion events, add metadata filter
- tests/unit/test_orchestrator_noise.py: New regression tests
- tests/unit/orchestrators/test_accumulator_pattern.py: Update expectations
- docs/bugs/P2_EXECUTOR_COMPLETED_EVENT_UI_NOISE.md: Full bug documentation

## Validation
- Senior review confirmed analysis (external agent audit)
- All 304 unit tests pass
- Aligns with MS Agent Framework sample patterns

Closes: P2 ExecutorCompletedEvent UI Noise bug

docs/bugs/P2_EXECUTOR_COMPLETED_EVENT_UI_NOISE.md ADDED
@@ -0,0 +1,351 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # P2 Bug: ExecutorCompletedEvent UI Noise
2
+
3
+ **Status**: VALIDATED - Ready for Implementation
4
+ **Discovered**: 2025-12-05
5
+ **Senior Review**: 2025-12-05 (External agent audit confirmed analysis)
6
+ **Severity**: P2 (UX noise, confusing but not blocking)
7
+ **Component**: `src/orchestrators/advanced.py`
8
+
9
+ ---
10
+
11
+ ## Symptom
12
+
13
+ After the report synthesis completes, extra events appear in the UI:
14
+
15
+ ```text
16
+ 📝 **SYNTHESIZING**: Synthesizing research findings...
17
+ [...full report content...]
18
+
19
+ 🧠 **JUDGING**: ManagerAgent: Action completed (Tool Call)
20
+ ⏱️ **PROGRESS**: Step 11: ManagerAgent task completed
21
+ ```
22
+
23
+ The "JUDGING" and "PROGRESS" events appear AFTER the report is already displayed, creating confusion.
24
+
25
+ ---
26
+
27
+ ## Root Cause Analysis
28
+
29
+ ### The Misunderstanding
30
+
31
+ We're treating `ExecutorCompletedEvent` as a **UI event** when it's actually an **internal framework bookkeeping event**.
32
+
33
+ ### Microsoft Agent Framework Design
34
+
35
+ Looking at `agent_framework/_workflows/_executor.py` (lines 266-281):
36
+
37
+ ```python
38
+ # This is auto-emitted by the framework - NOT for UI consumption
39
+ with _framework_event_origin():
40
+ completed_event = ExecutorCompletedEvent(self.id, sent_messages if sent_messages else None)
41
+ await context.add_event(completed_event)
42
+ ```
43
+
44
+ The framework emits `ExecutorCompletedEvent` automatically after every executor handler completes. This includes:
45
+ - SearchAgent completing a search
46
+ - JudgeAgent completing evaluation
47
+ - ReportAgent completing synthesis
48
+ - **ManagerAgent completing coordination** (this is the problem)
49
+
50
+ ### What the MS Framework Sample Does
51
+
52
+ From `samples/getting_started/workflows/orchestration/magentic.py`:
53
+
54
+ ```python
55
+ async for event in workflow.run_stream(task):
56
+ if isinstance(event, AgentRunUpdateEvent):
57
+ # Handle streaming with metadata
58
+ props = event.data.additional_properties if event.data else None
59
+ event_type = props.get("magentic_event_type") if props else None
60
+ # ...
61
+ elif isinstance(event, WorkflowOutputEvent):
62
+ # Handle final output
63
+ output = output_messages[-1].text
64
+ ```
65
+
66
+ They only handle:
67
+ 1. `AgentRunUpdateEvent` - for streaming content (with `magentic_event_type` metadata)
68
+ 2. `WorkflowOutputEvent` - for final output
69
+
70
+ **They do NOT emit UI events for `ExecutorCompletedEvent`.**
71
+
72
+ ### Our Problematic Code
73
+
74
+ In `src/orchestrators/advanced.py`:
75
+
76
+ ```python
77
+ # Line 348-368: We emit UI events for EVERY ExecutorCompletedEvent
78
+ if isinstance(event, ExecutorCompletedEvent):
79
+ state.iteration += 1
80
+
81
+ comp_event, prog_event = self._handle_completion_event(...)
82
+ yield comp_event # <-- WRONG: UI event for internal framework event
83
+ yield prog_event # <-- WRONG: More noise
84
+ ```
85
+
86
+ ### Why the Manager Fires a Completion Event
87
+
88
+ The workflow execution order:
89
+ 1. ReportAgent streams its output (`AgentRunUpdateEvent`)
90
+ 2. ReportAgent handler completes → `ExecutorCompletedEvent(reporter)` (we display this)
91
+ 3. Manager orchestrator handler completes → `ExecutorCompletedEvent(manager)` (we display this too!)
92
+ 4. `WorkflowOutputEvent` (final)
93
+
94
+ The Manager is also an executor in the framework. When it finishes coordinating (after ReportAgent returns), it fires its own `ExecutorCompletedEvent`. We're incorrectly emitting UI events for this.
95
+
96
+ ---
97
+
98
+ ## Impact
99
+
100
+ 1. **User Confusion**: Extra "JUDGING: ManagerAgent" events after the report
101
+ 2. **UX Noise**: Progress events that don't add value
102
+ 3. **Incorrect Semantics**: Manager completions displayed as agent activity
103
+ 4. **No Functional Bug**: The workflow completes correctly, just noisy
104
+
105
+ ---
106
+
107
+ ## The Fix
108
+
109
+ ### Stop Emitting UI Events for ExecutorCompletedEvent
110
+
111
+ Remove UI event emission for `ExecutorCompletedEvent` entirely. Keep internal state tracking only.
112
+
113
+ **Before (buggy):**
114
+
115
+ ```python
116
+ if isinstance(event, ExecutorCompletedEvent):
117
+ state.iteration += 1
118
+ agent_name = getattr(event, "executor_id", "") or "unknown"
119
+ if REPORTER_AGENT_ID in agent_name.lower():
120
+ state.reporter_ran = True
121
+
122
+ comp_event, prog_event = self._handle_completion_event(...)
123
+ yield comp_event # <-- REMOVE: Emits UI noise
124
+ yield prog_event # <-- REMOVE: Emits UI noise
125
+ ```
126
+
127
+ **After (correct):**
128
+
129
+ ```python
130
+ if isinstance(event, ExecutorCompletedEvent):
131
+ # Internal state tracking only - NO UI events
132
+ agent_name = getattr(event, "executor_id", "") or "unknown"
133
+ if REPORTER_AGENT_ID in agent_name.lower():
134
+ state.reporter_ran = True
135
+ state.current_message_buffer = ""
136
+ continue # Skip to next event - do not yield anything
137
+ ```
138
+
139
+ **Key changes:**
140
+ 1. Remove `yield comp_event` and `yield prog_event`
141
+ 2. Remove `state.iteration += 1` (iteration counter becomes meaningless without UI events)
142
+ 3. Keep `state.reporter_ran` tracking (needed for fallback synthesis logic)
143
+ 4. Add `continue` to skip to next event
144
+
145
+ **Why this is correct:**
146
+ - Aligns with MS framework design (their sample ignores `ExecutorCompletedEvent`)
147
+ - Eliminates all completion noise including trailing "ManagerAgent" events
148
+ - The streaming events (`AgentRunUpdateEvent`) already provide real-time feedback
149
+ - `WorkflowOutputEvent` signals completion
150
+
151
+ ### Additional Fix: Add Metadata Filtering to AgentRunUpdateEvent
152
+
153
+ The senior review identified a gap: we're not filtering `AgentRunUpdateEvent` by `magentic_event_type`.
154
+
155
+ **Current (incomplete):**
156
+
157
+ ```python
158
+ if isinstance(event, AgentRunUpdateEvent):
159
+ if event.data and hasattr(event.data, "text") and event.data.text:
160
+ yield AgentEvent(type="streaming", message=event.data.text)
161
+ ```
162
+
163
+ **Should be:**
164
+
165
+ ```python
166
+ if isinstance(event, AgentRunUpdateEvent):
167
+ if event.data and hasattr(event.data, "text") and event.data.text:
168
+ # Check metadata to filter internal orchestrator messages
169
+ props = getattr(event.data, "additional_properties", None) or {}
170
+ event_type = props.get("magentic_event_type")
171
+ msg_kind = props.get("orchestrator_message_kind")
172
+
173
+ # Filter out internal orchestrator messages (task_ledger, instruction)
174
+ if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
175
+ if msg_kind in ("task_ledger", "instruction"):
176
+ continue # Skip internal coordination messages
177
+
178
+ yield AgentEvent(type="streaming", message=event.data.text)
179
+ ```
180
+
181
+ **Why this matters:**
182
+ - Prevents internal JSON blobs from being displayed
183
+ - Filters out raw planning/instruction prompts not meant for users
184
+ - Aligns with how MS sample consumes events
185
+
186
+ ---
187
+
188
+ ## Related Code Locations
189
+
190
+ - `src/orchestrators/advanced.py` line 348-368: ExecutorCompletedEvent handling
191
+ - `src/orchestrators/advanced.py` line 437-469: `_handle_completion_event` method
192
+ - MS Framework: `python/packages/core/agent_framework/_workflows/_executor.py` line 277-281
193
+ - MS Framework: `python/packages/core/agent_framework/_workflows/_magentic.py` line 1962-1976
194
+
195
+ ---
196
+
197
+ ## Related Issues
198
+
199
+ - P2 Round Counter Semantic Mismatch (FIXED) - Changed display from "Round X/Y" to "Step N"
200
+ - This bug explains why step count was confusing - we count internal events too
201
+
202
+ ---
203
+
204
+ ## Framework Event Architecture Deep Dive
205
+
206
+ ### Event Categories in MS Agent Framework
207
+
208
+ The framework has distinct event categories with different purposes:
209
+
210
+ #### 1. Workflow Lifecycle Events (Framework-emitted, internal)
211
+
212
+ | Event | Purpose | UI Relevant? |
213
+ |-------|---------|--------------|
214
+ | `WorkflowStartedEvent` | Run begins | No |
215
+ | `WorkflowStatusEvent` | State transitions (IN_PROGRESS, IDLE, FAILED) | No |
216
+ | `WorkflowFailedEvent` | Error with structured details | Maybe (errors) |
217
+
218
+ #### 2. Superstep Events (Framework-emitted, internal)
219
+
220
+ | Event | Purpose | UI Relevant? |
221
+ |-------|---------|--------------|
222
+ | `SuperStepStartedEvent` | Pregel superstep begins | No |
223
+ | `SuperStepCompletedEvent` | Pregel superstep ends | No |
224
+
225
+ #### 3. Executor Events (Framework-emitted automatically, internal)
226
+
227
+ | Event | Purpose | UI Relevant? |
228
+ |-------|---------|--------------|
229
+ | `ExecutorInvokedEvent` | Handler starts | No |
230
+ | `ExecutorCompletedEvent` | Handler completes | **NO** |
231
+ | `ExecutorFailedEvent` | Handler errors | Maybe (errors) |
232
+
233
+ #### 4. Application Events (User-code emitted via ctx.add_event, UI-facing)
234
+
235
+ | Event | Purpose | UI Relevant? |
236
+ |-------|---------|--------------|
237
+ | `AgentRunUpdateEvent` | Streaming content | **YES** |
238
+ | `AgentRunEvent` | Complete agent response | Yes |
239
+ | `WorkflowOutputEvent` | Final workflow output | **YES** |
240
+ | `RequestInfoEvent` | HITL request | Yes |
241
+
242
+ ### Metadata Pattern in AgentRunUpdateEvent
243
+
244
+ The MS framework uses `additional_properties` in `AgentRunUpdateEvent.data` for classification:
245
+
246
+ ```python
247
+ # Orchestrator message
248
+ additional_properties={
249
+ "magentic_event_type": "orchestrator_message",
250
+ "orchestrator_message_kind": "user_task" | "task_ledger" | "instruction" | "notice",
251
+ "orchestrator_id": "...",
252
+ }
253
+
254
+ # Agent streaming
255
+ additional_properties={
256
+ "magentic_event_type": "agent_delta",
257
+ "agent_id": "searcher" | "judge" | ...,
258
+ }
259
+ ```
260
+
261
+ ### What We Should Handle for UI
262
+
263
+ 1. **`AgentRunUpdateEvent`** with metadata filtering:
264
+ - `magentic_event_type: "agent_delta"` → Display agent streaming
265
+ - `magentic_event_type: "orchestrator_message"` → Filter by `orchestrator_message_kind`:
266
+ - `"user_task"` → Show (task assignment)
267
+ - `"instruction"` → Filter out (internal)
268
+ - `"task_ledger"` → Filter out (internal)
269
+ - `"notice"` → Maybe show (warnings)
270
+
271
+ 2. **`WorkflowOutputEvent`** → Final output
272
+
273
+ ### What We Should NOT Handle for UI
274
+
275
+ - `ExecutorCompletedEvent` - Internal bookkeeping
276
+ - `ExecutorInvokedEvent` - Internal bookkeeping
277
+ - `SuperStepStartedEvent/CompletedEvent` - Internal iteration
278
+ - `WorkflowStatusEvent` - Internal state machine
279
+
280
+ ---
281
+
282
+ ## Required Import Changes
283
+
284
+ **Current imports:**
285
+
286
+ ```python
287
+ from agent_framework import (
288
+ MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
289
+ AgentRunUpdateEvent,
290
+ ExecutorCompletedEvent, # Keep for internal tracking
291
+ MagenticBuilder,
292
+ WorkflowOutputEvent,
293
+ )
294
+ ```
295
+
296
+ **Add these imports for metadata filtering:**
297
+
298
+ ```python
299
+ from agent_framework import (
300
+ MAGENTIC_EVENT_TYPE_AGENT_DELTA, # For agent streaming detection
301
+ ORCH_MSG_KIND_INSTRUCTION, # Filter internal messages
302
+ ORCH_MSG_KIND_TASK_LEDGER, # Filter internal messages
303
+ )
304
+ ```
305
+
306
+ ---
307
+
308
+ ## Test Cases
309
+
310
+ ```python
311
+ def test_no_executor_completed_events_in_ui():
312
+ """UI should not emit any events from ExecutorCompletedEvent."""
313
+ # Run workflow to completion
314
+ # Collect all yielded AgentEvent objects
315
+ # Assert NONE have type "progress" with "task completed" message
316
+ # Assert NONE have type matching completion patterns
317
+ pass
318
+
319
+ def test_internal_messages_filtered_from_streaming():
320
+ """Internal orchestrator messages should be filtered from UI stream."""
321
+ # Run workflow and collect all yielded events
322
+ # Assert no events contain "task_ledger" content
323
+ # Assert no events contain raw instruction prompts
324
+ # Assert no JSON blobs in streaming output
325
+ pass
326
+
327
+ def test_reporter_ran_tracking_still_works():
328
+ """Internal state.reporter_ran should still be set correctly."""
329
+ # Run workflow to completion
330
+ # Verify fallback synthesis is NOT triggered (reporter did run)
331
+ # This ensures we didn't break internal tracking when removing UI events
332
+ pass
333
+ ```
334
+
335
+ ---
336
+
337
+ ## Why the Free Tier "Works"
338
+
339
+ The user asked why the free tier seems to work despite expectations. The answer:
340
+
341
+ 1. **The framework handles orchestration** - The MS Agent Framework manages the workflow (planning, progress tracking, agent coordination)
342
+ 2. **The LLM just provides reasoning** - The model generates text, but the framework decides when to delegate, when to stop, etc.
343
+ 3. **The "bugs" are in our UI layer** - The orchestration works correctly; we're just displaying internal events
344
+
345
+ The free tier works because:
346
+ - `MagenticBuilder` creates the workflow graph
347
+ - `StandardMagenticManager` handles planning and progress evaluation
348
+ - The framework routes messages between executors
349
+ - The LLM quality affects answer quality, not workflow execution
350
+
351
+ Our UI noise (trailing events) is a bug in how we consume framework events, not a framework bug.
src/orchestrators/advanced.py CHANGED
@@ -18,11 +18,13 @@ Design Patterns:
18
  import asyncio
19
  from collections.abc import AsyncGenerator
20
  from dataclasses import dataclass
21
- from typing import TYPE_CHECKING, Any, Literal
22
 
23
  import structlog
24
  from agent_framework import (
25
  MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
 
 
26
  AgentRunUpdateEvent,
27
  ChatAgent,
28
  ExecutorCompletedEvent,
@@ -327,6 +329,16 @@ The final output should be a structured research report."""
327
  async for event in workflow.run_stream(task):
328
  # 1. Handle Streaming (Source of Truth for Content)
329
  if isinstance(event, AgentRunUpdateEvent) and event.data:
 
 
 
 
 
 
 
 
 
 
330
  author = getattr(event.data, "author_name", None)
331
  # Detect agent switch to clear buffer
332
  if author != state.current_agent_id:
@@ -346,21 +358,12 @@ The final output should be a structured research report."""
346
 
347
  # 2. Handle Completion Signal
348
  if isinstance(event, ExecutorCompletedEvent):
349
- state.iteration += 1
350
-
351
  # P1 FIX: Track if ReportAgent produced output
352
- # Note: ExecutorCompletedEvent might not have agent_id directly accessible
353
- # The executor_id usually maps to the agent name
354
  agent_name = getattr(event, "executor_id", "") or "unknown"
355
  if REPORTER_AGENT_ID in agent_name.lower():
356
  state.reporter_ran = True
357
 
358
- comp_event, prog_event = self._handle_completion_event(
359
- event, state.current_message_buffer, state.iteration
360
- )
361
- yield comp_event
362
- yield prog_event
363
-
364
  # P2 BUG FIX: Save length before clearing
365
  state.last_streamed_length = len(state.current_message_buffer)
366
  # Clear buffer after consuming
@@ -434,40 +437,6 @@ The final output should be a structured research report."""
434
  iteration=state.iteration,
435
  )
436
 
437
- def _handle_completion_event(
438
- self,
439
- event: ExecutorCompletedEvent,
440
- buffer: str,
441
- iteration: int,
442
- ) -> tuple[AgentEvent, AgentEvent]:
443
- """Handle an agent completion event using the accumulated buffer."""
444
- # Use buffer if available, otherwise fall back cautiously
445
- # (Only fall back if buffer empty, which implies tool-only turn)
446
- text_content = buffer
447
- if not text_content:
448
- # ExecutorCompletedEvent doesn't carry the message directly in the same way
449
- # Try extraction but ignore repr strings AND empty strings
450
- # The result is often in event.result or similar, but buffering is safer
451
- text_content = "Action completed (Tool Call)"
452
-
453
- agent_id = getattr(event, "executor_id", "unknown") or "unknown"
454
- event_type = self._get_event_type_for_agent(agent_id)
455
- semantic_name = self._get_agent_semantic_name(agent_id)
456
-
457
- completion_event = AgentEvent(
458
- type=event_type,
459
- message=f"{semantic_name}: {self._smart_truncate(text_content)}",
460
- iteration=iteration,
461
- )
462
-
463
- progress_event = AgentEvent(
464
- type="progress",
465
- message=f"Step {iteration}: {semantic_name} task completed",
466
- iteration=iteration,
467
- )
468
-
469
- return completion_event, progress_event
470
-
471
  def _handle_final_event(
472
  self,
473
  event: WorkflowOutputEvent,
@@ -549,29 +518,6 @@ The final output should be a structured research report."""
549
  # The repr is useless for display purposes
550
  return ""
551
 
552
- def _get_event_type_for_agent(
553
- self,
554
- agent_name: str,
555
- ) -> Literal["search_complete", "judge_complete", "hypothesizing", "synthesizing", "judging"]:
556
- """Map agent name to appropriate event type.
557
-
558
- Args:
559
- agent_name: The agent ID from the workflow event
560
-
561
- Returns:
562
- Event type string matching AgentEvent.type Literal
563
- """
564
- agent_lower = agent_name.lower()
565
- if SEARCHER_AGENT_ID in agent_lower:
566
- return "search_complete"
567
- if JUDGE_AGENT_ID in agent_lower:
568
- return "judge_complete"
569
- if HYPOTHESIZER_AGENT_ID in agent_lower:
570
- return "hypothesizing"
571
- if REPORTER_AGENT_ID in agent_lower:
572
- return "synthesizing"
573
- return "judging" # Default for unknown agents
574
-
575
  def _smart_truncate(self, text: str, max_len: int = 200) -> str:
576
  """Truncate at sentence boundary to avoid cutting words."""
577
  if len(text) <= max_len:
 
18
  import asyncio
19
  from collections.abc import AsyncGenerator
20
  from dataclasses import dataclass
21
+ from typing import TYPE_CHECKING, Any
22
 
23
  import structlog
24
  from agent_framework import (
25
  MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
26
+ ORCH_MSG_KIND_INSTRUCTION,
27
+ ORCH_MSG_KIND_TASK_LEDGER,
28
  AgentRunUpdateEvent,
29
  ChatAgent,
30
  ExecutorCompletedEvent,
 
329
  async for event in workflow.run_stream(task):
330
  # 1. Handle Streaming (Source of Truth for Content)
331
  if isinstance(event, AgentRunUpdateEvent) and event.data:
332
+ # Check metadata to filter internal orchestrator messages
333
+ props = getattr(event.data, "additional_properties", None) or {}
334
+ event_type = props.get("magentic_event_type")
335
+ msg_kind = props.get("orchestrator_message_kind")
336
+
337
+ # Filter out internal orchestrator messages (task_ledger, instruction)
338
+ if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
339
+ if msg_kind in (ORCH_MSG_KIND_TASK_LEDGER, ORCH_MSG_KIND_INSTRUCTION):
340
+ continue # Skip internal coordination messages
341
+
342
  author = getattr(event.data, "author_name", None)
343
  # Detect agent switch to clear buffer
344
  if author != state.current_agent_id:
 
358
 
359
  # 2. Handle Completion Signal
360
  if isinstance(event, ExecutorCompletedEvent):
361
+ # Internal state tracking only - NO UI events
 
362
  # P1 FIX: Track if ReportAgent produced output
 
 
363
  agent_name = getattr(event, "executor_id", "") or "unknown"
364
  if REPORTER_AGENT_ID in agent_name.lower():
365
  state.reporter_ran = True
366
 
 
 
 
 
 
 
367
  # P2 BUG FIX: Save length before clearing
368
  state.last_streamed_length = len(state.current_message_buffer)
369
  # Clear buffer after consuming
 
437
  iteration=state.iteration,
438
  )
439
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
440
  def _handle_final_event(
441
  self,
442
  event: WorkflowOutputEvent,
 
518
  # The repr is useless for display purposes
519
  return ""
520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
521
  def _smart_truncate(self, text: str, max_len: int = 200) -> str:
522
  """Truncate at sentence boundary to avoid cutting words."""
523
  if len(text) <= max_len:
tests/unit/orchestrators/test_accumulator_pattern.py CHANGED
@@ -90,6 +90,9 @@ def mock_agent_framework():
90
  mock_af.MagenticOrchestratorMessageEvent = MockOrchestratorMessageEvent
91
  mock_af.AgentRunResponse = MagicMock
92
  mock_af.MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message"
 
 
 
93
 
94
  # Mock other classes
95
  mock_af.MagenticBuilder = MagicMock
@@ -170,9 +173,9 @@ def mock_orchestrator(mock_agent_framework):
170
  @pytest.mark.asyncio
171
  async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator):
172
  """
173
- Scenario A: Standard Text Message
174
  Input: Updates ("Hello", " World") -> Completed
175
- Expected: AgentEvent with "Hello World"
176
  """
177
  # Use "searcher" to map to "SearchAgent"
178
  events = [
@@ -193,27 +196,32 @@ async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator):
193
  async for event in mock_orchestrator.run("test query"):
194
  generated_events.append(event)
195
 
196
- # Find the completion event for SearchAgent (non-streaming)
197
- chat_events = [
198
- e for e in generated_events if "SearchAgent" in str(e.message) and e.type != "streaming"
199
- ]
200
-
201
- assert len(chat_events) >= 1, (
202
- f"Expected SearchAgent events, got: {[e.message for e in generated_events]}"
203
  )
204
- final_event = chat_events[0]
205
 
206
- # Must contain accumulated text
207
- assert "Hello World" in final_event.message or "Hello" in final_event.message
 
 
 
 
 
 
 
 
208
 
209
 
210
  @pytest.mark.unit
211
  @pytest.mark.asyncio
212
  async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator):
213
  """
214
- Scenario B: Tool Call (No Text Deltas)
215
  Input: No Deltas -> Completed
216
- Expected: AgentEvent with fallback text
217
  """
218
  # Use "searcher" to map to "SearchAgent"
219
  events = [
@@ -232,26 +240,27 @@ async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator):
232
  async for event in mock_orchestrator.run("test query"):
233
  generated_events.append(event)
234
 
235
- # Find completion events for SearchAgent
236
  search_events = [
237
- e for e in generated_events if "SearchAgent" in str(e.message) and e.type != "streaming"
 
 
 
238
  ]
239
 
240
- assert len(search_events) >= 1, (
241
- f"Expected SearchAgent events, got: {[e.message for e in generated_events]}"
 
242
  )
243
- final_event = search_events[0]
244
-
245
- # Should contain fallback or tool indicator
246
- assert "Action completed" in final_event.message or "Tool" in final_event.message
247
 
248
 
249
  @pytest.mark.unit
250
  @pytest.mark.asyncio
251
  async def test_accumulator_pattern_buffer_clearing(mock_orchestrator):
252
  """
253
- Verify buffer clears between agents.
254
- Agent B should NOT inherit Agent A's accumulated text.
 
255
  """
256
  # Use "searcher" (SearchAgent) and "judge" (JudgeAgent)
257
  events = [
@@ -273,24 +282,22 @@ async def test_accumulator_pattern_buffer_clearing(mock_orchestrator):
273
  async for event in mock_orchestrator.run("test query"):
274
  generated_events.append(event)
275
 
276
- # Find non-streaming events for each agent
277
- agent_a_events = [
278
- e for e in generated_events if "SearchAgent" in str(e.message) and e.type != "streaming"
279
- ]
280
- agent_b_events = [
281
- e for e in generated_events if "JudgeAgent" in str(e.message) and e.type != "streaming"
282
- ]
283
 
284
- # Both should have completion events
285
- assert len(agent_a_events) >= 1, (
286
- f"No SearchAgent events: {[e.message for e in generated_events]}"
287
- )
288
- assert len(agent_b_events) >= 1, (
289
- f"No JudgeAgent events: {[e.message for e in generated_events]}"
290
  )
291
 
292
- # Agent A should have its own text
293
- assert "Searcher" in agent_a_events[0].message
294
- # Agent B should have its own text, NOT Agent A's
295
- assert "Judge" in agent_b_events[0].message
296
- assert "Searcher" not in agent_b_events[0].message, "Buffer not cleared between agents!"
 
 
 
 
 
 
90
  mock_af.MagenticOrchestratorMessageEvent = MockOrchestratorMessageEvent
91
  mock_af.AgentRunResponse = MagicMock
92
  mock_af.MAGENTIC_EVENT_TYPE_ORCHESTRATOR = "orchestrator_message"
93
+ # P2 Fix: Add constants for metadata filtering
94
+ mock_af.ORCH_MSG_KIND_INSTRUCTION = "instruction"
95
+ mock_af.ORCH_MSG_KIND_TASK_LEDGER = "task_ledger"
96
 
97
  # Mock other classes
98
  mock_af.MagenticBuilder = MagicMock
 
173
  @pytest.mark.asyncio
174
  async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator):
175
  """
176
+ Scenario A: Standard Text Message (P2 Fix)
177
  Input: Updates ("Hello", " World") -> Completed
178
+ Expected: Streaming events for text, NO completion events (P2 fix silences them)
179
  """
180
  # Use "searcher" to map to "SearchAgent"
181
  events = [
 
196
  async for event in mock_orchestrator.run("test query"):
197
  generated_events.append(event)
198
 
199
+ # P2 FIX: ExecutorCompletedEvent is SILENCED - no non-streaming agent events
200
+ # We should have STREAMING events from AgentRunUpdateEvent
201
+ streaming_events = [e for e in generated_events if e.type == "streaming"]
202
+ assert len(streaming_events) >= 1, (
203
+ f"Expected streaming events, got: {[e.type for e in generated_events]}"
 
 
204
  )
 
205
 
206
+ # P2 FIX: No "SearchAgent" completion events should exist (silenced)
207
+ completion_events = [
208
+ e
209
+ for e in generated_events
210
+ if "SearchAgent" in str(e.message)
211
+ and e.type not in ("streaming", "started", "progress", "thinking")
212
+ ]
213
+ assert len(completion_events) == 0, (
214
+ f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in completion_events]}"
215
+ )
216
 
217
 
218
  @pytest.mark.unit
219
  @pytest.mark.asyncio
220
  async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator):
221
  """
222
+ Scenario B: Tool Call (No Text Deltas) - P2 Fix
223
  Input: No Deltas -> Completed
224
+ Expected: NO completion events (P2 fix silences ExecutorCompletedEvent)
225
  """
226
  # Use "searcher" to map to "SearchAgent"
227
  events = [
 
240
  async for event in mock_orchestrator.run("test query"):
241
  generated_events.append(event)
242
 
243
+ # P2 FIX: ExecutorCompletedEvent is SILENCED - no agent completion events
244
  search_events = [
245
+ e
246
+ for e in generated_events
247
+ if "SearchAgent" in str(e.message)
248
+ and e.type not in ("streaming", "started", "progress", "thinking")
249
  ]
250
 
251
+ # P2 Fix: Should have NO completion events (they are silenced)
252
+ assert len(search_events) == 0, (
253
+ f"P2 Fix: Should NOT emit completion events, got: {[e.message for e in search_events]}"
254
  )
 
 
 
 
255
 
256
 
257
  @pytest.mark.unit
258
  @pytest.mark.asyncio
259
  async def test_accumulator_pattern_buffer_clearing(mock_orchestrator):
260
  """
261
+ Verify buffer clears between agents (P2 Fix).
262
+ P2 Fix: ExecutorCompletedEvent is silenced, so we verify via streaming events.
263
+ Agent B's streaming should NOT contain Agent A's text.
264
  """
265
  # Use "searcher" (SearchAgent) and "judge" (JudgeAgent)
266
  events = [
 
282
  async for event in mock_orchestrator.run("test query"):
283
  generated_events.append(event)
284
 
285
+ # P2 FIX: ExecutorCompletedEvent is SILENCED
286
+ # Verify via STREAMING events - each agent's stream is separate
287
+ streaming_events = [e for e in generated_events if e.type == "streaming"]
 
 
 
 
288
 
289
+ # Should have streaming events from both agents
290
+ assert len(streaming_events) >= 2, (
291
+ f"Expected streaming events, got: {[e.type for e in generated_events]}"
 
 
 
292
  )
293
 
294
+ # Verify content separation - each streaming event has its own content
295
+ searcher_streams = [e for e in streaming_events if "Searcher" in e.message]
296
+ judge_streams = [e for e in streaming_events if "Judge" in e.message]
297
+
298
+ assert len(searcher_streams) >= 1, "Missing searcher streaming events"
299
+ assert len(judge_streams) >= 1, "Missing judge streaming events"
300
+
301
+ # Buffer isolation: Judge stream should NOT contain Searcher text
302
+ for judge_event in judge_streams:
303
+ assert "Searcher" not in judge_event.message, "Buffer not cleared between agents!"
tests/unit/test_orchestrator_noise.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from unittest.mock import MagicMock
2
+
3
+ import pytest
4
+ from agent_framework import (
5
+ MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
6
+ ORCH_MSG_KIND_INSTRUCTION,
7
+ ORCH_MSG_KIND_TASK_LEDGER,
8
+ AgentRunUpdateEvent,
9
+ ExecutorCompletedEvent,
10
+ )
11
+
12
+ from src.orchestrators.advanced import REPORTER_AGENT_ID, AdvancedOrchestrator
13
+
14
+
15
+ @pytest.mark.asyncio
16
+ async def test_executor_completed_event_is_silenced():
17
+ """Verify ExecutorCompletedEvent produces NO UI events."""
18
+ orchestrator = AdvancedOrchestrator()
19
+
20
+ # Mock the workflow build to return our custom event stream
21
+ mock_workflow = MagicMock()
22
+
23
+ # Create a stream of events: Start -> ExecutorCompleted -> End
24
+ async def event_stream(task):
25
+ # 1. Completion event (should be ignored)
26
+ yield ExecutorCompletedEvent(executor_id="ManagerAgent", data=None)
27
+ # 2. Reporter completion (should set flag but yield nothing)
28
+ yield ExecutorCompletedEvent(executor_id=REPORTER_AGENT_ID, data=None)
29
+
30
+ mock_workflow.run_stream = event_stream
31
+ orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
32
+
33
+ # Mock init services to avoid side effects
34
+ async def mock_init_events(query):
35
+ if False:
36
+ yield
37
+
38
+ orchestrator._init_workflow_events = mock_init_events
39
+ orchestrator._init_embedding_service = MagicMock(return_value=None)
40
+ orchestrator._create_task_prompt = MagicMock(return_value="task")
41
+
42
+ events = []
43
+ async for event in orchestrator.run("query"):
44
+ events.append(event)
45
+
46
+ # Assertions
47
+ # We should have NO "progress" events with "task completed" message
48
+ for event in events:
49
+ if event.type == "progress":
50
+ assert "task completed" not in event.message
51
+ # We should have NO "judging" events from the manager completion
52
+ if event.type == "judging":
53
+ assert "ManagerAgent" not in event.message
54
+
55
+
56
+ @pytest.mark.asyncio
57
+ async def test_internal_messages_are_filtered():
58
+ """Verify internal task_ledger/instruction messages are filtered."""
59
+ orchestrator = AdvancedOrchestrator()
60
+ mock_workflow = MagicMock()
61
+
62
+ async def event_stream(task):
63
+ # 1. Task Ledger (Should be skipped)
64
+ ledger_update = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
65
+ ledger_update.data.text = '{"some": "json"}'
66
+ ledger_update.data.additional_properties = {
67
+ "magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
68
+ "orchestrator_message_kind": ORCH_MSG_KIND_TASK_LEDGER,
69
+ }
70
+ yield ledger_update
71
+
72
+ # 2. Instruction (Should be skipped)
73
+ instruction = AgentRunUpdateEvent(executor_id="Manager", data=MagicMock())
74
+ instruction.data.text = "Internal instruction to agent"
75
+ instruction.data.additional_properties = {
76
+ "magentic_event_type": MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
77
+ "orchestrator_message_kind": ORCH_MSG_KIND_INSTRUCTION,
78
+ }
79
+ yield instruction
80
+
81
+ # 3. Normal agent message (SHOULD pass through)
82
+ # The streaming block filters task_ledger/instruction but passes agent content.
83
+ normal_msg = AgentRunUpdateEvent(executor_id="Searcher", data=MagicMock())
84
+ normal_msg.data.text = "I found something"
85
+ normal_msg.data.author_name = "Searcher"
86
+ normal_msg.data.additional_properties = {}
87
+ yield normal_msg
88
+
89
+ mock_workflow.run_stream = event_stream
90
+ orchestrator._build_workflow = MagicMock(return_value=mock_workflow)
91
+
92
+ async def mock_init_events(query):
93
+ if False:
94
+ yield
95
+
96
+ orchestrator._init_workflow_events = mock_init_events
97
+ orchestrator._init_embedding_service = MagicMock(return_value=None)
98
+
99
+ events = []
100
+ async for event in orchestrator.run("query"):
101
+ events.append(event)
102
+
103
+ # Assertions
104
+ # 1. Verify we got the normal message
105
+ streaming_messages = [e.message for e in events if e.type == "streaming"]
106
+ assert "I found something" in streaming_messages
107
+
108
+ # 2. Verify we did NOT get the internal messages
109
+ all_messages = [e.message for e in events]
110
+ # The JSON from task_ledger should be filtered
111
+ assert not any('{"some": "json"}' in msg for msg in all_messages)
112
+ # The instruction text should be filtered
113
+ assert not any("Internal instruction to agent" in msg for msg in all_messages)