a2p / api /converter /stream.py
Spooker's picture
Upload 19 files
28a52dc verified
"""Convert AI SDK v6 Data Stream SSE to OpenAI chat-completions SSE format."""
from __future__ import annotations
import json
import time
from collections.abc import AsyncIterator
def _make_chunk(
request_id: str,
model: str,
*,
delta: dict,
finish_reason: str | None = None,
usage: dict | None = None,
) -> str:
"""Format a single OpenAI SSE chunk."""
chunk: dict = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": delta,
"finish_reason": finish_reason,
}
],
}
if usage is not None:
chunk["usage"] = usage
return f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
def _extract_usage(event: dict) -> dict | None:
"""Extract usage from finish event (handles both /api/chat and /api/doc/chat)."""
meta = event.get("messageMetadata", {})
# /api/doc/chat nests under custom.usage
raw = meta.get("custom", {}).get("usage")
# /api/chat puts usage directly in messageMetadata
if not raw:
raw = meta.get("usage")
if not raw:
return None
return {
"prompt_tokens": raw.get("inputTokens", 0),
"completion_tokens": raw.get("outputTokens", 0),
"total_tokens": raw.get("totalTokens", 0),
}
# ---------------------------------------------------------------------------
# Streaming conversion
# ---------------------------------------------------------------------------
async def convert_stream(
lines: AsyncIterator[str],
model: str,
request_id: str,
) -> AsyncIterator[str]:
"""Yield OpenAI-compatible SSE strings from an AI SDK data-stream."""
role_sent = False
# Accumulate tool call argument deltas per toolCallId
tool_calls_index: dict[str, int] = {} # toolCallId → index
next_tool_index = 0
async for raw_line in lines:
line = raw_line.rstrip("\r\n")
if not line:
continue
if line == "data: [DONE]":
yield "data: [DONE]\n\n"
return
if not line.startswith("data: "):
continue
try:
event = json.loads(line[6:])
except json.JSONDecodeError:
continue
event_type = event.get("type")
# --- Text events ---
if event_type == "text-start":
if not role_sent:
yield _make_chunk(
request_id, model, delta={"role": "assistant", "content": ""}
)
role_sent = True
elif event_type == "text-delta":
if not role_sent:
yield _make_chunk(
request_id, model, delta={"role": "assistant", "content": ""}
)
role_sent = True
yield _make_chunk(
request_id, model, delta={"content": event.get("delta", "")}
)
# --- Tool call events ---
elif event_type == "tool-input-start":
tc_id = event.get("toolCallId", "")
tool_name = event.get("toolName", "")
idx = next_tool_index
tool_calls_index[tc_id] = idx
next_tool_index += 1
delta: dict = {"tool_calls": [{
"index": idx,
"id": tc_id,
"type": "function",
"function": {"name": tool_name, "arguments": ""},
}]}
if not role_sent:
delta["role"] = "assistant"
role_sent = True
yield _make_chunk(request_id, model, delta=delta)
elif event_type == "tool-input-delta":
tc_id = event.get("toolCallId", "")
idx = tool_calls_index.get(tc_id, 0)
yield _make_chunk(
request_id, model,
delta={"tool_calls": [{
"index": idx,
"function": {"arguments": event.get("inputTextDelta", "")},
}]},
)
# tool-input-available — full args ready; nothing extra needed for
# streaming (client already accumulated deltas), but we can skip it.
# --- Finish events ---
elif event_type == "finish":
finish_reason = event.get("finishReason", "stop")
if finish_reason == "tool-calls":
finish_reason = "tool_calls"
usage = _extract_usage(event)
yield _make_chunk(
request_id, model,
delta={},
finish_reason=finish_reason,
usage=usage,
)
yield "data: [DONE]\n\n"
# ---------------------------------------------------------------------------
# Non-streaming helpers
# ---------------------------------------------------------------------------
def parse_full_response(lines: list[str]) -> tuple[str, list[dict], str, dict | None]:
"""Parse all SSE lines into (content, tool_calls, finish_reason, usage)."""
content_parts: list[str] = []
tool_calls: list[dict] = []
# Accumulate args per toolCallId
tool_args: dict[str, list[str]] = {}
tool_meta: dict[str, dict] = {} # toolCallId → {name, id}
finish_reason = "stop"
usage = None
for raw_line in lines:
line = raw_line.rstrip("\r\n")
if not line or not line.startswith("data: ") or line == "data: [DONE]":
continue
try:
event = json.loads(line[6:])
except json.JSONDecodeError:
continue
etype = event.get("type")
if etype == "text-delta":
content_parts.append(event.get("delta", ""))
elif etype == "tool-input-start":
tc_id = event.get("toolCallId", "")
tool_args[tc_id] = []
tool_meta[tc_id] = {
"name": event.get("toolName", ""),
"id": tc_id,
}
elif etype == "tool-input-delta":
tc_id = event.get("toolCallId", "")
tool_args.setdefault(tc_id, []).append(event.get("inputTextDelta", ""))
elif etype == "tool-input-available":
tc_id = event.get("toolCallId", "")
meta = tool_meta.get(tc_id, {"name": event.get("toolName", ""), "id": tc_id})
tool_calls.append({
"id": meta["id"],
"type": "function",
"function": {
"name": meta["name"],
"arguments": json.dumps(event.get("input", {}), ensure_ascii=False),
},
})
elif etype == "finish":
finish_reason = event.get("finishReason", "stop")
if finish_reason == "tool-calls":
finish_reason = "tool_calls"
usage = _extract_usage(event)
# If we got tool-input-start/delta but no tool-input-available, build from deltas
for tc_id, meta in tool_meta.items():
if not any(tc.get("id") == tc_id for tc in tool_calls):
tool_calls.append({
"id": meta["id"],
"type": "function",
"function": {
"name": meta["name"],
"arguments": "".join(tool_args.get(tc_id, [])),
},
})
return "".join(content_parts), tool_calls, finish_reason, usage
def build_non_stream_response(
request_id: str,
model: str,
content: str,
finish_reason: str = "stop",
usage: dict | None = None,
tool_calls: list[dict] | None = None,
) -> dict:
"""Build a non-streaming chat.completions response object."""
message: dict = {"role": "assistant", "content": content or None}
if tool_calls:
message["tool_calls"] = tool_calls
if not content:
message["content"] = None
resp: dict = {
"id": request_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": message,
"finish_reason": finish_reason,
}
],
"usage": usage or {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
}
return resp