AIDA / evals /test_streaming.py
destinyebuka's picture
new setup
7cd10a9
"""
SSE streaming primitives eval — pure-function tests, no real LLM.
Verifies:
- sse_event format (event: <type>\\ndata: <json>\\n\\n)
- chunk_text word-boundary splitting and reassembly
- stream_agent_response event sequence: thinking → text* → cards? → done
- error path emits a single error event and stops
- cards event only present when there's structured data
"""
import json
import pytest
from app.ai.streaming import (
chunk_text,
sse_event,
stream_agent_response,
)
# ============================================================
# sse_event
# ============================================================
def test_sse_event_format():
msg = sse_event("hello", {"x": 1})
assert msg.startswith("event: hello\n")
assert "data: " in msg
assert msg.endswith("\n\n")
def test_sse_event_unicode_passthrough():
msg = sse_event("text", {"content": "Bonjour مرحبا 你好"})
# ensure_ascii=False so non-ASCII goes on the wire as UTF-8
assert "Bonjour مرحبا 你好" in msg
def test_sse_event_payload_is_single_line():
msg = sse_event("text", {"content": "line1\nline2"})
# The data: line itself should be one line — newline in payload becomes \n in JSON
data_lines = [l for l in msg.split("\n") if l.startswith("data: ")]
assert len(data_lines) == 1
def test_sse_event_no_html_escape_dance():
msg = sse_event("text", {"content": "<script>"})
# We're SSE not HTML — payload should round-trip via JSON unchanged
payload = msg.split("data: ", 1)[1].split("\n\n")[0]
assert json.loads(payload)["content"] == "<script>"
# ============================================================
# chunk_text
# ============================================================
def test_chunk_text_empty_string():
assert list(chunk_text("")) == []
def test_chunk_text_whitespace_only_yields_once():
assert list(chunk_text(" ")) == [" "]
def test_chunk_text_short_string_one_chunk():
text = "hi"
assert list(chunk_text(text, chunk_size=20)) == ["hi"]
def test_chunk_text_round_trips_to_original():
text = "Hello world this is AIDA the helpful real-estate assistant"
chunks = list(chunk_text(text, chunk_size=10))
assert "".join(chunks) == text
assert len(chunks) > 1, "long string should split into multiple chunks"
def test_chunk_text_does_not_split_words():
text = "antidisestablishmentarianism is a long word"
chunks = list(chunk_text(text, chunk_size=5))
# Round-trip is preserved
assert "".join(chunks) == text
# The long word is intact in exactly one chunk (never split mid-word)
long_word = "antidisestablishmentarianism"
occurrences = sum(1 for c in chunks if long_word in c)
assert occurrences == 1
# Every internal-chunk boundary is at a whitespace position — i.e.
# except for the final chunk, every chunk ends with whitespace
for chunk in chunks[:-1]:
assert chunk[-1].isspace(), (
f"non-final chunk {chunk!r} doesn't end at a whitespace boundary"
)
def test_chunk_text_multilingual():
text = "مرحبا بالعالم هذا اختبار طويل"
chunks = list(chunk_text(text, chunk_size=8))
assert "".join(chunks) == text
# ============================================================
# stream_agent_response — event sequencing
# ============================================================
class _FakeResponse:
"""Mimics AgentResponse without the Pydantic dependency."""
def __init__(self, text="", search_results=None, my_listings=None,
draft=None, draft_ui=None):
self.text = text
self.search_results = search_results
self.my_listings = my_listings
self.draft = draft
self.draft_ui = draft_ui
def model_dump(self):
return {"text": self.text}
async def _no_sleep(_):
return None
async def _resolved_to(value):
return value
def _parse_events(messages):
"""Extract (event_type, data_dict) from a list of SSE messages."""
events = []
for msg in messages:
lines = msg.strip().split("\n")
event_line = next(l for l in lines if l.startswith("event: "))
data_line = next(l for l in lines if l.startswith("data: "))
events.append((
event_line[len("event: "):].strip(),
json.loads(data_line[len("data: "):]),
))
return events
@pytest.mark.asyncio
async def test_stream_emits_thinking_first():
fake = _FakeResponse(text="hi")
messages = []
async for chunk in stream_agent_response(
_resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
assert events[0][0] == "thinking"
assert "ts" in events[0][1]
@pytest.mark.asyncio
async def test_stream_full_event_sequence_text_only():
fake = _FakeResponse(text="Hello world from AIDA today")
messages = []
async for chunk in stream_agent_response(
_resolved_to(fake), chunk_size=10, inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
types = [e[0] for e in events]
assert types[0] == "thinking"
assert "text" in types
assert "cards" not in types # no structured data on this fake
assert types[-1] == "done"
@pytest.mark.asyncio
async def test_stream_emits_cards_event_when_search_results_present():
fake = _FakeResponse(
text="found 3",
search_results=[{"title": "A"}, {"title": "B"}, {"title": "C"}],
)
messages = []
async for chunk in stream_agent_response(
_resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
types = [e[0] for e in events]
assert "cards" in types
cards_event = next(e for e in events if e[0] == "cards")
assert "search_results" in cards_event[1]
assert len(cards_event[1]["search_results"]) == 3
@pytest.mark.asyncio
async def test_stream_text_chunks_reassemble_to_original():
text = "Voici trois propriétés correspondantes à ta recherche"
fake = _FakeResponse(text=text)
messages = []
async for chunk in stream_agent_response(
_resolved_to(fake), chunk_size=8, inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
text_chunks = [e[1]["content"] for e in events if e[0] == "text"]
assert "".join(text_chunks) == text
@pytest.mark.asyncio
async def test_stream_error_path_emits_error_and_stops():
async def _raises():
raise RuntimeError("brain went boom")
messages = []
async for chunk in stream_agent_response(
_raises(), inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
types = [e[0] for e in events]
assert types == ["thinking", "error"]
assert events[1][1]["type"] == "RuntimeError"
assert "brain went boom" in events[1][1]["message"]
@pytest.mark.asyncio
async def test_stream_handles_dict_response():
"""stream_agent_response accepts dicts as well as objects."""
messages = []
async for chunk in stream_agent_response(
_resolved_to({"text": "hello", "search_results": [{"id": 1}]}),
chunk_size=20, inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
types = [e[0] for e in events]
assert "text" in types
assert "cards" in types
@pytest.mark.asyncio
async def test_stream_handles_empty_text():
"""If the agent returned no text, no text event should be emitted."""
fake = _FakeResponse(text="")
messages = []
async for chunk in stream_agent_response(
_resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep,
):
messages.append(chunk)
events = _parse_events(messages)
types = [e[0] for e in events]
assert "text" not in types
assert types[0] == "thinking"
assert types[-1] == "done"