Spaces:
Running
Running
| """Shared native Anthropic SSE thinking policy, block remapping, and overlap repair. | |
| Used by :class:`OpenRouterProvider` and line-mode | |
| :class:`providers.anthropic_messages.AnthropicMessagesTransport` providers. | |
| """ | |
| from __future__ import annotations | |
| import copy | |
| import json | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| __all__ = [ | |
| "NativeSseBlockPolicyState", | |
| "format_native_sse_event", | |
| "is_terminal_openrouter_done_event", | |
| "parse_native_sse_event", | |
| "transform_native_sse_block_event", | |
| ] | |
| class _UpstreamBlockState: | |
| """Per-upstream content block: segment index and liveness in the model stream.""" | |
| block_type: str | |
| down_index: int | |
| open: bool | |
| last_start_block: dict[str, Any] | None = None | |
| class NativeSseBlockPolicyState: | |
| """Track per-upstream content blocks and remapped Anthropic ``index`` field.""" | |
| next_index: int = 0 | |
| by_upstream: dict[int, _UpstreamBlockState] = field(default_factory=dict) | |
| dropped_indexes: set[int] = field(default_factory=set) | |
| pending_suppressed_stops: set[int] = field(default_factory=set) | |
| message_stopped: bool = False | |
| def format_native_sse_event(event_name: str | None, data_text: str) -> str: | |
| """Format an SSE event from its event name and data payload.""" | |
| lines: list[str] = [] | |
| if event_name: | |
| lines.append(f"event: {event_name}") | |
| lines.extend(f"data: {line}" for line in data_text.splitlines()) | |
| return "\n".join(lines) + "\n\n" | |
| def parse_native_sse_event(event: str) -> tuple[str | None, str]: | |
| """Extract the event name and raw data payload from an SSE event.""" | |
| event_name = None | |
| data_lines: list[str] = [] | |
| for line in event.strip().splitlines(): | |
| if line.startswith("event:"): | |
| event_name = line[6:].strip() | |
| elif line.startswith("data:"): | |
| data_lines.append(line[5:].lstrip()) | |
| return event_name, "\n".join(data_lines) | |
| def is_terminal_openrouter_done_event(event_name: str | None, data_text: str) -> bool: | |
| """Return whether an event is OpenAI-style terminal noise (``[DONE]``).""" | |
| return (event_name is None or event_name in {"data", "done"}) and ( | |
| data_text.strip().upper() == "[DONE]" | |
| ) | |
| def _delta_type_to_block_kind(delta_type: Any) -> str | None: | |
| """Map a content_block_delta type to a content block kind (text/thinking/tool_use).""" | |
| if not isinstance(delta_type, str): | |
| return None | |
| if delta_type in {"thinking_delta", "signature_delta"}: | |
| return "thinking" | |
| if delta_type == "text_delta": | |
| return "text" | |
| if delta_type == "input_json_delta": | |
| return "tool_use" | |
| return None | |
| def _synthetic_start_content_block( | |
| block_kind: str, | |
| *, | |
| upstream_index: int, | |
| stored_tool_block: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Build a `content_block` for a `content_block_start` with empty streaming fields.""" | |
| if block_kind == "tool_use": | |
| if ( | |
| isinstance(stored_tool_block, dict) | |
| and stored_tool_block.get("type") == "tool_use" | |
| ): | |
| tool_id = stored_tool_block.get("id") | |
| name = stored_tool_block.get("name") | |
| inp = stored_tool_block.get("input") | |
| return { | |
| "type": "tool_use", | |
| "id": tool_id | |
| if isinstance(tool_id, str) and tool_id | |
| else f"toolu_or_{upstream_index}", | |
| "name": name if isinstance(name, str) else "", | |
| "input": inp if isinstance(inp, dict) else {}, | |
| } | |
| return { | |
| "type": "tool_use", | |
| "id": f"toolu_or_{upstream_index}", | |
| "name": "", | |
| "input": {}, | |
| } | |
| if block_kind == "thinking": | |
| return {"type": "thinking", "thinking": ""} | |
| if block_kind == "text": | |
| return {"type": "text", "text": ""} | |
| return {"type": "text", "text": ""} | |
| def _should_drop_block_type(block_type: Any, *, thinking_enabled: bool) -> bool: | |
| if not isinstance(block_type, str): | |
| return False | |
| if block_type.startswith("redacted_thinking"): | |
| return not thinking_enabled | |
| return not thinking_enabled and "thinking" in block_type | |
| def _synthetic_close_other_open_blocks( | |
| state: NativeSseBlockPolicyState, current_upstream: int | |
| ) -> str: | |
| """Close every open block except `current_upstream` and track duplicate upstream stops.""" | |
| out: list[str] = [] | |
| for upstream, seg in list(state.by_upstream.items()): | |
| if upstream == current_upstream or not seg.open: | |
| continue | |
| out.append( | |
| format_native_sse_event( | |
| "content_block_stop", | |
| json.dumps( | |
| { | |
| "type": "content_block_stop", | |
| "index": seg.down_index, | |
| } | |
| ), | |
| ) | |
| ) | |
| seg.open = False | |
| state.pending_suppressed_stops.add(upstream) | |
| return "".join(out) | |
| def _allocate_new_segment( | |
| state: NativeSseBlockPolicyState, | |
| upstream_index: int, | |
| block_type: str, | |
| *, | |
| last_start_block: dict[str, Any] | None = None, | |
| ) -> int: | |
| """Assign a new downstream `index` for a segment and record upstream state.""" | |
| new_idx = state.next_index | |
| state.next_index += 1 | |
| state.by_upstream[upstream_index] = _UpstreamBlockState( | |
| block_type=block_type, | |
| down_index=new_idx, | |
| open=True, | |
| last_start_block=last_start_block, | |
| ) | |
| return new_idx | |
| def transform_native_sse_block_event( | |
| event: str, | |
| state: NativeSseBlockPolicyState, | |
| *, | |
| thinking_enabled: bool, | |
| ) -> str | None: | |
| """Normalize native Anthropic SSE events and enforce local thinking policy.""" | |
| event_name, data_text = parse_native_sse_event(event) | |
| if not event_name or not data_text: | |
| return event | |
| try: | |
| payload = json.loads(data_text) | |
| except json.JSONDecodeError: | |
| return event | |
| if event_name == "content_block_start": | |
| block = payload.get("content_block") | |
| if not isinstance(block, dict): | |
| return event | |
| block_type = block.get("type") | |
| upstream_index = payload.get("index") | |
| if not isinstance(upstream_index, int): | |
| return event | |
| if _should_drop_block_type(block_type, thinking_enabled=thinking_enabled): | |
| state.dropped_indexes.add(upstream_index) | |
| return None | |
| if not isinstance(block_type, str): | |
| return event | |
| prefix = _synthetic_close_other_open_blocks(state, upstream_index) | |
| stored = copy.deepcopy(block) | |
| new_idx = _allocate_new_segment( | |
| state, | |
| upstream_index, | |
| block_type=block_type, | |
| last_start_block=stored, | |
| ) | |
| payload["index"] = new_idx | |
| return prefix + format_native_sse_event(event_name, json.dumps(payload)) | |
| if event_name == "content_block_delta": | |
| delta = payload.get("delta") | |
| if not isinstance(delta, dict): | |
| return event | |
| delta_type = delta.get("type") | |
| upstream_index = payload.get("index") | |
| if not isinstance(upstream_index, int): | |
| return event | |
| if upstream_index in state.dropped_indexes: | |
| return None | |
| if _should_drop_block_type(delta_type, thinking_enabled=thinking_enabled): | |
| return None | |
| block_kind = _delta_type_to_block_kind(delta_type) | |
| if block_kind is None: | |
| return event | |
| seg = state.by_upstream.get(upstream_index) | |
| if seg and seg.open: | |
| payload["index"] = seg.down_index | |
| return format_native_sse_event(event_name, json.dumps(payload)) | |
| if seg is not None and not seg.open: | |
| # More deltas for an upstream block after a synthetic (or other) close: | |
| # reopen with a new downstream `index` and emit a synthetic `content_block_start` first. | |
| state.pending_suppressed_stops.discard(upstream_index) | |
| carry = seg.last_start_block | |
| new_idx = _allocate_new_segment( | |
| state, | |
| upstream_index, | |
| block_type=block_kind, | |
| last_start_block=carry, | |
| ) | |
| stored_tool = ( | |
| carry | |
| if isinstance(carry, dict) and carry.get("type") == "tool_use" | |
| else None | |
| ) | |
| start_payload = { | |
| "type": "content_block_start", | |
| "index": new_idx, | |
| "content_block": _synthetic_start_content_block( | |
| block_kind, | |
| upstream_index=upstream_index, | |
| stored_tool_block=stored_tool, | |
| ), | |
| } | |
| prefix = format_native_sse_event( | |
| "content_block_start", json.dumps(start_payload) | |
| ) | |
| payload["index"] = new_idx | |
| return prefix + format_native_sse_event(event_name, json.dumps(payload)) | |
| # Delta with no prior `content_block_start` in this stream | |
| if block_kind in ("text", "tool_use"): | |
| synthetic_block = _synthetic_start_content_block( | |
| block_kind, | |
| upstream_index=upstream_index, | |
| ) | |
| new_idx = _allocate_new_segment( | |
| state, | |
| upstream_index, | |
| block_type=block_kind, | |
| last_start_block=copy.deepcopy(synthetic_block), | |
| ) | |
| start_payload = { | |
| "type": "content_block_start", | |
| "index": new_idx, | |
| "content_block": synthetic_block, | |
| } | |
| prefix = format_native_sse_event( | |
| "content_block_start", json.dumps(start_payload) | |
| ) | |
| payload["index"] = new_idx | |
| return prefix + format_native_sse_event(event_name, json.dumps(payload)) | |
| # thinking: pass through raw (unusual upstream shape) | |
| return event | |
| if event_name == "content_block_stop": | |
| upstream_index = payload.get("index") | |
| if not isinstance(upstream_index, int): | |
| return event | |
| if upstream_index in state.dropped_indexes: | |
| return None | |
| if upstream_index in state.pending_suppressed_stops: | |
| state.pending_suppressed_stops.discard(upstream_index) | |
| return None | |
| seg = state.by_upstream.get(upstream_index) | |
| if seg is not None and seg.open: | |
| payload["index"] = seg.down_index | |
| seg.open = False | |
| return format_native_sse_event(event_name, json.dumps(payload)) | |
| if seg is not None: | |
| # Spurious or duplicate `content_block_stop` for a closed block. | |
| return None | |
| if not thinking_enabled: | |
| return None | |
| return event | |
| return event | |