Spaces:
Running
Running
File size: 6,881 Bytes
0157ac7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """Neutral SSE parsing and Anthropic stream shape assertions.
Used by default CI contract tests and by opt-in live smoke scenarios.
"""
from __future__ import annotations
import json
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any
from .server_tool_sse import (
SERVER_TOOL_USE,
WEB_FETCH_TOOL_RESULT,
WEB_SEARCH_TOOL_RESULT,
)
# Content blocks that only use content_block_start/stop (no deltas), including
# Anthropic server tools and eager text emitted in a single start event.
_NO_DELTA_BLOCK_KINDS = frozenset(
{
SERVER_TOOL_USE,
WEB_SEARCH_TOOL_RESULT,
WEB_FETCH_TOOL_RESULT,
"text_eager",
"redacted_thinking",
}
)
_ALLOWED_BLOCK_START_TYPES = frozenset(
{
"text",
"thinking",
"tool_use",
"redacted_thinking",
SERVER_TOOL_USE,
WEB_SEARCH_TOOL_RESULT,
WEB_FETCH_TOOL_RESULT,
}
)
@dataclass(frozen=True, slots=True)
class SSEEvent:
event: str
data: dict[str, Any]
raw: str
def parse_sse_lines(lines: Iterable[str]) -> list[SSEEvent]:
events: list[SSEEvent] = []
current_event = ""
data_parts: list[str] = []
raw_parts: list[str] = []
for line in lines:
stripped = line.rstrip("\r\n")
if stripped == "":
_append_event(events, current_event, data_parts, raw_parts)
current_event = ""
data_parts = []
raw_parts = []
continue
raw_parts.append(stripped)
if stripped.startswith("event:"):
current_event = stripped.split(":", 1)[1].strip()
elif stripped.startswith("data:"):
data_parts.append(stripped.split(":", 1)[1].strip())
_append_event(events, current_event, data_parts, raw_parts)
return events
def parse_sse_text(text: str) -> list[SSEEvent]:
return parse_sse_lines(text.splitlines())
def _append_event(
events: list[SSEEvent],
current_event: str,
data_parts: list[str],
raw_parts: list[str],
) -> None:
if not current_event and not data_parts:
return
data_text = "\n".join(data_parts)
data: dict[str, Any]
try:
parsed = json.loads(data_text) if data_text else {}
data = parsed if isinstance(parsed, dict) else {"value": parsed}
except json.JSONDecodeError:
data = {"raw": data_text}
events.append(SSEEvent(current_event, data, "\n".join(raw_parts)))
def assert_anthropic_stream_contract(
events: list[SSEEvent], *, allow_error: bool = False
) -> None:
"""Check minimal Anthropic-style SSE invariants: start/stop, block nesting.
Does *not* assert strict event ordering (e.g. :class:`message_delta` vs
content blocks) beyond presence of a final ``message_stop``; stricter
ordering can be tested in product or transport-specific suites.
"""
assert events, "stream produced no SSE events"
event_names = [event.event for event in events]
assert "message_start" in event_names, event_names
assert event_names[-1] == "message_stop", event_names
open_blocks: dict[int, str] = {}
seen_blocks: set[int] = set()
for event in events:
if event.event == "error" and not allow_error:
raise AssertionError(f"unexpected SSE error event: {event.data}")
if event.event == "content_block_start":
index = event_index(event)
block = event.data.get("content_block", {})
assert isinstance(block, dict), event.data
block_type = str(block.get("type", ""))
assert block_type in _ALLOWED_BLOCK_START_TYPES, event.data
assert index not in open_blocks, f"block {index} started twice"
assert index not in seen_blocks, f"block {index} reused after stop"
if block_type == "text" and str(block.get("text", "")).strip():
storage = "text_eager"
else:
storage = block_type
open_blocks[index] = storage
seen_blocks.add(index)
continue
if event.event == "content_block_delta":
index = event_index(event)
assert index in open_blocks, f"delta for unopened block {index}"
kind = open_blocks[index]
assert kind not in _NO_DELTA_BLOCK_KINDS, (
f"unexpected delta for start/stop-only block {kind} at index {index}"
)
delta = event.data.get("delta", {})
assert isinstance(delta, dict), event.data
delta_type = str(delta.get("type", ""))
if kind == "thinking":
assert delta_type in (
"thinking_delta",
"signature_delta",
), f"block {index} is {kind}, got {delta_type}"
continue
expected = {
"text": "text_delta",
"tool_use": "input_json_delta",
}[kind]
assert delta_type == expected, f"block {index} is {kind}, got {delta_type}"
continue
if event.event == "content_block_stop":
index = event_index(event)
assert index in open_blocks, f"stop for unopened block {index}"
open_blocks.pop(index)
assert not open_blocks, f"unclosed blocks: {open_blocks}"
assert seen_blocks, "stream did not emit any content blocks"
def event_names(events: list[SSEEvent]) -> list[str]:
return [event.event for event in events]
def text_content(events: list[SSEEvent]) -> str:
parts: list[str] = []
for event in events:
if event.event == "content_block_start":
block = event.data.get("content_block", {})
if isinstance(block, dict) and block.get("type") == "text":
eager = str(block.get("text", ""))
if eager:
parts.append(eager)
delta = event.data.get("delta", {})
if isinstance(delta, dict) and delta.get("type") == "text_delta":
parts.append(str(delta.get("text", "")))
return "".join(parts)
def thinking_content(events: list[SSEEvent]) -> str:
parts: list[str] = []
for event in events:
delta = event.data.get("delta", {})
if isinstance(delta, dict) and delta.get("type") == "thinking_delta":
parts.append(str(delta.get("thinking", "")))
return "".join(parts)
def has_tool_use(events: list[SSEEvent]) -> bool:
for event in events:
block = event.data.get("content_block", {})
if isinstance(block, dict) and block.get("type") == "tool_use":
return True
return False
def event_index(event: SSEEvent) -> int:
"""Return the content block ``index`` field from an SSE payload (strict)."""
value = event.data.get("index")
assert isinstance(value, int), event.data
return value
|