import { promises as fs } from 'fs'; import * as path from 'path'; import * as http from 'http'; // Add http for IncomingMessage and ServerResponse types import * as crypto from 'crypto'; // Import crypto for MD5 hashing import { convertData, getOpenAIStreamChunkStop } from '../convert/convert.js'; import { ProviderStrategyFactory } from './provider-strategies.js'; import { getPluginManager } from '../core/plugin-manager.js'; // ==================== 网络错误处理 ==================== /** * 可重试的网络错误标识列表 * 这些错误可能出现在 error.code 或 error.message 中 */ export const RETRYABLE_NETWORK_ERRORS = [ 'ECONNRESET', // 连接被重置 'ETIMEDOUT', // 连接超时 'ECONNREFUSED', // 连接被拒绝 'ENOTFOUND', // DNS 解析失败 'ENETUNREACH', // 网络不可达 'EHOSTUNREACH', // 主机不可达 'EPIPE', // 管道破裂 'EAI_AGAIN', // DNS 临时失败 'ECONNABORTED', // 连接中止 'ESOCKETTIMEDOUT', // Socket 超时 ]; /** * 检查是否为可重试的网络错误 * @param {Error} error - 错误对象 * @returns {boolean} - 是否为可重试的网络错误 */ export function isRetryableNetworkError(error) { if (!error) return false; const errorCode = error.code || ''; const errorMessage = error.message || ''; return RETRYABLE_NETWORK_ERRORS.some(errId => errorCode === errId || errorMessage.includes(errId) ); } // ==================== API 常量 ==================== export const API_ACTIONS = { GENERATE_CONTENT: 'generateContent', STREAM_GENERATE_CONTENT: 'streamGenerateContent', }; export const MODEL_PROTOCOL_PREFIX = { // Model provider constants GEMINI: 'gemini', OPENAI: 'openai', OPENAI_RESPONSES: 'openaiResponses', CLAUDE: 'claude', OLLAMA: 'ollama', CODEX: 'codex', FORWARD: 'forward', } export const MODEL_PROVIDER = { // Model provider constants GEMINI_CLI: 'gemini-cli-oauth', ANTIGRAVITY: 'gemini-antigravity', OPENAI_CUSTOM: 'openai-custom', OPENAI_CUSTOM_RESPONSES: 'openaiResponses-custom', CLAUDE_CUSTOM: 'claude-custom', KIRO_API: 'claude-kiro-oauth', QWEN_API: 'openai-qwen-oauth', IFLOW_API: 'openai-iflow', CODEX_API: 'openai-codex-oauth', FORWARD_API: 'forward-api', } /** * Extracts the protocol prefix from a given model provider string. * This is used to determine if two providers belong to the same underlying protocol (e.g., gemini, openai, claude). * @param {string} provider - The model provider string (e.g., 'gemini-cli', 'openai-custom'). * @returns {string} The protocol prefix (e.g., 'gemini', 'openai', 'claude'). */ export function getProtocolPrefix(provider) { // Special case for Codex - it needs its own protocol if (provider === 'openai-codex-oauth') { return 'codex'; } const hyphenIndex = provider.indexOf('-'); if (hyphenIndex !== -1) { return provider.substring(0, hyphenIndex); } return provider; // Return original if no hyphen is found } export const ENDPOINT_TYPE = { OPENAI_CHAT: 'openai_chat', OPENAI_RESPONSES: 'openai_responses', GEMINI_CONTENT: 'gemini_content', CLAUDE_MESSAGE: 'claude_message', OPENAI_MODEL_LIST: 'openai_model_list', GEMINI_MODEL_LIST: 'gemini_model_list', }; export const FETCH_SYSTEM_PROMPT_FILE = path.join(process.cwd(), 'configs', 'fetch_system_prompt.txt'); export const INPUT_SYSTEM_PROMPT_FILE = path.join(process.cwd(), 'configs', 'input_system_prompt.txt'); export function formatExpiryTime(expiryTimestamp) { if (!expiryTimestamp || typeof expiryTimestamp !== 'number') return "No expiry date available"; const diffMs = expiryTimestamp - Date.now(); if (diffMs <= 0) return "Token has expired"; let totalSeconds = Math.floor(diffMs / 1000); const hours = Math.floor(totalSeconds / 3600); totalSeconds %= 3600; const minutes = Math.floor(totalSeconds / 60); const seconds = totalSeconds % 60; const pad = (num) => String(num).padStart(2, '0'); return `${pad(hours)}h ${pad(minutes)}m ${pad(seconds)}s`; } /** * 格式化日志输出,统一日志格式 * @param {string} tag - 日志标签,如 'Qwen', 'Kiro' 等 * @param {string} message - 日志消息 * @param {Object} [data] - 可选的数据对象,将被格式化输出 * @returns {string} 格式化后的日志字符串 */ export function formatLog(tag, message, data = null) { let logMessage = `[${tag}] ${message}`; if (data !== null && data !== undefined) { if (typeof data === 'object') { const dataStr = Object.entries(data) .map(([key, value]) => `${key}: ${value}`) .join(', '); logMessage += ` | ${dataStr}`; } else { logMessage += ` | ${data}`; } } return logMessage; } /** * 格式化凭证过期时间日志 * @param {string} tag - 日志标签,如 'Qwen', 'Kiro' 等 * @param {number} expiryDate - 过期时间戳 * @param {number} nearMinutes - 临近过期的分钟数 * @returns {{message: string, isNearExpiry: boolean}} 格式化后的日志字符串和是否临近过期 */ export function formatExpiryLog(tag, expiryDate, nearMinutes) { const currentTime = Date.now(); const nearMinutesInMillis = nearMinutes * 60 * 1000; const thresholdTime = currentTime + nearMinutesInMillis; const isNearExpiry = expiryDate <= thresholdTime; const message = formatLog(tag, 'Checking expiry date', { 'Expiry date': expiryDate, 'Current time': currentTime, [`${nearMinutes} minutes from now`]: thresholdTime, 'Is near expiry': isNearExpiry }); return { message, isNearExpiry }; } /** * Reads the entire request body from an HTTP request. * @param {http.IncomingMessage} req - The HTTP request object. * @returns {Promise} A promise that resolves with the parsed JSON request body. * @throws {Error} If the request body is not valid JSON. */ export function getRequestBody(req) { return new Promise((resolve, reject) => { let body = ''; req.on('data', chunk => { body += chunk.toString(); }); req.on('end', () => { if (!body) { return resolve({}); } try { resolve(JSON.parse(body)); } catch (error) { reject(new Error("Invalid JSON in request body.")); } }); req.on('error', err => { reject(err); }); }); } export async function logConversation(type, content, logMode, logFilename) { if (logMode === 'none') return; if (!content) return; const timestamp = new Date().toLocaleString(); const logEntry = `${timestamp} [${type.toUpperCase()}]:\n${content}\n--------------------------------------\n`; if (logMode === 'console') { console.log(logEntry); } else if (logMode === 'file') { try { // Append to the file await fs.appendFile(logFilename, logEntry); } catch (err) { console.error(`[Error] Failed to write conversation log to ${logFilename}:`, err); } } } /** * Checks if the request is authorized based on API key. * @param {http.IncomingMessage} req - The HTTP request object. * @param {URL} requestUrl - The parsed URL object. * @param {string} REQUIRED_API_KEY - The API key required for authorization. * @returns {boolean} True if authorized, false otherwise. */ export function isAuthorized(req, requestUrl, REQUIRED_API_KEY) { const authHeader = req.headers['authorization']; const queryKey = requestUrl.searchParams.get('key'); const googApiKey = req.headers['x-goog-api-key']; const claudeApiKey = req.headers['x-api-key']; // Claude-specific header // Check for Bearer token in Authorization header (OpenAI style) if (authHeader && authHeader.startsWith('Bearer ')) { const token = authHeader.substring(7); if (token === REQUIRED_API_KEY) { return true; } } // Check for API key in URL query parameter (Gemini style) if (queryKey === REQUIRED_API_KEY) { return true; } // Check for API key in x-goog-api-key header (Gemini style) if (googApiKey === REQUIRED_API_KEY) { return true; } // Check for API key in x-api-key header (Claude style) if (claudeApiKey === REQUIRED_API_KEY) { return true; } console.log(`[Auth] Unauthorized request denied. Bearer: "${authHeader ? 'present' : 'N/A'}", Query Key: "${queryKey}", x-goog-api-key: "${googApiKey}", x-api-key: "${claudeApiKey}"`); return false; } /** * Handles the common logic for sending API responses (unary and stream). * This includes writing response headers, logging conversation, and logging auth token expiry. * @param {http.ServerResponse} res - The HTTP response object. * @param {Object} responsePayload - The actual response payload (string for unary, object for stream chunks). * @param {boolean} isStream - Whether the response is a stream. */ export async function handleUnifiedResponse(res, responsePayload, isStream) { if (isStream) { res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "Transfer-Encoding": "chunked" }); } else { res.writeHead(200, { 'Content-Type': 'application/json' }); } if (isStream) { // Stream chunks are handled by the calling function that iterates the stream } else { res.end(responsePayload); } } export async function handleStreamRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid, customName, retryContext = null) { let fullResponseText = ''; let fullResponseJson = ''; let fullOldResponseJson = ''; let responseClosed = false; // 重试上下文:包含 CONFIG 和重试计数 // maxRetries: 凭证切换最大次数(跨凭证),默认 5 次 const maxRetries = retryContext?.maxRetries ?? 5; const currentRetry = retryContext?.currentRetry ?? 0; const CONFIG = retryContext?.CONFIG; const isRetry = currentRetry > 0; // 只在首次请求时发送响应头,重试时跳过(响应头已发送) if (!isRetry) { await handleUnifiedResponse(res, '', true); } // fs.writeFile('request'+Date.now()+'.json', JSON.stringify(requestBody)); // The service returns a stream in its native format (toProvider). const needsConversion = getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider); requestBody.model = model; const nativeStream = await service.generateContentStream(model, requestBody); const addEvent = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.CLAUDE || getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES; const openStop = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI ; try { for await (const nativeChunk of nativeStream) { // Extract text for logging purposes const chunkText = extractResponseText(nativeChunk, toProvider); if (chunkText && !Array.isArray(chunkText)) { fullResponseText += chunkText; } // Convert the complete chunk object to the client's format (fromProvider), if necessary. const chunkToSend = needsConversion ? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model) : nativeChunk; if (!chunkToSend) { continue; } // 处理 chunkToSend 可能是数组或对象的情况 const chunksToSend = Array.isArray(chunkToSend) ? chunkToSend : [chunkToSend]; for (const chunk of chunksToSend) { if (addEvent) { // fullOldResponseJson += chunk.type+"\n"; // fullResponseJson += chunk.type+"\n"; res.write(`event: ${chunk.type}\n`); // console.log(`event: ${chunk.type}\n`); } // fullOldResponseJson += JSON.stringify(chunk)+"\n"; // fullResponseJson += JSON.stringify(chunk)+"\n\n"; res.write(`data: ${JSON.stringify(chunk)}\n\n`); // console.log(`data: ${JSON.stringify(chunk)}\n`); } } if (openStop && needsConversion) { res.write(`data: ${JSON.stringify(getOpenAIStreamChunkStop(model))}\n\n`); // console.log(`data: ${JSON.stringify(getOpenAIStreamChunkStop(model))}\n`); } // 流式请求成功完成,统计使用次数,错误次数重置为0 if (providerPoolManager && pooluuid) { const customNameDisplay = customName ? `, ${customName}` : ''; console.log(`[Provider Pool] Increasing usage count for ${toProvider} (${pooluuid}${customNameDisplay}) after successful stream request`); providerPoolManager.markProviderHealthy(toProvider, { uuid: pooluuid }); } } catch (error) { console.error('\n[Server] Error during stream processing:', error.stack); // 如果已经发送了内容,不进行重试(避免响应数据损坏) if (fullResponseText.length > 0) { console.log(`[Stream Retry] Cannot retry: ${fullResponseText.length} bytes already sent to client`); // 直接发送错误并结束 const errorPayload = createStreamErrorResponse(error, fromProvider); res.write(errorPayload); res.end(); responseClosed = true; return; } // 获取状态码(用于日志记录,不再用于判断是否重试) const status = error.response?.status; // 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况) const skipErrorCount = error.skipErrorCount === true; // 检查是否应该切换凭证(用于 429/5xx/402/403 等情况) const shouldSwitchCredential = error.shouldSwitchCredential === true; // 检查凭证是否已在底层被标记为不健康(避免重复标记) let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true; // 如果底层未标记,且不跳过错误计数,则在此处标记 if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) { // 400 报错码通常是请求参数问题,不记录为提供商错误 if (status === 400) { console.log(`[Provider Pool] Skipping unhealthy marking for ${toProvider} (${pooluuid}) due to status 400 (client error)`); } else { console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to stream error (status: ${status || 'unknown'})`); // 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康 providerPoolManager.markProviderUnhealthy(toProvider, { uuid: pooluuid }); credentialMarkedUnhealthy = true; } } else if (credentialMarkedUnhealthy) { console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`); } else if (skipErrorCount) { console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`); } // 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试 if (shouldSwitchCredential && !credentialMarkedUnhealthy) { credentialMarkedUnhealthy = true; // 触发下面的重试逻辑 } // 凭证已被标记为不健康后,尝试切换到新凭证重试 // 不再依赖状态码判断,只要凭证被标记不健康且可以重试,就尝试切换 if (credentialMarkedUnhealthy && currentRetry < maxRetries && providerPoolManager && CONFIG) { // 增加10秒内的随机等待时间,避免所有请求同时切换凭证 const randomDelay = Math.floor(Math.random() * 10000); // 0-10000毫秒 console.log(`[Stream Retry] Credential marked unhealthy. Waiting ${randomDelay}ms before retry ${currentRetry + 1}/${maxRetries} with different credential...`); await new Promise(resolve => setTimeout(resolve, randomDelay)); try { // 动态导入以避免循环依赖 const { getApiServiceWithFallback } = await import('../services/service-manager.js'); const result = await getApiServiceWithFallback(CONFIG, model); if (result && result.service && result.uuid !== pooluuid) { console.log(`[Stream Retry] Switched to new credential: ${result.uuid} (provider: ${result.actualProviderType})`); // 使用新服务重试 const newRetryContext = { ...retryContext, CONFIG, currentRetry: currentRetry + 1, maxRetries }; // 递归调用,使用新的服务 return await handleStreamRequest( res, result.service, result.actualModel || model, requestBody, fromProvider, result.actualProviderType || toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, result.uuid, result.serviceConfig?.customName || customName, newRetryContext ); } else if (result && result.uuid === pooluuid) { console.log(`[Stream Retry] No different healthy credential available. Same credential selected.`); } else { console.log(`[Stream Retry] No healthy credential available for retry.`); } } catch (retryError) { console.error(`[Stream Retry] Failed to get alternative service:`, retryError.message); } } // 使用新方法创建符合 fromProvider 格式的流式错误响应 const errorPayload = createStreamErrorResponse(error, fromProvider); res.write(errorPayload); res.end(); responseClosed = true; } finally { if (!responseClosed) { res.end(); } await logConversation('output', fullResponseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME); // fs.writeFile('oldResponseChunk'+Date.now()+'.json', fullOldResponseJson); // fs.writeFile('responseChunk'+Date.now()+'.json', fullResponseJson); } } export async function handleUnaryRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid, customName, retryContext = null) { // 重试上下文:包含 CONFIG 和重试计数 // maxRetries: 凭证切换最大次数(跨凭证),默认 5 次 const maxRetries = retryContext?.maxRetries ?? 5; const currentRetry = retryContext?.currentRetry ?? 0; const CONFIG = retryContext?.CONFIG; try{ // The service returns the response in its native format (toProvider). const needsConversion = getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider); requestBody.model = model; // fs.writeFile('oldRequest'+Date.now()+'.json', JSON.stringify(requestBody)); const nativeResponse = await service.generateContent(model, requestBody); const responseText = extractResponseText(nativeResponse, toProvider); // Convert the response back to the client's format (fromProvider), if necessary. let clientResponse = nativeResponse; if (needsConversion) { console.log(`[Response Convert] Converting response from ${toProvider} to ${fromProvider}`); clientResponse = convertData(nativeResponse, 'response', toProvider, fromProvider, model); } //console.log(`[Response] Sending response to client: ${JSON.stringify(clientResponse)}`); await handleUnifiedResponse(res, JSON.stringify(clientResponse), false); await logConversation('output', responseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME); // fs.writeFile('oldResponse'+Date.now()+'.json', JSON.stringify(clientResponse)); // 一元请求成功完成,统计使用次数,错误次数重置为0 if (providerPoolManager && pooluuid) { const customNameDisplay = customName ? `, ${customName}` : ''; console.log(`[Provider Pool] Increasing usage count for ${toProvider} (${pooluuid}${customNameDisplay}) after successful unary request`); providerPoolManager.markProviderHealthy(toProvider, { uuid: pooluuid }); } } catch (error) { console.error('\n[Server] Error during unary processing:', error.stack); // 获取状态码(用于日志记录,不再用于判断是否重试) const status = error.response?.status; // 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况) const skipErrorCount = error.skipErrorCount === true; // 检查是否应该切换凭证(用于 429/5xx/402/403 等情况) const shouldSwitchCredential = error.shouldSwitchCredential === true; // 检查凭证是否已在底层被标记为不健康(避免重复标记) let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true; // 如果底层未标记,且不跳过错误计数,则在此处标记 if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) { // 400 报错码通常是请求参数问题,不记录为提供商错误 if (status === 400) { console.log(`[Provider Pool] Skipping unhealthy marking for ${toProvider} (${pooluuid}) due to status 400 (client error)`); } else { console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to unary error (status: ${status || 'unknown'})`); // 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康 providerPoolManager.markProviderUnhealthy(toProvider, { uuid: pooluuid }); credentialMarkedUnhealthy = true; } } else if (credentialMarkedUnhealthy) { console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`); } else if (skipErrorCount) { console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`); } // 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试 if (shouldSwitchCredential && !credentialMarkedUnhealthy) { credentialMarkedUnhealthy = true; // 触发下面的重试逻辑 } // 凭证已被标记为不健康后,尝试切换到新凭证重试 // 不再依赖状态码判断,只要凭证被标记不健康且可以重试,就尝试切换 if (credentialMarkedUnhealthy && currentRetry < maxRetries && providerPoolManager && CONFIG) { // 增加10秒内的随机等待时间,避免所有请求同时切换凭证 const randomDelay = Math.floor(Math.random() * 10000); // 0-10000毫秒 console.log(`[Unary Retry] Credential marked unhealthy. Waiting ${randomDelay}ms before retry ${currentRetry + 1}/${maxRetries} with different credential...`); await new Promise(resolve => setTimeout(resolve, randomDelay)); try { // 动态导入以避免循环依赖 const { getApiServiceWithFallback } = await import('../services/service-manager.js'); const result = await getApiServiceWithFallback(CONFIG, model); if (result && result.service && result.uuid !== pooluuid) { console.log(`[Unary Retry] Switched to new credential: ${result.uuid} (provider: ${result.actualProviderType})`); // 使用新服务重试 const newRetryContext = { ...retryContext, CONFIG, currentRetry: currentRetry + 1, maxRetries }; // 递归调用,使用新的服务 return await handleUnaryRequest( res, result.service, result.actualModel || model, requestBody, fromProvider, result.actualProviderType || toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, result.uuid, result.serviceConfig?.customName || customName, newRetryContext ); } else if (result && result.uuid === pooluuid) { console.log(`[Unary Retry] No different healthy credential available. Same credential selected.`); } else { console.log(`[Unary Retry] No healthy credential available for retry.`); } } catch (retryError) { console.error(`[Unary Retry] Failed to get alternative service:`, retryError.message); } } // 使用新方法创建符合 fromProvider 格式的错误响应 const errorResponse = createErrorResponse(error, fromProvider); await handleUnifiedResponse(res, JSON.stringify(errorResponse), false); } } /** * Handles requests for listing available models. It fetches models from the * service, transforms them to the format expected by the client (OpenAI, Claude, etc.), * and sends the JSON response. * @param {http.IncomingMessage} req The HTTP request object. * @param {http.ServerResponse} res The HTTP response object. * @param {string} endpointType The type of endpoint being called (e.g., OPENAI_MODEL_LIST). * @param {Object} CONFIG - The server configuration object. */ export async function handleModelListRequest(req, res, service, endpointType, CONFIG, providerPoolManager, pooluuid) { try{ const clientProviderMap = { [ENDPOINT_TYPE.OPENAI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.OPENAI, [ENDPOINT_TYPE.GEMINI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.GEMINI, }; const fromProvider = clientProviderMap[endpointType]; const toProvider = CONFIG.MODEL_PROVIDER; if (!fromProvider) { throw new Error(`Unsupported endpoint type for model list: ${endpointType}`); } // 1. Get the model list in the backend's native format. const nativeModelList = await service.listModels(); // 2. Convert the model list to the client's expected format, if necessary. let clientModelList = nativeModelList; if (!getProtocolPrefix(toProvider).includes(getProtocolPrefix(fromProvider))) { console.log(`[ModelList Convert] Converting model list from ${toProvider} to ${fromProvider}`); clientModelList = convertData(nativeModelList, 'modelList', toProvider, fromProvider); } else { console.log(`[ModelList Convert] Model list format matches. No conversion needed.`); } console.log(`[ModelList Response] Sending model list to client: ${JSON.stringify(clientModelList)}`); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(clientModelList)); } catch (error) { console.error('\n[Server] Error during model list processing:', error.stack); if (providerPoolManager) { // 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康 providerPoolManager.markProviderUnhealthy(toProvider, { uuid: pooluuid }); } } } /** * Handles requests for content generation (both unary and streaming). This function * orchestrates request body parsing, conversion to the internal Gemini format, * logging, and dispatching to the appropriate stream or unary handler. * @param {http.IncomingMessage} req The HTTP request object. * @param {http.ServerResponse} res The HTTP response object. * @param {string} endpointType The type of endpoint being called (e.g., OPENAI_CHAT). * @param {Object} CONFIG - The server configuration object. * @param {string} PROMPT_LOG_FILENAME - The prompt log filename. */ export async function handleContentGenerationRequest(req, res, service, endpointType, CONFIG, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid, requestPath = null) { const originalRequestBody = await getRequestBody(req); if (!originalRequestBody) { throw new Error("Request body is missing for content generation."); } const clientProviderMap = { [ENDPOINT_TYPE.OPENAI_CHAT]: MODEL_PROTOCOL_PREFIX.OPENAI, [ENDPOINT_TYPE.OPENAI_RESPONSES]: MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES, [ENDPOINT_TYPE.CLAUDE_MESSAGE]: MODEL_PROTOCOL_PREFIX.CLAUDE, [ENDPOINT_TYPE.GEMINI_CONTENT]: MODEL_PROTOCOL_PREFIX.GEMINI, }; const fromProvider = clientProviderMap[endpointType]; // 使用实际的提供商类型(可能是 fallback 后的类型) let toProvider = CONFIG.actualProviderType || CONFIG.MODEL_PROVIDER; let actualUuid = pooluuid; if (!fromProvider) { throw new Error(`Unsupported endpoint type for content generation: ${endpointType}`); } // 2. Extract model and determine if the request is for streaming. let { model, isStream } = _extractModelAndStreamInfo(req, originalRequestBody, fromProvider); if (!model) { throw new Error("Could not determine the model from the request."); } console.log(`[Content Generation] Model: ${model}, Stream: ${isStream}`); let actualCustomName = CONFIG.customName; // 2.5. 如果使用了提供商池,根据模型重新选择提供商(支持 Fallback) // 注意:这里使用 skipUsageCount: true,因为初次选择时已经增加了 usageCount if (providerPoolManager && CONFIG.providerPools && CONFIG.providerPools[CONFIG.MODEL_PROVIDER]) { const { getApiServiceWithFallback } = await import('../services/service-manager.js'); const result = await getApiServiceWithFallback(CONFIG, model); service = result.service; toProvider = result.actualProviderType; actualUuid = result.uuid || pooluuid; actualCustomName = result.serviceConfig?.customName || CONFIG.customName; // 如果发生了模型级别的 fallback,需要更新请求使用的模型 if (result.actualModel && result.actualModel !== model) { console.log(`[Content Generation] Model Fallback: ${model} -> ${result.actualModel}`); model = result.actualModel; } if (result.isFallback) { console.log(`[Content Generation] Fallback activated: ${CONFIG.MODEL_PROVIDER} -> ${toProvider} (uuid: ${actualUuid})`); } else { console.log(`[Content Generation] Re-selected service adapter based on model: ${model}`); } } // 1. Convert request body from client format to backend format, if necessary. let processedRequestBody = originalRequestBody; // fs.writeFile('originalRequestBody'+Date.now()+'.json', JSON.stringify(originalRequestBody)); if (getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider)) { console.log(`[Request Convert] Converting request from ${fromProvider} to ${toProvider}`); processedRequestBody = convertData(originalRequestBody, 'request', fromProvider, toProvider); } else { console.log(`[Request Convert] Request format matches backend provider. No conversion needed.`); } // 为 forward provider 添加原始请求路径作为 endpoint if (requestPath && toProvider === MODEL_PROVIDER.FORWARD_API) { console.log(`[Forward API] Request path: ${requestPath}`); processedRequestBody.endpoint = requestPath; } // 3. Apply system prompt from file if configured. processedRequestBody = await _applySystemPromptFromFile(CONFIG, processedRequestBody, toProvider); await _manageSystemPrompt(processedRequestBody, toProvider); // 4. Log the incoming prompt (after potential conversion to the backend's format). const promptText = extractPromptText(processedRequestBody, toProvider); await logConversation('input', promptText, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME); // 5. Call the appropriate stream or unary handler, passing the provider info. // 创建重试上下文,包含 CONFIG 以便在认证错误时切换凭证重试 // 凭证切换重试次数(默认 5),可在配置中自定义更大的值 // 注意:这与底层的 429/5xx 重试(REQUEST_MAX_RETRIES)是不同层次的重试机制 // - 底层重试:同一凭证遇到 429/5xx 时的重试 // - 凭证切换重试:凭证被标记不健康后切换到其他凭证 // 当没有不同的健康凭证可用时,重试会自动停止 const credentialSwitchMaxRetries = CONFIG.CREDENTIAL_SWITCH_MAX_RETRIES || 5; const retryContext = providerPoolManager ? { CONFIG, currentRetry: 0, maxRetries: credentialSwitchMaxRetries } : null; if (isStream) { await handleStreamRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, actualUuid, actualCustomName, retryContext); } else { await handleUnaryRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, actualUuid, actualCustomName, retryContext); } // 执行插件钩子:内容生成后 try { const pluginManager = getPluginManager(); await pluginManager.executeHook('onContentGenerated', CONFIG); } catch (e) { /* 静默失败,不影响主流程 */ } } /** * Helper function to extract model and stream information from the request. * @param {http.IncomingMessage} req The HTTP request object. * @param {Object} requestBody The parsed request body. * @param {string} fromProvider The type of endpoint being called. * @returns {{model: string, isStream: boolean}} An object containing the model name and stream status. */ function _extractModelAndStreamInfo(req, requestBody, fromProvider) { const strategy = ProviderStrategyFactory.getStrategy(getProtocolPrefix(fromProvider)); return strategy.extractModelAndStreamInfo(req, requestBody); } async function _applySystemPromptFromFile(config, requestBody, toProvider) { const strategy = ProviderStrategyFactory.getStrategy(getProtocolPrefix(toProvider)); return strategy.applySystemPromptFromFile(config, requestBody); } export async function _manageSystemPrompt(requestBody, provider) { const strategy = ProviderStrategyFactory.getStrategy(getProtocolPrefix(provider)); await strategy.manageSystemPrompt(requestBody); } // Helper functions for content extraction and conversion (from convert.js, but needed here) export function extractResponseText(response, provider) { const strategy = ProviderStrategyFactory.getStrategy(getProtocolPrefix(provider)); return strategy.extractResponseText(response); } export function extractPromptText(requestBody, provider) { const strategy = ProviderStrategyFactory.getStrategy(getProtocolPrefix(provider)); return strategy.extractPromptText(requestBody); } export function handleError(res, error, provider = null) { const statusCode = error.response?.status || error.statusCode || error.status || error.code || 500; let errorMessage = error.message; let suggestions = []; // 仅在没有传入错误信息时,才使用默认消息;否则只添加建议 const hasOriginalMessage = error.message && error.message.trim() !== ''; // 根据提供商获取适配的错误信息和建议 const providerSuggestions = _getProviderSpecificSuggestions(statusCode, provider); // Provide detailed information and suggestions for different error types switch (statusCode) { case 401: errorMessage = 'Authentication failed. Please check your credentials.'; suggestions = providerSuggestions.auth; break; case 403: errorMessage = 'Access forbidden. Insufficient permissions.'; suggestions = providerSuggestions.permission; break; case 429: errorMessage = 'Too many requests. Rate limit exceeded.'; suggestions = providerSuggestions.rateLimit; break; case 500: case 502: case 503: case 504: errorMessage = 'Server error occurred. This is usually temporary.'; suggestions = providerSuggestions.serverError; break; default: if (statusCode >= 400 && statusCode < 500) { errorMessage = `Client error (${statusCode}): ${error.message}`; suggestions = providerSuggestions.clientError; } else if (statusCode >= 500) { errorMessage = `Server error (${statusCode}): ${error.message}`; suggestions = providerSuggestions.serverError; } } errorMessage = hasOriginalMessage ? error.message.trim() : errorMessage; console.error(`\n[Server] Request failed (${statusCode}): ${errorMessage}`); if (suggestions.length > 0) { console.error('[Server] Suggestions:'); suggestions.forEach((suggestion, index) => { console.error(` ${index + 1}. ${suggestion}`); }); } console.error('[Server] Full error details:', error.stack); if (!res.headersSent) { res.writeHead(statusCode, { 'Content-Type': 'application/json' }); } const errorPayload = { error: { message: errorMessage, code: statusCode, suggestions: suggestions, details: error.response?.data } }; res.end(JSON.stringify(errorPayload)); } /** * 根据提供商类型获取适配的错误建议 * @param {number} statusCode - HTTP 状态码 * @param {string|null} provider - 提供商类型 * @returns {Object} 包含各类错误建议的对象 */ function _getProviderSpecificSuggestions(statusCode, provider) { const protocolPrefix = provider ? getProtocolPrefix(provider) : null; // 默认/通用建议 const defaultSuggestions = { auth: [ 'Verify your API key or credentials are valid', 'Check if your credentials have expired', 'Ensure the API key has the necessary permissions' ], permission: [ 'Check if your account has the necessary permissions', 'Verify the API endpoint is accessible with your credentials', 'Contact your administrator if permissions are restricted' ], rateLimit: [ 'The request has been automatically retried with exponential backoff', 'If the issue persists, try reducing the request frequency', 'Consider upgrading your API quota if available' ], serverError: [ 'The request has been automatically retried', 'If the issue persists, try again in a few minutes', 'Check the service status page for outages' ], clientError: [ 'Check your request format and parameters', 'Verify the model name is correct', 'Ensure all required fields are provided' ] }; // 根据提供商返回特定建议 switch (protocolPrefix) { case MODEL_PROTOCOL_PREFIX.GEMINI: return { auth: [ 'Verify your OAuth credentials are valid', 'Try re-authenticating by deleting the credentials file', 'Check if your Google Cloud project has the necessary permissions' ], permission: [ 'Ensure your Google Cloud project has the Gemini API enabled', 'Check if your account has the necessary permissions', 'Verify the project ID is correct' ], rateLimit: [ 'The request has been automatically retried with exponential backoff', 'If the issue persists, try reducing the request frequency', 'Consider upgrading your Google Cloud API quota' ], serverError: [ 'The request has been automatically retried', 'If the issue persists, try again in a few minutes', 'Check Google Cloud status page for service outages' ], clientError: [ 'Check your request format and parameters', 'Verify the model name is a valid Gemini model', 'Ensure all required fields are provided' ] }; case MODEL_PROTOCOL_PREFIX.OPENAI: case MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES: return { auth: [ 'Verify your OpenAI API key is valid', 'Check if your API key has expired or been revoked', 'Ensure the API key is correctly formatted (starts with sk-)' ], permission: [ 'Check if your OpenAI account has access to the requested model', 'Verify your organization settings allow this operation', 'Ensure you have sufficient credits in your account' ], rateLimit: [ 'The request has been automatically retried with exponential backoff', 'If the issue persists, try reducing the request frequency', 'Consider upgrading your OpenAI usage tier for higher limits' ], serverError: [ 'The request has been automatically retried', 'If the issue persists, try again in a few minutes', 'Check OpenAI status page (status.openai.com) for outages' ], clientError: [ 'Check your request format and parameters', 'Verify the model name is a valid OpenAI model', 'Ensure the message format is correct (role and content fields)' ] }; case MODEL_PROTOCOL_PREFIX.CLAUDE: return { auth: [ 'Verify your Anthropic API key is valid', 'Check if your API key has expired or been revoked', 'Ensure the x-api-key header is correctly set' ], permission: [ 'Check if your Anthropic account has access to the requested model', 'Verify your account is in good standing', 'Ensure you have sufficient credits in your account' ], rateLimit: [ 'The request has been automatically retried with exponential backoff', 'If the issue persists, try reducing the request frequency', 'Consider upgrading your Anthropic usage tier for higher limits' ], serverError: [ 'The request has been automatically retried', 'If the issue persists, try again in a few minutes', 'Check Anthropic status page for service outages' ], clientError: [ 'Check your request format and parameters', 'Verify the model name is a valid Claude model', 'Ensure the message format follows Anthropic API specifications' ] }; case MODEL_PROTOCOL_PREFIX.OLLAMA: return { auth: [ 'Ollama typically does not require authentication', 'If using a custom setup, verify your credentials', 'Check if the Ollama server requires authentication' ], permission: [ 'Verify the Ollama server is accessible', 'Check if the requested model is available locally', 'Ensure the Ollama server allows the requested operation' ], rateLimit: [ 'The local Ollama server may be overloaded', 'Try reducing concurrent requests', 'Consider increasing server resources if running locally' ], serverError: [ 'Check if the Ollama server is running', 'Verify the server address and port are correct', 'Check Ollama server logs for detailed error information' ], clientError: [ 'Check your request format and parameters', 'Verify the model name is available in your Ollama installation', 'Try pulling the model first with: ollama pull ' ] }; default: return defaultSuggestions; } } /** * 从请求体中提取系统提示词。 * @param {Object} requestBody - 请求体对象。 * @param {string} provider - 提供商类型('openai', 'gemini', 'claude')。 * @returns {string} 提取到的系统提示词字符串。 */ export function extractSystemPromptFromRequestBody(requestBody, provider) { let incomingSystemText = ''; switch (provider) { case MODEL_PROTOCOL_PREFIX.OPENAI: const openaiSystemMessage = requestBody.messages?.find(m => m.role === 'system'); if (openaiSystemMessage?.content) { incomingSystemText = openaiSystemMessage.content; } else if (requestBody.messages?.length > 0) { // Fallback to first user message if no system message const userMessage = requestBody.messages.find(m => m.role === 'user'); if (userMessage) { incomingSystemText = userMessage.content; } } break; case MODEL_PROTOCOL_PREFIX.GEMINI: const geminiSystemInstruction = requestBody.system_instruction || requestBody.systemInstruction; if (geminiSystemInstruction?.parts) { incomingSystemText = geminiSystemInstruction.parts .filter(p => p?.text) .map(p => p.text) .join('\n'); } else if (requestBody.contents?.length > 0) { // Fallback to first user content if no system instruction const userContent = requestBody.contents[0]; if (userContent?.parts) { incomingSystemText = userContent.parts .filter(p => p?.text) .map(p => p.text) .join('\n'); } } break; case MODEL_PROTOCOL_PREFIX.CLAUDE: if (typeof requestBody.system === 'string') { incomingSystemText = requestBody.system; } else if (typeof requestBody.system === 'object') { incomingSystemText = JSON.stringify(requestBody.system); } else if (requestBody.messages?.length > 0) { // Fallback to first user message if no system property const userMessage = requestBody.messages.find(m => m.role === 'user'); if (userMessage) { if (Array.isArray(userMessage.content)) { incomingSystemText = userMessage.content.map(block => block.text).join(''); } else { incomingSystemText = userMessage.content; } } } break; default: console.warn(`[System Prompt] Unknown provider: ${provider}`); break; } return incomingSystemText; } /** * Generates an MD5 hash for a given object by first converting it to a JSON string. * @param {object} obj - The object to hash. * @returns {string} The MD5 hash of the object's JSON string representation. */ export function getMD5Hash(obj) { const jsonString = JSON.stringify(obj); return crypto.createHash('md5').update(jsonString).digest('hex'); } /** * 创建符合 fromProvider 格式的错误响应(非流式) * @param {Error} error - 错误对象 * @param {string} fromProvider - 客户端期望的提供商格式 * @returns {Object} 格式化的错误响应对象 */ function createErrorResponse(error, fromProvider) { const protocolPrefix = getProtocolPrefix(fromProvider); const statusCode = error.status || error.code || 500; const errorMessage = error.message || "An error occurred during processing."; // 根据 HTTP 状态码映射错误类型 const getErrorType = (code) => { if (code === 401) return 'authentication_error'; if (code === 403) return 'permission_error'; if (code === 429) return 'rate_limit_error'; if (code >= 500) return 'server_error'; return 'invalid_request_error'; }; // 根据 HTTP 状态码映射 Gemini 的 status const getGeminiStatus = (code) => { if (code === 400) return 'INVALID_ARGUMENT'; if (code === 401) return 'UNAUTHENTICATED'; if (code === 403) return 'PERMISSION_DENIED'; if (code === 404) return 'NOT_FOUND'; if (code === 429) return 'RESOURCE_EXHAUSTED'; if (code >= 500) return 'INTERNAL'; return 'UNKNOWN'; }; switch (protocolPrefix) { case MODEL_PROTOCOL_PREFIX.OPENAI: // OpenAI 非流式错误格式 return { error: { message: errorMessage, type: getErrorType(statusCode), code: getErrorType(statusCode) // OpenAI 使用 code 字段作为核心判断 } }; case MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES: // OpenAI Responses API 非流式错误格式 return { error: { type: getErrorType(statusCode), message: errorMessage, code: getErrorType(statusCode) } }; case MODEL_PROTOCOL_PREFIX.CLAUDE: // Claude 非流式错误格式(外层有 type 标记) return { type: "error", // 核心区分标记 error: { type: getErrorType(statusCode), // Claude 使用 error.type 作为核心判断 message: errorMessage } }; case MODEL_PROTOCOL_PREFIX.GEMINI: // Gemini 非流式错误格式(遵循 Google Cloud 标准) return { error: { code: statusCode, message: errorMessage, status: getGeminiStatus(statusCode) // Gemini 使用 status 作为核心判断 } }; default: // 默认使用 OpenAI 格式 return { error: { message: errorMessage, type: getErrorType(statusCode), code: getErrorType(statusCode) } }; } } /** * 创建符合 fromProvider 格式的流式错误响应 * @param {Error} error - 错误对象 * @param {string} fromProvider - 客户端期望的提供商格式 * @returns {string} 格式化的流式错误响应字符串 */ function createStreamErrorResponse(error, fromProvider) { const protocolPrefix = getProtocolPrefix(fromProvider); const statusCode = error.status || error.code || 500; const errorMessage = error.message || "An error occurred during streaming."; // 根据 HTTP 状态码映射错误类型 const getErrorType = (code) => { if (code === 401) return 'authentication_error'; if (code === 403) return 'permission_error'; if (code === 429) return 'rate_limit_error'; if (code >= 500) return 'server_error'; return 'invalid_request_error'; }; // 根据 HTTP 状态码映射 Gemini 的 status const getGeminiStatus = (code) => { if (code === 400) return 'INVALID_ARGUMENT'; if (code === 401) return 'UNAUTHENTICATED'; if (code === 403) return 'PERMISSION_DENIED'; if (code === 404) return 'NOT_FOUND'; if (code === 429) return 'RESOURCE_EXHAUSTED'; if (code >= 500) return 'INTERNAL'; return 'UNKNOWN'; }; switch (protocolPrefix) { case MODEL_PROTOCOL_PREFIX.OPENAI: // OpenAI 流式错误格式(SSE data 块) const openaiError = { error: { message: errorMessage, type: getErrorType(statusCode), code: null } }; return `data: ${JSON.stringify(openaiError)}\n\n`; case MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES: // OpenAI Responses API 流式错误格式(SSE event + data) const responsesError = { id: `resp_${Date.now()}`, object: "error", created: Math.floor(Date.now() / 1000), error: { type: getErrorType(statusCode), message: errorMessage, code: getErrorType(statusCode) } }; return `event: error\ndata: ${JSON.stringify(responsesError)}\n\n`; case MODEL_PROTOCOL_PREFIX.CLAUDE: // Claude 流式错误格式(SSE event + data) const claudeError = { type: "error", error: { type: getErrorType(statusCode), message: errorMessage } }; return `event: error\ndata: ${JSON.stringify(claudeError)}\n\n`; case MODEL_PROTOCOL_PREFIX.GEMINI: // Gemini 流式错误格式 // 注意:虽然 Gemini 原生使用 JSON 数组,但在我们的实现中已经转换为 SSE 格式 // 所以这里也需要使用 data: 前缀,保持与正常流式响应一致 const geminiError = { error: { code: statusCode, message: errorMessage, status: getGeminiStatus(statusCode) } }; return `data: ${JSON.stringify(geminiError)}\n\n`; default: // 默认使用 OpenAI SSE 格式 const defaultError = { error: { message: errorMessage, type: getErrorType(statusCode), code: null } }; return `data: ${JSON.stringify(defaultError)}\n\n`; } }