File size: 4,328 Bytes
0c9be4a 82503b1 0c9be4a cd7c282 0c9be4a 9006d69 e82d9c9 9006d69 0c9be4a 82503b1 0c9be4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
"""Test that streaming event handling is fixed (no token-by-token spam)."""
from unittest.mock import MagicMock
import pytest
from src.utils.models import AgentEvent
@pytest.mark.unit
@pytest.mark.asyncio
async def test_streaming_events_are_buffered_not_spammed():
"""
Verify that streaming events are buffered, not yielded individually.
This test validates the fix for Bug 1: Token-by-Token Streaming Spam.
Before the fix, each token would create a separate yield, resulting in O(NΒ²) spam.
After the fix, streaming tokens are buffered and only yielded once.
"""
# Import here to avoid circular dependencies
from src.app import research_agent
# Mock orchestrator
mock_orchestrator = MagicMock()
# Simulate streaming events (like LLM token-by-token output)
streaming_events = [
AgentEvent(type="started", message="Starting research", iteration=0),
AgentEvent(type="streaming", message="This", iteration=1),
AgentEvent(type="streaming", message=" is", iteration=1),
AgentEvent(type="streaming", message=" a", iteration=1),
AgentEvent(type="streaming", message=" test", iteration=1),
AgentEvent(type="complete", message="Final answer: This is a test", iteration=1),
]
# Create async generator that yields events
async def mock_run(query):
for event in streaming_events:
yield event
mock_orchestrator.run = mock_run
# Mock configure_orchestrator to return our mock
import src.app as app_module
original_configure = app_module.configure_orchestrator
app_module.configure_orchestrator = MagicMock(return_value=(mock_orchestrator, "Test Backend"))
try:
# Run the research agent
results = []
# SPEC-16: mode parameter removed (unified architecture)
async for result in research_agent("test query", [], api_key=""):
results.append(result)
# Verify that we DO see streaming updates (for UX responsiveness)
# But we don't want O(N^2) growth of the persisted list.
# We expect results to contain the streaming updates
assert len(results) > 0, "Should have yielded results"
# Check that we see the accumulated message
assert any("π‘ **STREAMING**: This is a test" in r for r in results), (
"Buffer didn't accumulate correctly"
)
# The critical check for the "Spam" bug:
# In the spam bug, the output grew like:
# "Stream: T"
# "Stream: T\nStream: h"
# "Stream: T\nStream: h\nStream: i"
#
# In the fixed version, it should look like:
# "Stream: T"
# "Stream: Th"
# "Stream: Thi"
# (Replacing the last line, not adding new lines)
for res in results:
# Count occurrences of "π‘ **STREAMING**:": in a single result string
# It should appear AT MOST once
# (unless we have multiple distinct streaming blocks)
streaming_markers = res.count("π‘ **STREAMING**:")
assert streaming_markers <= 1, (
f"Found multiple streaming markers in single response: {res}\n"
"This indicates we are appending new lines instead of updating in place."
)
# The final result should be the complete message
assert any("Final answer" in r for r in results), "Missing final complete message"
finally:
# Restore original function
app_module.configure_orchestrator = original_configure
@pytest.mark.unit
@pytest.mark.asyncio
async def test_api_key_state_parameter_exists():
"""
Verify that api_key_state parameter was added to research_agent.
This validates the fix for Bug 2: API Key Persistence.
"""
import inspect
from src.app import research_agent
# Get function signature
sig = inspect.signature(research_agent)
params = list(sig.parameters.keys())
# Verify api_key_state parameter exists
assert "api_key_state" in params, "api_key_state parameter missing from research_agent"
# Verify it's after api_key
api_key_idx = params.index("api_key")
api_key_state_idx = params.index("api_key_state")
assert api_key_state_idx > api_key_idx, "api_key_state should come after api_key"
|