ikun2 / openai-adapter.js
bingn's picture
Upload 19 files
f1357b6 verified
/**
* openai-adapter.js - OpenAI 兼容 API 适配器
*
* 处理 POST /v1/chat/completions
* 将 OpenAI 格式请求转换为 chataibot.pro 协议
* 支持自动重试换号 (最多 3 次)
*/
import crypto from 'crypto';
import { createContext, sendMessageStreaming, sendMultiChunkStreaming } from './chat.js';
import { openaiToText, resolveModel, splitToChunks } from './message-convert.js';
import { transformToOpenAISSE, collectFullResponse, transformToOpenAISSEWithTools, probeStream } from './stream-transform.js';
import { parseToolCalls, toOpenAIToolCalls } from './tool-prompt.js';
import config from './config.js';
const MAX_RETRY = 3;
/**
* 判断错误是否可重试 (换号)
* 注意: 消息太长等请求层面的错误不应重试 (换号也没用)
*/
function isRetryable(e) {
const msg = (e.message || '').toLowerCase();
// 消息长度限制 — 请求本身的问题,换号无用
if (msg.includes('maximumlengthmessage') || msg.includes('message should be no more')) return false;
const code = e.statusCode || 0;
// 401 session 过期, 429 额度耗尽
if (code === 401 || code === 429) return true;
// 403 可能是 session 过期,也可能是其他限制
if (code === 403) return true;
if (msg.includes('limit') || msg.includes('quota') || msg.includes('exhaust')
|| msg.includes('exceed') || msg.includes('too many') || msg.includes('no available')
|| msg.includes('insufficient') || msg.includes('credits')) return true;
return false;
}
/**
* 根据错误类型释放账号
*/
function releaseOnError(pool, account, e) {
const msg = (e.message || '').toLowerCase();
const code = e.statusCode || 0;
// 消息太长等请求错误 — 账号本身没问题,正常释放
if (msg.includes('maximumlengthmessage') || msg.includes('message should be no more')) {
pool.release(account, { success: true });
} else if (code === 401) {
pool.release(account, { sessionExpired: true });
} else if (code === 403) {
// 403 可能是 session 过期,也可能是暂时限制
pool.release(account, { sessionExpired: true });
} else if (code === 429) {
pool.release(account, { quotaExhausted: true });
} else {
pool.release(account, { success: false });
}
}
/**
* 处理 OpenAI chat completions 请求
*/
export async function handleChatCompletions(body, res, pool) {
const requestId = 'chatcmpl-' + crypto.randomBytes(12).toString('hex');
// 验证参数
if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) {
sendError(res, 400, 'messages is required and must be a non-empty array');
return;
}
const text = openaiToText(body.messages, body.tools, body.tool_choice);
const chunks = splitToChunks(text);
const model = resolveModel(body.model);
const clientModel = body.model || 'gpt-4o';
const stream = body.stream === true;
const hasTools = body.tools && body.tools.length > 0;
const isMultiChunk = chunks.length > 1;
const cost = config.modelCost?.[model] ?? config.defaultModelCost ?? 2;
if (!text) {
sendError(res, 400, 'No valid message content found');
return;
}
// 流式请求: 尝试获取流,如果获取阶段就报错则换号重试
if (stream) {
let lastError;
for (let attempt = 1; attempt <= MAX_RETRY; attempt++) {
let account;
try {
account = await pool.acquire();
} catch (e) {
sendError(res, 503, 'No available account: ' + e.message);
return;
}
try {
const title = text.substring(0, 100);
const ctx = await createContext(account.cookies, model, title);
if (ctx.cookies) account.cookies = ctx.cookies;
const result = isMultiChunk
? await sendMultiChunkStreaming(account.cookies, ctx.chatId, chunks, model)
: await sendMessageStreaming(account.cookies, ctx.chatId, chunks[0], model);
if (result.cookies) account.cookies = result.cookies;
// 流预检: 缓冲前几个对象,检测 streamingError (如 Insufficient credits)
// 如果是错误 probeStream 会 throw,进入 catch 换号重试
const probed = await probeStream(result.stream);
// 流确认可用 — 开始写 SSE (此后无法重试)
if (hasTools) {
transformToOpenAISSEWithTools(probed, res, clientModel, requestId, (errMsg) => {
const msg = (errMsg || '').toLowerCase();
if (msg.includes('limit') || msg.includes('quota') || msg.includes('exhaust')
|| msg.includes('exceed') || msg.includes('insufficient') || msg.includes('credits')) {
pool.release(account, { quotaExhausted: true });
} else {
pool.release(account, { success: false });
}
account = null;
});
} else {
transformToOpenAISSE(probed, res, clientModel, requestId, (errMsg) => {
const msg = (errMsg || '').toLowerCase();
if (msg.includes('limit') || msg.includes('quota') || msg.includes('exhaust')
|| msg.includes('exceed') || msg.includes('insufficient') || msg.includes('credits')) {
pool.release(account, { quotaExhausted: true });
} else {
pool.release(account, { success: false });
}
account = null;
});
}
probed.on('end', () => { if (account) pool.release(account, { success: true, cost }); });
probed.on('error', (err) => {
if (!account) return;
const msg = (err?.message || '').toLowerCase();
if (msg.includes('limit') || msg.includes('quota') || msg.includes('exhaust')
|| msg.includes('insufficient') || msg.includes('credits')) {
pool.release(account, { quotaExhausted: true });
} else {
pool.release(account, { success: false });
}
});
return; // 成功启动流,退出
} catch (e) {
releaseOnError(pool, account, e);
lastError = e;
if (isRetryable(e) && attempt < MAX_RETRY) {
console.log(`[OpenAI] 请求失败 (${e.statusCode || e.message}), 换号重试 ${attempt + 1}/${MAX_RETRY}`);
continue;
}
}
}
// 所有重试都失败
if (!res.headersSent) {
sendError(res, 502, `Upstream error after ${MAX_RETRY} retries: ${lastError?.message}`);
}
return;
}
// 非流式请求: 完整重试 (包括 stream 收集阶段)
let lastError;
for (let attempt = 1; attempt <= MAX_RETRY; attempt++) {
let account;
try {
account = await pool.acquire();
} catch (e) {
sendError(res, 503, 'No available account: ' + e.message);
return;
}
try {
const title = text.substring(0, 100);
const ctx = await createContext(account.cookies, model, title);
if (ctx.cookies) account.cookies = ctx.cookies;
const result = isMultiChunk
? await sendMultiChunkStreaming(account.cookies, ctx.chatId, chunks, model)
: await sendMessageStreaming(account.cookies, ctx.chatId, chunks[0], model);
if (result.cookies) account.cookies = result.cookies;
const full = await collectFullResponse(result.stream);
pool.release(account, { success: true, cost });
// 检测工具调用
const { hasToolCalls, toolCalls, textContent } = hasTools
? parseToolCalls(full.text)
: { hasToolCalls: false, toolCalls: [], textContent: full.text };
const message = { role: 'assistant' };
if (hasToolCalls) {
message.content = textContent || null;
message.tool_calls = toOpenAIToolCalls(toolCalls);
} else {
message.content = full.text;
}
res.writeHead(200, {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
});
res.end(JSON.stringify({
id: requestId,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
model: clientModel,
choices: [{
index: 0,
message,
finish_reason: hasToolCalls ? 'tool_calls' : 'stop',
}],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
}));
return; // 成功
} catch (e) {
releaseOnError(pool, account, e);
lastError = e;
if (isRetryable(e) && attempt < MAX_RETRY) {
console.log(`[OpenAI] 请求失败 (${e.statusCode || e.message}), 换号重试 ${attempt + 1}/${MAX_RETRY}`);
continue;
}
}
}
if (!res.headersSent) {
sendError(res, 502, `Upstream error after ${MAX_RETRY} retries: ${lastError?.message}`);
}
}
function sendError(res, status, message) {
res.writeHead(status, { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*' });
res.end(JSON.stringify({
error: { message, type: 'invalid_request_error', code: status },
}));
}