|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import config from '../config/config.js'; |
|
|
import logger from '../utils/logger.js'; |
|
|
import memoryManager, { registerMemoryPoolCleanup } from '../utils/memoryManager.js'; |
|
|
import { DEFAULT_HEARTBEAT_INTERVAL } from '../constants/index.js'; |
|
|
|
|
|
|
|
|
const HEARTBEAT_INTERVAL = config.server.heartbeatInterval || DEFAULT_HEARTBEAT_INTERVAL; |
|
|
const SSE_HEARTBEAT = Buffer.from(': heartbeat\n\n'); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const createHeartbeat = (res) => { |
|
|
const timer = setInterval(() => { |
|
|
if (!res.writableEnded) { |
|
|
res.write(SSE_HEARTBEAT); |
|
|
} else { |
|
|
clearInterval(timer); |
|
|
} |
|
|
}, HEARTBEAT_INTERVAL); |
|
|
|
|
|
|
|
|
res.on('close', () => clearInterval(timer)); |
|
|
res.on('finish', () => clearInterval(timer)); |
|
|
|
|
|
return timer; |
|
|
}; |
|
|
|
|
|
|
|
|
const SSE_PREFIX = Buffer.from('data: '); |
|
|
const SSE_SUFFIX = Buffer.from('\n\n'); |
|
|
const SSE_DONE = Buffer.from('data: [DONE]\n\n'); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const createResponseMeta = () => ({ |
|
|
id: `chatcmpl-${Date.now()}`, |
|
|
created: Math.floor(Date.now() / 1000) |
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const setStreamHeaders = (res) => { |
|
|
res.setHeader('Content-Type', 'text/event-stream'); |
|
|
res.setHeader('Cache-Control', 'no-cache'); |
|
|
res.setHeader('Connection', 'keep-alive'); |
|
|
res.setHeader('X-Accel-Buffering', 'no'); |
|
|
}; |
|
|
|
|
|
|
|
|
const chunkPool = []; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const getChunkObject = () => chunkPool.pop() || { choices: [{ index: 0, delta: {}, finish_reason: null }] }; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const releaseChunkObject = (obj) => { |
|
|
const maxSize = memoryManager.getPoolSizes().chunk; |
|
|
if (chunkPool.length < maxSize) chunkPool.push(obj); |
|
|
}; |
|
|
|
|
|
|
|
|
registerMemoryPoolCleanup(chunkPool, () => memoryManager.getPoolSizes().chunk); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const getChunkPoolSize = () => chunkPool.length; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const clearChunkPool = () => { |
|
|
chunkPool.length = 0; |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const writeStreamData = (res, data) => { |
|
|
const json = JSON.stringify(data); |
|
|
res.write(SSE_PREFIX); |
|
|
res.write(json); |
|
|
res.write(SSE_SUFFIX); |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const endStream = (res, isWriteDone = true) => { |
|
|
if (res.writableEnded) return; |
|
|
if (isWriteDone) res.write(SSE_DONE); |
|
|
res.end(); |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export const with429Retry = async (fn, maxRetries, loggerPrefix = '') => { |
|
|
const retries = Number.isFinite(maxRetries) && maxRetries > 0 ? Math.floor(maxRetries) : 0; |
|
|
let attempt = 0; |
|
|
|
|
|
while (true) { |
|
|
try { |
|
|
return await fn(attempt); |
|
|
} catch (error) { |
|
|
|
|
|
const status = Number(error.status || error.statusCode || error.response?.status); |
|
|
if (status === 429 && attempt < retries) { |
|
|
const nextAttempt = attempt + 1; |
|
|
logger.warn(`${loggerPrefix}收到 429,正在进行第 ${nextAttempt} 次重试(共 ${retries} 次)`); |
|
|
attempt = nextAttempt; |
|
|
continue; |
|
|
} |
|
|
throw error; |
|
|
} |
|
|
} |
|
|
}; |