ikun2 / stream-transform.js
bingn's picture
Upload 19 files
f1357b6 verified
/**
* stream-transform.js - chataibot JSON 流 → OpenAI/Anthropic SSE 转换器
*
* chataibot.pro 流式响应格式 (紧凑 JSON 对象流,无换行分隔):
* {"type":"botType","data":"gpt-4o"}{"type":"chunk","data":"Hello"}...
* {"type":"finalResult","data":{"mainText":"...","questions":[...]}}
*/
import { PassThrough } from 'stream';
import { parseToolCalls, toOpenAIToolCalls, toAnthropicToolUse, detectToolCallStart } from './tool-prompt.js';
/**
* 流式重复内容检测器
*
* chataibot.pro 的搜索模型有时会返回重复内容 (同一段回答重复 2-3 次)。
* 策略: 滑动窗口检测 — 当新增文本中出现之前已输出内容的大段重复时,截断。
*
* @param {number} minRepeatLen - 最小重复片段长度 (默认 150 字符)
* @returns {{ feed(text): { emit: string, repeated: boolean }, getText(): string }}
*/
function createRepeatDetector(minRepeatLen = 150) {
let fullText = '';
return {
/**
* 送入新的 chunk 文本,返回应该输出的部分
* @returns {{ emit: string, repeated: boolean }}
*/
feed(chunk) {
if (!chunk) return { emit: '', repeated: false };
const prevLen = fullText.length;
fullText += chunk;
// 文本太短时不检测
if (fullText.length < minRepeatLen * 2) {
return { emit: chunk, repeated: false };
}
// 检测: 取已有文本的后 minRepeatLen 字符作为 needle,
// 在之前的文本 (不含最后 needle 自身) 中查找
const windowSize = Math.min(minRepeatLen, Math.floor(fullText.length / 3));
if (windowSize < 80) return { emit: chunk, repeated: false };
// 从新加入的文本末尾往回取 windowSize 长度的片段
const needle = fullText.substring(fullText.length - windowSize);
const searchArea = fullText.substring(0, fullText.length - windowSize);
const idx = searchArea.indexOf(needle);
if (idx >= 0) {
// 找到重复 — 计算需要截断的位置
// needle 在 fullText 中首次出现于 idx,第二次出现于 fullText.length - windowSize
// 我们需要保留到第一次出现结束的部分,截掉重复
const repeatStart = fullText.length - windowSize;
// 只输出 chunk 中在 repeatStart 之前的部分
const safeEnd = repeatStart - prevLen;
const emit = safeEnd > 0 ? chunk.substring(0, safeEnd) : '';
// 回滚 fullText 到截断点
fullText = fullText.substring(0, repeatStart);
console.log(`[RepeatDetect] 检测到重复 (${windowSize} chars),截断输出`);
return { emit, repeated: true };
}
return { emit: chunk, repeated: false };
},
getText() {
return fullText;
},
};
}
/**
* 流预检 — 缓冲流的前几个 JSON 对象,检测是否为 streamingError
*
* chataibot 有时在流开始后才返回 "Insufficient credits" 等错误。
* 此函数先读取前几个对象,如果是错误就 reject (允许上层换号重试),
* 否则返回一个包装过的 stream (先重放缓冲的原始数据,再接续剩余流),
* transform 函数无需任何修改。
*
* @param {ReadableStream} upstreamStream
* @returns {Promise<ReadableStream>}
* @throws {Error} 如果流的第一个实质性对象就是 streamingError
*/
export function probeStream(upstreamStream, timeoutMs = 30000) {
return new Promise((resolve, reject) => {
let resolved = false;
let rawChunks = [];
function cleanup() {
clearTimeout(timer);
upstreamStream.removeListener('data', onData);
upstreamStream.removeListener('end', onEnd);
upstreamStream.removeListener('error', onError);
}
// 超时保护: 防止上游永不响应导致请求永久挂起
const timer = setTimeout(() => {
if (resolved) return;
resolved = true;
cleanup();
upstreamStream.destroy();
reject(new Error('probeStream timeout'));
}, timeoutMs);
const parser = createJsonStreamParser((obj) => {
if (resolved) return;
if (obj.type === 'streamingError') {
resolved = true;
cleanup();
upstreamStream.resume();
reject(Object.assign(
new Error(obj.data || 'Streaming error'),
{ statusCode: 429 }
));
return;
}
if (obj.type === 'chunk' || obj.type === 'reasoningContent') {
resolved = true;
cleanup();
upstreamStream.pause();
const wrapped = new PassThrough();
for (const chunk of rawChunks) wrapped.write(chunk);
upstreamStream.pipe(wrapped);
upstreamStream.resume();
resolve(wrapped);
}
});
function onData(chunk) {
rawChunks.push(chunk);
parser.feed(chunk);
}
function onEnd() {
if (resolved) return;
parser.flush();
if (!resolved) {
resolved = true;
cleanup();
const wrapped = new PassThrough();
for (const chunk of rawChunks) wrapped.write(chunk);
wrapped.end();
resolve(wrapped);
}
}
function onError(err) {
if (resolved) return;
resolved = true;
cleanup();
reject(err);
}
upstreamStream.on('data', onData);
upstreamStream.on('end', onEnd);
upstreamStream.on('error', onError);
});
}
/**
* JSON 对象流解析器 — 通过花括号计数提取完整 JSON 对象
* chataibot 返回的不是 NDJSON(无换行),而是紧凑的 JSON 对象序列
*/
function createJsonStreamParser(onObject) {
let buffer = '';
let depth = 0;
let inString = false;
let escape = false;
let objStart = -1;
return {
feed(chunk) {
buffer += chunk;
for (let i = 0; i < buffer.length; i++) {
const ch = buffer[i];
if (escape) { escape = false; continue; }
if (ch === '\\' && inString) { escape = true; continue; }
if (ch === '"') { inString = !inString; continue; }
if (inString) continue;
if (ch === '{') {
if (depth === 0) objStart = i;
depth++;
} else if (ch === '}') {
depth--;
if (depth === 0 && objStart >= 0) {
const jsonStr = buffer.substring(objStart, i + 1);
try { onObject(JSON.parse(jsonStr)); } catch {}
objStart = -1;
}
}
}
// 清理已消费的部分
if (objStart >= 0) {
buffer = buffer.substring(objStart);
objStart = 0;
} else if (depth === 0) {
buffer = '';
}
},
flush() {
if (buffer.trim()) {
try { onObject(JSON.parse(buffer.trim())); } catch {}
}
buffer = '';
},
};
}
/**
* chataibot NDJSON → OpenAI SSE 格式
* @param {Function} onStreamError - 可选回调,流中出现 streamingError 时调用
*/
export function transformToOpenAISSE(upstreamStream, res, model, requestId, onStreamError) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
const ts = Math.floor(Date.now() / 1000);
function writeChunk(delta, finishReason = null) {
const obj = {
id: requestId,
object: 'chat.completion.chunk',
created: ts,
model,
choices: [{ index: 0, delta, finish_reason: finishReason }],
};
res.write(`data: ${JSON.stringify(obj)}\n\n`);
}
// 先发送 role chunk
writeChunk({ role: 'assistant', content: '' });
const detector = createRepeatDetector();
let stopped = false;
function finishStream() {
if (res.writableEnded || stopped) return;
stopped = true;
writeChunk({}, 'stop');
res.write('data: [DONE]\n\n');
res.end();
}
const parser = createJsonStreamParser((obj) => {
if (stopped) return;
switch (obj.type) {
case 'chunk': {
const { emit, repeated } = detector.feed(obj.data);
if (emit) writeChunk({ content: emit });
if (repeated) finishStream();
break;
}
case 'reasoningContent':
// 兼容 OpenAI o-series reasoning 输出
writeChunk({ reasoning_content: obj.data });
break;
case 'finalResult':
finishStream();
break;
case 'streamingError':
if (onStreamError) onStreamError(obj.data);
finishStream();
break;
}
});
upstreamStream.on('data', (chunk) => parser.feed(chunk));
upstreamStream.on('end', () => {
parser.flush();
finishStream();
});
upstreamStream.on('error', () => {
if (!res.writableEnded) {
res.write('data: [DONE]\n\n');
res.end();
}
});
}
/**
* chataibot NDJSON → Anthropic SSE 格式
* @param {Function} onStreamError - 可选回调,流中出现 streamingError 时调用
*/
export function transformToAnthropicSSE(upstreamStream, res, model, requestId, onStreamError) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
function writeEvent(event, data) {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
let headerSent = false;
let blockIndex = 0;
function ensureHeader() {
if (headerSent) return;
headerSent = true;
writeEvent('message_start', {
type: 'message_start',
message: {
id: requestId,
type: 'message',
role: 'assistant',
model,
content: [],
stop_reason: null,
usage: { input_tokens: 0, output_tokens: 0 },
},
});
writeEvent('content_block_start', {
type: 'content_block_start',
index: blockIndex,
content_block: { type: 'text', text: '' },
});
}
const detector = createRepeatDetector();
let stopped = false;
function finishAnthropicStream() {
if (res.writableEnded || stopped) return;
stopped = true;
ensureHeader();
writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIndex });
writeEvent('message_delta', {
type: 'message_delta',
delta: { stop_reason: 'end_turn' },
usage: { output_tokens: 0 },
});
writeEvent('message_stop', { type: 'message_stop' });
res.end();
}
const parser = createJsonStreamParser((obj) => {
if (stopped) return;
switch (obj.type) {
case 'chunk': {
const { emit, repeated } = detector.feed(obj.data);
if (emit) {
ensureHeader();
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: blockIndex,
delta: { type: 'text_delta', text: emit },
});
}
if (repeated) finishAnthropicStream();
break;
}
case 'reasoningContent':
ensureHeader();
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: blockIndex,
delta: { type: 'text_delta', text: obj.data },
});
break;
case 'finalResult':
finishAnthropicStream();
break;
case 'streamingError':
if (onStreamError) onStreamError(obj.data);
finishAnthropicStream();
break;
}
});
upstreamStream.on('data', (chunk) => parser.feed(chunk));
upstreamStream.on('end', () => {
parser.flush();
finishAnthropicStream();
});
upstreamStream.on('error', () => {
if (!res.writableEnded) res.end();
});
}
/**
* chataibot → OpenAI SSE (带工具调用检测)
*
* 策略: 先正常流式输出文本。当检测到 ```tool_calls 开头时,
* 停止流式文本输出,缓冲剩余内容。流结束后解析完整文本,
* 如果包含工具调用则发送 tool_calls chunk,否则补发剩余文本。
*/
export function transformToOpenAISSEWithTools(upstreamStream, res, model, requestId, onStreamError) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
const ts = Math.floor(Date.now() / 1000);
let fullText = '';
let toolCallDetected = false;
let streamError = false;
function writeChunk(delta, finishReason = null) {
const obj = {
id: requestId,
object: 'chat.completion.chunk',
created: ts,
model,
choices: [{ index: 0, delta, finish_reason: finishReason }],
};
res.write(`data: ${JSON.stringify(obj)}\n\n`);
}
writeChunk({ role: 'assistant', content: '' });
const detector = createRepeatDetector();
const parser = createJsonStreamParser((obj) => {
switch (obj.type) {
case 'chunk':
fullText += obj.data;
if (!toolCallDetected) {
if (detectToolCallStart(fullText)) {
toolCallDetected = true;
} else {
const { emit, repeated } = detector.feed(obj.data);
if (emit) writeChunk({ content: emit });
if (repeated) {
// 重复内容,停止流式输出但不影响 tool call 解析
}
}
}
break;
case 'reasoningContent':
if (!toolCallDetected) {
writeChunk({ reasoning_content: obj.data });
}
break;
case 'finalResult':
if (obj.data?.mainText) fullText = obj.data.mainText;
// 最终处理在 end 事件中
break;
case 'streamingError':
streamError = true;
if (onStreamError) onStreamError(obj.data);
break;
}
});
upstreamStream.on('data', (chunk) => parser.feed(chunk));
upstreamStream.on('end', () => {
parser.flush();
if (res.writableEnded) return;
if (streamError) {
writeChunk({}, 'stop');
res.write('data: [DONE]\n\n');
res.end();
return;
}
// 解析完整文本中的 tool calls
const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText);
if (hasToolCalls) {
// 如果之前已经流式输出了部分文本 (tool_calls 标记之前的文本)
// 而解析后 textContent 为空或很少,无需额外处理
// 发送 tool_calls
const openaiToolCalls = toOpenAIToolCalls(toolCalls);
for (let i = 0; i < openaiToolCalls.length; i++) {
const tc = openaiToolCalls[i];
// 首个 chunk: 含 tool call id, type, function.name, function.arguments 开始部分
writeChunk({
tool_calls: [{
index: i,
id: tc.id,
type: 'function',
function: { name: tc.function.name, arguments: tc.function.arguments },
}],
});
}
writeChunk({}, 'tool_calls');
} else {
// 没有工具调用 — 如果之前因检测到 ```tool_calls 停了输出
// 需要补发被缓冲的文本
if (toolCallDetected) {
// 找到之前已输出的部分,补发剩余
const markerIdx = fullText.indexOf('```tool_calls');
if (markerIdx >= 0) {
writeChunk({ content: fullText.substring(markerIdx) });
}
}
writeChunk({}, 'stop');
}
res.write('data: [DONE]\n\n');
res.end();
});
upstreamStream.on('error', () => {
if (!res.writableEnded) {
res.write('data: [DONE]\n\n');
res.end();
}
});
}
/**
* chataibot → Anthropic SSE (带工具调用检测)
*
* 同 OpenAI 版本,先流式输出文本,检测到工具调用后缓冲,
* 最终解析并发送 tool_use content blocks。
*/
export function transformToAnthropicSSEWithTools(upstreamStream, res, model, requestId, onStreamError) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
function writeEvent(event, data) {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
let headerSent = false;
let textBlockIndex = 0;
let fullText = '';
let toolCallDetected = false;
let streamError = false;
function ensureHeader() {
if (headerSent) return;
headerSent = true;
writeEvent('message_start', {
type: 'message_start',
message: {
id: requestId,
type: 'message',
role: 'assistant',
model,
content: [],
stop_reason: null,
usage: { input_tokens: 0, output_tokens: 0 },
},
});
writeEvent('content_block_start', {
type: 'content_block_start',
index: textBlockIndex,
content_block: { type: 'text', text: '' },
});
}
const detector = createRepeatDetector();
const parser = createJsonStreamParser((obj) => {
switch (obj.type) {
case 'chunk':
fullText += obj.data;
if (!toolCallDetected) {
if (detectToolCallStart(fullText)) {
toolCallDetected = true;
} else {
const { emit, repeated } = detector.feed(obj.data);
if (emit) {
ensureHeader();
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: textBlockIndex,
delta: { type: 'text_delta', text: emit },
});
}
// repeated 时停止流式输出,但不影响 tool call 解析
}
}
break;
case 'reasoningContent':
if (!toolCallDetected) {
ensureHeader();
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: textBlockIndex,
delta: { type: 'text_delta', text: obj.data },
});
}
break;
case 'finalResult':
if (obj.data?.mainText) fullText = obj.data.mainText;
break;
case 'streamingError':
streamError = true;
if (onStreamError) onStreamError(obj.data);
break;
}
});
upstreamStream.on('data', (chunk) => parser.feed(chunk));
upstreamStream.on('end', () => {
parser.flush();
if (res.writableEnded) return;
ensureHeader();
if (streamError) {
writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
writeEvent('message_delta', {
type: 'message_delta',
delta: { stop_reason: 'end_turn' },
usage: { output_tokens: 0 },
});
writeEvent('message_stop', { type: 'message_stop' });
res.end();
return;
}
const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText);
if (hasToolCalls) {
// 关闭文本 block
writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
// 发送 tool_use blocks
const toolUseBlocks = toAnthropicToolUse(toolCalls);
for (let i = 0; i < toolUseBlocks.length; i++) {
const blockIdx = textBlockIndex + 1 + i;
const tu = toolUseBlocks[i];
writeEvent('content_block_start', {
type: 'content_block_start',
index: blockIdx,
content_block: { type: 'tool_use', id: tu.id, name: tu.name, input: {} },
});
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: blockIdx,
delta: { type: 'input_json_delta', partial_json: JSON.stringify(tu.input) },
});
writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIdx });
}
writeEvent('message_delta', {
type: 'message_delta',
delta: { stop_reason: 'tool_use' },
usage: { output_tokens: 0 },
});
} else {
// 没有工具调用 — 补发被缓冲的文本
if (toolCallDetected) {
const markerIdx = fullText.indexOf('```tool_calls');
if (markerIdx >= 0) {
writeEvent('content_block_delta', {
type: 'content_block_delta',
index: textBlockIndex,
delta: { type: 'text_delta', text: fullText.substring(markerIdx) },
});
}
}
writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
writeEvent('message_delta', {
type: 'message_delta',
delta: { stop_reason: 'end_turn' },
usage: { output_tokens: 0 },
});
}
writeEvent('message_stop', { type: 'message_stop' });
res.end();
});
upstreamStream.on('error', () => {
if (!res.writableEnded) res.end();
});
}
/**
* 消费 NDJSON 流,收集完整响应 (用于非流式请求)
*/
export function collectFullResponse(upstreamStream) {
return new Promise((resolve, reject) => {
let text = '';
let reasoning = '';
let actualModel = '';
const parser = createJsonStreamParser((obj) => {
switch (obj.type) {
case 'botType':
actualModel = obj.data;
break;
case 'chunk':
text += obj.data;
break;
case 'reasoningContent':
reasoning += obj.data;
break;
case 'finalResult':
if (obj.data?.mainText) text = obj.data.mainText;
break;
case 'streamingError':
reject(new Error(obj.data || 'Streaming error'));
break;
}
});
upstreamStream.on('data', (chunk) => parser.feed(chunk));
upstreamStream.on('end', () => {
parser.flush();
// 去重: 检测并移除重复的大段文本
text = deduplicateText(text);
resolve({ text, reasoning, model: actualModel });
});
upstreamStream.on('error', reject);
});
}
/**
* 对完整文本做后处理去重
* 检测是否有大段内容重复出现,保留第一次出现
*/
function deduplicateText(text) {
if (!text || text.length < 300) return text;
// 策略: 尝试不同的片段长度 (200, 300, 500),
// 从文本中间位置取一个 needle,看后半部分是否有重复
for (const windowSize of [500, 300, 200]) {
if (text.length < windowSize * 2) continue;
// 取文本前 1/3 处的片段作为 needle
const start = Math.floor(text.length / 3);
const needle = text.substring(start, start + windowSize);
// 在 needle 之后的文本中查找重复
const searchFrom = start + windowSize;
const repeatIdx = text.indexOf(needle, searchFrom);
if (repeatIdx >= 0) {
// 找到重复 — 截断到重复开始处
console.log(`[Dedup] 非流式去重: ${text.length}${repeatIdx} (window=${windowSize})`);
return text.substring(0, repeatIdx).trimEnd();
}
}
return text;
}