| """Test that streaming event handling is fixed (no token-by-token spam).""" |
|
|
| from unittest.mock import MagicMock |
|
|
| import pytest |
|
|
| from src.utils.models import AgentEvent |
|
|
|
|
| @pytest.mark.unit |
| @pytest.mark.asyncio |
| async def test_streaming_events_are_buffered_not_spammed(): |
| """ |
| Verify that streaming events are buffered, not yielded individually. |
| |
| This test validates the fix for Bug 1: Token-by-Token Streaming Spam. |
| Before the fix, each token would create a separate yield, resulting in O(N²) spam. |
| After the fix, streaming tokens are buffered and only yielded once. |
| """ |
| |
| from src.app import research_agent |
|
|
| |
| mock_orchestrator = MagicMock() |
|
|
| |
| streaming_events = [ |
| AgentEvent(type="started", message="Starting research", iteration=0), |
| AgentEvent(type="streaming", message="This", iteration=1), |
| AgentEvent(type="streaming", message=" is", iteration=1), |
| AgentEvent(type="streaming", message=" a", iteration=1), |
| AgentEvent(type="streaming", message=" test", iteration=1), |
| AgentEvent(type="complete", message="Final answer: This is a test", iteration=1), |
| ] |
|
|
| |
| async def mock_run(query): |
| for event in streaming_events: |
| yield event |
|
|
| mock_orchestrator.run = mock_run |
|
|
| |
| import src.app as app_module |
|
|
| original_configure = app_module.configure_orchestrator |
| app_module.configure_orchestrator = MagicMock(return_value=(mock_orchestrator, "Test Backend")) |
|
|
| try: |
| |
| results = [] |
| async for result in research_agent("test query", [], mode="simple", api_key=""): |
| results.append(result) |
|
|
| |
| |
|
|
| |
| assert len(results) > 0, "Should have yielded results" |
|
|
| |
| assert any("📡 **STREAMING**: This is a test" in r for r in results), ( |
| "Buffer didn't accumulate correctly" |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| for res in results: |
| |
| |
| |
| streaming_markers = res.count("📡 **STREAMING**:") |
| assert streaming_markers <= 1, ( |
| f"Found multiple streaming markers in single response: {res}\n" |
| "This indicates we are appending new lines instead of updating in place." |
| ) |
|
|
| |
| assert any("Final answer" in r for r in results), "Missing final complete message" |
|
|
| finally: |
| |
| app_module.configure_orchestrator = original_configure |
|
|
|
|
| @pytest.mark.unit |
| @pytest.mark.asyncio |
| async def test_api_key_state_parameter_exists(): |
| """ |
| Verify that api_key_state parameter was added to research_agent. |
| |
| This validates the fix for Bug 2: API Key Persistence. |
| """ |
| import inspect |
|
|
| from src.app import research_agent |
|
|
| |
| sig = inspect.signature(research_agent) |
| params = list(sig.parameters.keys()) |
|
|
| |
| assert "api_key_state" in params, "api_key_state parameter missing from research_agent" |
|
|
| |
| api_key_idx = params.index("api_key") |
| api_key_state_idx = params.index("api_key_state") |
| assert api_key_state_idx > api_key_idx, "api_key_state should come after api_key" |
|
|