""" SSE streaming primitives eval — pure-function tests, no real LLM. Verifies: - sse_event format (event: \\ndata: \\n\\n) - chunk_text word-boundary splitting and reassembly - stream_agent_response event sequence: thinking → text* → cards? → done - error path emits a single error event and stops - cards event only present when there's structured data """ import json import pytest from app.ai.streaming import ( chunk_text, sse_event, stream_agent_response, ) # ============================================================ # sse_event # ============================================================ def test_sse_event_format(): msg = sse_event("hello", {"x": 1}) assert msg.startswith("event: hello\n") assert "data: " in msg assert msg.endswith("\n\n") def test_sse_event_unicode_passthrough(): msg = sse_event("text", {"content": "Bonjour مرحبا 你好"}) # ensure_ascii=False so non-ASCII goes on the wire as UTF-8 assert "Bonjour مرحبا 你好" in msg def test_sse_event_payload_is_single_line(): msg = sse_event("text", {"content": "line1\nline2"}) # The data: line itself should be one line — newline in payload becomes \n in JSON data_lines = [l for l in msg.split("\n") if l.startswith("data: ")] assert len(data_lines) == 1 def test_sse_event_no_html_escape_dance(): msg = sse_event("text", {"content": "