web2api / core /hub /openai_sse.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""
把 OpenAIStreamEvent 编码为 OpenAI ChatCompletions SSE chunk。
这是 Hub 层的“协议输出工具”,用于把插件输出的结构化事件流转换为
OpenAI 兼容的 `data: {...}\\n\\n` 片段。
当前不替换既有渲染链路,仅提供给后续协议/插件扩展使用。
"""
from __future__ import annotations
import json
import time
import uuid as uuid_mod
from collections.abc import AsyncIterator, Iterator
from core.hub.schemas import OpenAIStreamEvent
def make_openai_stream_context(*, model: str) -> tuple[str, int]:
"""生成 OpenAI SSE 上下文:chat_id + created。"""
chat_id = f"chatcmpl-{uuid_mod.uuid4().hex[:24]}"
created = int(time.time())
# model 由上层写入 payload
del model
return chat_id, created
def _chunk(
*,
chat_id: str,
model: str,
created: int,
delta: dict,
finish_reason: str | None = None,
) -> str:
return (
"data: "
+ json.dumps(
{
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": delta,
"logprobs": None,
"finish_reason": finish_reason,
}
],
},
ensure_ascii=False,
)
+ "\n\n"
)
def encode_openai_sse_events(
events: Iterator[OpenAIStreamEvent],
*,
chat_id: str,
model: str,
created: int,
) -> Iterator[str]:
"""同步编码器:OpenAIStreamEvent -> OpenAI SSE strings。"""
# 兼容主流 OpenAI SSE 客户端:先发一帧 role:assistant + content:""
yield _chunk(
chat_id=chat_id,
model=model,
created=created,
delta={"role": "assistant", "content": ""},
finish_reason=None,
)
for ev in events:
if ev.type == "content_delta":
if ev.content:
yield _chunk(
chat_id=chat_id,
model=model,
created=created,
delta={"content": ev.content},
finish_reason=None,
)
elif ev.type == "tool_call_delta":
if ev.tool_calls:
yield _chunk(
chat_id=chat_id,
model=model,
created=created,
delta={"tool_calls": [tc.model_dump() for tc in ev.tool_calls]},
finish_reason=None,
)
elif ev.type == "finish":
# OpenAI 的结束 chunk 允许 delta 为空对象
yield _chunk(
chat_id=chat_id,
model=model,
created=created,
delta={},
finish_reason=ev.finish_reason or "stop",
)
yield "data: [DONE]\n\n"
return
elif ev.type == "error":
# OpenAI SSE 没有标准 error 事件,这里用 data 包一层 error 对象(与现有实现一致风格)
msg = ev.error or "unknown error"
yield (
"data: "
+ json.dumps(
{"error": {"message": msg, "type": "server_error"}},
ensure_ascii=False,
)
+ "\n\n"
)
async def encode_openai_sse_events_async(
events: AsyncIterator[OpenAIStreamEvent],
*,
chat_id: str,
model: str,
created: int,
) -> AsyncIterator[str]:
"""异步编码器:OpenAIStreamEvent -> OpenAI SSE strings。"""
async for ev in events:
for out in encode_openai_sse_events(
iter([ev]),
chat_id=chat_id,
model=model,
created=created,
):
yield out