File size: 6,881 Bytes
0157ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""Neutral SSE parsing and Anthropic stream shape assertions.

Used by default CI contract tests and by opt-in live smoke scenarios.
"""

from __future__ import annotations

import json
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any

from .server_tool_sse import (
    SERVER_TOOL_USE,
    WEB_FETCH_TOOL_RESULT,
    WEB_SEARCH_TOOL_RESULT,
)

# Content blocks that only use content_block_start/stop (no deltas), including
# Anthropic server tools and eager text emitted in a single start event.
_NO_DELTA_BLOCK_KINDS = frozenset(
    {
        SERVER_TOOL_USE,
        WEB_SEARCH_TOOL_RESULT,
        WEB_FETCH_TOOL_RESULT,
        "text_eager",
        "redacted_thinking",
    }
)

_ALLOWED_BLOCK_START_TYPES = frozenset(
    {
        "text",
        "thinking",
        "tool_use",
        "redacted_thinking",
        SERVER_TOOL_USE,
        WEB_SEARCH_TOOL_RESULT,
        WEB_FETCH_TOOL_RESULT,
    }
)


@dataclass(frozen=True, slots=True)
class SSEEvent:
    event: str
    data: dict[str, Any]
    raw: str


def parse_sse_lines(lines: Iterable[str]) -> list[SSEEvent]:
    events: list[SSEEvent] = []
    current_event = ""
    data_parts: list[str] = []
    raw_parts: list[str] = []

    for line in lines:
        stripped = line.rstrip("\r\n")
        if stripped == "":
            _append_event(events, current_event, data_parts, raw_parts)
            current_event = ""
            data_parts = []
            raw_parts = []
            continue
        raw_parts.append(stripped)
        if stripped.startswith("event:"):
            current_event = stripped.split(":", 1)[1].strip()
        elif stripped.startswith("data:"):
            data_parts.append(stripped.split(":", 1)[1].strip())

    _append_event(events, current_event, data_parts, raw_parts)
    return events


def parse_sse_text(text: str) -> list[SSEEvent]:
    return parse_sse_lines(text.splitlines())


def _append_event(
    events: list[SSEEvent],
    current_event: str,
    data_parts: list[str],
    raw_parts: list[str],
) -> None:
    if not current_event and not data_parts:
        return
    data_text = "\n".join(data_parts)
    data: dict[str, Any]
    try:
        parsed = json.loads(data_text) if data_text else {}
        data = parsed if isinstance(parsed, dict) else {"value": parsed}
    except json.JSONDecodeError:
        data = {"raw": data_text}
    events.append(SSEEvent(current_event, data, "\n".join(raw_parts)))


def assert_anthropic_stream_contract(
    events: list[SSEEvent], *, allow_error: bool = False
) -> None:
    """Check minimal Anthropic-style SSE invariants: start/stop, block nesting.

    Does *not* assert strict event ordering (e.g. :class:`message_delta` vs
    content blocks) beyond presence of a final ``message_stop``; stricter
    ordering can be tested in product or transport-specific suites.
    """
    assert events, "stream produced no SSE events"
    event_names = [event.event for event in events]
    assert "message_start" in event_names, event_names
    assert event_names[-1] == "message_stop", event_names

    open_blocks: dict[int, str] = {}
    seen_blocks: set[int] = set()
    for event in events:
        if event.event == "error" and not allow_error:
            raise AssertionError(f"unexpected SSE error event: {event.data}")

        if event.event == "content_block_start":
            index = event_index(event)
            block = event.data.get("content_block", {})
            assert isinstance(block, dict), event.data
            block_type = str(block.get("type", ""))
            assert block_type in _ALLOWED_BLOCK_START_TYPES, event.data
            assert index not in open_blocks, f"block {index} started twice"
            assert index not in seen_blocks, f"block {index} reused after stop"
            if block_type == "text" and str(block.get("text", "")).strip():
                storage = "text_eager"
            else:
                storage = block_type
            open_blocks[index] = storage
            seen_blocks.add(index)
            continue

        if event.event == "content_block_delta":
            index = event_index(event)
            assert index in open_blocks, f"delta for unopened block {index}"
            kind = open_blocks[index]
            assert kind not in _NO_DELTA_BLOCK_KINDS, (
                f"unexpected delta for start/stop-only block {kind} at index {index}"
            )
            delta = event.data.get("delta", {})
            assert isinstance(delta, dict), event.data
            delta_type = str(delta.get("type", ""))
            if kind == "thinking":
                assert delta_type in (
                    "thinking_delta",
                    "signature_delta",
                ), f"block {index} is {kind}, got {delta_type}"
                continue
            expected = {
                "text": "text_delta",
                "tool_use": "input_json_delta",
            }[kind]
            assert delta_type == expected, f"block {index} is {kind}, got {delta_type}"
            continue

        if event.event == "content_block_stop":
            index = event_index(event)
            assert index in open_blocks, f"stop for unopened block {index}"
            open_blocks.pop(index)

    assert not open_blocks, f"unclosed blocks: {open_blocks}"
    assert seen_blocks, "stream did not emit any content blocks"


def event_names(events: list[SSEEvent]) -> list[str]:
    return [event.event for event in events]


def text_content(events: list[SSEEvent]) -> str:
    parts: list[str] = []
    for event in events:
        if event.event == "content_block_start":
            block = event.data.get("content_block", {})
            if isinstance(block, dict) and block.get("type") == "text":
                eager = str(block.get("text", ""))
                if eager:
                    parts.append(eager)
        delta = event.data.get("delta", {})
        if isinstance(delta, dict) and delta.get("type") == "text_delta":
            parts.append(str(delta.get("text", "")))
    return "".join(parts)


def thinking_content(events: list[SSEEvent]) -> str:
    parts: list[str] = []
    for event in events:
        delta = event.data.get("delta", {})
        if isinstance(delta, dict) and delta.get("type") == "thinking_delta":
            parts.append(str(delta.get("thinking", "")))
    return "".join(parts)


def has_tool_use(events: list[SSEEvent]) -> bool:
    for event in events:
        block = event.data.get("content_block", {})
        if isinstance(block, dict) and block.get("type") == "tool_use":
            return True
    return False


def event_index(event: SSEEvent) -> int:
    """Return the content block ``index`` field from an SSE payload (strict)."""
    value = event.data.get("index")
    assert isinstance(value, int), event.data
    return value