const axios = require('axios') const ProxyHelper = require('../utils/proxyHelper') const logger = require('../utils/logger') const openaiResponsesAccountService = require('./openaiResponsesAccountService') const apiKeyService = require('./apiKeyService') const unifiedOpenAIScheduler = require('./unifiedOpenAIScheduler') const config = require('../../config/config') const crypto = require('crypto') // 抽取缓存写入 token,兼容多种字段命名 function extractCacheCreationTokens(usageData) { if (!usageData || typeof usageData !== 'object') { return 0 } const details = usageData.input_tokens_details || usageData.prompt_tokens_details || {} const candidates = [ details.cache_creation_input_tokens, details.cache_creation_tokens, usageData.cache_creation_input_tokens, usageData.cache_creation_tokens ] for (const value of candidates) { if (value !== undefined && value !== null && value !== '') { const parsed = Number(value) if (!Number.isNaN(parsed)) { return parsed } } } return 0 } class OpenAIResponsesRelayService { constructor() { this.defaultTimeout = config.requestTimeout || 600000 } // 处理请求转发 async handleRequest(req, res, account, apiKeyData) { let abortController = null // 获取会话哈希(如果有的话) const sessionId = req.headers['session_id'] || req.body?.session_id const sessionHash = sessionId ? crypto.createHash('sha256').update(sessionId).digest('hex') : null try { // 获取完整的账户信息(包含解密的 API Key) const fullAccount = await openaiResponsesAccountService.getAccount(account.id) if (!fullAccount) { throw new Error('Account not found') } // 创建 AbortController 用于取消请求 abortController = new AbortController() // 设置客户端断开监听器 const handleClientDisconnect = () => { logger.info('🔌 Client disconnected, aborting OpenAI-Responses request') if (abortController && !abortController.signal.aborted) { abortController.abort() } } // 监听客户端断开事件 req.once('close', handleClientDisconnect) res.once('close', handleClientDisconnect) // 构建目标 URL const targetUrl = `${fullAccount.baseApi}${req.path}` logger.info(`🎯 Forwarding to: ${targetUrl}`) // 构建请求头 const headers = { ...this._filterRequestHeaders(req.headers), Authorization: `Bearer ${fullAccount.apiKey}`, 'Content-Type': 'application/json' } // 处理 User-Agent if (fullAccount.userAgent) { // 使用自定义 User-Agent headers['User-Agent'] = fullAccount.userAgent logger.debug(`📱 Using custom User-Agent: ${fullAccount.userAgent}`) } else if (req.headers['user-agent']) { // 透传原始 User-Agent headers['User-Agent'] = req.headers['user-agent'] logger.debug(`📱 Forwarding original User-Agent: ${req.headers['user-agent']}`) } // 配置请求选项 const requestOptions = { method: req.method, url: targetUrl, headers, data: req.body, timeout: this.defaultTimeout, responseType: req.body?.stream ? 'stream' : 'json', validateStatus: () => true, // 允许处理所有状态码 signal: abortController.signal } // 配置代理(如果有) if (fullAccount.proxy) { const proxyAgent = ProxyHelper.createProxyAgent(fullAccount.proxy) if (proxyAgent) { requestOptions.httpAgent = proxyAgent requestOptions.httpsAgent = proxyAgent requestOptions.proxy = false logger.info( `🌐 Using proxy for OpenAI-Responses: ${ProxyHelper.getProxyDescription(fullAccount.proxy)}` ) } } // 记录请求信息 logger.info('📤 OpenAI-Responses relay request', { accountId: account.id, accountName: account.name, targetUrl, method: req.method, stream: req.body?.stream || false, model: req.body?.model || 'unknown', userAgent: headers['User-Agent'] || 'not set' }) // 发送请求 const response = await axios(requestOptions) // 处理 429 限流错误 if (response.status === 429) { const { resetsInSeconds, errorData } = await this._handle429Error( account, response, req.body?.stream, sessionHash ) // 返回错误响应(使用处理后的数据,避免循环引用) const errorResponse = errorData || { error: { message: 'Rate limit exceeded', type: 'rate_limit_error', code: 'rate_limit_exceeded', resets_in_seconds: resetsInSeconds } } return res.status(429).json(errorResponse) } // 处理其他错误状态码 if (response.status >= 400) { // 处理流式错误响应 let errorData = response.data if (response.data && typeof response.data.pipe === 'function') { // 流式响应需要先读取内容 const chunks = [] await new Promise((resolve) => { response.data.on('data', (chunk) => chunks.push(chunk)) response.data.on('end', resolve) response.data.on('error', resolve) setTimeout(resolve, 5000) // 超时保护 }) const fullResponse = Buffer.concat(chunks).toString() // 尝试解析错误响应 try { if (fullResponse.includes('data: ')) { // SSE格式 const lines = fullResponse.split('\n') for (const line of lines) { if (line.startsWith('data: ')) { const jsonStr = line.slice(6).trim() if (jsonStr && jsonStr !== '[DONE]') { errorData = JSON.parse(jsonStr) break } } } } else { // 普通JSON errorData = JSON.parse(fullResponse) } } catch (e) { logger.error('Failed to parse error response:', e) errorData = { error: { message: fullResponse || 'Unknown error' } } } } logger.error('OpenAI-Responses API error', { status: response.status, statusText: response.statusText, errorData }) if (response.status === 401) { let reason = 'OpenAI Responses账号认证失败(401错误)' if (errorData) { if (typeof errorData === 'string' && errorData.trim()) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}` } else if ( errorData.error && typeof errorData.error.message === 'string' && errorData.error.message.trim() ) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}` } else if (typeof errorData.message === 'string' && errorData.message.trim()) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}` } } try { await unifiedOpenAIScheduler.markAccountUnauthorized( account.id, 'openai-responses', sessionHash, reason ) } catch (markError) { logger.error( '❌ Failed to mark OpenAI-Responses account unauthorized after 401:', markError ) } let unauthorizedResponse = errorData if ( !unauthorizedResponse || typeof unauthorizedResponse !== 'object' || unauthorizedResponse.pipe || Buffer.isBuffer(unauthorizedResponse) ) { const fallbackMessage = typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized' unauthorizedResponse = { error: { message: fallbackMessage, type: 'unauthorized', code: 'unauthorized' } } } // 清理监听器 req.removeListener('close', handleClientDisconnect) res.removeListener('close', handleClientDisconnect) return res.status(401).json(unauthorizedResponse) } // 清理监听器 req.removeListener('close', handleClientDisconnect) res.removeListener('close', handleClientDisconnect) return res.status(response.status).json(errorData) } // 更新最后使用时间 await openaiResponsesAccountService.updateAccount(account.id, { lastUsedAt: new Date().toISOString() }) // 处理流式响应 if (req.body?.stream && response.data && typeof response.data.pipe === 'function') { return this._handleStreamResponse( response, res, account, apiKeyData, req.body?.model, handleClientDisconnect, req ) } // 处理非流式响应 return this._handleNormalResponse(response, res, account, apiKeyData, req.body?.model) } catch (error) { // 清理 AbortController if (abortController && !abortController.signal.aborted) { abortController.abort() } // 安全地记录错误,避免循环引用 const errorInfo = { message: error.message, code: error.code, status: error.response?.status, statusText: error.response?.statusText } logger.error('OpenAI-Responses relay error:', errorInfo) // 检查是否是网络错误 if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') { await openaiResponsesAccountService.updateAccount(account.id, { status: 'error', errorMessage: `Connection error: ${error.code}` }) } // 如果已经发送了响应头,直接结束 if (res.headersSent) { return res.end() } // 检查是否是axios错误并包含响应 if (error.response) { // 处理axios错误响应 const status = error.response.status || 500 let errorData = { error: { message: error.response.statusText || 'Request failed', type: 'api_error', code: error.code || 'unknown' } } // 如果响应包含数据,尝试使用它 if (error.response.data) { // 检查是否是流 if (typeof error.response.data === 'object' && !error.response.data.pipe) { errorData = error.response.data } else if (typeof error.response.data === 'string') { try { errorData = JSON.parse(error.response.data) } catch (e) { errorData.error.message = error.response.data } } } if (status === 401) { let reason = 'OpenAI Responses账号认证失败(401错误)' if (errorData) { if (typeof errorData === 'string' && errorData.trim()) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}` } else if ( errorData.error && typeof errorData.error.message === 'string' && errorData.error.message.trim() ) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}` } else if (typeof errorData.message === 'string' && errorData.message.trim()) { reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}` } } try { await unifiedOpenAIScheduler.markAccountUnauthorized( account.id, 'openai-responses', sessionHash, reason ) } catch (markError) { logger.error( '❌ Failed to mark OpenAI-Responses account unauthorized in catch handler:', markError ) } let unauthorizedResponse = errorData if ( !unauthorizedResponse || typeof unauthorizedResponse !== 'object' || unauthorizedResponse.pipe || Buffer.isBuffer(unauthorizedResponse) ) { const fallbackMessage = typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized' unauthorizedResponse = { error: { message: fallbackMessage, type: 'unauthorized', code: 'unauthorized' } } } return res.status(401).json(unauthorizedResponse) } return res.status(status).json(errorData) } // 其他错误 return res.status(500).json({ error: { message: 'Internal server error', type: 'internal_error', details: error.message } }) } } // 处理流式响应 async _handleStreamResponse( response, res, account, apiKeyData, requestedModel, handleClientDisconnect, req ) { // 设置 SSE 响应头 res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Cache-Control', 'no-cache') res.setHeader('Connection', 'keep-alive') res.setHeader('X-Accel-Buffering', 'no') let usageData = null let actualModel = null let buffer = '' let rateLimitDetected = false let rateLimitResetsInSeconds = null let streamEnded = false // 解析 SSE 事件以捕获 usage 数据和 model const parseSSEForUsage = (data) => { const lines = data.split('\n') for (const line of lines) { if (line.startsWith('data: ')) { try { const jsonStr = line.slice(6) if (jsonStr === '[DONE]') { continue } const eventData = JSON.parse(jsonStr) // 检查是否是 response.completed 事件(OpenAI-Responses 格式) if (eventData.type === 'response.completed' && eventData.response) { // 从响应中获取真实的 model if (eventData.response.model) { actualModel = eventData.response.model logger.debug(`📊 Captured actual model from response.completed: ${actualModel}`) } // 获取 usage 数据 - OpenAI-Responses 格式在 response.usage 下 if (eventData.response.usage) { usageData = eventData.response.usage logger.info('📊 Successfully captured usage data from OpenAI-Responses:', { input_tokens: usageData.input_tokens, output_tokens: usageData.output_tokens, total_tokens: usageData.total_tokens }) } } // 检查是否有限流错误 if (eventData.error) { // 检查多种可能的限流错误类型 if ( eventData.error.type === 'rate_limit_error' || eventData.error.type === 'usage_limit_reached' || eventData.error.type === 'rate_limit_exceeded' ) { rateLimitDetected = true if (eventData.error.resets_in_seconds) { rateLimitResetsInSeconds = eventData.error.resets_in_seconds logger.warn( `🚫 Rate limit detected in stream, resets in ${rateLimitResetsInSeconds} seconds (${Math.ceil(rateLimitResetsInSeconds / 60)} minutes)` ) } } } } catch (e) { // 忽略解析错误 } } } } // 监听数据流 response.data.on('data', (chunk) => { try { const chunkStr = chunk.toString() // 转发数据给客户端 if (!res.destroyed && !streamEnded) { res.write(chunk) } // 同时解析数据以捕获 usage 信息 buffer += chunkStr // 处理完整的 SSE 事件 if (buffer.includes('\n\n')) { const events = buffer.split('\n\n') buffer = events.pop() || '' for (const event of events) { if (event.trim()) { parseSSEForUsage(event) } } } } catch (error) { logger.error('Error processing stream chunk:', error) } }) response.data.on('end', async () => { streamEnded = true // 处理剩余的 buffer if (buffer.trim()) { parseSSEForUsage(buffer) } // 记录使用统计 if (usageData) { try { // OpenAI-Responses 使用 input_tokens/output_tokens,标准 OpenAI 使用 prompt_tokens/completion_tokens const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0 const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0 // 提取缓存相关的 tokens(如果存在) const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0 const cacheCreateTokens = extractCacheCreationTokens(usageData) // 计算实际输入token(总输入减去缓存部分) const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens) const totalTokens = usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens const modelToRecord = actualModel || requestedModel || 'gpt-4' await apiKeyService.recordUsage( apiKeyData.id, actualInputTokens, // 传递实际输入(不含缓存) outputTokens, cacheCreateTokens, cacheReadTokens, modelToRecord, account.id ) logger.info( `📊 Recorded usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${modelToRecord}` ) // 更新账户的 token 使用统计 await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens) // 更新账户使用额度(如果设置了额度限制) if (parseFloat(account.dailyQuota) > 0) { // 使用CostCalculator正确计算费用(考虑缓存token的不同价格) const CostCalculator = require('../utils/costCalculator') const costInfo = CostCalculator.calculateCost( { input_tokens: actualInputTokens, // 实际输入(不含缓存) output_tokens: outputTokens, cache_creation_input_tokens: cacheCreateTokens, cache_read_input_tokens: cacheReadTokens }, modelToRecord ) await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total) } } catch (error) { logger.error('Failed to record usage:', error) } } // 如果在流式响应中检测到限流 if (rateLimitDetected) { // 使用统一调度器处理限流(与非流式响应保持一致) const sessionId = req.headers['session_id'] || req.body?.session_id const sessionHash = sessionId ? crypto.createHash('sha256').update(sessionId).digest('hex') : null await unifiedOpenAIScheduler.markAccountRateLimited( account.id, 'openai-responses', sessionHash, rateLimitResetsInSeconds ) logger.warn( `🚫 Processing rate limit for OpenAI-Responses account ${account.id} from stream` ) } // 清理监听器 req.removeListener('close', handleClientDisconnect) res.removeListener('close', handleClientDisconnect) if (!res.destroyed) { res.end() } logger.info('Stream response completed', { accountId: account.id, hasUsage: !!usageData, actualModel: actualModel || 'unknown' }) }) response.data.on('error', (error) => { streamEnded = true logger.error('Stream error:', error) // 清理监听器 req.removeListener('close', handleClientDisconnect) res.removeListener('close', handleClientDisconnect) if (!res.headersSent) { res.status(502).json({ error: { message: 'Upstream stream error' } }) } else if (!res.destroyed) { res.end() } }) // 处理客户端断开连接 const cleanup = () => { streamEnded = true try { response.data?.unpipe?.(res) response.data?.destroy?.() } catch (_) { // 忽略清理错误 } } req.on('close', cleanup) req.on('aborted', cleanup) } // 处理非流式响应 async _handleNormalResponse(response, res, account, apiKeyData, requestedModel) { const responseData = response.data // 提取 usage 数据和实际 model // 支持两种格式:直接的 usage 或嵌套在 response 中的 usage const usageData = responseData?.usage || responseData?.response?.usage const actualModel = responseData?.model || responseData?.response?.model || requestedModel || 'gpt-4' // 记录使用统计 if (usageData) { try { // OpenAI-Responses 使用 input_tokens/output_tokens,标准 OpenAI 使用 prompt_tokens/completion_tokens const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0 const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0 // 提取缓存相关的 tokens(如果存在) const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0 const cacheCreateTokens = extractCacheCreationTokens(usageData) // 计算实际输入token(总输入减去缓存部分) const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens) const totalTokens = usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens await apiKeyService.recordUsage( apiKeyData.id, actualInputTokens, // 传递实际输入(不含缓存) outputTokens, cacheCreateTokens, cacheReadTokens, actualModel, account.id ) logger.info( `📊 Recorded non-stream usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${actualModel}` ) // 更新账户的 token 使用统计 await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens) // 更新账户使用额度(如果设置了额度限制) if (parseFloat(account.dailyQuota) > 0) { // 使用CostCalculator正确计算费用(考虑缓存token的不同价格) const CostCalculator = require('../utils/costCalculator') const costInfo = CostCalculator.calculateCost( { input_tokens: actualInputTokens, // 实际输入(不含缓存) output_tokens: outputTokens, cache_creation_input_tokens: cacheCreateTokens, cache_read_input_tokens: cacheReadTokens }, actualModel ) await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total) } } catch (error) { logger.error('Failed to record usage:', error) } } // 返回响应 res.status(response.status).json(responseData) logger.info('Normal response completed', { accountId: account.id, status: response.status, hasUsage: !!usageData, model: actualModel }) } // 处理 429 限流错误 async _handle429Error(account, response, isStream = false, sessionHash = null) { let resetsInSeconds = null let errorData = null try { // 对于429错误,响应可能是JSON或SSE格式 if (isStream && response.data && typeof response.data.pipe === 'function') { // 流式响应需要先收集数据 const chunks = [] await new Promise((resolve, reject) => { response.data.on('data', (chunk) => chunks.push(chunk)) response.data.on('end', resolve) response.data.on('error', reject) // 设置超时防止无限等待 setTimeout(resolve, 5000) }) const fullResponse = Buffer.concat(chunks).toString() // 尝试解析SSE格式的错误响应 if (fullResponse.includes('data: ')) { const lines = fullResponse.split('\n') for (const line of lines) { if (line.startsWith('data: ')) { try { const jsonStr = line.slice(6).trim() if (jsonStr && jsonStr !== '[DONE]') { errorData = JSON.parse(jsonStr) break } } catch (e) { // 继续尝试下一行 } } } } // 如果SSE解析失败,尝试直接解析为JSON if (!errorData) { try { errorData = JSON.parse(fullResponse) } catch (e) { logger.error('Failed to parse 429 error response:', e) logger.debug('Raw response:', fullResponse) } } } else if (response.data && typeof response.data !== 'object') { // 如果response.data是字符串,尝试解析为JSON try { errorData = JSON.parse(response.data) } catch (e) { logger.error('Failed to parse 429 error response as JSON:', e) errorData = { error: { message: response.data } } } } else if (response.data && typeof response.data === 'object' && !response.data.pipe) { // 非流式响应,且是对象,直接使用 errorData = response.data } // 从响应体中提取重置时间(OpenAI 标准格式) if (errorData && errorData.error) { if (errorData.error.resets_in_seconds) { resetsInSeconds = errorData.error.resets_in_seconds logger.info( `🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)` ) } else if (errorData.error.resets_in) { // 某些 API 可能使用不同的字段名 resetsInSeconds = parseInt(errorData.error.resets_in) logger.info( `🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)` ) } } if (!resetsInSeconds) { logger.warn('⚠️ Could not extract reset time from 429 response, using default 60 minutes') } } catch (e) { logger.error('⚠️ Failed to parse rate limit error:', e) } // 使用统一调度器标记账户为限流状态(与普通OpenAI账号保持一致) await unifiedOpenAIScheduler.markAccountRateLimited( account.id, 'openai-responses', sessionHash, resetsInSeconds ) logger.warn('OpenAI-Responses account rate limited', { accountId: account.id, accountName: account.name, resetsInSeconds: resetsInSeconds || 'unknown', resetInMinutes: resetsInSeconds ? Math.ceil(resetsInSeconds / 60) : 60, resetInHours: resetsInSeconds ? Math.ceil(resetsInSeconds / 3600) : 1 }) // 返回处理后的数据,避免循环引用 return { resetsInSeconds, errorData } } // 过滤请求头 _filterRequestHeaders(headers) { const filtered = {} const skipHeaders = [ 'host', 'content-length', 'authorization', 'x-api-key', 'x-cr-api-key', 'connection', 'upgrade', 'sec-websocket-key', 'sec-websocket-version', 'sec-websocket-extensions' ] for (const [key, value] of Object.entries(headers)) { if (!skipHeaders.includes(key.toLowerCase())) { filtered[key] = value } } return filtered } // 估算费用(简化版本,实际应该根据不同的定价模型) _estimateCost(model, inputTokens, outputTokens) { // 这是一个简化的费用估算,实际应该根据不同的 API 提供商和模型定价 const rates = { 'gpt-4': { input: 0.03, output: 0.06 }, // per 1K tokens 'gpt-4-turbo': { input: 0.01, output: 0.03 }, 'gpt-3.5-turbo': { input: 0.0005, output: 0.0015 }, 'claude-3-opus': { input: 0.015, output: 0.075 }, 'claude-3-sonnet': { input: 0.003, output: 0.015 }, 'claude-3-haiku': { input: 0.00025, output: 0.00125 } } // 查找匹配的模型定价 let rate = rates['gpt-3.5-turbo'] // 默认使用 GPT-3.5 的价格 for (const [modelKey, modelRate] of Object.entries(rates)) { if (model.toLowerCase().includes(modelKey.toLowerCase())) { rate = modelRate break } } const inputCost = (inputTokens / 1000) * rate.input const outputCost = (outputTokens / 1000) * rate.output return inputCost + outputCost } } module.exports = new OpenAIResponsesRelayService()