Commit
·
40ca236
1
Parent(s):
c6474c2
refactor(orchestrator): implement semantic progress tracking
Browse files- Remove misleading 'Round X/Y' counter and time estimates
- Remove dead code (_get_progress_message, _EST_SECONDS_PER_ROUND)
- Implement semantic agent naming (e.g., 'reporter' -> 'ReportAgent')
- Update progress events to show 'Step N: AgentName task completed'
- Update tests to use valid domain agent IDs
- Fix P2_ROUND_COUNTER_SEMANTIC_MISMATCH
src/orchestrators/advanced.py
CHANGED
|
@@ -83,9 +83,6 @@ class AdvancedOrchestrator(OrchestratorProtocol):
|
|
| 83 |
- Configurable timeouts and round limits
|
| 84 |
"""
|
| 85 |
|
| 86 |
-
# Estimated seconds per coordination round (for progress UI)
|
| 87 |
-
_EST_SECONDS_PER_ROUND: int = 45
|
| 88 |
-
|
| 89 |
def __init__(
|
| 90 |
self,
|
| 91 |
max_rounds: int = 5,
|
|
@@ -193,16 +190,18 @@ Focus on:
|
|
| 193 |
|
| 194 |
The final output should be a structured research report."""
|
| 195 |
|
| 196 |
-
def
|
| 197 |
-
"""
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
|
|
|
|
|
|
| 206 |
|
| 207 |
async def _init_workflow_events(self, query: str) -> AsyncGenerator[AgentEvent, None]:
|
| 208 |
"""Yield initialization events."""
|
|
@@ -219,7 +218,9 @@ The final output should be a structured research report."""
|
|
| 219 |
)
|
| 220 |
|
| 221 |
async def _synthesize_fallback(
|
| 222 |
-
self,
|
|
|
|
|
|
|
| 223 |
) -> AsyncGenerator[AgentEvent, None]:
|
| 224 |
"""
|
| 225 |
Unified fallback synthesis for all termination scenarios.
|
|
@@ -272,7 +273,8 @@ The final output should be a structured research report."""
|
|
| 272 |
)
|
| 273 |
|
| 274 |
async def run( # noqa: PLR0915 - Complex but necessary for event stream handling
|
| 275 |
-
self,
|
|
|
|
| 276 |
) -> AsyncGenerator[AgentEvent, None]:
|
| 277 |
"""
|
| 278 |
Run the workflow.
|
|
@@ -312,9 +314,8 @@ The final output should be a structured research report."""
|
|
| 312 |
yield AgentEvent(
|
| 313 |
type="thinking",
|
| 314 |
message=(
|
| 315 |
-
f"Multi-agent reasoning in progress ({self._max_rounds} rounds
|
| 316 |
-
|
| 317 |
-
f"{self._max_rounds * 60 // 60} minutes."
|
| 318 |
),
|
| 319 |
iteration=0,
|
| 320 |
)
|
|
@@ -434,7 +435,10 @@ The final output should be a structured research report."""
|
|
| 434 |
)
|
| 435 |
|
| 436 |
def _handle_completion_event(
|
| 437 |
-
self,
|
|
|
|
|
|
|
|
|
|
| 438 |
) -> tuple[AgentEvent, AgentEvent]:
|
| 439 |
"""Handle an agent completion event using the accumulated buffer."""
|
| 440 |
# Use buffer if available, otherwise fall back cautiously
|
|
@@ -446,25 +450,19 @@ The final output should be a structured research report."""
|
|
| 446 |
# The result is often in event.result or similar, but buffering is safer
|
| 447 |
text_content = "Action completed (Tool Call)"
|
| 448 |
|
| 449 |
-
|
| 450 |
-
event_type = self._get_event_type_for_agent(
|
|
|
|
| 451 |
|
| 452 |
completion_event = AgentEvent(
|
| 453 |
type=event_type,
|
| 454 |
-
message=f"{
|
| 455 |
iteration=iteration,
|
| 456 |
)
|
| 457 |
|
| 458 |
-
# Progress update
|
| 459 |
-
rounds_remaining = max(self._max_rounds - iteration, 0)
|
| 460 |
-
est_seconds = rounds_remaining * 45
|
| 461 |
-
est_display = (
|
| 462 |
-
f"{est_seconds // 60}m {est_seconds % 60}s" if est_seconds >= 60 else f"{est_seconds}s"
|
| 463 |
-
)
|
| 464 |
-
|
| 465 |
progress_event = AgentEvent(
|
| 466 |
type="progress",
|
| 467 |
-
message=f"
|
| 468 |
iteration=iteration,
|
| 469 |
)
|
| 470 |
|
|
@@ -552,7 +550,8 @@ The final output should be a structured research report."""
|
|
| 552 |
return ""
|
| 553 |
|
| 554 |
def _get_event_type_for_agent(
|
| 555 |
-
self,
|
|
|
|
| 556 |
) -> Literal["search_complete", "judge_complete", "hypothesizing", "synthesizing", "judging"]:
|
| 557 |
"""Map agent name to appropriate event type.
|
| 558 |
|
|
|
|
| 83 |
- Configurable timeouts and round limits
|
| 84 |
"""
|
| 85 |
|
|
|
|
|
|
|
|
|
|
| 86 |
def __init__(
|
| 87 |
self,
|
| 88 |
max_rounds: int = 5,
|
|
|
|
| 190 |
|
| 191 |
The final output should be a structured research report."""
|
| 192 |
|
| 193 |
+
def _get_agent_semantic_name(self, agent_id: str) -> str:
|
| 194 |
+
"""Map internal agent ID to user-facing semantic name."""
|
| 195 |
+
name = agent_id.lower()
|
| 196 |
+
if SEARCHER_AGENT_ID in name:
|
| 197 |
+
return "SearchAgent"
|
| 198 |
+
if JUDGE_AGENT_ID in name:
|
| 199 |
+
return "JudgeAgent"
|
| 200 |
+
if HYPOTHESIZER_AGENT_ID in name:
|
| 201 |
+
return "HypothesisAgent"
|
| 202 |
+
if REPORTER_AGENT_ID in name:
|
| 203 |
+
return "ReportAgent"
|
| 204 |
+
return "ManagerAgent"
|
| 205 |
|
| 206 |
async def _init_workflow_events(self, query: str) -> AsyncGenerator[AgentEvent, None]:
|
| 207 |
"""Yield initialization events."""
|
|
|
|
| 218 |
)
|
| 219 |
|
| 220 |
async def _synthesize_fallback(
|
| 221 |
+
self,
|
| 222 |
+
iteration: int,
|
| 223 |
+
reason: str,
|
| 224 |
) -> AsyncGenerator[AgentEvent, None]:
|
| 225 |
"""
|
| 226 |
Unified fallback synthesis for all termination scenarios.
|
|
|
|
| 273 |
)
|
| 274 |
|
| 275 |
async def run( # noqa: PLR0915 - Complex but necessary for event stream handling
|
| 276 |
+
self,
|
| 277 |
+
query: str,
|
| 278 |
) -> AsyncGenerator[AgentEvent, None]:
|
| 279 |
"""
|
| 280 |
Run the workflow.
|
|
|
|
| 314 |
yield AgentEvent(
|
| 315 |
type="thinking",
|
| 316 |
message=(
|
| 317 |
+
f"Multi-agent reasoning in progress (Limit: {self._max_rounds} Manager rounds)... "
|
| 318 |
+
"Allocating time for deep research..."
|
|
|
|
| 319 |
),
|
| 320 |
iteration=0,
|
| 321 |
)
|
|
|
|
| 435 |
)
|
| 436 |
|
| 437 |
def _handle_completion_event(
|
| 438 |
+
self,
|
| 439 |
+
event: ExecutorCompletedEvent,
|
| 440 |
+
buffer: str,
|
| 441 |
+
iteration: int,
|
| 442 |
) -> tuple[AgentEvent, AgentEvent]:
|
| 443 |
"""Handle an agent completion event using the accumulated buffer."""
|
| 444 |
# Use buffer if available, otherwise fall back cautiously
|
|
|
|
| 450 |
# The result is often in event.result or similar, but buffering is safer
|
| 451 |
text_content = "Action completed (Tool Call)"
|
| 452 |
|
| 453 |
+
agent_id = getattr(event, "executor_id", "unknown") or "unknown"
|
| 454 |
+
event_type = self._get_event_type_for_agent(agent_id)
|
| 455 |
+
semantic_name = self._get_agent_semantic_name(agent_id)
|
| 456 |
|
| 457 |
completion_event = AgentEvent(
|
| 458 |
type=event_type,
|
| 459 |
+
message=f"{semantic_name}: {text_content[:200]}...",
|
| 460 |
iteration=iteration,
|
| 461 |
)
|
| 462 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 463 |
progress_event = AgentEvent(
|
| 464 |
type="progress",
|
| 465 |
+
message=f"Step {iteration}: {semantic_name} task completed",
|
| 466 |
iteration=iteration,
|
| 467 |
)
|
| 468 |
|
|
|
|
| 550 |
return ""
|
| 551 |
|
| 552 |
def _get_event_type_for_agent(
|
| 553 |
+
self,
|
| 554 |
+
agent_name: str,
|
| 555 |
) -> Literal["search_complete", "judge_complete", "hypothesizing", "synthesizing", "judging"]:
|
| 556 |
"""Map agent name to appropriate event type.
|
| 557 |
|
tests/unit/orchestrators/test_accumulator_pattern.py
CHANGED
|
@@ -174,10 +174,11 @@ async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator):
|
|
| 174 |
Input: Updates ("Hello", " World") -> Completed
|
| 175 |
Expected: AgentEvent with "Hello World"
|
| 176 |
"""
|
|
|
|
| 177 |
events = [
|
| 178 |
-
MockAgentRunUpdateEvent("Hello", author_name="
|
| 179 |
-
MockAgentRunUpdateEvent(" World", author_name="
|
| 180 |
-
MockExecutorCompletedEvent(executor_id="
|
| 181 |
]
|
| 182 |
|
| 183 |
async def mock_stream(*args, **kwargs):
|
|
@@ -192,13 +193,13 @@ async def test_accumulator_pattern_scenario_a_standard_text(mock_orchestrator):
|
|
| 192 |
async for event in mock_orchestrator.run("test query"):
|
| 193 |
generated_events.append(event)
|
| 194 |
|
| 195 |
-
# Find the completion event for
|
| 196 |
chat_events = [
|
| 197 |
-
e for e in generated_events if "
|
| 198 |
]
|
| 199 |
|
| 200 |
assert len(chat_events) >= 1, (
|
| 201 |
-
f"Expected
|
| 202 |
)
|
| 203 |
final_event = chat_events[0]
|
| 204 |
|
|
@@ -214,8 +215,9 @@ async def test_accumulator_pattern_scenario_b_tool_call(mock_orchestrator):
|
|
| 214 |
Input: No Deltas -> Completed
|
| 215 |
Expected: AgentEvent with fallback text
|
| 216 |
"""
|
|
|
|
| 217 |
events = [
|
| 218 |
-
MockExecutorCompletedEvent(executor_id="
|
| 219 |
]
|
| 220 |
|
| 221 |
async def mock_stream(*args, **kwargs):
|
|
@@ -251,11 +253,12 @@ async def test_accumulator_pattern_buffer_clearing(mock_orchestrator):
|
|
| 251 |
Verify buffer clears between agents.
|
| 252 |
Agent B should NOT inherit Agent A's accumulated text.
|
| 253 |
"""
|
|
|
|
| 254 |
events = [
|
| 255 |
-
MockAgentRunUpdateEvent("
|
| 256 |
-
MockExecutorCompletedEvent(executor_id="
|
| 257 |
-
MockAgentRunUpdateEvent("
|
| 258 |
-
MockExecutorCompletedEvent(executor_id="
|
| 259 |
]
|
| 260 |
|
| 261 |
async def mock_stream(*args, **kwargs):
|
|
@@ -272,18 +275,22 @@ async def test_accumulator_pattern_buffer_clearing(mock_orchestrator):
|
|
| 272 |
|
| 273 |
# Find non-streaming events for each agent
|
| 274 |
agent_a_events = [
|
| 275 |
-
e for e in generated_events if "
|
| 276 |
]
|
| 277 |
agent_b_events = [
|
| 278 |
-
e for e in generated_events if "
|
| 279 |
]
|
| 280 |
|
| 281 |
# Both should have completion events
|
| 282 |
-
assert len(agent_a_events) >= 1,
|
| 283 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 284 |
|
| 285 |
# Agent A should have its own text
|
| 286 |
-
assert "
|
| 287 |
# Agent B should have its own text, NOT Agent A's
|
| 288 |
-
assert "
|
| 289 |
-
assert "
|
|
|
|
| 174 |
Input: Updates ("Hello", " World") -> Completed
|
| 175 |
Expected: AgentEvent with "Hello World"
|
| 176 |
"""
|
| 177 |
+
# Use "searcher" to map to "SearchAgent"
|
| 178 |
events = [
|
| 179 |
+
MockAgentRunUpdateEvent("Hello", author_name="searcher"),
|
| 180 |
+
MockAgentRunUpdateEvent(" World", author_name="searcher"),
|
| 181 |
+
MockExecutorCompletedEvent(executor_id="searcher"),
|
| 182 |
]
|
| 183 |
|
| 184 |
async def mock_stream(*args, **kwargs):
|
|
|
|
| 193 |
async for event in mock_orchestrator.run("test query"):
|
| 194 |
generated_events.append(event)
|
| 195 |
|
| 196 |
+
# Find the completion event for SearchAgent (non-streaming)
|
| 197 |
chat_events = [
|
| 198 |
+
e for e in generated_events if "SearchAgent" in str(e.message) and e.type != "streaming"
|
| 199 |
]
|
| 200 |
|
| 201 |
assert len(chat_events) >= 1, (
|
| 202 |
+
f"Expected SearchAgent events, got: {[e.message for e in generated_events]}"
|
| 203 |
)
|
| 204 |
final_event = chat_events[0]
|
| 205 |
|
|
|
|
| 215 |
Input: No Deltas -> Completed
|
| 216 |
Expected: AgentEvent with fallback text
|
| 217 |
"""
|
| 218 |
+
# Use "searcher" to map to "SearchAgent"
|
| 219 |
events = [
|
| 220 |
+
MockExecutorCompletedEvent(executor_id="searcher"),
|
| 221 |
]
|
| 222 |
|
| 223 |
async def mock_stream(*args, **kwargs):
|
|
|
|
| 253 |
Verify buffer clears between agents.
|
| 254 |
Agent B should NOT inherit Agent A's accumulated text.
|
| 255 |
"""
|
| 256 |
+
# Use "searcher" (SearchAgent) and "judge" (JudgeAgent)
|
| 257 |
events = [
|
| 258 |
+
MockAgentRunUpdateEvent("Searcher says hi", author_name="searcher"),
|
| 259 |
+
MockExecutorCompletedEvent(executor_id="searcher"),
|
| 260 |
+
MockAgentRunUpdateEvent("Judge responds", author_name="judge"),
|
| 261 |
+
MockExecutorCompletedEvent(executor_id="judge"),
|
| 262 |
]
|
| 263 |
|
| 264 |
async def mock_stream(*args, **kwargs):
|
|
|
|
| 275 |
|
| 276 |
# Find non-streaming events for each agent
|
| 277 |
agent_a_events = [
|
| 278 |
+
e for e in generated_events if "SearchAgent" in str(e.message) and e.type != "streaming"
|
| 279 |
]
|
| 280 |
agent_b_events = [
|
| 281 |
+
e for e in generated_events if "JudgeAgent" in str(e.message) and e.type != "streaming"
|
| 282 |
]
|
| 283 |
|
| 284 |
# Both should have completion events
|
| 285 |
+
assert len(agent_a_events) >= 1, (
|
| 286 |
+
f"No SearchAgent events: {[e.message for e in generated_events]}"
|
| 287 |
+
)
|
| 288 |
+
assert len(agent_b_events) >= 1, (
|
| 289 |
+
f"No JudgeAgent events: {[e.message for e in generated_events]}"
|
| 290 |
+
)
|
| 291 |
|
| 292 |
# Agent A should have its own text
|
| 293 |
+
assert "Searcher" in agent_a_events[0].message
|
| 294 |
# Agent B should have its own text, NOT Agent A's
|
| 295 |
+
assert "Judge" in agent_b_events[0].message
|
| 296 |
+
assert "Searcher" not in agent_b_events[0].message, "Buffer not cleared between agents!"
|