/** * stream-transform.js - chataibot JSON 流 → OpenAI/Anthropic SSE 转换器 * * chataibot.pro 流式响应格式 (紧凑 JSON 对象流,无换行分隔): * {"type":"botType","data":"gpt-4o"}{"type":"chunk","data":"Hello"}... * {"type":"finalResult","data":{"mainText":"...","questions":[...]}} */ import { PassThrough } from 'stream'; import { parseToolCalls, toOpenAIToolCalls, toAnthropicToolUse, detectToolCallStart } from './tool-prompt.js'; /** * 流式重复内容检测器 * * chataibot.pro 的搜索模型有时会返回重复内容 (同一段回答重复 2-3 次)。 * 策略: 滑动窗口检测 — 当新增文本中出现之前已输出内容的大段重复时,截断。 * * @param {number} minRepeatLen - 最小重复片段长度 (默认 150 字符) * @returns {{ feed(text): { emit: string, repeated: boolean }, getText(): string }} */ function createRepeatDetector(minRepeatLen = 150) { let fullText = ''; return { /** * 送入新的 chunk 文本,返回应该输出的部分 * @returns {{ emit: string, repeated: boolean }} */ feed(chunk) { if (!chunk) return { emit: '', repeated: false }; const prevLen = fullText.length; fullText += chunk; // 文本太短时不检测 if (fullText.length < minRepeatLen * 2) { return { emit: chunk, repeated: false }; } // 检测: 取已有文本的后 minRepeatLen 字符作为 needle, // 在之前的文本 (不含最后 needle 自身) 中查找 const windowSize = Math.min(minRepeatLen, Math.floor(fullText.length / 3)); if (windowSize < 80) return { emit: chunk, repeated: false }; // 从新加入的文本末尾往回取 windowSize 长度的片段 const needle = fullText.substring(fullText.length - windowSize); const searchArea = fullText.substring(0, fullText.length - windowSize); const idx = searchArea.indexOf(needle); if (idx >= 0) { // 找到重复 — 计算需要截断的位置 // needle 在 fullText 中首次出现于 idx,第二次出现于 fullText.length - windowSize // 我们需要保留到第一次出现结束的部分,截掉重复 const repeatStart = fullText.length - windowSize; // 只输出 chunk 中在 repeatStart 之前的部分 const safeEnd = repeatStart - prevLen; const emit = safeEnd > 0 ? chunk.substring(0, safeEnd) : ''; // 回滚 fullText 到截断点 fullText = fullText.substring(0, repeatStart); console.log(`[RepeatDetect] 检测到重复 (${windowSize} chars),截断输出`); return { emit, repeated: true }; } return { emit: chunk, repeated: false }; }, getText() { return fullText; }, }; } /** * 流预检 — 缓冲流的前几个 JSON 对象,检测是否为 streamingError * * chataibot 有时在流开始后才返回 "Insufficient credits" 等错误。 * 此函数先读取前几个对象,如果是错误就 reject (允许上层换号重试), * 否则返回一个包装过的 stream (先重放缓冲的原始数据,再接续剩余流), * transform 函数无需任何修改。 * * @param {ReadableStream} upstreamStream * @returns {Promise} * @throws {Error} 如果流的第一个实质性对象就是 streamingError */ export function probeStream(upstreamStream, timeoutMs = 30000) { return new Promise((resolve, reject) => { let resolved = false; let rawChunks = []; function cleanup() { clearTimeout(timer); upstreamStream.removeListener('data', onData); upstreamStream.removeListener('end', onEnd); upstreamStream.removeListener('error', onError); } // 超时保护: 防止上游永不响应导致请求永久挂起 const timer = setTimeout(() => { if (resolved) return; resolved = true; cleanup(); upstreamStream.destroy(); reject(new Error('probeStream timeout')); }, timeoutMs); const parser = createJsonStreamParser((obj) => { if (resolved) return; if (obj.type === 'streamingError') { resolved = true; cleanup(); upstreamStream.resume(); reject(Object.assign( new Error(obj.data || 'Streaming error'), { statusCode: 429 } )); return; } if (obj.type === 'chunk' || obj.type === 'reasoningContent') { resolved = true; cleanup(); upstreamStream.pause(); const wrapped = new PassThrough(); for (const chunk of rawChunks) wrapped.write(chunk); upstreamStream.pipe(wrapped); upstreamStream.resume(); resolve(wrapped); } }); function onData(chunk) { rawChunks.push(chunk); parser.feed(chunk); } function onEnd() { if (resolved) return; parser.flush(); if (!resolved) { resolved = true; cleanup(); const wrapped = new PassThrough(); for (const chunk of rawChunks) wrapped.write(chunk); wrapped.end(); resolve(wrapped); } } function onError(err) { if (resolved) return; resolved = true; cleanup(); reject(err); } upstreamStream.on('data', onData); upstreamStream.on('end', onEnd); upstreamStream.on('error', onError); }); } /** * JSON 对象流解析器 — 通过花括号计数提取完整 JSON 对象 * chataibot 返回的不是 NDJSON(无换行),而是紧凑的 JSON 对象序列 */ function createJsonStreamParser(onObject) { let buffer = ''; let depth = 0; let inString = false; let escape = false; let objStart = -1; return { feed(chunk) { buffer += chunk; for (let i = 0; i < buffer.length; i++) { const ch = buffer[i]; if (escape) { escape = false; continue; } if (ch === '\\' && inString) { escape = true; continue; } if (ch === '"') { inString = !inString; continue; } if (inString) continue; if (ch === '{') { if (depth === 0) objStart = i; depth++; } else if (ch === '}') { depth--; if (depth === 0 && objStart >= 0) { const jsonStr = buffer.substring(objStart, i + 1); try { onObject(JSON.parse(jsonStr)); } catch {} objStart = -1; } } } // 清理已消费的部分 if (objStart >= 0) { buffer = buffer.substring(objStart); objStart = 0; } else if (depth === 0) { buffer = ''; } }, flush() { if (buffer.trim()) { try { onObject(JSON.parse(buffer.trim())); } catch {} } buffer = ''; }, }; } /** * chataibot NDJSON → OpenAI SSE 格式 * @param {Function} onStreamError - 可选回调,流中出现 streamingError 时调用 */ export function transformToOpenAISSE(upstreamStream, res, model, requestId, onStreamError) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', }); const ts = Math.floor(Date.now() / 1000); function writeChunk(delta, finishReason = null) { const obj = { id: requestId, object: 'chat.completion.chunk', created: ts, model, choices: [{ index: 0, delta, finish_reason: finishReason }], }; res.write(`data: ${JSON.stringify(obj)}\n\n`); } // 先发送 role chunk writeChunk({ role: 'assistant', content: '' }); const detector = createRepeatDetector(); let stopped = false; function finishStream() { if (res.writableEnded || stopped) return; stopped = true; writeChunk({}, 'stop'); res.write('data: [DONE]\n\n'); res.end(); } const parser = createJsonStreamParser((obj) => { if (stopped) return; switch (obj.type) { case 'chunk': { const { emit, repeated } = detector.feed(obj.data); if (emit) writeChunk({ content: emit }); if (repeated) finishStream(); break; } case 'reasoningContent': // 兼容 OpenAI o-series reasoning 输出 writeChunk({ reasoning_content: obj.data }); break; case 'finalResult': finishStream(); break; case 'streamingError': if (onStreamError) onStreamError(obj.data); finishStream(); break; } }); upstreamStream.on('data', (chunk) => parser.feed(chunk)); upstreamStream.on('end', () => { parser.flush(); finishStream(); }); upstreamStream.on('error', () => { if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } }); } /** * chataibot NDJSON → Anthropic SSE 格式 * @param {Function} onStreamError - 可选回调,流中出现 streamingError 时调用 */ export function transformToAnthropicSSE(upstreamStream, res, model, requestId, onStreamError) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', }); function writeEvent(event, data) { res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); } let headerSent = false; let blockIndex = 0; function ensureHeader() { if (headerSent) return; headerSent = true; writeEvent('message_start', { type: 'message_start', message: { id: requestId, type: 'message', role: 'assistant', model, content: [], stop_reason: null, usage: { input_tokens: 0, output_tokens: 0 }, }, }); writeEvent('content_block_start', { type: 'content_block_start', index: blockIndex, content_block: { type: 'text', text: '' }, }); } const detector = createRepeatDetector(); let stopped = false; function finishAnthropicStream() { if (res.writableEnded || stopped) return; stopped = true; ensureHeader(); writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIndex }); writeEvent('message_delta', { type: 'message_delta', delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 0 }, }); writeEvent('message_stop', { type: 'message_stop' }); res.end(); } const parser = createJsonStreamParser((obj) => { if (stopped) return; switch (obj.type) { case 'chunk': { const { emit, repeated } = detector.feed(obj.data); if (emit) { ensureHeader(); writeEvent('content_block_delta', { type: 'content_block_delta', index: blockIndex, delta: { type: 'text_delta', text: emit }, }); } if (repeated) finishAnthropicStream(); break; } case 'reasoningContent': ensureHeader(); writeEvent('content_block_delta', { type: 'content_block_delta', index: blockIndex, delta: { type: 'text_delta', text: obj.data }, }); break; case 'finalResult': finishAnthropicStream(); break; case 'streamingError': if (onStreamError) onStreamError(obj.data); finishAnthropicStream(); break; } }); upstreamStream.on('data', (chunk) => parser.feed(chunk)); upstreamStream.on('end', () => { parser.flush(); finishAnthropicStream(); }); upstreamStream.on('error', () => { if (!res.writableEnded) res.end(); }); } /** * chataibot → OpenAI SSE (带工具调用检测) * * 策略: 先正常流式输出文本。当检测到 ```tool_calls 开头时, * 停止流式文本输出,缓冲剩余内容。流结束后解析完整文本, * 如果包含工具调用则发送 tool_calls chunk,否则补发剩余文本。 */ export function transformToOpenAISSEWithTools(upstreamStream, res, model, requestId, onStreamError) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', }); const ts = Math.floor(Date.now() / 1000); let fullText = ''; let toolCallDetected = false; let streamError = false; function writeChunk(delta, finishReason = null) { const obj = { id: requestId, object: 'chat.completion.chunk', created: ts, model, choices: [{ index: 0, delta, finish_reason: finishReason }], }; res.write(`data: ${JSON.stringify(obj)}\n\n`); } writeChunk({ role: 'assistant', content: '' }); const detector = createRepeatDetector(); const parser = createJsonStreamParser((obj) => { switch (obj.type) { case 'chunk': fullText += obj.data; if (!toolCallDetected) { if (detectToolCallStart(fullText)) { toolCallDetected = true; } else { const { emit, repeated } = detector.feed(obj.data); if (emit) writeChunk({ content: emit }); if (repeated) { // 重复内容,停止流式输出但不影响 tool call 解析 } } } break; case 'reasoningContent': if (!toolCallDetected) { writeChunk({ reasoning_content: obj.data }); } break; case 'finalResult': if (obj.data?.mainText) fullText = obj.data.mainText; // 最终处理在 end 事件中 break; case 'streamingError': streamError = true; if (onStreamError) onStreamError(obj.data); break; } }); upstreamStream.on('data', (chunk) => parser.feed(chunk)); upstreamStream.on('end', () => { parser.flush(); if (res.writableEnded) return; if (streamError) { writeChunk({}, 'stop'); res.write('data: [DONE]\n\n'); res.end(); return; } // 解析完整文本中的 tool calls const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText); if (hasToolCalls) { // 如果之前已经流式输出了部分文本 (tool_calls 标记之前的文本) // 而解析后 textContent 为空或很少,无需额外处理 // 发送 tool_calls const openaiToolCalls = toOpenAIToolCalls(toolCalls); for (let i = 0; i < openaiToolCalls.length; i++) { const tc = openaiToolCalls[i]; // 首个 chunk: 含 tool call id, type, function.name, function.arguments 开始部分 writeChunk({ tool_calls: [{ index: i, id: tc.id, type: 'function', function: { name: tc.function.name, arguments: tc.function.arguments }, }], }); } writeChunk({}, 'tool_calls'); } else { // 没有工具调用 — 如果之前因检测到 ```tool_calls 停了输出 // 需要补发被缓冲的文本 if (toolCallDetected) { // 找到之前已输出的部分,补发剩余 const markerIdx = fullText.indexOf('```tool_calls'); if (markerIdx >= 0) { writeChunk({ content: fullText.substring(markerIdx) }); } } writeChunk({}, 'stop'); } res.write('data: [DONE]\n\n'); res.end(); }); upstreamStream.on('error', () => { if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } }); } /** * chataibot → Anthropic SSE (带工具调用检测) * * 同 OpenAI 版本,先流式输出文本,检测到工具调用后缓冲, * 最终解析并发送 tool_use content blocks。 */ export function transformToAnthropicSSEWithTools(upstreamStream, res, model, requestId, onStreamError) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', }); function writeEvent(event, data) { res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); } let headerSent = false; let textBlockIndex = 0; let fullText = ''; let toolCallDetected = false; let streamError = false; function ensureHeader() { if (headerSent) return; headerSent = true; writeEvent('message_start', { type: 'message_start', message: { id: requestId, type: 'message', role: 'assistant', model, content: [], stop_reason: null, usage: { input_tokens: 0, output_tokens: 0 }, }, }); writeEvent('content_block_start', { type: 'content_block_start', index: textBlockIndex, content_block: { type: 'text', text: '' }, }); } const detector = createRepeatDetector(); const parser = createJsonStreamParser((obj) => { switch (obj.type) { case 'chunk': fullText += obj.data; if (!toolCallDetected) { if (detectToolCallStart(fullText)) { toolCallDetected = true; } else { const { emit, repeated } = detector.feed(obj.data); if (emit) { ensureHeader(); writeEvent('content_block_delta', { type: 'content_block_delta', index: textBlockIndex, delta: { type: 'text_delta', text: emit }, }); } // repeated 时停止流式输出,但不影响 tool call 解析 } } break; case 'reasoningContent': if (!toolCallDetected) { ensureHeader(); writeEvent('content_block_delta', { type: 'content_block_delta', index: textBlockIndex, delta: { type: 'text_delta', text: obj.data }, }); } break; case 'finalResult': if (obj.data?.mainText) fullText = obj.data.mainText; break; case 'streamingError': streamError = true; if (onStreamError) onStreamError(obj.data); break; } }); upstreamStream.on('data', (chunk) => parser.feed(chunk)); upstreamStream.on('end', () => { parser.flush(); if (res.writableEnded) return; ensureHeader(); if (streamError) { writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex }); writeEvent('message_delta', { type: 'message_delta', delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 0 }, }); writeEvent('message_stop', { type: 'message_stop' }); res.end(); return; } const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText); if (hasToolCalls) { // 关闭文本 block writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex }); // 发送 tool_use blocks const toolUseBlocks = toAnthropicToolUse(toolCalls); for (let i = 0; i < toolUseBlocks.length; i++) { const blockIdx = textBlockIndex + 1 + i; const tu = toolUseBlocks[i]; writeEvent('content_block_start', { type: 'content_block_start', index: blockIdx, content_block: { type: 'tool_use', id: tu.id, name: tu.name, input: {} }, }); writeEvent('content_block_delta', { type: 'content_block_delta', index: blockIdx, delta: { type: 'input_json_delta', partial_json: JSON.stringify(tu.input) }, }); writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIdx }); } writeEvent('message_delta', { type: 'message_delta', delta: { stop_reason: 'tool_use' }, usage: { output_tokens: 0 }, }); } else { // 没有工具调用 — 补发被缓冲的文本 if (toolCallDetected) { const markerIdx = fullText.indexOf('```tool_calls'); if (markerIdx >= 0) { writeEvent('content_block_delta', { type: 'content_block_delta', index: textBlockIndex, delta: { type: 'text_delta', text: fullText.substring(markerIdx) }, }); } } writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex }); writeEvent('message_delta', { type: 'message_delta', delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 0 }, }); } writeEvent('message_stop', { type: 'message_stop' }); res.end(); }); upstreamStream.on('error', () => { if (!res.writableEnded) res.end(); }); } /** * 消费 NDJSON 流,收集完整响应 (用于非流式请求) */ export function collectFullResponse(upstreamStream) { return new Promise((resolve, reject) => { let text = ''; let reasoning = ''; let actualModel = ''; const parser = createJsonStreamParser((obj) => { switch (obj.type) { case 'botType': actualModel = obj.data; break; case 'chunk': text += obj.data; break; case 'reasoningContent': reasoning += obj.data; break; case 'finalResult': if (obj.data?.mainText) text = obj.data.mainText; break; case 'streamingError': reject(new Error(obj.data || 'Streaming error')); break; } }); upstreamStream.on('data', (chunk) => parser.feed(chunk)); upstreamStream.on('end', () => { parser.flush(); // 去重: 检测并移除重复的大段文本 text = deduplicateText(text); resolve({ text, reasoning, model: actualModel }); }); upstreamStream.on('error', reject); }); } /** * 对完整文本做后处理去重 * 检测是否有大段内容重复出现,保留第一次出现 */ function deduplicateText(text) { if (!text || text.length < 300) return text; // 策略: 尝试不同的片段长度 (200, 300, 500), // 从文本中间位置取一个 needle,看后半部分是否有重复 for (const windowSize of [500, 300, 200]) { if (text.length < windowSize * 2) continue; // 取文本前 1/3 处的片段作为 needle const start = Math.floor(text.length / 3); const needle = text.substring(start, start + windowSize); // 在 needle 之后的文本中查找重复 const searchFrom = start + windowSize; const repeatIdx = text.indexOf(needle, searchFrom); if (repeatIdx >= 0) { // 找到重复 — 截断到重复开始处 console.log(`[Dedup] 非流式去重: ${text.length} → ${repeatIdx} (window=${windowSize})`); return text.substring(0, repeatIdx).trimEnd(); } } return text; }