Spaces:
Running
Running
| """ | |
| SSE streaming primitives eval — pure-function tests, no real LLM. | |
| Verifies: | |
| - sse_event format (event: <type>\\ndata: <json>\\n\\n) | |
| - chunk_text word-boundary splitting and reassembly | |
| - stream_agent_response event sequence: thinking → text* → cards? → done | |
| - error path emits a single error event and stops | |
| - cards event only present when there's structured data | |
| """ | |
| import json | |
| import pytest | |
| from app.ai.streaming import ( | |
| chunk_text, | |
| sse_event, | |
| stream_agent_response, | |
| ) | |
| # ============================================================ | |
| # sse_event | |
| # ============================================================ | |
| def test_sse_event_format(): | |
| msg = sse_event("hello", {"x": 1}) | |
| assert msg.startswith("event: hello\n") | |
| assert "data: " in msg | |
| assert msg.endswith("\n\n") | |
| def test_sse_event_unicode_passthrough(): | |
| msg = sse_event("text", {"content": "Bonjour مرحبا 你好"}) | |
| # ensure_ascii=False so non-ASCII goes on the wire as UTF-8 | |
| assert "Bonjour مرحبا 你好" in msg | |
| def test_sse_event_payload_is_single_line(): | |
| msg = sse_event("text", {"content": "line1\nline2"}) | |
| # The data: line itself should be one line — newline in payload becomes \n in JSON | |
| data_lines = [l for l in msg.split("\n") if l.startswith("data: ")] | |
| assert len(data_lines) == 1 | |
| def test_sse_event_no_html_escape_dance(): | |
| msg = sse_event("text", {"content": "<script>"}) | |
| # We're SSE not HTML — payload should round-trip via JSON unchanged | |
| payload = msg.split("data: ", 1)[1].split("\n\n")[0] | |
| assert json.loads(payload)["content"] == "<script>" | |
| # ============================================================ | |
| # chunk_text | |
| # ============================================================ | |
| def test_chunk_text_empty_string(): | |
| assert list(chunk_text("")) == [] | |
| def test_chunk_text_whitespace_only_yields_once(): | |
| assert list(chunk_text(" ")) == [" "] | |
| def test_chunk_text_short_string_one_chunk(): | |
| text = "hi" | |
| assert list(chunk_text(text, chunk_size=20)) == ["hi"] | |
| def test_chunk_text_round_trips_to_original(): | |
| text = "Hello world this is AIDA the helpful real-estate assistant" | |
| chunks = list(chunk_text(text, chunk_size=10)) | |
| assert "".join(chunks) == text | |
| assert len(chunks) > 1, "long string should split into multiple chunks" | |
| def test_chunk_text_does_not_split_words(): | |
| text = "antidisestablishmentarianism is a long word" | |
| chunks = list(chunk_text(text, chunk_size=5)) | |
| # Round-trip is preserved | |
| assert "".join(chunks) == text | |
| # The long word is intact in exactly one chunk (never split mid-word) | |
| long_word = "antidisestablishmentarianism" | |
| occurrences = sum(1 for c in chunks if long_word in c) | |
| assert occurrences == 1 | |
| # Every internal-chunk boundary is at a whitespace position — i.e. | |
| # except for the final chunk, every chunk ends with whitespace | |
| for chunk in chunks[:-1]: | |
| assert chunk[-1].isspace(), ( | |
| f"non-final chunk {chunk!r} doesn't end at a whitespace boundary" | |
| ) | |
| def test_chunk_text_multilingual(): | |
| text = "مرحبا بالعالم هذا اختبار طويل" | |
| chunks = list(chunk_text(text, chunk_size=8)) | |
| assert "".join(chunks) == text | |
| # ============================================================ | |
| # stream_agent_response — event sequencing | |
| # ============================================================ | |
| class _FakeResponse: | |
| """Mimics AgentResponse without the Pydantic dependency.""" | |
| def __init__(self, text="", search_results=None, my_listings=None, | |
| draft=None, draft_ui=None): | |
| self.text = text | |
| self.search_results = search_results | |
| self.my_listings = my_listings | |
| self.draft = draft | |
| self.draft_ui = draft_ui | |
| def model_dump(self): | |
| return {"text": self.text} | |
| async def _no_sleep(_): | |
| return None | |
| async def _resolved_to(value): | |
| return value | |
| def _parse_events(messages): | |
| """Extract (event_type, data_dict) from a list of SSE messages.""" | |
| events = [] | |
| for msg in messages: | |
| lines = msg.strip().split("\n") | |
| event_line = next(l for l in lines if l.startswith("event: ")) | |
| data_line = next(l for l in lines if l.startswith("data: ")) | |
| events.append(( | |
| event_line[len("event: "):].strip(), | |
| json.loads(data_line[len("data: "):]), | |
| )) | |
| return events | |
| async def test_stream_emits_thinking_first(): | |
| fake = _FakeResponse(text="hi") | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| assert events[0][0] == "thinking" | |
| assert "ts" in events[0][1] | |
| async def test_stream_full_event_sequence_text_only(): | |
| fake = _FakeResponse(text="Hello world from AIDA today") | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to(fake), chunk_size=10, inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| types = [e[0] for e in events] | |
| assert types[0] == "thinking" | |
| assert "text" in types | |
| assert "cards" not in types # no structured data on this fake | |
| assert types[-1] == "done" | |
| async def test_stream_emits_cards_event_when_search_results_present(): | |
| fake = _FakeResponse( | |
| text="found 3", | |
| search_results=[{"title": "A"}, {"title": "B"}, {"title": "C"}], | |
| ) | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| types = [e[0] for e in events] | |
| assert "cards" in types | |
| cards_event = next(e for e in events if e[0] == "cards") | |
| assert "search_results" in cards_event[1] | |
| assert len(cards_event[1]["search_results"]) == 3 | |
| async def test_stream_text_chunks_reassemble_to_original(): | |
| text = "Voici trois propriétés correspondantes à ta recherche" | |
| fake = _FakeResponse(text=text) | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to(fake), chunk_size=8, inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| text_chunks = [e[1]["content"] for e in events if e[0] == "text"] | |
| assert "".join(text_chunks) == text | |
| async def test_stream_error_path_emits_error_and_stops(): | |
| async def _raises(): | |
| raise RuntimeError("brain went boom") | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _raises(), inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| types = [e[0] for e in events] | |
| assert types == ["thinking", "error"] | |
| assert events[1][1]["type"] == "RuntimeError" | |
| assert "brain went boom" in events[1][1]["message"] | |
| async def test_stream_handles_dict_response(): | |
| """stream_agent_response accepts dicts as well as objects.""" | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to({"text": "hello", "search_results": [{"id": 1}]}), | |
| chunk_size=20, inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| types = [e[0] for e in events] | |
| assert "text" in types | |
| assert "cards" in types | |
| async def test_stream_handles_empty_text(): | |
| """If the agent returned no text, no text event should be emitted.""" | |
| fake = _FakeResponse(text="") | |
| messages = [] | |
| async for chunk in stream_agent_response( | |
| _resolved_to(fake), inter_chunk_delay_seconds=0, sleep=_no_sleep, | |
| ): | |
| messages.append(chunk) | |
| events = _parse_events(messages) | |
| types = [e[0] for e in events] | |
| assert "text" not in types | |
| assert types[0] == "thinking" | |
| assert types[-1] == "done" | |