File size: 4,328 Bytes
0c9be4a
 
 
 
 
 
 
 
 
82503b1
0c9be4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd7c282
 
0c9be4a
 
9006d69
 
 
 
 
 
 
e82d9c9
 
 
9006d69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c9be4a
 
 
 
 
 
 
 
 
82503b1
0c9be4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Test that streaming event handling is fixed (no token-by-token spam)."""

from unittest.mock import MagicMock

import pytest

from src.utils.models import AgentEvent


@pytest.mark.unit
@pytest.mark.asyncio
async def test_streaming_events_are_buffered_not_spammed():
    """
    Verify that streaming events are buffered, not yielded individually.

    This test validates the fix for Bug 1: Token-by-Token Streaming Spam.
    Before the fix, each token would create a separate yield, resulting in O(NΒ²) spam.
    After the fix, streaming tokens are buffered and only yielded once.
    """
    # Import here to avoid circular dependencies
    from src.app import research_agent

    # Mock orchestrator
    mock_orchestrator = MagicMock()

    # Simulate streaming events (like LLM token-by-token output)
    streaming_events = [
        AgentEvent(type="started", message="Starting research", iteration=0),
        AgentEvent(type="streaming", message="This", iteration=1),
        AgentEvent(type="streaming", message=" is", iteration=1),
        AgentEvent(type="streaming", message=" a", iteration=1),
        AgentEvent(type="streaming", message=" test", iteration=1),
        AgentEvent(type="complete", message="Final answer: This is a test", iteration=1),
    ]

    # Create async generator that yields events
    async def mock_run(query):
        for event in streaming_events:
            yield event

    mock_orchestrator.run = mock_run

    # Mock configure_orchestrator to return our mock
    import src.app as app_module

    original_configure = app_module.configure_orchestrator
    app_module.configure_orchestrator = MagicMock(return_value=(mock_orchestrator, "Test Backend"))

    try:
        # Run the research agent
        results = []
        # SPEC-16: mode parameter removed (unified architecture)
        async for result in research_agent("test query", [], api_key=""):
            results.append(result)

        # Verify that we DO see streaming updates (for UX responsiveness)
        # But we don't want O(N^2) growth of the persisted list.

        # We expect results to contain the streaming updates
        assert len(results) > 0, "Should have yielded results"

        # Check that we see the accumulated message
        assert any("πŸ“‘ **STREAMING**: This is a test" in r for r in results), (
            "Buffer didn't accumulate correctly"
        )

        # The critical check for the "Spam" bug:
        # In the spam bug, the output grew like:
        # "Stream: T"
        # "Stream: T\nStream: h"
        # "Stream: T\nStream: h\nStream: i"
        #
        # In the fixed version, it should look like:
        # "Stream: T"
        # "Stream: Th"
        # "Stream: Thi"
        # (Replacing the last line, not adding new lines)

        for res in results:
            # Count occurrences of "πŸ“‘ **STREAMING**:": in a single result string
            # It should appear AT MOST once
            # (unless we have multiple distinct streaming blocks)
            streaming_markers = res.count("πŸ“‘ **STREAMING**:")
            assert streaming_markers <= 1, (
                f"Found multiple streaming markers in single response: {res}\n"
                "This indicates we are appending new lines instead of updating in place."
            )

        # The final result should be the complete message
        assert any("Final answer" in r for r in results), "Missing final complete message"

    finally:
        # Restore original function
        app_module.configure_orchestrator = original_configure


@pytest.mark.unit
@pytest.mark.asyncio
async def test_api_key_state_parameter_exists():
    """
    Verify that api_key_state parameter was added to research_agent.

    This validates the fix for Bug 2: API Key Persistence.
    """
    import inspect

    from src.app import research_agent

    # Get function signature
    sig = inspect.signature(research_agent)
    params = list(sig.parameters.keys())

    # Verify api_key_state parameter exists
    assert "api_key_state" in params, "api_key_state parameter missing from research_agent"

    # Verify it's after api_key
    api_key_idx = params.index("api_key")
    api_key_state_idx = params.index("api_key_state")
    assert api_key_state_idx > api_key_idx, "api_key_state should come after api_key"