const https = require('https') const zlib = require('zlib') const fs = require('fs') const path = require('path') const ProxyHelper = require('../utils/proxyHelper') const claudeAccountService = require('./claudeAccountService') const unifiedClaudeScheduler = require('./unifiedClaudeScheduler') const sessionHelper = require('../utils/sessionHelper') const logger = require('../utils/logger') const config = require('../../config/config') const claudeCodeHeadersService = require('./claudeCodeHeadersService') const redis = require('../models/redis') const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') const { formatDateWithTimezone } = require('../utils/dateHelper') class ClaudeRelayService { constructor() { this.claudeApiUrl = config.claude.apiUrl this.apiVersion = config.claude.apiVersion this.betaHeader = config.claude.betaHeader this.systemPrompt = config.claude.systemPrompt this.claudeCodeSystemPrompt = "You are Claude Code, Anthropic's official CLI for Claude." } _buildStandardRateLimitMessage(resetTime) { if (!resetTime) { return '此专属账号已触发 Anthropic 限流控制。' } const formattedReset = formatDateWithTimezone(resetTime) return `此专属账号已触发 Anthropic 限流控制,将于 ${formattedReset} 自动恢复。` } _buildOpusLimitMessage(resetTime) { if (!resetTime) { return '此专属账号的Opus模型已达到周使用限制,请尝试切换其他模型后再试。' } const formattedReset = formatDateWithTimezone(resetTime) return `此专属账号的Opus模型已达到周使用限制,将于 ${formattedReset} 自动恢复,请尝试切换其他模型后再试。` } // 🧾 提取错误消息文本 _extractErrorMessage(body) { if (!body) { return '' } if (typeof body === 'string') { const trimmed = body.trim() if (!trimmed) { return '' } try { const parsed = JSON.parse(trimmed) return this._extractErrorMessage(parsed) } catch (error) { return trimmed } } if (typeof body === 'object') { if (typeof body.error === 'string') { return body.error } if (body.error && typeof body.error === 'object') { if (typeof body.error.message === 'string') { return body.error.message } if (typeof body.error.error === 'string') { return body.error.error } } if (typeof body.message === 'string') { return body.message } } return '' } // 🚫 检查是否为组织被禁用错误 _isOrganizationDisabledError(statusCode, body) { if (statusCode !== 400) { return false } const message = this._extractErrorMessage(body) if (!message) { return false } return message.toLowerCase().includes('this organization has been disabled') } // 🔍 判断是否是真实的 Claude Code 请求 isRealClaudeCodeRequest(requestBody) { return ClaudeCodeValidator.includesClaudeCodeSystemPrompt(requestBody, 1) } // 🚀 转发请求到Claude API async relayRequest( requestBody, apiKeyData, clientRequest, clientResponse, clientHeaders, options = {} ) { let upstreamRequest = null try { // 调试日志:查看API Key数据 logger.info('🔍 API Key data received:', { apiKeyName: apiKeyData.name, enableModelRestriction: apiKeyData.enableModelRestriction, restrictedModels: apiKeyData.restrictedModels, requestedModel: requestBody.model }) const isOpusModelRequest = typeof requestBody?.model === 'string' && requestBody.model.toLowerCase().includes('opus') // 生成会话哈希用于sticky会话 const sessionHash = sessionHelper.generateSessionHash(requestBody) // 选择可用的Claude账户(支持专属绑定和sticky会话) let accountSelection try { accountSelection = await unifiedClaudeScheduler.selectAccountForApiKey( apiKeyData, sessionHash, requestBody.model ) } catch (error) { if (error.code === 'CLAUDE_DEDICATED_RATE_LIMITED') { const limitMessage = this._buildStandardRateLimitMessage(error.rateLimitEndAt) logger.warn( `🚫 Dedicated account ${error.accountId} is rate limited for API key ${apiKeyData.name}, returning 403` ) return { statusCode: 403, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ error: 'upstream_rate_limited', message: limitMessage }), accountId: error.accountId } } throw error } const { accountId } = accountSelection const { accountType } = accountSelection logger.info( `📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` ) // 获取账户信息 let account = await claudeAccountService.getAccount(accountId) if (isOpusModelRequest) { await claudeAccountService.clearExpiredOpusRateLimit(accountId) account = await claudeAccountService.getAccount(accountId) } const isDedicatedOfficialAccount = accountType === 'claude-official' && apiKeyData.claudeAccountId && !apiKeyData.claudeAccountId.startsWith('group:') && apiKeyData.claudeAccountId === accountId let opusRateLimitActive = false let opusRateLimitEndAt = null if (isOpusModelRequest) { opusRateLimitActive = await claudeAccountService.isAccountOpusRateLimited(accountId) opusRateLimitEndAt = account?.opusRateLimitEndAt || null } if (isOpusModelRequest && isDedicatedOfficialAccount && opusRateLimitActive) { const limitMessage = this._buildOpusLimitMessage(opusRateLimitEndAt) logger.warn( `🚫 Dedicated account ${account?.name || accountId} is under Opus weekly limit until ${opusRateLimitEndAt}` ) return { statusCode: 403, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ error: 'opus_weekly_limit', message: limitMessage }), accountId } } // 获取有效的访问token const accessToken = await claudeAccountService.getValidAccessToken(accountId) const processedBody = this._processRequestBody(requestBody, account) // 获取代理配置 const proxyAgent = await this._getProxyAgent(accountId) // 设置客户端断开监听器 const handleClientDisconnect = () => { logger.info('🔌 Client disconnected, aborting upstream request') if (upstreamRequest && !upstreamRequest.destroyed) { upstreamRequest.destroy() } } // 监听客户端断开事件 if (clientRequest) { clientRequest.once('close', handleClientDisconnect) } if (clientResponse) { clientResponse.once('close', handleClientDisconnect) } // 发送请求到Claude API(传入回调以获取请求对象) const response = await this._makeClaudeRequest( processedBody, accessToken, proxyAgent, clientHeaders, accountId, (req) => { upstreamRequest = req }, options ) response.accountId = accountId response.accountType = accountType // 移除监听器(请求成功完成) if (clientRequest) { clientRequest.removeListener('close', handleClientDisconnect) } if (clientResponse) { clientResponse.removeListener('close', handleClientDisconnect) } // 检查响应是否为限流错误或认证错误 if (response.statusCode !== 200 && response.statusCode !== 201) { let isRateLimited = false let rateLimitResetTimestamp = null let dedicatedRateLimitMessage = null const organizationDisabledError = this._isOrganizationDisabledError( response.statusCode, response.body ) // 检查是否为401状态码(未授权) if (response.statusCode === 401) { logger.warn(`🔐 Unauthorized error (401) detected for account ${accountId}`) // 记录401错误 await this.recordUnauthorizedError(accountId) // 检查是否需要标记为异常(遇到1次401就停止调度) const errorCount = await this.getUnauthorizedErrorCount(accountId) logger.info( `🔐 Account ${accountId} has ${errorCount} consecutive 401 errors in the last 5 minutes` ) if (errorCount >= 1) { logger.error( `❌ Account ${accountId} encountered 401 error (${errorCount} errors), marking as unauthorized` ) await unifiedClaudeScheduler.markAccountUnauthorized( accountId, accountType, sessionHash ) } } // 检查是否为403状态码(禁止访问) else if (response.statusCode === 403) { logger.error( `🚫 Forbidden error (403) detected for account ${accountId}, marking as blocked` ) await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) } // 检查是否返回组织被禁用错误(400状态码) else if (organizationDisabledError) { logger.error( `🚫 Organization disabled error (400) detected for account ${accountId}, marking as blocked` ) await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) } // 检查是否为529状态码(服务过载) else if (response.statusCode === 529) { logger.warn(`🚫 Overload error (529) detected for account ${accountId}`) // 检查是否启用了529错误处理 if (config.claude.overloadHandling.enabled > 0) { try { await claudeAccountService.markAccountOverloaded(accountId) logger.info( `🚫 Account ${accountId} marked as overloaded for ${config.claude.overloadHandling.enabled} minutes` ) } catch (overloadError) { logger.error(`❌ Failed to mark account as overloaded: ${accountId}`, overloadError) } } else { logger.info(`🚫 529 error handling is disabled, skipping account overload marking`) } } // 检查是否为5xx状态码 else if (response.statusCode >= 500 && response.statusCode < 600) { logger.warn(`🔥 Server error (${response.statusCode}) detected for account ${accountId}`) await this._handleServerError(accountId, response.statusCode, sessionHash) } // 检查是否为429状态码 else if (response.statusCode === 429) { const resetHeader = response.headers ? response.headers['anthropic-ratelimit-unified-reset'] : null const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN if (isOpusModelRequest && !Number.isNaN(parsedResetTimestamp)) { await claudeAccountService.markAccountOpusRateLimited(accountId, parsedResetTimestamp) logger.warn( `🚫 Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` ) if (isDedicatedOfficialAccount) { const limitMessage = this._buildOpusLimitMessage(parsedResetTimestamp) return { statusCode: 403, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ error: 'opus_weekly_limit', message: limitMessage }), accountId } } } else { isRateLimited = true if (!Number.isNaN(parsedResetTimestamp)) { rateLimitResetTimestamp = parsedResetTimestamp logger.info( `🕐 Extracted rate limit reset timestamp: ${rateLimitResetTimestamp} (${new Date(rateLimitResetTimestamp * 1000).toISOString()})` ) } if (isDedicatedOfficialAccount) { dedicatedRateLimitMessage = this._buildStandardRateLimitMessage( rateLimitResetTimestamp || account?.rateLimitEndAt ) } } } else { // 检查响应体中的错误信息 try { const responseBody = typeof response.body === 'string' ? JSON.parse(response.body) : response.body if ( responseBody && responseBody.error && responseBody.error.message && responseBody.error.message.toLowerCase().includes("exceed your account's rate limit") ) { isRateLimited = true } } catch (e) { // 如果解析失败,检查原始字符串 if ( response.body && response.body.toLowerCase().includes("exceed your account's rate limit") ) { isRateLimited = true } } } if (isRateLimited) { if (isDedicatedOfficialAccount && !dedicatedRateLimitMessage) { dedicatedRateLimitMessage = this._buildStandardRateLimitMessage( rateLimitResetTimestamp || account?.rateLimitEndAt ) } logger.warn( `🚫 Rate limit detected for account ${accountId}, status: ${response.statusCode}` ) // 标记账号为限流状态并删除粘性会话映射,传递准确的重置时间戳 await unifiedClaudeScheduler.markAccountRateLimited( accountId, accountType, sessionHash, rateLimitResetTimestamp ) if (dedicatedRateLimitMessage) { return { statusCode: 403, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ error: 'upstream_rate_limited', message: dedicatedRateLimitMessage }), accountId } } } } else if (response.statusCode === 200 || response.statusCode === 201) { // 提取5小时会话窗口状态 // 使用大小写不敏感的方式获取响应头 const get5hStatus = (headers) => { if (!headers) { return null } // HTTP头部名称不区分大小写,需要处理不同情况 return ( headers['anthropic-ratelimit-unified-5h-status'] || headers['Anthropic-Ratelimit-Unified-5h-Status'] || headers['ANTHROPIC-RATELIMIT-UNIFIED-5H-STATUS'] ) } const sessionWindowStatus = get5hStatus(response.headers) if (sessionWindowStatus) { logger.info(`📊 Session window status for account ${accountId}: ${sessionWindowStatus}`) // 保存会话窗口状态到账户数据 await claudeAccountService.updateSessionWindowStatus(accountId, sessionWindowStatus) } // 请求成功,清除401和500错误计数 await this.clearUnauthorizedErrors(accountId) await claudeAccountService.clearInternalErrors(accountId) // 如果请求成功,检查并移除限流状态 const isRateLimited = await unifiedClaudeScheduler.isAccountRateLimited( accountId, accountType ) if (isRateLimited) { await unifiedClaudeScheduler.removeAccountRateLimit(accountId, accountType) } // 如果请求成功,检查并移除过载状态 try { const isOverloaded = await claudeAccountService.isAccountOverloaded(accountId) if (isOverloaded) { await claudeAccountService.removeAccountOverload(accountId) } } catch (overloadError) { logger.error( `❌ Failed to check/remove overload status for account ${accountId}:`, overloadError ) } // 只有真实的 Claude Code 请求才更新 headers if ( clientHeaders && Object.keys(clientHeaders).length > 0 && this.isRealClaudeCodeRequest(requestBody) ) { await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders) } } // 记录成功的API调用并打印详细的usage数据 let responseBody = null try { responseBody = typeof response.body === 'string' ? JSON.parse(response.body) : response.body } catch (e) { logger.debug('Failed to parse response body for usage logging') } if (responseBody && responseBody.usage) { const { usage } = responseBody // 打印原始usage数据为JSON字符串 logger.info( `📊 === Non-Stream Request Usage Summary === Model: ${requestBody.model}, Usage: ${JSON.stringify(usage)}` ) } else { // 如果没有usage数据,使用估算值 const inputTokens = requestBody.messages ? requestBody.messages.reduce((sum, msg) => sum + (msg.content?.length || 0), 0) / 4 : 0 const outputTokens = response.content ? response.content.reduce((sum, content) => sum + (content.text?.length || 0), 0) / 4 : 0 logger.info( `✅ API request completed - Key: ${apiKeyData.name}, Account: ${accountId}, Model: ${requestBody.model}, Input: ~${Math.round(inputTokens)} tokens (estimated), Output: ~${Math.round(outputTokens)} tokens (estimated)` ) } // 在响应中添加accountId,以便调用方记录账户级别统计 response.accountId = accountId return response } catch (error) { logger.error( `❌ Claude relay request failed for key: ${apiKeyData.name || apiKeyData.id}:`, error.message ) throw error } } // 🔄 处理请求体 _processRequestBody(body, account = null) { if (!body) { return body } // 深拷贝请求体 const processedBody = JSON.parse(JSON.stringify(body)) // 验证并限制max_tokens参数 this._validateAndLimitMaxTokens(processedBody) // 移除cache_control中的ttl字段 this._stripTtlFromCacheControl(processedBody) // 判断是否是真实的 Claude Code 请求 const isRealClaudeCode = this.isRealClaudeCodeRequest(processedBody) // 如果不是真实的 Claude Code 请求,需要设置 Claude Code 系统提示词 if (!isRealClaudeCode) { const claudeCodePrompt = { type: 'text', text: this.claudeCodeSystemPrompt, cache_control: { type: 'ephemeral' } } if (processedBody.system) { if (typeof processedBody.system === 'string') { // 字符串格式:转换为数组,Claude Code 提示词在第一位 const userSystemPrompt = { type: 'text', text: processedBody.system } // 如果用户的提示词与 Claude Code 提示词相同,只保留一个 if (processedBody.system.trim() === this.claudeCodeSystemPrompt) { processedBody.system = [claudeCodePrompt] } else { processedBody.system = [claudeCodePrompt, userSystemPrompt] } } else if (Array.isArray(processedBody.system)) { // 检查第一个元素是否是 Claude Code 系统提示词 const firstItem = processedBody.system[0] const isFirstItemClaudeCode = firstItem && firstItem.type === 'text' && firstItem.text === this.claudeCodeSystemPrompt if (!isFirstItemClaudeCode) { // 如果第一个不是 Claude Code 提示词,需要在开头插入 // 同时检查数组中是否有其他位置包含 Claude Code 提示词,如果有则移除 const filteredSystem = processedBody.system.filter( (item) => !(item && item.type === 'text' && item.text === this.claudeCodeSystemPrompt) ) processedBody.system = [claudeCodePrompt, ...filteredSystem] } } else { // 其他格式,记录警告但不抛出错误,尝试处理 logger.warn('⚠️ Unexpected system field type:', typeof processedBody.system) processedBody.system = [claudeCodePrompt] } } else { // 用户没有传递 system,需要添加 Claude Code 提示词 processedBody.system = [claudeCodePrompt] } } this._enforceCacheControlLimit(processedBody) // 处理原有的系统提示(如果配置了) if (this.systemPrompt && this.systemPrompt.trim()) { const systemPrompt = { type: 'text', text: this.systemPrompt } // 经过上面的处理,system 现在应该总是数组格式 if (processedBody.system && Array.isArray(processedBody.system)) { // 不要重复添加相同的系统提示 const hasSystemPrompt = processedBody.system.some( (item) => item && item.text && item.text === this.systemPrompt ) if (!hasSystemPrompt) { processedBody.system.push(systemPrompt) } } else { // 理论上不应该走到这里,但为了安全起见 processedBody.system = [systemPrompt] } } else { // 如果没有配置系统提示,且system字段为空,则删除它 if (processedBody.system && Array.isArray(processedBody.system)) { const hasValidContent = processedBody.system.some( (item) => item && item.text && item.text.trim() ) if (!hasValidContent) { delete processedBody.system } } } // Claude API只允许temperature或top_p其中之一,优先使用temperature if (processedBody.top_p !== undefined && processedBody.top_p !== null) { delete processedBody.top_p } // 处理统一的客户端标识 if (account && account.useUnifiedClientId === 'true' && account.unifiedClientId) { this._replaceClientId(processedBody, account.unifiedClientId) } return processedBody } // 🔄 替换请求中的客户端标识 _replaceClientId(body, unifiedClientId) { if (!body || !body.metadata || !body.metadata.user_id || !unifiedClientId) { return } const userId = body.metadata.user_id // user_id格式:user_{64位十六进制}_account__session_{uuid} // 只替换第一个下划线后到_account之前的部分(客户端标识) const match = userId.match(/^user_[a-f0-9]{64}(_account__session_[a-f0-9-]{36})$/) if (match && match[1]) { // 替换客户端标识部分 body.metadata.user_id = `user_${unifiedClientId}${match[1]}` logger.info(`🔄 Replaced client ID with unified ID: ${body.metadata.user_id}`) } } // 🔢 验证并限制max_tokens参数 _validateAndLimitMaxTokens(body) { if (!body || !body.max_tokens) { return } try { // 读取模型定价配置文件 const pricingFilePath = path.join(__dirname, '../../data/model_pricing.json') if (!fs.existsSync(pricingFilePath)) { logger.warn('⚠️ Model pricing file not found, skipping max_tokens validation') return } const pricingData = JSON.parse(fs.readFileSync(pricingFilePath, 'utf8')) const model = body.model || 'claude-sonnet-4-20250514' // 查找对应模型的配置 const modelConfig = pricingData[model] if (!modelConfig) { // 如果找不到模型配置,直接透传客户端参数,不进行任何干预 logger.info( `📝 Model ${model} not found in pricing file, passing through client parameters without modification` ) return } // 获取模型的最大token限制 const maxLimit = modelConfig.max_tokens || modelConfig.max_output_tokens if (!maxLimit) { logger.debug(`🔍 No max_tokens limit found for model ${model}, skipping validation`) return } // 检查并调整max_tokens if (body.max_tokens > maxLimit) { logger.warn( `⚠️ max_tokens ${body.max_tokens} exceeds limit ${maxLimit} for model ${model}, adjusting to ${maxLimit}` ) body.max_tokens = maxLimit } } catch (error) { logger.error('❌ Failed to validate max_tokens from pricing file:', error) // 如果文件读取失败,不进行校验,让请求继续处理 } } // 🧹 移除TTL字段 _stripTtlFromCacheControl(body) { if (!body || typeof body !== 'object') { return } const processContentArray = (contentArray) => { if (!Array.isArray(contentArray)) { return } contentArray.forEach((item) => { if (item && typeof item === 'object' && item.cache_control) { if (item.cache_control.ttl) { delete item.cache_control.ttl logger.debug('🧹 Removed ttl from cache_control') } } }) } if (Array.isArray(body.system)) { processContentArray(body.system) } if (Array.isArray(body.messages)) { body.messages.forEach((message) => { if (message && Array.isArray(message.content)) { processContentArray(message.content) } }) } } // ⚖️ 限制带缓存控制的内容数量 _enforceCacheControlLimit(body) { const MAX_CACHE_CONTROL_BLOCKS = 4 if (!body || typeof body !== 'object') { return } const countCacheControlBlocks = () => { let total = 0 if (Array.isArray(body.messages)) { body.messages.forEach((message) => { if (!message || !Array.isArray(message.content)) { return } message.content.forEach((item) => { if (item && item.cache_control) { total += 1 } }) }) } if (Array.isArray(body.system)) { body.system.forEach((item) => { if (item && item.cache_control) { total += 1 } }) } return total } const removeFromMessages = () => { if (!Array.isArray(body.messages)) { return false } for (let messageIndex = 0; messageIndex < body.messages.length; messageIndex += 1) { const message = body.messages[messageIndex] if (!message || !Array.isArray(message.content)) { continue } for (let contentIndex = 0; contentIndex < message.content.length; contentIndex += 1) { const contentItem = message.content[contentIndex] if (contentItem && contentItem.cache_control) { message.content.splice(contentIndex, 1) if (message.content.length === 0) { body.messages.splice(messageIndex, 1) } return true } } } return false } const removeFromSystem = () => { if (!Array.isArray(body.system)) { return false } for (let index = 0; index < body.system.length; index += 1) { const systemItem = body.system[index] if (systemItem && systemItem.cache_control) { body.system.splice(index, 1) if (body.system.length === 0) { delete body.system } return true } } return false } let total = countCacheControlBlocks() while (total > MAX_CACHE_CONTROL_BLOCKS) { if (removeFromMessages()) { total -= 1 continue } if (removeFromSystem()) { total -= 1 continue } break } } // 🌐 获取代理Agent(使用统一的代理工具) async _getProxyAgent(accountId) { try { const accountData = await claudeAccountService.getAllAccounts() const account = accountData.find((acc) => acc.id === accountId) if (!account || !account.proxy) { logger.debug('🌐 No proxy configured for Claude account') return null } const proxyAgent = ProxyHelper.createProxyAgent(account.proxy) if (proxyAgent) { logger.info( `🌐 Using proxy for Claude request: ${ProxyHelper.getProxyDescription(account.proxy)}` ) } return proxyAgent } catch (error) { logger.warn('⚠️ Failed to create proxy agent:', error) return null } } // 🔧 过滤客户端请求头 _filterClientHeaders(clientHeaders) { // 需要移除的敏感 headers const sensitiveHeaders = [ 'content-type', 'user-agent', 'x-api-key', 'authorization', 'host', 'content-length', 'connection', 'proxy-authorization', 'content-encoding', 'transfer-encoding' ] // 🆕 需要移除的浏览器相关 headers(避免CORS问题) const browserHeaders = [ 'origin', 'referer', 'sec-fetch-mode', 'sec-fetch-site', 'sec-fetch-dest', 'sec-ch-ua', 'sec-ch-ua-mobile', 'sec-ch-ua-platform', 'accept-language', 'accept-encoding', 'accept', 'cache-control', 'pragma', 'anthropic-dangerous-direct-browser-access' // 这个头可能触发CORS检查 ] // 应该保留的 headers(用于会话一致性和追踪) const allowedHeaders = [ 'x-request-id', 'anthropic-version', // 保留API版本 'anthropic-beta' // 保留beta功能 ] const filteredHeaders = {} // 转发客户端的非敏感 headers Object.keys(clientHeaders || {}).forEach((key) => { const lowerKey = key.toLowerCase() // 如果在允许列表中,直接保留 if (allowedHeaders.includes(lowerKey)) { filteredHeaders[key] = clientHeaders[key] } // 如果不在敏感列表和浏览器列表中,也保留 else if (!sensitiveHeaders.includes(lowerKey) && !browserHeaders.includes(lowerKey)) { filteredHeaders[key] = clientHeaders[key] } }) return filteredHeaders } // 🔗 发送请求到Claude API async _makeClaudeRequest( body, accessToken, proxyAgent, clientHeaders, accountId, onRequest, requestOptions = {} ) { const url = new URL(this.claudeApiUrl) // 获取账户信息用于统一 User-Agent const account = await claudeAccountService.getAccount(accountId) // 获取统一的 User-Agent const unifiedUA = await this.captureAndGetUnifiedUserAgent(clientHeaders, account) // 获取过滤后的客户端 headers const filteredHeaders = this._filterClientHeaders(clientHeaders) // 判断是否是真实的 Claude Code 请求 const isRealClaudeCode = this.isRealClaudeCodeRequest(body) // 如果不是真实的 Claude Code 请求,需要使用从账户获取的 Claude Code headers const finalHeaders = { ...filteredHeaders } if (!isRealClaudeCode) { // 获取该账号存储的 Claude Code headers const claudeCodeHeaders = await claudeCodeHeadersService.getAccountHeaders(accountId) // 只添加客户端没有提供的 headers Object.keys(claudeCodeHeaders).forEach((key) => { const lowerKey = key.toLowerCase() if (!finalHeaders[key] && !finalHeaders[lowerKey]) { finalHeaders[key] = claudeCodeHeaders[key] } }) } return new Promise((resolve, reject) => { // 支持自定义路径(如 count_tokens) let requestPath = url.pathname if (requestOptions.customPath) { const baseUrl = new URL('https://api.anthropic.com') const customUrl = new URL(requestOptions.customPath, baseUrl) requestPath = customUrl.pathname } const options = { hostname: url.hostname, port: url.port || 443, path: requestPath, method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${accessToken}`, 'anthropic-version': this.apiVersion, ...finalHeaders }, agent: proxyAgent, timeout: config.requestTimeout || 600000 } // 使用统一 User-Agent 或客户端提供的,最后使用默认值 if (!options.headers['user-agent'] || unifiedUA !== null) { const userAgent = unifiedUA || 'claude-cli/1.0.119 (external, cli)' options.headers['user-agent'] = userAgent } logger.info(`🔗 指纹是这个: ${options.headers['user-agent']}`) // 使用自定义的 betaHeader 或默认值 const betaHeader = requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader if (betaHeader) { options.headers['anthropic-beta'] = betaHeader } const req = https.request(options, (res) => { let responseData = Buffer.alloc(0) res.on('data', (chunk) => { responseData = Buffer.concat([responseData, chunk]) }) res.on('end', () => { try { let bodyString = '' // 根据Content-Encoding处理响应数据 const contentEncoding = res.headers['content-encoding'] if (contentEncoding === 'gzip') { try { bodyString = zlib.gunzipSync(responseData).toString('utf8') } catch (unzipError) { logger.error('❌ Failed to decompress gzip response:', unzipError) bodyString = responseData.toString('utf8') } } else if (contentEncoding === 'deflate') { try { bodyString = zlib.inflateSync(responseData).toString('utf8') } catch (unzipError) { logger.error('❌ Failed to decompress deflate response:', unzipError) bodyString = responseData.toString('utf8') } } else { bodyString = responseData.toString('utf8') } const response = { statusCode: res.statusCode, headers: res.headers, body: bodyString } logger.debug(`🔗 Claude API response: ${res.statusCode}`) resolve(response) } catch (error) { logger.error(`❌ Failed to parse Claude API response (Account: ${accountId}):`, error) reject(error) } }) }) // 如果提供了 onRequest 回调,传递请求对象 if (onRequest && typeof onRequest === 'function') { onRequest(req) } req.on('error', async (error) => { console.error(': ❌ ', error) logger.error(`❌ Claude API request error (Account: ${accountId}):`, error.message, { code: error.code, errno: error.errno, syscall: error.syscall, address: error.address, port: error.port }) // 根据错误类型提供更具体的错误信息 let errorMessage = 'Upstream request failed' if (error.code === 'ECONNRESET') { errorMessage = 'Connection reset by Claude API server' } else if (error.code === 'ENOTFOUND') { errorMessage = 'Unable to resolve Claude API hostname' } else if (error.code === 'ECONNREFUSED') { errorMessage = 'Connection refused by Claude API server' } else if (error.code === 'ETIMEDOUT') { errorMessage = 'Connection timed out to Claude API server' await this._handleServerError(accountId, 504, null, 'Network') } reject(new Error(errorMessage)) }) req.on('timeout', async () => { req.destroy() logger.error(`❌ Claude API request timeout (Account: ${accountId})`) await this._handleServerError(accountId, 504, null, 'Request') reject(new Error('Request timeout')) }) // 写入请求体 req.write(JSON.stringify(body)) req.end() }) } // 🌊 处理流式响应(带usage数据捕获) async relayStreamRequestWithUsageCapture( requestBody, apiKeyData, responseStream, clientHeaders, usageCallback, streamTransformer = null, options = {} ) { try { // 调试日志:查看API Key数据(流式请求) logger.info('🔍 [Stream] API Key data received:', { apiKeyName: apiKeyData.name, enableModelRestriction: apiKeyData.enableModelRestriction, restrictedModels: apiKeyData.restrictedModels, requestedModel: requestBody.model }) const isOpusModelRequest = typeof requestBody?.model === 'string' && requestBody.model.toLowerCase().includes('opus') // 生成会话哈希用于sticky会话 const sessionHash = sessionHelper.generateSessionHash(requestBody) // 选择可用的Claude账户(支持专属绑定和sticky会话) let accountSelection try { accountSelection = await unifiedClaudeScheduler.selectAccountForApiKey( apiKeyData, sessionHash, requestBody.model ) } catch (error) { if (error.code === 'CLAUDE_DEDICATED_RATE_LIMITED') { const limitMessage = this._buildStandardRateLimitMessage(error.rateLimitEndAt) if (!responseStream.headersSent) { responseStream.status(403) responseStream.setHeader('Content-Type', 'application/json') } responseStream.write( JSON.stringify({ error: 'upstream_rate_limited', message: limitMessage }) ) responseStream.end() return } throw error } const { accountId } = accountSelection const { accountType } = accountSelection logger.info( `📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` ) // 获取账户信息 let account = await claudeAccountService.getAccount(accountId) if (isOpusModelRequest) { await claudeAccountService.clearExpiredOpusRateLimit(accountId) account = await claudeAccountService.getAccount(accountId) } const isDedicatedOfficialAccount = accountType === 'claude-official' && apiKeyData.claudeAccountId && !apiKeyData.claudeAccountId.startsWith('group:') && apiKeyData.claudeAccountId === accountId let opusRateLimitActive = false if (isOpusModelRequest) { opusRateLimitActive = await claudeAccountService.isAccountOpusRateLimited(accountId) } if (isOpusModelRequest && isDedicatedOfficialAccount && opusRateLimitActive) { const limitMessage = this._buildOpusLimitMessage(account?.opusRateLimitEndAt) if (!responseStream.headersSent) { responseStream.status(403) responseStream.setHeader('Content-Type', 'application/json') } responseStream.write( JSON.stringify({ error: 'opus_weekly_limit', message: limitMessage }) ) responseStream.end() return } // 获取有效的访问token const accessToken = await claudeAccountService.getValidAccessToken(accountId) const processedBody = this._processRequestBody(requestBody, account) // 获取代理配置 const proxyAgent = await this._getProxyAgent(accountId) // 发送流式请求并捕获usage数据 await this._makeClaudeStreamRequestWithUsageCapture( processedBody, accessToken, proxyAgent, clientHeaders, responseStream, (usageData) => { // 在usageCallback中添加accountId usageCallback({ ...usageData, accountId }) }, accountId, accountType, sessionHash, streamTransformer, options, isDedicatedOfficialAccount ) } catch (error) { logger.error(`❌ Claude stream relay with usage capture failed:`, error) throw error } } // 🌊 发送流式请求到Claude API(带usage数据捕获) async _makeClaudeStreamRequestWithUsageCapture( body, accessToken, proxyAgent, clientHeaders, responseStream, usageCallback, accountId, accountType, sessionHash, streamTransformer = null, requestOptions = {}, isDedicatedOfficialAccount = false ) { // 获取账户信息用于统一 User-Agent const account = await claudeAccountService.getAccount(accountId) const isOpusModelRequest = typeof body?.model === 'string' && body.model.toLowerCase().includes('opus') // 获取统一的 User-Agent const unifiedUA = await this.captureAndGetUnifiedUserAgent(clientHeaders, account) // 获取过滤后的客户端 headers const filteredHeaders = this._filterClientHeaders(clientHeaders) // 判断是否是真实的 Claude Code 请求 const isRealClaudeCode = this.isRealClaudeCodeRequest(body) // 如果不是真实的 Claude Code 请求,需要使用从账户获取的 Claude Code headers const finalHeaders = { ...filteredHeaders } if (!isRealClaudeCode) { // 获取该账号存储的 Claude Code headers const claudeCodeHeaders = await claudeCodeHeadersService.getAccountHeaders(accountId) // 只添加客户端没有提供的 headers Object.keys(claudeCodeHeaders).forEach((key) => { const lowerKey = key.toLowerCase() if (!finalHeaders[key] && !finalHeaders[lowerKey]) { finalHeaders[key] = claudeCodeHeaders[key] } }) } return new Promise((resolve, reject) => { const url = new URL(this.claudeApiUrl) const options = { hostname: url.hostname, port: url.port || 443, path: url.pathname, method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${accessToken}`, 'anthropic-version': this.apiVersion, ...finalHeaders }, agent: proxyAgent, timeout: config.requestTimeout || 600000 } // 使用统一 User-Agent 或客户端提供的,最后使用默认值 if (!options.headers['user-agent'] || unifiedUA !== null) { const userAgent = unifiedUA || 'claude-cli/1.0.119 (external, cli)' options.headers['user-agent'] = userAgent } logger.info(`🔗 指纹是这个: ${options.headers['user-agent']}`) // 使用自定义的 betaHeader 或默认值 const betaHeader = requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader if (betaHeader) { options.headers['anthropic-beta'] = betaHeader } const req = https.request(options, async (res) => { logger.debug(`🌊 Claude stream response status: ${res.statusCode}`) // 错误响应处理 if (res.statusCode !== 200) { if (res.statusCode === 429) { const resetHeader = res.headers ? res.headers['anthropic-ratelimit-unified-reset'] : null const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN if (isOpusModelRequest) { if (!Number.isNaN(parsedResetTimestamp)) { await claudeAccountService.markAccountOpusRateLimited( accountId, parsedResetTimestamp ) logger.warn( `🚫 [Stream] Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` ) } if (isDedicatedOfficialAccount) { const limitMessage = this._buildOpusLimitMessage(parsedResetTimestamp) if (!responseStream.headersSent) { responseStream.status(403) responseStream.setHeader('Content-Type', 'application/json') } responseStream.write( JSON.stringify({ error: 'opus_weekly_limit', message: limitMessage }) ) responseStream.end() res.resume() resolve() return } } else { const rateLimitResetTimestamp = Number.isNaN(parsedResetTimestamp) ? null : parsedResetTimestamp await unifiedClaudeScheduler.markAccountRateLimited( accountId, accountType, sessionHash, rateLimitResetTimestamp ) logger.warn(`🚫 [Stream] Rate limit detected for account ${accountId}, status 429`) if (isDedicatedOfficialAccount) { const limitMessage = this._buildStandardRateLimitMessage( rateLimitResetTimestamp || account?.rateLimitEndAt ) if (!responseStream.headersSent) { responseStream.status(403) responseStream.setHeader('Content-Type', 'application/json') } responseStream.write( JSON.stringify({ error: 'upstream_rate_limited', message: limitMessage }) ) responseStream.end() res.resume() resolve() return } } } // 将错误处理逻辑封装在一个异步函数中 const handleErrorResponse = async () => { if (res.statusCode === 401) { logger.warn(`🔐 [Stream] Unauthorized error (401) detected for account ${accountId}`) await this.recordUnauthorizedError(accountId) const errorCount = await this.getUnauthorizedErrorCount(accountId) logger.info( `🔐 [Stream] Account ${accountId} has ${errorCount} consecutive 401 errors in the last 5 minutes` ) if (errorCount >= 1) { logger.error( `❌ [Stream] Account ${accountId} encountered 401 error (${errorCount} errors), marking as unauthorized` ) await unifiedClaudeScheduler.markAccountUnauthorized( accountId, accountType, sessionHash ) } } else if (res.statusCode === 403) { logger.error( `🚫 [Stream] Forbidden error (403) detected for account ${accountId}, marking as blocked` ) await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) } else if (res.statusCode === 529) { logger.warn(`🚫 [Stream] Overload error (529) detected for account ${accountId}`) // 检查是否启用了529错误处理 if (config.claude.overloadHandling.enabled > 0) { try { await claudeAccountService.markAccountOverloaded(accountId) logger.info( `🚫 [Stream] Account ${accountId} marked as overloaded for ${config.claude.overloadHandling.enabled} minutes` ) } catch (overloadError) { logger.error( `❌ [Stream] Failed to mark account as overloaded: ${accountId}`, overloadError ) } } else { logger.info( `🚫 [Stream] 529 error handling is disabled, skipping account overload marking` ) } } else if (res.statusCode >= 500 && res.statusCode < 600) { logger.warn( `🔥 [Stream] Server error (${res.statusCode}) detected for account ${accountId}` ) await this._handleServerError(accountId, res.statusCode, sessionHash, '[Stream]') } } // 调用异步错误处理函数 handleErrorResponse().catch((err) => { logger.error('❌ Error in stream error handler:', err) }) logger.error( `❌ Claude API returned error status: ${res.statusCode} | Account: ${account?.name || accountId}` ) let errorData = '' res.on('data', (chunk) => { errorData += chunk.toString() }) res.on('end', () => { console.error(': ❌ ', errorData) logger.error( `❌ Claude API error response (Account: ${account?.name || accountId}):`, errorData ) if (this._isOrganizationDisabledError(res.statusCode, errorData)) { ;(async () => { try { logger.error( `🚫 [Stream] Organization disabled error (400) detected for account ${accountId}, marking as blocked` ) await unifiedClaudeScheduler.markAccountBlocked( accountId, accountType, sessionHash ) } catch (markError) { logger.error( `❌ [Stream] Failed to mark account ${accountId} as blocked after organization disabled error:`, markError ) } })() } if (!responseStream.destroyed) { // 发送错误事件 responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Claude API error', status: res.statusCode, details: errorData, timestamp: new Date().toISOString() })}\n\n` ) responseStream.end() } reject(new Error(`Claude API error: ${res.statusCode}`)) }) return } let buffer = '' const allUsageData = [] // 收集所有的usage事件 let currentUsageData = {} // 当前正在收集的usage数据 let rateLimitDetected = false // 限流检测标志 // 监听数据块,解析SSE并寻找usage信息 res.on('data', (chunk) => { try { const chunkStr = chunk.toString() buffer += chunkStr // 处理完整的SSE行 const lines = buffer.split('\n') buffer = lines.pop() || '' // 保留最后的不完整行 // 转发已处理的完整行到客户端 if (lines.length > 0 && !responseStream.destroyed) { const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') // 如果有流转换器,应用转换 if (streamTransformer) { const transformed = streamTransformer(linesToForward) if (transformed) { responseStream.write(transformed) } } else { responseStream.write(linesToForward) } } for (const line of lines) { // 解析SSE数据寻找usage信息 if (line.startsWith('data:')) { const jsonStr = line.slice(5).trimStart() if (!jsonStr || jsonStr === '[DONE]') { continue } try { const data = JSON.parse(jsonStr) // 收集来自不同事件的usage数据 if (data.type === 'message_start' && data.message && data.message.usage) { // 新的消息开始,如果之前有数据,先保存 if ( currentUsageData.input_tokens !== undefined && currentUsageData.output_tokens !== undefined ) { allUsageData.push({ ...currentUsageData }) currentUsageData = {} } // message_start包含input tokens、cache tokens和模型信息 currentUsageData.input_tokens = data.message.usage.input_tokens || 0 currentUsageData.cache_creation_input_tokens = data.message.usage.cache_creation_input_tokens || 0 currentUsageData.cache_read_input_tokens = data.message.usage.cache_read_input_tokens || 0 currentUsageData.model = data.message.model // 检查是否有详细的 cache_creation 对象 if ( data.message.usage.cache_creation && typeof data.message.usage.cache_creation === 'object' ) { currentUsageData.cache_creation = { ephemeral_5m_input_tokens: data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0, ephemeral_1h_input_tokens: data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0 } logger.debug( '📊 Collected detailed cache creation data:', JSON.stringify(currentUsageData.cache_creation) ) } logger.debug( '📊 Collected input/cache data from message_start:', JSON.stringify(currentUsageData) ) } // message_delta包含最终的output tokens if ( data.type === 'message_delta' && data.usage && data.usage.output_tokens !== undefined ) { currentUsageData.output_tokens = data.usage.output_tokens || 0 logger.debug( '📊 Collected output data from message_delta:', JSON.stringify(currentUsageData) ) // 如果已经收集到了input数据和output数据,这是一个完整的usage if (currentUsageData.input_tokens !== undefined) { logger.debug( '🎯 Complete usage data collected for model:', currentUsageData.model, '- Input:', currentUsageData.input_tokens, 'Output:', currentUsageData.output_tokens ) // 保存到列表中,但不立即触发回调 allUsageData.push({ ...currentUsageData }) // 重置当前数据,准备接收下一个 currentUsageData = {} } } // 检查是否有限流错误 if ( data.type === 'error' && data.error && data.error.message && data.error.message.toLowerCase().includes("exceed your account's rate limit") ) { rateLimitDetected = true logger.warn(`🚫 Rate limit detected in stream for account ${accountId}`) } } catch (parseError) { // 忽略JSON解析错误,继续处理 logger.debug('🔍 SSE line not JSON or no usage data:', line.slice(0, 100)) } } } } catch (error) { logger.error('❌ Error processing stream data:', error) // 发送错误但不破坏流,让它自然结束 if (!responseStream.destroyed) { responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Stream processing error', message: error.message, timestamp: new Date().toISOString() })}\n\n` ) } } }) res.on('end', async () => { try { // 处理缓冲区中剩余的数据 if (buffer.trim() && !responseStream.destroyed) { if (streamTransformer) { const transformed = streamTransformer(buffer) if (transformed) { responseStream.write(transformed) } } else { responseStream.write(buffer) } } // 确保流正确结束 if (!responseStream.destroyed) { responseStream.end() } } catch (error) { logger.error('❌ Error processing stream end:', error) } // 如果还有未完成的usage数据,尝试保存 if (currentUsageData.input_tokens !== undefined) { if (currentUsageData.output_tokens === undefined) { currentUsageData.output_tokens = 0 // 如果没有output,设为0 } allUsageData.push(currentUsageData) } // 检查是否捕获到usage数据 if (allUsageData.length === 0) { logger.warn( '⚠️ Stream completed but no usage data was captured! This indicates a problem with SSE parsing or Claude API response format.' ) } else { // 打印此次请求的所有usage数据汇总 const totalUsage = allUsageData.reduce( (acc, usage) => ({ input_tokens: (acc.input_tokens || 0) + (usage.input_tokens || 0), output_tokens: (acc.output_tokens || 0) + (usage.output_tokens || 0), cache_creation_input_tokens: (acc.cache_creation_input_tokens || 0) + (usage.cache_creation_input_tokens || 0), cache_read_input_tokens: (acc.cache_read_input_tokens || 0) + (usage.cache_read_input_tokens || 0), models: [...(acc.models || []), usage.model].filter(Boolean) }), {} ) // 打印原始的usage数据为JSON字符串,避免嵌套问题 logger.info( `📊 === Stream Request Usage Summary === Model: ${body.model}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}` ) // 一般一个请求只会使用一个模型,即使有多个usage事件也应该合并 // 计算总的usage const finalUsage = { input_tokens: totalUsage.input_tokens, output_tokens: totalUsage.output_tokens, cache_creation_input_tokens: totalUsage.cache_creation_input_tokens, cache_read_input_tokens: totalUsage.cache_read_input_tokens, model: allUsageData[allUsageData.length - 1].model || body.model // 使用最后一个模型或请求模型 } // 如果有详细的cache_creation数据,合并它们 let totalEphemeral5m = 0 let totalEphemeral1h = 0 allUsageData.forEach((usage) => { if (usage.cache_creation && typeof usage.cache_creation === 'object') { totalEphemeral5m += usage.cache_creation.ephemeral_5m_input_tokens || 0 totalEphemeral1h += usage.cache_creation.ephemeral_1h_input_tokens || 0 } }) // 如果有详细的缓存数据,添加到finalUsage if (totalEphemeral5m > 0 || totalEphemeral1h > 0) { finalUsage.cache_creation = { ephemeral_5m_input_tokens: totalEphemeral5m, ephemeral_1h_input_tokens: totalEphemeral1h } logger.info( '📊 Detailed cache creation breakdown:', JSON.stringify(finalUsage.cache_creation) ) } // 调用一次usageCallback记录合并后的数据 usageCallback(finalUsage) } // 提取5小时会话窗口状态 // 使用大小写不敏感的方式获取响应头 const get5hStatus = (headers) => { if (!headers) { return null } // HTTP头部名称不区分大小写,需要处理不同情况 return ( headers['anthropic-ratelimit-unified-5h-status'] || headers['Anthropic-Ratelimit-Unified-5h-Status'] || headers['ANTHROPIC-RATELIMIT-UNIFIED-5H-STATUS'] ) } const sessionWindowStatus = get5hStatus(res.headers) if (sessionWindowStatus) { logger.info(`📊 Session window status for account ${accountId}: ${sessionWindowStatus}`) // 保存会话窗口状态到账户数据 await claudeAccountService.updateSessionWindowStatus(accountId, sessionWindowStatus) } // 处理限流状态 if (rateLimitDetected || res.statusCode === 429) { const resetHeader = res.headers ? res.headers['anthropic-ratelimit-unified-reset'] : null const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN if (isOpusModelRequest && !Number.isNaN(parsedResetTimestamp)) { await claudeAccountService.markAccountOpusRateLimited(accountId, parsedResetTimestamp) logger.warn( `🚫 [Stream] Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` ) } else { const rateLimitResetTimestamp = Number.isNaN(parsedResetTimestamp) ? null : parsedResetTimestamp if (!Number.isNaN(parsedResetTimestamp)) { logger.info( `🕐 Extracted rate limit reset timestamp from stream: ${parsedResetTimestamp} (${new Date(parsedResetTimestamp * 1000).toISOString()})` ) } await unifiedClaudeScheduler.markAccountRateLimited( accountId, accountType, sessionHash, rateLimitResetTimestamp ) } } else if (res.statusCode === 200) { // 请求成功,清除401和500错误计数 await this.clearUnauthorizedErrors(accountId) await claudeAccountService.clearInternalErrors(accountId) // 如果请求成功,检查并移除限流状态 const isRateLimited = await unifiedClaudeScheduler.isAccountRateLimited( accountId, accountType ) if (isRateLimited) { await unifiedClaudeScheduler.removeAccountRateLimit(accountId, accountType) } // 如果流式请求成功,检查并移除过载状态 try { const isOverloaded = await claudeAccountService.isAccountOverloaded(accountId) if (isOverloaded) { await claudeAccountService.removeAccountOverload(accountId) } } catch (overloadError) { logger.error( `❌ [Stream] Failed to check/remove overload status for account ${accountId}:`, overloadError ) } // 只有真实的 Claude Code 请求才更新 headers(流式请求) if ( clientHeaders && Object.keys(clientHeaders).length > 0 && this.isRealClaudeCodeRequest(body) ) { await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders) } } logger.debug('🌊 Claude stream response with usage capture completed') resolve() }) }) req.on('error', async (error) => { logger.error( `❌ Claude stream request error (Account: ${account?.name || accountId}):`, error.message, { code: error.code, errno: error.errno, syscall: error.syscall } ) // 根据错误类型提供更具体的错误信息 let errorMessage = 'Upstream request failed' let statusCode = 500 if (error.code === 'ECONNRESET') { errorMessage = 'Connection reset by Claude API server' statusCode = 502 } else if (error.code === 'ENOTFOUND') { errorMessage = 'Unable to resolve Claude API hostname' statusCode = 502 } else if (error.code === 'ECONNREFUSED') { errorMessage = 'Connection refused by Claude API server' statusCode = 502 } else if (error.code === 'ETIMEDOUT') { errorMessage = 'Connection timed out to Claude API server' statusCode = 504 } if (!responseStream.headersSent) { responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive' }) } if (!responseStream.destroyed) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: errorMessage, code: error.code, timestamp: new Date().toISOString() })}\n\n` ) responseStream.end() } reject(error) }) req.on('timeout', async () => { req.destroy() logger.error(`❌ Claude stream request timeout | Account: ${account?.name || accountId}`) if (!responseStream.headersSent) { responseStream.writeHead(504, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive' }) } if (!responseStream.destroyed) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Request timeout', code: 'TIMEOUT', timestamp: new Date().toISOString() })}\n\n` ) responseStream.end() } reject(new Error('Request timeout')) }) // 处理客户端断开连接 responseStream.on('close', () => { logger.debug('🔌 Client disconnected, cleaning up stream') if (!req.destroyed) { req.destroy() } }) // 写入请求体 req.write(JSON.stringify(body)) req.end() }) } // 🌊 发送流式请求到Claude API async _makeClaudeStreamRequest( body, accessToken, proxyAgent, clientHeaders, responseStream, requestOptions = {} ) { return new Promise((resolve, reject) => { const url = new URL(this.claudeApiUrl) // 获取过滤后的客户端 headers const filteredHeaders = this._filterClientHeaders(clientHeaders) const options = { hostname: url.hostname, port: url.port || 443, path: url.pathname, method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${accessToken}`, 'anthropic-version': this.apiVersion, ...filteredHeaders }, agent: proxyAgent, timeout: config.requestTimeout || 600000 } // 如果客户端没有提供 User-Agent,使用默认值 if (!filteredHeaders['User-Agent'] && !filteredHeaders['user-agent']) { // 第三个方法不支持统一 User-Agent,使用简化逻辑 const userAgent = clientHeaders?.['user-agent'] || clientHeaders?.['User-Agent'] || 'claude-cli/1.0.102 (external, cli)' options.headers['User-Agent'] = userAgent } // 使用自定义的 betaHeader 或默认值 const betaHeader = requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader if (betaHeader) { options.headers['anthropic-beta'] = betaHeader } const req = https.request(options, (res) => { // 设置响应头 responseStream.statusCode = res.statusCode Object.keys(res.headers).forEach((key) => { responseStream.setHeader(key, res.headers[key]) }) // 管道响应数据 res.pipe(responseStream) res.on('end', () => { logger.debug('🌊 Claude stream response completed') resolve() }) }) req.on('error', async (error) => { logger.error(`❌ Claude stream request error:`, error.message, { code: error.code, errno: error.errno, syscall: error.syscall }) // 根据错误类型提供更具体的错误信息 let errorMessage = 'Upstream request failed' let statusCode = 500 if (error.code === 'ECONNRESET') { errorMessage = 'Connection reset by Claude API server' statusCode = 502 } else if (error.code === 'ENOTFOUND') { errorMessage = 'Unable to resolve Claude API hostname' statusCode = 502 } else if (error.code === 'ECONNREFUSED') { errorMessage = 'Connection refused by Claude API server' statusCode = 502 } else if (error.code === 'ETIMEDOUT') { errorMessage = 'Connection timed out to Claude API server' statusCode = 504 } if (!responseStream.headersSent) { responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive' }) } if (!responseStream.destroyed) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: errorMessage, code: error.code, timestamp: new Date().toISOString() })}\n\n` ) responseStream.end() } reject(error) }) req.on('timeout', async () => { req.destroy() logger.error(`❌ Claude stream request timeout`) if (!responseStream.headersSent) { responseStream.writeHead(504, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive' }) } if (!responseStream.destroyed) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ error: 'Request timeout', code: 'TIMEOUT', timestamp: new Date().toISOString() })}\n\n` ) responseStream.end() } reject(new Error('Request timeout')) }) // 处理客户端断开连接 responseStream.on('close', () => { logger.debug('🔌 Client disconnected, cleaning up stream') if (!req.destroyed) { req.destroy() } }) // 写入请求体 req.write(JSON.stringify(body)) req.end() }) } // 🛠️ 统一的错误处理方法 async _handleServerError(accountId, statusCode, _sessionHash = null, context = '') { try { await claudeAccountService.recordServerError(accountId, statusCode) const errorCount = await claudeAccountService.getServerErrorCount(accountId) // 根据错误类型设置不同的阈值和日志前缀 const isTimeout = statusCode === 504 const threshold = 3 // 统一使用3次阈值 const prefix = context ? `${context} ` : '' logger.warn( `⏱️ ${prefix}${isTimeout ? 'Timeout' : 'Server'} error for account ${accountId}, error count: ${errorCount}/${threshold}` ) if (errorCount > threshold) { const errorTypeLabel = isTimeout ? 'timeout' : '5xx' // ⚠️ 只记录5xx/504告警,不再自动停止调度,避免上游抖动导致误停 logger.error( `❌ ${prefix}Account ${accountId} exceeded ${errorTypeLabel} error threshold (${errorCount} errors), please investigate upstream stability` ) } } catch (handlingError) { logger.error(`❌ Failed to handle ${context} server error:`, handlingError) } } // 🔄 重试逻辑 async _retryRequest(requestFunc, maxRetries = 3) { let lastError for (let i = 0; i < maxRetries; i++) { try { return await requestFunc() } catch (error) { lastError = error if (i < maxRetries - 1) { const delay = Math.pow(2, i) * 1000 // 指数退避 logger.warn(`⏳ Retry ${i + 1}/${maxRetries} in ${delay}ms: ${error.message}`) await new Promise((resolve) => setTimeout(resolve, delay)) } } } throw lastError } // 🔐 记录401未授权错误 async recordUnauthorizedError(accountId) { try { const key = `claude_account:${accountId}:401_errors` // 增加错误计数,设置5分钟过期时间 await redis.client.incr(key) await redis.client.expire(key, 300) // 5分钟 logger.info(`📝 Recorded 401 error for account ${accountId}`) } catch (error) { logger.error(`❌ Failed to record 401 error for account ${accountId}:`, error) } } // 🔍 获取401错误计数 async getUnauthorizedErrorCount(accountId) { try { const key = `claude_account:${accountId}:401_errors` const count = await redis.client.get(key) return parseInt(count) || 0 } catch (error) { logger.error(`❌ Failed to get 401 error count for account ${accountId}:`, error) return 0 } } // 🧹 清除401错误计数 async clearUnauthorizedErrors(accountId) { try { const key = `claude_account:${accountId}:401_errors` await redis.client.del(key) logger.info(`✅ Cleared 401 error count for account ${accountId}`) } catch (error) { logger.error(`❌ Failed to clear 401 errors for account ${accountId}:`, error) } } // 🔧 动态捕获并获取统一的 User-Agent async captureAndGetUnifiedUserAgent(clientHeaders, account) { if (account.useUnifiedUserAgent !== 'true') { return null } const CACHE_KEY = 'claude_code_user_agent:daily' const TTL = 90000 // 25小时 // ⚠️ 重要:这里通过正则表达式判断是否为 Claude Code 客户端 // 如果未来 Claude Code 的 User-Agent 格式发生变化,需要更新这个正则表达式 // 当前已知格式:claude-cli/1.0.102 (external, cli) const CLAUDE_CODE_UA_PATTERN = /^claude-cli\/[\d.]+\s+\(/i const clientUA = clientHeaders?.['user-agent'] || clientHeaders?.['User-Agent'] let cachedUA = await redis.client.get(CACHE_KEY) if (clientUA && CLAUDE_CODE_UA_PATTERN.test(clientUA)) { if (!cachedUA) { // 没有缓存,直接存储 await redis.client.setex(CACHE_KEY, TTL, clientUA) logger.info(`📱 Captured unified Claude Code User-Agent: ${clientUA}`) cachedUA = clientUA } else { // 有缓存,比较版本号,保存更新的版本 const shouldUpdate = this.compareClaudeCodeVersions(clientUA, cachedUA) if (shouldUpdate) { await redis.client.setex(CACHE_KEY, TTL, clientUA) logger.info(`🔄 Updated to newer Claude Code User-Agent: ${clientUA} (was: ${cachedUA})`) cachedUA = clientUA } else { // 当前版本不比缓存版本新,仅刷新TTL await redis.client.expire(CACHE_KEY, TTL) } } } return cachedUA // 没有缓存返回 null } // 🔄 比较Claude Code版本号,判断是否需要更新 // 返回 true 表示 newUA 版本更新,需要更新缓存 compareClaudeCodeVersions(newUA, cachedUA) { try { // 提取版本号:claude-cli/1.0.102 (external, cli) -> 1.0.102 // 支持多段版本号格式,如 1.0.102、2.1.0.beta1 等 const newVersionMatch = newUA.match(/claude-cli\/([\d.]+(?:[a-zA-Z0-9-]*)?)/i) const cachedVersionMatch = cachedUA.match(/claude-cli\/([\d.]+(?:[a-zA-Z0-9-]*)?)/i) if (!newVersionMatch || !cachedVersionMatch) { // 无法解析版本号,优先使用新的 logger.warn(`⚠️ Unable to parse Claude Code versions: new=${newUA}, cached=${cachedUA}`) return true } const newVersion = newVersionMatch[1] const cachedVersion = cachedVersionMatch[1] // 比较版本号 (semantic version) const compareResult = this.compareSemanticVersions(newVersion, cachedVersion) logger.debug(`🔍 Version comparison: ${newVersion} vs ${cachedVersion} = ${compareResult}`) return compareResult > 0 // 新版本更大则返回 true } catch (error) { logger.warn(`⚠️ Error comparing Claude Code versions, defaulting to update: ${error.message}`) return true // 出错时优先使用新的 } } // 🔢 比较版本号 // 返回:1 表示 v1 > v2,-1 表示 v1 < v2,0 表示相等 compareSemanticVersions(version1, version2) { // 将版本号字符串按"."分割成数字数组 const arr1 = version1.split('.') const arr2 = version2.split('.') // 获取两个版本号数组中的最大长度 const maxLength = Math.max(arr1.length, arr2.length) // 循环遍历,逐段比较版本号 for (let i = 0; i < maxLength; i++) { // 如果某个版本号的某一段不存在,则视为0 const num1 = parseInt(arr1[i] || 0, 10) const num2 = parseInt(arr2[i] || 0, 10) if (num1 > num2) { return 1 // version1 大于 version2 } if (num1 < num2) { return -1 // version1 小于 version2 } } return 0 // 两个版本号相等 } // 🎯 健康检查 async healthCheck() { try { const accounts = await claudeAccountService.getAllAccounts() const activeAccounts = accounts.filter((acc) => acc.isActive && acc.status === 'active') return { healthy: activeAccounts.length > 0, activeAccounts: activeAccounts.length, totalAccounts: accounts.length, timestamp: new Date().toISOString() } } catch (error) { logger.error('❌ Health check failed:', error) return { healthy: false, error: error.message, timestamp: new Date().toISOString() } } } } module.exports = new ClaudeRelayService()