|
|
const https = require('https') |
|
|
const zlib = require('zlib') |
|
|
const fs = require('fs') |
|
|
const path = require('path') |
|
|
const ProxyHelper = require('../utils/proxyHelper') |
|
|
const claudeAccountService = require('./claudeAccountService') |
|
|
const unifiedClaudeScheduler = require('./unifiedClaudeScheduler') |
|
|
const sessionHelper = require('../utils/sessionHelper') |
|
|
const logger = require('../utils/logger') |
|
|
const config = require('../../config/config') |
|
|
const claudeCodeHeadersService = require('./claudeCodeHeadersService') |
|
|
const redis = require('../models/redis') |
|
|
const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') |
|
|
const { formatDateWithTimezone } = require('../utils/dateHelper') |
|
|
|
|
|
class ClaudeRelayService { |
|
|
constructor() { |
|
|
this.claudeApiUrl = config.claude.apiUrl |
|
|
this.apiVersion = config.claude.apiVersion |
|
|
this.betaHeader = config.claude.betaHeader |
|
|
this.systemPrompt = config.claude.systemPrompt |
|
|
this.claudeCodeSystemPrompt = "You are Claude Code, Anthropic's official CLI for Claude." |
|
|
} |
|
|
|
|
|
_buildStandardRateLimitMessage(resetTime) { |
|
|
if (!resetTime) { |
|
|
return '此专属账号已触发 Anthropic 限流控制。' |
|
|
} |
|
|
const formattedReset = formatDateWithTimezone(resetTime) |
|
|
return `此专属账号已触发 Anthropic 限流控制,将于 ${formattedReset} 自动恢复。` |
|
|
} |
|
|
|
|
|
_buildOpusLimitMessage(resetTime) { |
|
|
if (!resetTime) { |
|
|
return '此专属账号的Opus模型已达到周使用限制,请尝试切换其他模型后再试。' |
|
|
} |
|
|
const formattedReset = formatDateWithTimezone(resetTime) |
|
|
return `此专属账号的Opus模型已达到周使用限制,将于 ${formattedReset} 自动恢复,请尝试切换其他模型后再试。` |
|
|
} |
|
|
|
|
|
|
|
|
_extractErrorMessage(body) { |
|
|
if (!body) { |
|
|
return '' |
|
|
} |
|
|
|
|
|
if (typeof body === 'string') { |
|
|
const trimmed = body.trim() |
|
|
if (!trimmed) { |
|
|
return '' |
|
|
} |
|
|
try { |
|
|
const parsed = JSON.parse(trimmed) |
|
|
return this._extractErrorMessage(parsed) |
|
|
} catch (error) { |
|
|
return trimmed |
|
|
} |
|
|
} |
|
|
|
|
|
if (typeof body === 'object') { |
|
|
if (typeof body.error === 'string') { |
|
|
return body.error |
|
|
} |
|
|
if (body.error && typeof body.error === 'object') { |
|
|
if (typeof body.error.message === 'string') { |
|
|
return body.error.message |
|
|
} |
|
|
if (typeof body.error.error === 'string') { |
|
|
return body.error.error |
|
|
} |
|
|
} |
|
|
if (typeof body.message === 'string') { |
|
|
return body.message |
|
|
} |
|
|
} |
|
|
|
|
|
return '' |
|
|
} |
|
|
|
|
|
|
|
|
_isOrganizationDisabledError(statusCode, body) { |
|
|
if (statusCode !== 400) { |
|
|
return false |
|
|
} |
|
|
const message = this._extractErrorMessage(body) |
|
|
if (!message) { |
|
|
return false |
|
|
} |
|
|
return message.toLowerCase().includes('this organization has been disabled') |
|
|
} |
|
|
|
|
|
|
|
|
isRealClaudeCodeRequest(requestBody) { |
|
|
return ClaudeCodeValidator.includesClaudeCodeSystemPrompt(requestBody, 1) |
|
|
} |
|
|
|
|
|
|
|
|
async relayRequest( |
|
|
requestBody, |
|
|
apiKeyData, |
|
|
clientRequest, |
|
|
clientResponse, |
|
|
clientHeaders, |
|
|
options = {} |
|
|
) { |
|
|
let upstreamRequest = null |
|
|
|
|
|
try { |
|
|
|
|
|
logger.info('🔍 API Key data received:', { |
|
|
apiKeyName: apiKeyData.name, |
|
|
enableModelRestriction: apiKeyData.enableModelRestriction, |
|
|
restrictedModels: apiKeyData.restrictedModels, |
|
|
requestedModel: requestBody.model |
|
|
}) |
|
|
|
|
|
const isOpusModelRequest = |
|
|
typeof requestBody?.model === 'string' && requestBody.model.toLowerCase().includes('opus') |
|
|
|
|
|
|
|
|
const sessionHash = sessionHelper.generateSessionHash(requestBody) |
|
|
|
|
|
|
|
|
let accountSelection |
|
|
try { |
|
|
accountSelection = await unifiedClaudeScheduler.selectAccountForApiKey( |
|
|
apiKeyData, |
|
|
sessionHash, |
|
|
requestBody.model |
|
|
) |
|
|
} catch (error) { |
|
|
if (error.code === 'CLAUDE_DEDICATED_RATE_LIMITED') { |
|
|
const limitMessage = this._buildStandardRateLimitMessage(error.rateLimitEndAt) |
|
|
logger.warn( |
|
|
`🚫 Dedicated account ${error.accountId} is rate limited for API key ${apiKeyData.name}, returning 403` |
|
|
) |
|
|
return { |
|
|
statusCode: 403, |
|
|
headers: { 'Content-Type': 'application/json' }, |
|
|
body: JSON.stringify({ |
|
|
error: 'upstream_rate_limited', |
|
|
message: limitMessage |
|
|
}), |
|
|
accountId: error.accountId |
|
|
} |
|
|
} |
|
|
throw error |
|
|
} |
|
|
const { accountId } = accountSelection |
|
|
const { accountType } = accountSelection |
|
|
|
|
|
logger.info( |
|
|
`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` |
|
|
) |
|
|
|
|
|
|
|
|
let account = await claudeAccountService.getAccount(accountId) |
|
|
|
|
|
if (isOpusModelRequest) { |
|
|
await claudeAccountService.clearExpiredOpusRateLimit(accountId) |
|
|
account = await claudeAccountService.getAccount(accountId) |
|
|
} |
|
|
|
|
|
const isDedicatedOfficialAccount = |
|
|
accountType === 'claude-official' && |
|
|
apiKeyData.claudeAccountId && |
|
|
!apiKeyData.claudeAccountId.startsWith('group:') && |
|
|
apiKeyData.claudeAccountId === accountId |
|
|
|
|
|
let opusRateLimitActive = false |
|
|
let opusRateLimitEndAt = null |
|
|
if (isOpusModelRequest) { |
|
|
opusRateLimitActive = await claudeAccountService.isAccountOpusRateLimited(accountId) |
|
|
opusRateLimitEndAt = account?.opusRateLimitEndAt || null |
|
|
} |
|
|
|
|
|
if (isOpusModelRequest && isDedicatedOfficialAccount && opusRateLimitActive) { |
|
|
const limitMessage = this._buildOpusLimitMessage(opusRateLimitEndAt) |
|
|
logger.warn( |
|
|
`🚫 Dedicated account ${account?.name || accountId} is under Opus weekly limit until ${opusRateLimitEndAt}` |
|
|
) |
|
|
return { |
|
|
statusCode: 403, |
|
|
headers: { 'Content-Type': 'application/json' }, |
|
|
body: JSON.stringify({ |
|
|
error: 'opus_weekly_limit', |
|
|
message: limitMessage |
|
|
}), |
|
|
accountId |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const accessToken = await claudeAccountService.getValidAccessToken(accountId) |
|
|
|
|
|
const processedBody = this._processRequestBody(requestBody, account) |
|
|
|
|
|
|
|
|
const proxyAgent = await this._getProxyAgent(accountId) |
|
|
|
|
|
|
|
|
const handleClientDisconnect = () => { |
|
|
logger.info('🔌 Client disconnected, aborting upstream request') |
|
|
if (upstreamRequest && !upstreamRequest.destroyed) { |
|
|
upstreamRequest.destroy() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (clientRequest) { |
|
|
clientRequest.once('close', handleClientDisconnect) |
|
|
} |
|
|
if (clientResponse) { |
|
|
clientResponse.once('close', handleClientDisconnect) |
|
|
} |
|
|
|
|
|
|
|
|
const response = await this._makeClaudeRequest( |
|
|
processedBody, |
|
|
accessToken, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
accountId, |
|
|
(req) => { |
|
|
upstreamRequest = req |
|
|
}, |
|
|
options |
|
|
) |
|
|
|
|
|
response.accountId = accountId |
|
|
response.accountType = accountType |
|
|
|
|
|
|
|
|
if (clientRequest) { |
|
|
clientRequest.removeListener('close', handleClientDisconnect) |
|
|
} |
|
|
if (clientResponse) { |
|
|
clientResponse.removeListener('close', handleClientDisconnect) |
|
|
} |
|
|
|
|
|
|
|
|
if (response.statusCode !== 200 && response.statusCode !== 201) { |
|
|
let isRateLimited = false |
|
|
let rateLimitResetTimestamp = null |
|
|
let dedicatedRateLimitMessage = null |
|
|
const organizationDisabledError = this._isOrganizationDisabledError( |
|
|
response.statusCode, |
|
|
response.body |
|
|
) |
|
|
|
|
|
|
|
|
if (response.statusCode === 401) { |
|
|
logger.warn(`🔐 Unauthorized error (401) detected for account ${accountId}`) |
|
|
|
|
|
|
|
|
await this.recordUnauthorizedError(accountId) |
|
|
|
|
|
|
|
|
const errorCount = await this.getUnauthorizedErrorCount(accountId) |
|
|
logger.info( |
|
|
`🔐 Account ${accountId} has ${errorCount} consecutive 401 errors in the last 5 minutes` |
|
|
) |
|
|
|
|
|
if (errorCount >= 1) { |
|
|
logger.error( |
|
|
`❌ Account ${accountId} encountered 401 error (${errorCount} errors), marking as unauthorized` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountUnauthorized( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
else if (response.statusCode === 403) { |
|
|
logger.error( |
|
|
`🚫 Forbidden error (403) detected for account ${accountId}, marking as blocked` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) |
|
|
} |
|
|
|
|
|
else if (organizationDisabledError) { |
|
|
logger.error( |
|
|
`🚫 Organization disabled error (400) detected for account ${accountId}, marking as blocked` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) |
|
|
} |
|
|
|
|
|
else if (response.statusCode === 529) { |
|
|
logger.warn(`🚫 Overload error (529) detected for account ${accountId}`) |
|
|
|
|
|
|
|
|
if (config.claude.overloadHandling.enabled > 0) { |
|
|
try { |
|
|
await claudeAccountService.markAccountOverloaded(accountId) |
|
|
logger.info( |
|
|
`🚫 Account ${accountId} marked as overloaded for ${config.claude.overloadHandling.enabled} minutes` |
|
|
) |
|
|
} catch (overloadError) { |
|
|
logger.error(`❌ Failed to mark account as overloaded: ${accountId}`, overloadError) |
|
|
} |
|
|
} else { |
|
|
logger.info(`🚫 529 error handling is disabled, skipping account overload marking`) |
|
|
} |
|
|
} |
|
|
|
|
|
else if (response.statusCode >= 500 && response.statusCode < 600) { |
|
|
logger.warn(`🔥 Server error (${response.statusCode}) detected for account ${accountId}`) |
|
|
await this._handleServerError(accountId, response.statusCode, sessionHash) |
|
|
} |
|
|
|
|
|
else if (response.statusCode === 429) { |
|
|
const resetHeader = response.headers |
|
|
? response.headers['anthropic-ratelimit-unified-reset'] |
|
|
: null |
|
|
const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN |
|
|
|
|
|
if (isOpusModelRequest && !Number.isNaN(parsedResetTimestamp)) { |
|
|
await claudeAccountService.markAccountOpusRateLimited(accountId, parsedResetTimestamp) |
|
|
logger.warn( |
|
|
`🚫 Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` |
|
|
) |
|
|
|
|
|
if (isDedicatedOfficialAccount) { |
|
|
const limitMessage = this._buildOpusLimitMessage(parsedResetTimestamp) |
|
|
return { |
|
|
statusCode: 403, |
|
|
headers: { 'Content-Type': 'application/json' }, |
|
|
body: JSON.stringify({ |
|
|
error: 'opus_weekly_limit', |
|
|
message: limitMessage |
|
|
}), |
|
|
accountId |
|
|
} |
|
|
} |
|
|
} else { |
|
|
isRateLimited = true |
|
|
if (!Number.isNaN(parsedResetTimestamp)) { |
|
|
rateLimitResetTimestamp = parsedResetTimestamp |
|
|
logger.info( |
|
|
`🕐 Extracted rate limit reset timestamp: ${rateLimitResetTimestamp} (${new Date(rateLimitResetTimestamp * 1000).toISOString()})` |
|
|
) |
|
|
} |
|
|
if (isDedicatedOfficialAccount) { |
|
|
dedicatedRateLimitMessage = this._buildStandardRateLimitMessage( |
|
|
rateLimitResetTimestamp || account?.rateLimitEndAt |
|
|
) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
|
|
|
try { |
|
|
const responseBody = |
|
|
typeof response.body === 'string' ? JSON.parse(response.body) : response.body |
|
|
if ( |
|
|
responseBody && |
|
|
responseBody.error && |
|
|
responseBody.error.message && |
|
|
responseBody.error.message.toLowerCase().includes("exceed your account's rate limit") |
|
|
) { |
|
|
isRateLimited = true |
|
|
} |
|
|
} catch (e) { |
|
|
|
|
|
if ( |
|
|
response.body && |
|
|
response.body.toLowerCase().includes("exceed your account's rate limit") |
|
|
) { |
|
|
isRateLimited = true |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
if (isRateLimited) { |
|
|
if (isDedicatedOfficialAccount && !dedicatedRateLimitMessage) { |
|
|
dedicatedRateLimitMessage = this._buildStandardRateLimitMessage( |
|
|
rateLimitResetTimestamp || account?.rateLimitEndAt |
|
|
) |
|
|
} |
|
|
logger.warn( |
|
|
`🚫 Rate limit detected for account ${accountId}, status: ${response.statusCode}` |
|
|
) |
|
|
|
|
|
await unifiedClaudeScheduler.markAccountRateLimited( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash, |
|
|
rateLimitResetTimestamp |
|
|
) |
|
|
|
|
|
if (dedicatedRateLimitMessage) { |
|
|
return { |
|
|
statusCode: 403, |
|
|
headers: { 'Content-Type': 'application/json' }, |
|
|
body: JSON.stringify({ |
|
|
error: 'upstream_rate_limited', |
|
|
message: dedicatedRateLimitMessage |
|
|
}), |
|
|
accountId |
|
|
} |
|
|
} |
|
|
} |
|
|
} else if (response.statusCode === 200 || response.statusCode === 201) { |
|
|
|
|
|
|
|
|
const get5hStatus = (headers) => { |
|
|
if (!headers) { |
|
|
return null |
|
|
} |
|
|
|
|
|
return ( |
|
|
headers['anthropic-ratelimit-unified-5h-status'] || |
|
|
headers['Anthropic-Ratelimit-Unified-5h-Status'] || |
|
|
headers['ANTHROPIC-RATELIMIT-UNIFIED-5H-STATUS'] |
|
|
) |
|
|
} |
|
|
|
|
|
const sessionWindowStatus = get5hStatus(response.headers) |
|
|
if (sessionWindowStatus) { |
|
|
logger.info(`📊 Session window status for account ${accountId}: ${sessionWindowStatus}`) |
|
|
|
|
|
await claudeAccountService.updateSessionWindowStatus(accountId, sessionWindowStatus) |
|
|
} |
|
|
|
|
|
|
|
|
await this.clearUnauthorizedErrors(accountId) |
|
|
await claudeAccountService.clearInternalErrors(accountId) |
|
|
|
|
|
const isRateLimited = await unifiedClaudeScheduler.isAccountRateLimited( |
|
|
accountId, |
|
|
accountType |
|
|
) |
|
|
if (isRateLimited) { |
|
|
await unifiedClaudeScheduler.removeAccountRateLimit(accountId, accountType) |
|
|
} |
|
|
|
|
|
|
|
|
try { |
|
|
const isOverloaded = await claudeAccountService.isAccountOverloaded(accountId) |
|
|
if (isOverloaded) { |
|
|
await claudeAccountService.removeAccountOverload(accountId) |
|
|
} |
|
|
} catch (overloadError) { |
|
|
logger.error( |
|
|
`❌ Failed to check/remove overload status for account ${accountId}:`, |
|
|
overloadError |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
clientHeaders && |
|
|
Object.keys(clientHeaders).length > 0 && |
|
|
this.isRealClaudeCodeRequest(requestBody) |
|
|
) { |
|
|
await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
let responseBody = null |
|
|
try { |
|
|
responseBody = typeof response.body === 'string' ? JSON.parse(response.body) : response.body |
|
|
} catch (e) { |
|
|
logger.debug('Failed to parse response body for usage logging') |
|
|
} |
|
|
|
|
|
if (responseBody && responseBody.usage) { |
|
|
const { usage } = responseBody |
|
|
|
|
|
logger.info( |
|
|
`📊 === Non-Stream Request Usage Summary === Model: ${requestBody.model}, Usage: ${JSON.stringify(usage)}` |
|
|
) |
|
|
} else { |
|
|
|
|
|
const inputTokens = requestBody.messages |
|
|
? requestBody.messages.reduce((sum, msg) => sum + (msg.content?.length || 0), 0) / 4 |
|
|
: 0 |
|
|
const outputTokens = response.content |
|
|
? response.content.reduce((sum, content) => sum + (content.text?.length || 0), 0) / 4 |
|
|
: 0 |
|
|
|
|
|
logger.info( |
|
|
`✅ API request completed - Key: ${apiKeyData.name}, Account: ${accountId}, Model: ${requestBody.model}, Input: ~${Math.round(inputTokens)} tokens (estimated), Output: ~${Math.round(outputTokens)} tokens (estimated)` |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
response.accountId = accountId |
|
|
return response |
|
|
} catch (error) { |
|
|
logger.error( |
|
|
`❌ Claude relay request failed for key: ${apiKeyData.name || apiKeyData.id}:`, |
|
|
error.message |
|
|
) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_processRequestBody(body, account = null) { |
|
|
if (!body) { |
|
|
return body |
|
|
} |
|
|
|
|
|
|
|
|
const processedBody = JSON.parse(JSON.stringify(body)) |
|
|
|
|
|
|
|
|
this._validateAndLimitMaxTokens(processedBody) |
|
|
|
|
|
|
|
|
this._stripTtlFromCacheControl(processedBody) |
|
|
|
|
|
|
|
|
const isRealClaudeCode = this.isRealClaudeCodeRequest(processedBody) |
|
|
|
|
|
|
|
|
if (!isRealClaudeCode) { |
|
|
const claudeCodePrompt = { |
|
|
type: 'text', |
|
|
text: this.claudeCodeSystemPrompt, |
|
|
cache_control: { |
|
|
type: 'ephemeral' |
|
|
} |
|
|
} |
|
|
|
|
|
if (processedBody.system) { |
|
|
if (typeof processedBody.system === 'string') { |
|
|
|
|
|
const userSystemPrompt = { |
|
|
type: 'text', |
|
|
text: processedBody.system |
|
|
} |
|
|
|
|
|
if (processedBody.system.trim() === this.claudeCodeSystemPrompt) { |
|
|
processedBody.system = [claudeCodePrompt] |
|
|
} else { |
|
|
processedBody.system = [claudeCodePrompt, userSystemPrompt] |
|
|
} |
|
|
} else if (Array.isArray(processedBody.system)) { |
|
|
|
|
|
const firstItem = processedBody.system[0] |
|
|
const isFirstItemClaudeCode = |
|
|
firstItem && firstItem.type === 'text' && firstItem.text === this.claudeCodeSystemPrompt |
|
|
|
|
|
if (!isFirstItemClaudeCode) { |
|
|
|
|
|
|
|
|
const filteredSystem = processedBody.system.filter( |
|
|
(item) => !(item && item.type === 'text' && item.text === this.claudeCodeSystemPrompt) |
|
|
) |
|
|
processedBody.system = [claudeCodePrompt, ...filteredSystem] |
|
|
} |
|
|
} else { |
|
|
|
|
|
logger.warn('⚠️ Unexpected system field type:', typeof processedBody.system) |
|
|
processedBody.system = [claudeCodePrompt] |
|
|
} |
|
|
} else { |
|
|
|
|
|
processedBody.system = [claudeCodePrompt] |
|
|
} |
|
|
} |
|
|
|
|
|
this._enforceCacheControlLimit(processedBody) |
|
|
|
|
|
|
|
|
if (this.systemPrompt && this.systemPrompt.trim()) { |
|
|
const systemPrompt = { |
|
|
type: 'text', |
|
|
text: this.systemPrompt |
|
|
} |
|
|
|
|
|
|
|
|
if (processedBody.system && Array.isArray(processedBody.system)) { |
|
|
|
|
|
const hasSystemPrompt = processedBody.system.some( |
|
|
(item) => item && item.text && item.text === this.systemPrompt |
|
|
) |
|
|
if (!hasSystemPrompt) { |
|
|
processedBody.system.push(systemPrompt) |
|
|
} |
|
|
} else { |
|
|
|
|
|
processedBody.system = [systemPrompt] |
|
|
} |
|
|
} else { |
|
|
|
|
|
if (processedBody.system && Array.isArray(processedBody.system)) { |
|
|
const hasValidContent = processedBody.system.some( |
|
|
(item) => item && item.text && item.text.trim() |
|
|
) |
|
|
if (!hasValidContent) { |
|
|
delete processedBody.system |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (processedBody.top_p !== undefined && processedBody.top_p !== null) { |
|
|
delete processedBody.top_p |
|
|
} |
|
|
|
|
|
|
|
|
if (account && account.useUnifiedClientId === 'true' && account.unifiedClientId) { |
|
|
this._replaceClientId(processedBody, account.unifiedClientId) |
|
|
} |
|
|
|
|
|
return processedBody |
|
|
} |
|
|
|
|
|
|
|
|
_replaceClientId(body, unifiedClientId) { |
|
|
if (!body || !body.metadata || !body.metadata.user_id || !unifiedClientId) { |
|
|
return |
|
|
} |
|
|
|
|
|
const userId = body.metadata.user_id |
|
|
|
|
|
|
|
|
const match = userId.match(/^user_[a-f0-9]{64}(_account__session_[a-f0-9-]{36})$/) |
|
|
if (match && match[1]) { |
|
|
|
|
|
body.metadata.user_id = `user_${unifiedClientId}${match[1]}` |
|
|
logger.info(`🔄 Replaced client ID with unified ID: ${body.metadata.user_id}`) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_validateAndLimitMaxTokens(body) { |
|
|
if (!body || !body.max_tokens) { |
|
|
return |
|
|
} |
|
|
|
|
|
try { |
|
|
|
|
|
const pricingFilePath = path.join(__dirname, '../../data/model_pricing.json') |
|
|
|
|
|
if (!fs.existsSync(pricingFilePath)) { |
|
|
logger.warn('⚠️ Model pricing file not found, skipping max_tokens validation') |
|
|
return |
|
|
} |
|
|
|
|
|
const pricingData = JSON.parse(fs.readFileSync(pricingFilePath, 'utf8')) |
|
|
const model = body.model || 'claude-sonnet-4-20250514' |
|
|
|
|
|
|
|
|
const modelConfig = pricingData[model] |
|
|
|
|
|
if (!modelConfig) { |
|
|
|
|
|
logger.info( |
|
|
`📝 Model ${model} not found in pricing file, passing through client parameters without modification` |
|
|
) |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
const maxLimit = modelConfig.max_tokens || modelConfig.max_output_tokens |
|
|
|
|
|
if (!maxLimit) { |
|
|
logger.debug(`🔍 No max_tokens limit found for model ${model}, skipping validation`) |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
if (body.max_tokens > maxLimit) { |
|
|
logger.warn( |
|
|
`⚠️ max_tokens ${body.max_tokens} exceeds limit ${maxLimit} for model ${model}, adjusting to ${maxLimit}` |
|
|
) |
|
|
body.max_tokens = maxLimit |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to validate max_tokens from pricing file:', error) |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_stripTtlFromCacheControl(body) { |
|
|
if (!body || typeof body !== 'object') { |
|
|
return |
|
|
} |
|
|
|
|
|
const processContentArray = (contentArray) => { |
|
|
if (!Array.isArray(contentArray)) { |
|
|
return |
|
|
} |
|
|
|
|
|
contentArray.forEach((item) => { |
|
|
if (item && typeof item === 'object' && item.cache_control) { |
|
|
if (item.cache_control.ttl) { |
|
|
delete item.cache_control.ttl |
|
|
logger.debug('🧹 Removed ttl from cache_control') |
|
|
} |
|
|
} |
|
|
}) |
|
|
} |
|
|
|
|
|
if (Array.isArray(body.system)) { |
|
|
processContentArray(body.system) |
|
|
} |
|
|
|
|
|
if (Array.isArray(body.messages)) { |
|
|
body.messages.forEach((message) => { |
|
|
if (message && Array.isArray(message.content)) { |
|
|
processContentArray(message.content) |
|
|
} |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_enforceCacheControlLimit(body) { |
|
|
const MAX_CACHE_CONTROL_BLOCKS = 4 |
|
|
|
|
|
if (!body || typeof body !== 'object') { |
|
|
return |
|
|
} |
|
|
|
|
|
const countCacheControlBlocks = () => { |
|
|
let total = 0 |
|
|
|
|
|
if (Array.isArray(body.messages)) { |
|
|
body.messages.forEach((message) => { |
|
|
if (!message || !Array.isArray(message.content)) { |
|
|
return |
|
|
} |
|
|
message.content.forEach((item) => { |
|
|
if (item && item.cache_control) { |
|
|
total += 1 |
|
|
} |
|
|
}) |
|
|
}) |
|
|
} |
|
|
|
|
|
if (Array.isArray(body.system)) { |
|
|
body.system.forEach((item) => { |
|
|
if (item && item.cache_control) { |
|
|
total += 1 |
|
|
} |
|
|
}) |
|
|
} |
|
|
|
|
|
return total |
|
|
} |
|
|
|
|
|
const removeFromMessages = () => { |
|
|
if (!Array.isArray(body.messages)) { |
|
|
return false |
|
|
} |
|
|
|
|
|
for (let messageIndex = 0; messageIndex < body.messages.length; messageIndex += 1) { |
|
|
const message = body.messages[messageIndex] |
|
|
if (!message || !Array.isArray(message.content)) { |
|
|
continue |
|
|
} |
|
|
|
|
|
for (let contentIndex = 0; contentIndex < message.content.length; contentIndex += 1) { |
|
|
const contentItem = message.content[contentIndex] |
|
|
if (contentItem && contentItem.cache_control) { |
|
|
message.content.splice(contentIndex, 1) |
|
|
|
|
|
if (message.content.length === 0) { |
|
|
body.messages.splice(messageIndex, 1) |
|
|
} |
|
|
|
|
|
return true |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return false |
|
|
} |
|
|
|
|
|
const removeFromSystem = () => { |
|
|
if (!Array.isArray(body.system)) { |
|
|
return false |
|
|
} |
|
|
|
|
|
for (let index = 0; index < body.system.length; index += 1) { |
|
|
const systemItem = body.system[index] |
|
|
if (systemItem && systemItem.cache_control) { |
|
|
body.system.splice(index, 1) |
|
|
|
|
|
if (body.system.length === 0) { |
|
|
delete body.system |
|
|
} |
|
|
|
|
|
return true |
|
|
} |
|
|
} |
|
|
|
|
|
return false |
|
|
} |
|
|
|
|
|
let total = countCacheControlBlocks() |
|
|
|
|
|
while (total > MAX_CACHE_CONTROL_BLOCKS) { |
|
|
if (removeFromMessages()) { |
|
|
total -= 1 |
|
|
continue |
|
|
} |
|
|
|
|
|
if (removeFromSystem()) { |
|
|
total -= 1 |
|
|
continue |
|
|
} |
|
|
|
|
|
break |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async _getProxyAgent(accountId) { |
|
|
try { |
|
|
const accountData = await claudeAccountService.getAllAccounts() |
|
|
const account = accountData.find((acc) => acc.id === accountId) |
|
|
|
|
|
if (!account || !account.proxy) { |
|
|
logger.debug('🌐 No proxy configured for Claude account') |
|
|
return null |
|
|
} |
|
|
|
|
|
const proxyAgent = ProxyHelper.createProxyAgent(account.proxy) |
|
|
if (proxyAgent) { |
|
|
logger.info( |
|
|
`🌐 Using proxy for Claude request: ${ProxyHelper.getProxyDescription(account.proxy)}` |
|
|
) |
|
|
} |
|
|
return proxyAgent |
|
|
} catch (error) { |
|
|
logger.warn('⚠️ Failed to create proxy agent:', error) |
|
|
return null |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_filterClientHeaders(clientHeaders) { |
|
|
|
|
|
const sensitiveHeaders = [ |
|
|
'content-type', |
|
|
'user-agent', |
|
|
'x-api-key', |
|
|
'authorization', |
|
|
'host', |
|
|
'content-length', |
|
|
'connection', |
|
|
'proxy-authorization', |
|
|
'content-encoding', |
|
|
'transfer-encoding' |
|
|
] |
|
|
|
|
|
|
|
|
const browserHeaders = [ |
|
|
'origin', |
|
|
'referer', |
|
|
'sec-fetch-mode', |
|
|
'sec-fetch-site', |
|
|
'sec-fetch-dest', |
|
|
'sec-ch-ua', |
|
|
'sec-ch-ua-mobile', |
|
|
'sec-ch-ua-platform', |
|
|
'accept-language', |
|
|
'accept-encoding', |
|
|
'accept', |
|
|
'cache-control', |
|
|
'pragma', |
|
|
'anthropic-dangerous-direct-browser-access' |
|
|
] |
|
|
|
|
|
|
|
|
const allowedHeaders = [ |
|
|
'x-request-id', |
|
|
'anthropic-version', |
|
|
'anthropic-beta' |
|
|
] |
|
|
|
|
|
const filteredHeaders = {} |
|
|
|
|
|
|
|
|
Object.keys(clientHeaders || {}).forEach((key) => { |
|
|
const lowerKey = key.toLowerCase() |
|
|
|
|
|
if (allowedHeaders.includes(lowerKey)) { |
|
|
filteredHeaders[key] = clientHeaders[key] |
|
|
} |
|
|
|
|
|
else if (!sensitiveHeaders.includes(lowerKey) && !browserHeaders.includes(lowerKey)) { |
|
|
filteredHeaders[key] = clientHeaders[key] |
|
|
} |
|
|
}) |
|
|
|
|
|
return filteredHeaders |
|
|
} |
|
|
|
|
|
|
|
|
async _makeClaudeRequest( |
|
|
body, |
|
|
accessToken, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
accountId, |
|
|
onRequest, |
|
|
requestOptions = {} |
|
|
) { |
|
|
const url = new URL(this.claudeApiUrl) |
|
|
|
|
|
|
|
|
const account = await claudeAccountService.getAccount(accountId) |
|
|
|
|
|
|
|
|
const unifiedUA = await this.captureAndGetUnifiedUserAgent(clientHeaders, account) |
|
|
|
|
|
|
|
|
const filteredHeaders = this._filterClientHeaders(clientHeaders) |
|
|
|
|
|
|
|
|
const isRealClaudeCode = this.isRealClaudeCodeRequest(body) |
|
|
|
|
|
|
|
|
const finalHeaders = { ...filteredHeaders } |
|
|
|
|
|
if (!isRealClaudeCode) { |
|
|
|
|
|
const claudeCodeHeaders = await claudeCodeHeadersService.getAccountHeaders(accountId) |
|
|
|
|
|
|
|
|
Object.keys(claudeCodeHeaders).forEach((key) => { |
|
|
const lowerKey = key.toLowerCase() |
|
|
if (!finalHeaders[key] && !finalHeaders[lowerKey]) { |
|
|
finalHeaders[key] = claudeCodeHeaders[key] |
|
|
} |
|
|
}) |
|
|
} |
|
|
|
|
|
return new Promise((resolve, reject) => { |
|
|
|
|
|
let requestPath = url.pathname |
|
|
if (requestOptions.customPath) { |
|
|
const baseUrl = new URL('https://api.anthropic.com') |
|
|
const customUrl = new URL(requestOptions.customPath, baseUrl) |
|
|
requestPath = customUrl.pathname |
|
|
} |
|
|
|
|
|
const options = { |
|
|
hostname: url.hostname, |
|
|
port: url.port || 443, |
|
|
path: requestPath, |
|
|
method: 'POST', |
|
|
headers: { |
|
|
'Content-Type': 'application/json', |
|
|
Authorization: `Bearer ${accessToken}`, |
|
|
'anthropic-version': this.apiVersion, |
|
|
...finalHeaders |
|
|
}, |
|
|
agent: proxyAgent, |
|
|
timeout: config.requestTimeout || 600000 |
|
|
} |
|
|
|
|
|
|
|
|
if (!options.headers['user-agent'] || unifiedUA !== null) { |
|
|
const userAgent = unifiedUA || 'claude-cli/1.0.119 (external, cli)' |
|
|
options.headers['user-agent'] = userAgent |
|
|
} |
|
|
|
|
|
logger.info(`🔗 指纹是这个: ${options.headers['user-agent']}`) |
|
|
|
|
|
|
|
|
const betaHeader = |
|
|
requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader |
|
|
if (betaHeader) { |
|
|
options.headers['anthropic-beta'] = betaHeader |
|
|
} |
|
|
|
|
|
const req = https.request(options, (res) => { |
|
|
let responseData = Buffer.alloc(0) |
|
|
|
|
|
res.on('data', (chunk) => { |
|
|
responseData = Buffer.concat([responseData, chunk]) |
|
|
}) |
|
|
|
|
|
res.on('end', () => { |
|
|
try { |
|
|
let bodyString = '' |
|
|
|
|
|
|
|
|
const contentEncoding = res.headers['content-encoding'] |
|
|
if (contentEncoding === 'gzip') { |
|
|
try { |
|
|
bodyString = zlib.gunzipSync(responseData).toString('utf8') |
|
|
} catch (unzipError) { |
|
|
logger.error('❌ Failed to decompress gzip response:', unzipError) |
|
|
bodyString = responseData.toString('utf8') |
|
|
} |
|
|
} else if (contentEncoding === 'deflate') { |
|
|
try { |
|
|
bodyString = zlib.inflateSync(responseData).toString('utf8') |
|
|
} catch (unzipError) { |
|
|
logger.error('❌ Failed to decompress deflate response:', unzipError) |
|
|
bodyString = responseData.toString('utf8') |
|
|
} |
|
|
} else { |
|
|
bodyString = responseData.toString('utf8') |
|
|
} |
|
|
|
|
|
const response = { |
|
|
statusCode: res.statusCode, |
|
|
headers: res.headers, |
|
|
body: bodyString |
|
|
} |
|
|
|
|
|
logger.debug(`🔗 Claude API response: ${res.statusCode}`) |
|
|
|
|
|
resolve(response) |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to parse Claude API response (Account: ${accountId}):`, error) |
|
|
reject(error) |
|
|
} |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
if (onRequest && typeof onRequest === 'function') { |
|
|
onRequest(req) |
|
|
} |
|
|
|
|
|
req.on('error', async (error) => { |
|
|
console.error(': ❌ ', error) |
|
|
logger.error(`❌ Claude API request error (Account: ${accountId}):`, error.message, { |
|
|
code: error.code, |
|
|
errno: error.errno, |
|
|
syscall: error.syscall, |
|
|
address: error.address, |
|
|
port: error.port |
|
|
}) |
|
|
|
|
|
|
|
|
let errorMessage = 'Upstream request failed' |
|
|
if (error.code === 'ECONNRESET') { |
|
|
errorMessage = 'Connection reset by Claude API server' |
|
|
} else if (error.code === 'ENOTFOUND') { |
|
|
errorMessage = 'Unable to resolve Claude API hostname' |
|
|
} else if (error.code === 'ECONNREFUSED') { |
|
|
errorMessage = 'Connection refused by Claude API server' |
|
|
} else if (error.code === 'ETIMEDOUT') { |
|
|
errorMessage = 'Connection timed out to Claude API server' |
|
|
|
|
|
await this._handleServerError(accountId, 504, null, 'Network') |
|
|
} |
|
|
|
|
|
reject(new Error(errorMessage)) |
|
|
}) |
|
|
|
|
|
req.on('timeout', async () => { |
|
|
req.destroy() |
|
|
logger.error(`❌ Claude API request timeout (Account: ${accountId})`) |
|
|
|
|
|
await this._handleServerError(accountId, 504, null, 'Request') |
|
|
|
|
|
reject(new Error('Request timeout')) |
|
|
}) |
|
|
|
|
|
|
|
|
req.write(JSON.stringify(body)) |
|
|
req.end() |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
async relayStreamRequestWithUsageCapture( |
|
|
requestBody, |
|
|
apiKeyData, |
|
|
responseStream, |
|
|
clientHeaders, |
|
|
usageCallback, |
|
|
streamTransformer = null, |
|
|
options = {} |
|
|
) { |
|
|
try { |
|
|
|
|
|
logger.info('🔍 [Stream] API Key data received:', { |
|
|
apiKeyName: apiKeyData.name, |
|
|
enableModelRestriction: apiKeyData.enableModelRestriction, |
|
|
restrictedModels: apiKeyData.restrictedModels, |
|
|
requestedModel: requestBody.model |
|
|
}) |
|
|
|
|
|
const isOpusModelRequest = |
|
|
typeof requestBody?.model === 'string' && requestBody.model.toLowerCase().includes('opus') |
|
|
|
|
|
|
|
|
const sessionHash = sessionHelper.generateSessionHash(requestBody) |
|
|
|
|
|
|
|
|
let accountSelection |
|
|
try { |
|
|
accountSelection = await unifiedClaudeScheduler.selectAccountForApiKey( |
|
|
apiKeyData, |
|
|
sessionHash, |
|
|
requestBody.model |
|
|
) |
|
|
} catch (error) { |
|
|
if (error.code === 'CLAUDE_DEDICATED_RATE_LIMITED') { |
|
|
const limitMessage = this._buildStandardRateLimitMessage(error.rateLimitEndAt) |
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.status(403) |
|
|
responseStream.setHeader('Content-Type', 'application/json') |
|
|
} |
|
|
responseStream.write( |
|
|
JSON.stringify({ |
|
|
error: 'upstream_rate_limited', |
|
|
message: limitMessage |
|
|
}) |
|
|
) |
|
|
responseStream.end() |
|
|
return |
|
|
} |
|
|
throw error |
|
|
} |
|
|
const { accountId } = accountSelection |
|
|
const { accountType } = accountSelection |
|
|
|
|
|
logger.info( |
|
|
`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` |
|
|
) |
|
|
|
|
|
|
|
|
let account = await claudeAccountService.getAccount(accountId) |
|
|
|
|
|
if (isOpusModelRequest) { |
|
|
await claudeAccountService.clearExpiredOpusRateLimit(accountId) |
|
|
account = await claudeAccountService.getAccount(accountId) |
|
|
} |
|
|
|
|
|
const isDedicatedOfficialAccount = |
|
|
accountType === 'claude-official' && |
|
|
apiKeyData.claudeAccountId && |
|
|
!apiKeyData.claudeAccountId.startsWith('group:') && |
|
|
apiKeyData.claudeAccountId === accountId |
|
|
|
|
|
let opusRateLimitActive = false |
|
|
if (isOpusModelRequest) { |
|
|
opusRateLimitActive = await claudeAccountService.isAccountOpusRateLimited(accountId) |
|
|
} |
|
|
|
|
|
if (isOpusModelRequest && isDedicatedOfficialAccount && opusRateLimitActive) { |
|
|
const limitMessage = this._buildOpusLimitMessage(account?.opusRateLimitEndAt) |
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.status(403) |
|
|
responseStream.setHeader('Content-Type', 'application/json') |
|
|
} |
|
|
responseStream.write( |
|
|
JSON.stringify({ |
|
|
error: 'opus_weekly_limit', |
|
|
message: limitMessage |
|
|
}) |
|
|
) |
|
|
responseStream.end() |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
const accessToken = await claudeAccountService.getValidAccessToken(accountId) |
|
|
|
|
|
const processedBody = this._processRequestBody(requestBody, account) |
|
|
|
|
|
|
|
|
const proxyAgent = await this._getProxyAgent(accountId) |
|
|
|
|
|
|
|
|
await this._makeClaudeStreamRequestWithUsageCapture( |
|
|
processedBody, |
|
|
accessToken, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
responseStream, |
|
|
(usageData) => { |
|
|
|
|
|
usageCallback({ ...usageData, accountId }) |
|
|
}, |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash, |
|
|
streamTransformer, |
|
|
options, |
|
|
isDedicatedOfficialAccount |
|
|
) |
|
|
} catch (error) { |
|
|
logger.error(`❌ Claude stream relay with usage capture failed:`, error) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async _makeClaudeStreamRequestWithUsageCapture( |
|
|
body, |
|
|
accessToken, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
responseStream, |
|
|
usageCallback, |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash, |
|
|
streamTransformer = null, |
|
|
requestOptions = {}, |
|
|
isDedicatedOfficialAccount = false |
|
|
) { |
|
|
|
|
|
const account = await claudeAccountService.getAccount(accountId) |
|
|
|
|
|
const isOpusModelRequest = |
|
|
typeof body?.model === 'string' && body.model.toLowerCase().includes('opus') |
|
|
|
|
|
|
|
|
const unifiedUA = await this.captureAndGetUnifiedUserAgent(clientHeaders, account) |
|
|
|
|
|
|
|
|
const filteredHeaders = this._filterClientHeaders(clientHeaders) |
|
|
|
|
|
|
|
|
const isRealClaudeCode = this.isRealClaudeCodeRequest(body) |
|
|
|
|
|
|
|
|
const finalHeaders = { ...filteredHeaders } |
|
|
|
|
|
if (!isRealClaudeCode) { |
|
|
|
|
|
const claudeCodeHeaders = await claudeCodeHeadersService.getAccountHeaders(accountId) |
|
|
|
|
|
|
|
|
Object.keys(claudeCodeHeaders).forEach((key) => { |
|
|
const lowerKey = key.toLowerCase() |
|
|
if (!finalHeaders[key] && !finalHeaders[lowerKey]) { |
|
|
finalHeaders[key] = claudeCodeHeaders[key] |
|
|
} |
|
|
}) |
|
|
} |
|
|
|
|
|
return new Promise((resolve, reject) => { |
|
|
const url = new URL(this.claudeApiUrl) |
|
|
|
|
|
const options = { |
|
|
hostname: url.hostname, |
|
|
port: url.port || 443, |
|
|
path: url.pathname, |
|
|
method: 'POST', |
|
|
headers: { |
|
|
'Content-Type': 'application/json', |
|
|
Authorization: `Bearer ${accessToken}`, |
|
|
'anthropic-version': this.apiVersion, |
|
|
...finalHeaders |
|
|
}, |
|
|
agent: proxyAgent, |
|
|
timeout: config.requestTimeout || 600000 |
|
|
} |
|
|
|
|
|
|
|
|
if (!options.headers['user-agent'] || unifiedUA !== null) { |
|
|
const userAgent = unifiedUA || 'claude-cli/1.0.119 (external, cli)' |
|
|
options.headers['user-agent'] = userAgent |
|
|
} |
|
|
|
|
|
logger.info(`🔗 指纹是这个: ${options.headers['user-agent']}`) |
|
|
|
|
|
const betaHeader = |
|
|
requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader |
|
|
if (betaHeader) { |
|
|
options.headers['anthropic-beta'] = betaHeader |
|
|
} |
|
|
|
|
|
const req = https.request(options, async (res) => { |
|
|
logger.debug(`🌊 Claude stream response status: ${res.statusCode}`) |
|
|
|
|
|
|
|
|
if (res.statusCode !== 200) { |
|
|
if (res.statusCode === 429) { |
|
|
const resetHeader = res.headers |
|
|
? res.headers['anthropic-ratelimit-unified-reset'] |
|
|
: null |
|
|
const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN |
|
|
|
|
|
if (isOpusModelRequest) { |
|
|
if (!Number.isNaN(parsedResetTimestamp)) { |
|
|
await claudeAccountService.markAccountOpusRateLimited( |
|
|
accountId, |
|
|
parsedResetTimestamp |
|
|
) |
|
|
logger.warn( |
|
|
`🚫 [Stream] Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` |
|
|
) |
|
|
} |
|
|
|
|
|
if (isDedicatedOfficialAccount) { |
|
|
const limitMessage = this._buildOpusLimitMessage(parsedResetTimestamp) |
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.status(403) |
|
|
responseStream.setHeader('Content-Type', 'application/json') |
|
|
} |
|
|
responseStream.write( |
|
|
JSON.stringify({ |
|
|
error: 'opus_weekly_limit', |
|
|
message: limitMessage |
|
|
}) |
|
|
) |
|
|
responseStream.end() |
|
|
res.resume() |
|
|
resolve() |
|
|
return |
|
|
} |
|
|
} else { |
|
|
const rateLimitResetTimestamp = Number.isNaN(parsedResetTimestamp) |
|
|
? null |
|
|
: parsedResetTimestamp |
|
|
await unifiedClaudeScheduler.markAccountRateLimited( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash, |
|
|
rateLimitResetTimestamp |
|
|
) |
|
|
logger.warn(`🚫 [Stream] Rate limit detected for account ${accountId}, status 429`) |
|
|
|
|
|
if (isDedicatedOfficialAccount) { |
|
|
const limitMessage = this._buildStandardRateLimitMessage( |
|
|
rateLimitResetTimestamp || account?.rateLimitEndAt |
|
|
) |
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.status(403) |
|
|
responseStream.setHeader('Content-Type', 'application/json') |
|
|
} |
|
|
responseStream.write( |
|
|
JSON.stringify({ |
|
|
error: 'upstream_rate_limited', |
|
|
message: limitMessage |
|
|
}) |
|
|
) |
|
|
responseStream.end() |
|
|
res.resume() |
|
|
resolve() |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const handleErrorResponse = async () => { |
|
|
if (res.statusCode === 401) { |
|
|
logger.warn(`🔐 [Stream] Unauthorized error (401) detected for account ${accountId}`) |
|
|
|
|
|
await this.recordUnauthorizedError(accountId) |
|
|
|
|
|
const errorCount = await this.getUnauthorizedErrorCount(accountId) |
|
|
logger.info( |
|
|
`🔐 [Stream] Account ${accountId} has ${errorCount} consecutive 401 errors in the last 5 minutes` |
|
|
) |
|
|
|
|
|
if (errorCount >= 1) { |
|
|
logger.error( |
|
|
`❌ [Stream] Account ${accountId} encountered 401 error (${errorCount} errors), marking as unauthorized` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountUnauthorized( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash |
|
|
) |
|
|
} |
|
|
} else if (res.statusCode === 403) { |
|
|
logger.error( |
|
|
`🚫 [Stream] Forbidden error (403) detected for account ${accountId}, marking as blocked` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) |
|
|
} else if (res.statusCode === 529) { |
|
|
logger.warn(`🚫 [Stream] Overload error (529) detected for account ${accountId}`) |
|
|
|
|
|
|
|
|
if (config.claude.overloadHandling.enabled > 0) { |
|
|
try { |
|
|
await claudeAccountService.markAccountOverloaded(accountId) |
|
|
logger.info( |
|
|
`🚫 [Stream] Account ${accountId} marked as overloaded for ${config.claude.overloadHandling.enabled} minutes` |
|
|
) |
|
|
} catch (overloadError) { |
|
|
logger.error( |
|
|
`❌ [Stream] Failed to mark account as overloaded: ${accountId}`, |
|
|
overloadError |
|
|
) |
|
|
} |
|
|
} else { |
|
|
logger.info( |
|
|
`🚫 [Stream] 529 error handling is disabled, skipping account overload marking` |
|
|
) |
|
|
} |
|
|
} else if (res.statusCode >= 500 && res.statusCode < 600) { |
|
|
logger.warn( |
|
|
`🔥 [Stream] Server error (${res.statusCode}) detected for account ${accountId}` |
|
|
) |
|
|
await this._handleServerError(accountId, res.statusCode, sessionHash, '[Stream]') |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
handleErrorResponse().catch((err) => { |
|
|
logger.error('❌ Error in stream error handler:', err) |
|
|
}) |
|
|
|
|
|
logger.error( |
|
|
`❌ Claude API returned error status: ${res.statusCode} | Account: ${account?.name || accountId}` |
|
|
) |
|
|
let errorData = '' |
|
|
|
|
|
res.on('data', (chunk) => { |
|
|
errorData += chunk.toString() |
|
|
}) |
|
|
|
|
|
res.on('end', () => { |
|
|
console.error(': ❌ ', errorData) |
|
|
logger.error( |
|
|
`❌ Claude API error response (Account: ${account?.name || accountId}):`, |
|
|
errorData |
|
|
) |
|
|
if (this._isOrganizationDisabledError(res.statusCode, errorData)) { |
|
|
;(async () => { |
|
|
try { |
|
|
logger.error( |
|
|
`🚫 [Stream] Organization disabled error (400) detected for account ${accountId}, marking as blocked` |
|
|
) |
|
|
await unifiedClaudeScheduler.markAccountBlocked( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash |
|
|
) |
|
|
} catch (markError) { |
|
|
logger.error( |
|
|
`❌ [Stream] Failed to mark account ${accountId} as blocked after organization disabled error:`, |
|
|
markError |
|
|
) |
|
|
} |
|
|
})() |
|
|
} |
|
|
if (!responseStream.destroyed) { |
|
|
|
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Claude API error', |
|
|
status: res.statusCode, |
|
|
details: errorData, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(new Error(`Claude API error: ${res.statusCode}`)) |
|
|
}) |
|
|
return |
|
|
} |
|
|
|
|
|
let buffer = '' |
|
|
const allUsageData = [] |
|
|
let currentUsageData = {} |
|
|
let rateLimitDetected = false |
|
|
|
|
|
|
|
|
res.on('data', (chunk) => { |
|
|
try { |
|
|
const chunkStr = chunk.toString() |
|
|
|
|
|
buffer += chunkStr |
|
|
|
|
|
|
|
|
const lines = buffer.split('\n') |
|
|
buffer = lines.pop() || '' |
|
|
|
|
|
|
|
|
if (lines.length > 0 && !responseStream.destroyed) { |
|
|
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') |
|
|
|
|
|
if (streamTransformer) { |
|
|
const transformed = streamTransformer(linesToForward) |
|
|
if (transformed) { |
|
|
responseStream.write(transformed) |
|
|
} |
|
|
} else { |
|
|
responseStream.write(linesToForward) |
|
|
} |
|
|
} |
|
|
|
|
|
for (const line of lines) { |
|
|
|
|
|
if (line.startsWith('data:')) { |
|
|
const jsonStr = line.slice(5).trimStart() |
|
|
if (!jsonStr || jsonStr === '[DONE]') { |
|
|
continue |
|
|
} |
|
|
try { |
|
|
const data = JSON.parse(jsonStr) |
|
|
|
|
|
|
|
|
if (data.type === 'message_start' && data.message && data.message.usage) { |
|
|
|
|
|
if ( |
|
|
currentUsageData.input_tokens !== undefined && |
|
|
currentUsageData.output_tokens !== undefined |
|
|
) { |
|
|
allUsageData.push({ ...currentUsageData }) |
|
|
currentUsageData = {} |
|
|
} |
|
|
|
|
|
|
|
|
currentUsageData.input_tokens = data.message.usage.input_tokens || 0 |
|
|
currentUsageData.cache_creation_input_tokens = |
|
|
data.message.usage.cache_creation_input_tokens || 0 |
|
|
currentUsageData.cache_read_input_tokens = |
|
|
data.message.usage.cache_read_input_tokens || 0 |
|
|
currentUsageData.model = data.message.model |
|
|
|
|
|
|
|
|
if ( |
|
|
data.message.usage.cache_creation && |
|
|
typeof data.message.usage.cache_creation === 'object' |
|
|
) { |
|
|
currentUsageData.cache_creation = { |
|
|
ephemeral_5m_input_tokens: |
|
|
data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0, |
|
|
ephemeral_1h_input_tokens: |
|
|
data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0 |
|
|
} |
|
|
logger.debug( |
|
|
'📊 Collected detailed cache creation data:', |
|
|
JSON.stringify(currentUsageData.cache_creation) |
|
|
) |
|
|
} |
|
|
|
|
|
logger.debug( |
|
|
'📊 Collected input/cache data from message_start:', |
|
|
JSON.stringify(currentUsageData) |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
data.type === 'message_delta' && |
|
|
data.usage && |
|
|
data.usage.output_tokens !== undefined |
|
|
) { |
|
|
currentUsageData.output_tokens = data.usage.output_tokens || 0 |
|
|
|
|
|
logger.debug( |
|
|
'📊 Collected output data from message_delta:', |
|
|
JSON.stringify(currentUsageData) |
|
|
) |
|
|
|
|
|
|
|
|
if (currentUsageData.input_tokens !== undefined) { |
|
|
logger.debug( |
|
|
'🎯 Complete usage data collected for model:', |
|
|
currentUsageData.model, |
|
|
'- Input:', |
|
|
currentUsageData.input_tokens, |
|
|
'Output:', |
|
|
currentUsageData.output_tokens |
|
|
) |
|
|
|
|
|
allUsageData.push({ ...currentUsageData }) |
|
|
|
|
|
currentUsageData = {} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
data.type === 'error' && |
|
|
data.error && |
|
|
data.error.message && |
|
|
data.error.message.toLowerCase().includes("exceed your account's rate limit") |
|
|
) { |
|
|
rateLimitDetected = true |
|
|
logger.warn(`🚫 Rate limit detected in stream for account ${accountId}`) |
|
|
} |
|
|
} catch (parseError) { |
|
|
|
|
|
logger.debug('🔍 SSE line not JSON or no usage data:', line.slice(0, 100)) |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('❌ Error processing stream data:', error) |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Stream processing error', |
|
|
message: error.message, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
} |
|
|
} |
|
|
}) |
|
|
|
|
|
res.on('end', async () => { |
|
|
try { |
|
|
|
|
|
if (buffer.trim() && !responseStream.destroyed) { |
|
|
if (streamTransformer) { |
|
|
const transformed = streamTransformer(buffer) |
|
|
if (transformed) { |
|
|
responseStream.write(transformed) |
|
|
} |
|
|
} else { |
|
|
responseStream.write(buffer) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.end() |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('❌ Error processing stream end:', error) |
|
|
} |
|
|
|
|
|
|
|
|
if (currentUsageData.input_tokens !== undefined) { |
|
|
if (currentUsageData.output_tokens === undefined) { |
|
|
currentUsageData.output_tokens = 0 |
|
|
} |
|
|
allUsageData.push(currentUsageData) |
|
|
} |
|
|
|
|
|
|
|
|
if (allUsageData.length === 0) { |
|
|
logger.warn( |
|
|
'⚠️ Stream completed but no usage data was captured! This indicates a problem with SSE parsing or Claude API response format.' |
|
|
) |
|
|
} else { |
|
|
|
|
|
const totalUsage = allUsageData.reduce( |
|
|
(acc, usage) => ({ |
|
|
input_tokens: (acc.input_tokens || 0) + (usage.input_tokens || 0), |
|
|
output_tokens: (acc.output_tokens || 0) + (usage.output_tokens || 0), |
|
|
cache_creation_input_tokens: |
|
|
(acc.cache_creation_input_tokens || 0) + (usage.cache_creation_input_tokens || 0), |
|
|
cache_read_input_tokens: |
|
|
(acc.cache_read_input_tokens || 0) + (usage.cache_read_input_tokens || 0), |
|
|
models: [...(acc.models || []), usage.model].filter(Boolean) |
|
|
}), |
|
|
{} |
|
|
) |
|
|
|
|
|
|
|
|
logger.info( |
|
|
`📊 === Stream Request Usage Summary === Model: ${body.model}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}` |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
const finalUsage = { |
|
|
input_tokens: totalUsage.input_tokens, |
|
|
output_tokens: totalUsage.output_tokens, |
|
|
cache_creation_input_tokens: totalUsage.cache_creation_input_tokens, |
|
|
cache_read_input_tokens: totalUsage.cache_read_input_tokens, |
|
|
model: allUsageData[allUsageData.length - 1].model || body.model |
|
|
} |
|
|
|
|
|
|
|
|
let totalEphemeral5m = 0 |
|
|
let totalEphemeral1h = 0 |
|
|
allUsageData.forEach((usage) => { |
|
|
if (usage.cache_creation && typeof usage.cache_creation === 'object') { |
|
|
totalEphemeral5m += usage.cache_creation.ephemeral_5m_input_tokens || 0 |
|
|
totalEphemeral1h += usage.cache_creation.ephemeral_1h_input_tokens || 0 |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
if (totalEphemeral5m > 0 || totalEphemeral1h > 0) { |
|
|
finalUsage.cache_creation = { |
|
|
ephemeral_5m_input_tokens: totalEphemeral5m, |
|
|
ephemeral_1h_input_tokens: totalEphemeral1h |
|
|
} |
|
|
logger.info( |
|
|
'📊 Detailed cache creation breakdown:', |
|
|
JSON.stringify(finalUsage.cache_creation) |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
usageCallback(finalUsage) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const get5hStatus = (headers) => { |
|
|
if (!headers) { |
|
|
return null |
|
|
} |
|
|
|
|
|
return ( |
|
|
headers['anthropic-ratelimit-unified-5h-status'] || |
|
|
headers['Anthropic-Ratelimit-Unified-5h-Status'] || |
|
|
headers['ANTHROPIC-RATELIMIT-UNIFIED-5H-STATUS'] |
|
|
) |
|
|
} |
|
|
|
|
|
const sessionWindowStatus = get5hStatus(res.headers) |
|
|
if (sessionWindowStatus) { |
|
|
logger.info(`📊 Session window status for account ${accountId}: ${sessionWindowStatus}`) |
|
|
|
|
|
await claudeAccountService.updateSessionWindowStatus(accountId, sessionWindowStatus) |
|
|
} |
|
|
|
|
|
|
|
|
if (rateLimitDetected || res.statusCode === 429) { |
|
|
const resetHeader = res.headers |
|
|
? res.headers['anthropic-ratelimit-unified-reset'] |
|
|
: null |
|
|
const parsedResetTimestamp = resetHeader ? parseInt(resetHeader, 10) : NaN |
|
|
|
|
|
if (isOpusModelRequest && !Number.isNaN(parsedResetTimestamp)) { |
|
|
await claudeAccountService.markAccountOpusRateLimited(accountId, parsedResetTimestamp) |
|
|
logger.warn( |
|
|
`🚫 [Stream] Account ${accountId} hit Opus limit, resets at ${new Date(parsedResetTimestamp * 1000).toISOString()}` |
|
|
) |
|
|
} else { |
|
|
const rateLimitResetTimestamp = Number.isNaN(parsedResetTimestamp) |
|
|
? null |
|
|
: parsedResetTimestamp |
|
|
|
|
|
if (!Number.isNaN(parsedResetTimestamp)) { |
|
|
logger.info( |
|
|
`🕐 Extracted rate limit reset timestamp from stream: ${parsedResetTimestamp} (${new Date(parsedResetTimestamp * 1000).toISOString()})` |
|
|
) |
|
|
} |
|
|
|
|
|
await unifiedClaudeScheduler.markAccountRateLimited( |
|
|
accountId, |
|
|
accountType, |
|
|
sessionHash, |
|
|
rateLimitResetTimestamp |
|
|
) |
|
|
} |
|
|
} else if (res.statusCode === 200) { |
|
|
|
|
|
await this.clearUnauthorizedErrors(accountId) |
|
|
await claudeAccountService.clearInternalErrors(accountId) |
|
|
|
|
|
const isRateLimited = await unifiedClaudeScheduler.isAccountRateLimited( |
|
|
accountId, |
|
|
accountType |
|
|
) |
|
|
if (isRateLimited) { |
|
|
await unifiedClaudeScheduler.removeAccountRateLimit(accountId, accountType) |
|
|
} |
|
|
|
|
|
|
|
|
try { |
|
|
const isOverloaded = await claudeAccountService.isAccountOverloaded(accountId) |
|
|
if (isOverloaded) { |
|
|
await claudeAccountService.removeAccountOverload(accountId) |
|
|
} |
|
|
} catch (overloadError) { |
|
|
logger.error( |
|
|
`❌ [Stream] Failed to check/remove overload status for account ${accountId}:`, |
|
|
overloadError |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
clientHeaders && |
|
|
Object.keys(clientHeaders).length > 0 && |
|
|
this.isRealClaudeCodeRequest(body) |
|
|
) { |
|
|
await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders) |
|
|
} |
|
|
} |
|
|
|
|
|
logger.debug('🌊 Claude stream response with usage capture completed') |
|
|
resolve() |
|
|
}) |
|
|
}) |
|
|
|
|
|
req.on('error', async (error) => { |
|
|
logger.error( |
|
|
`❌ Claude stream request error (Account: ${account?.name || accountId}):`, |
|
|
error.message, |
|
|
{ |
|
|
code: error.code, |
|
|
errno: error.errno, |
|
|
syscall: error.syscall |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
let errorMessage = 'Upstream request failed' |
|
|
let statusCode = 500 |
|
|
if (error.code === 'ECONNRESET') { |
|
|
errorMessage = 'Connection reset by Claude API server' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ENOTFOUND') { |
|
|
errorMessage = 'Unable to resolve Claude API hostname' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ECONNREFUSED') { |
|
|
errorMessage = 'Connection refused by Claude API server' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ETIMEDOUT') { |
|
|
errorMessage = 'Connection timed out to Claude API server' |
|
|
statusCode = 504 |
|
|
} |
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(statusCode, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive' |
|
|
}) |
|
|
} |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
|
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: errorMessage, |
|
|
code: error.code, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(error) |
|
|
}) |
|
|
|
|
|
req.on('timeout', async () => { |
|
|
req.destroy() |
|
|
logger.error(`❌ Claude stream request timeout | Account: ${account?.name || accountId}`) |
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(504, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive' |
|
|
}) |
|
|
} |
|
|
if (!responseStream.destroyed) { |
|
|
|
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Request timeout', |
|
|
code: 'TIMEOUT', |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(new Error('Request timeout')) |
|
|
}) |
|
|
|
|
|
|
|
|
responseStream.on('close', () => { |
|
|
logger.debug('🔌 Client disconnected, cleaning up stream') |
|
|
if (!req.destroyed) { |
|
|
req.destroy() |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
req.write(JSON.stringify(body)) |
|
|
req.end() |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
async _makeClaudeStreamRequest( |
|
|
body, |
|
|
accessToken, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
responseStream, |
|
|
requestOptions = {} |
|
|
) { |
|
|
return new Promise((resolve, reject) => { |
|
|
const url = new URL(this.claudeApiUrl) |
|
|
|
|
|
|
|
|
const filteredHeaders = this._filterClientHeaders(clientHeaders) |
|
|
|
|
|
const options = { |
|
|
hostname: url.hostname, |
|
|
port: url.port || 443, |
|
|
path: url.pathname, |
|
|
method: 'POST', |
|
|
headers: { |
|
|
'Content-Type': 'application/json', |
|
|
Authorization: `Bearer ${accessToken}`, |
|
|
'anthropic-version': this.apiVersion, |
|
|
...filteredHeaders |
|
|
}, |
|
|
agent: proxyAgent, |
|
|
timeout: config.requestTimeout || 600000 |
|
|
} |
|
|
|
|
|
|
|
|
if (!filteredHeaders['User-Agent'] && !filteredHeaders['user-agent']) { |
|
|
|
|
|
const userAgent = |
|
|
clientHeaders?.['user-agent'] || |
|
|
clientHeaders?.['User-Agent'] || |
|
|
'claude-cli/1.0.102 (external, cli)' |
|
|
options.headers['User-Agent'] = userAgent |
|
|
} |
|
|
|
|
|
|
|
|
const betaHeader = |
|
|
requestOptions?.betaHeader !== undefined ? requestOptions.betaHeader : this.betaHeader |
|
|
if (betaHeader) { |
|
|
options.headers['anthropic-beta'] = betaHeader |
|
|
} |
|
|
|
|
|
const req = https.request(options, (res) => { |
|
|
|
|
|
responseStream.statusCode = res.statusCode |
|
|
Object.keys(res.headers).forEach((key) => { |
|
|
responseStream.setHeader(key, res.headers[key]) |
|
|
}) |
|
|
|
|
|
|
|
|
res.pipe(responseStream) |
|
|
|
|
|
res.on('end', () => { |
|
|
logger.debug('🌊 Claude stream response completed') |
|
|
resolve() |
|
|
}) |
|
|
}) |
|
|
|
|
|
req.on('error', async (error) => { |
|
|
logger.error(`❌ Claude stream request error:`, error.message, { |
|
|
code: error.code, |
|
|
errno: error.errno, |
|
|
syscall: error.syscall |
|
|
}) |
|
|
|
|
|
|
|
|
let errorMessage = 'Upstream request failed' |
|
|
let statusCode = 500 |
|
|
if (error.code === 'ECONNRESET') { |
|
|
errorMessage = 'Connection reset by Claude API server' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ENOTFOUND') { |
|
|
errorMessage = 'Unable to resolve Claude API hostname' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ECONNREFUSED') { |
|
|
errorMessage = 'Connection refused by Claude API server' |
|
|
statusCode = 502 |
|
|
} else if (error.code === 'ETIMEDOUT') { |
|
|
errorMessage = 'Connection timed out to Claude API server' |
|
|
statusCode = 504 |
|
|
} |
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(statusCode, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive' |
|
|
}) |
|
|
} |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
|
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: errorMessage, |
|
|
code: error.code, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(error) |
|
|
}) |
|
|
|
|
|
req.on('timeout', async () => { |
|
|
req.destroy() |
|
|
logger.error(`❌ Claude stream request timeout`) |
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(504, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive' |
|
|
}) |
|
|
} |
|
|
if (!responseStream.destroyed) { |
|
|
|
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Request timeout', |
|
|
code: 'TIMEOUT', |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(new Error('Request timeout')) |
|
|
}) |
|
|
|
|
|
|
|
|
responseStream.on('close', () => { |
|
|
logger.debug('🔌 Client disconnected, cleaning up stream') |
|
|
if (!req.destroyed) { |
|
|
req.destroy() |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
req.write(JSON.stringify(body)) |
|
|
req.end() |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
async _handleServerError(accountId, statusCode, _sessionHash = null, context = '') { |
|
|
try { |
|
|
await claudeAccountService.recordServerError(accountId, statusCode) |
|
|
const errorCount = await claudeAccountService.getServerErrorCount(accountId) |
|
|
|
|
|
|
|
|
const isTimeout = statusCode === 504 |
|
|
const threshold = 3 |
|
|
const prefix = context ? `${context} ` : '' |
|
|
|
|
|
logger.warn( |
|
|
`⏱️ ${prefix}${isTimeout ? 'Timeout' : 'Server'} error for account ${accountId}, error count: ${errorCount}/${threshold}` |
|
|
) |
|
|
|
|
|
if (errorCount > threshold) { |
|
|
const errorTypeLabel = isTimeout ? 'timeout' : '5xx' |
|
|
|
|
|
logger.error( |
|
|
`❌ ${prefix}Account ${accountId} exceeded ${errorTypeLabel} error threshold (${errorCount} errors), please investigate upstream stability` |
|
|
) |
|
|
} |
|
|
} catch (handlingError) { |
|
|
logger.error(`❌ Failed to handle ${context} server error:`, handlingError) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async _retryRequest(requestFunc, maxRetries = 3) { |
|
|
let lastError |
|
|
|
|
|
for (let i = 0; i < maxRetries; i++) { |
|
|
try { |
|
|
return await requestFunc() |
|
|
} catch (error) { |
|
|
lastError = error |
|
|
|
|
|
if (i < maxRetries - 1) { |
|
|
const delay = Math.pow(2, i) * 1000 |
|
|
logger.warn(`⏳ Retry ${i + 1}/${maxRetries} in ${delay}ms: ${error.message}`) |
|
|
await new Promise((resolve) => setTimeout(resolve, delay)) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
throw lastError |
|
|
} |
|
|
|
|
|
|
|
|
async recordUnauthorizedError(accountId) { |
|
|
try { |
|
|
const key = `claude_account:${accountId}:401_errors` |
|
|
|
|
|
|
|
|
await redis.client.incr(key) |
|
|
await redis.client.expire(key, 300) |
|
|
|
|
|
logger.info(`📝 Recorded 401 error for account ${accountId}`) |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to record 401 error for account ${accountId}:`, error) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getUnauthorizedErrorCount(accountId) { |
|
|
try { |
|
|
const key = `claude_account:${accountId}:401_errors` |
|
|
|
|
|
const count = await redis.client.get(key) |
|
|
return parseInt(count) || 0 |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to get 401 error count for account ${accountId}:`, error) |
|
|
return 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async clearUnauthorizedErrors(accountId) { |
|
|
try { |
|
|
const key = `claude_account:${accountId}:401_errors` |
|
|
|
|
|
await redis.client.del(key) |
|
|
logger.info(`✅ Cleared 401 error count for account ${accountId}`) |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to clear 401 errors for account ${accountId}:`, error) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async captureAndGetUnifiedUserAgent(clientHeaders, account) { |
|
|
if (account.useUnifiedUserAgent !== 'true') { |
|
|
return null |
|
|
} |
|
|
|
|
|
const CACHE_KEY = 'claude_code_user_agent:daily' |
|
|
const TTL = 90000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const CLAUDE_CODE_UA_PATTERN = /^claude-cli\/[\d.]+\s+\(/i |
|
|
|
|
|
const clientUA = clientHeaders?.['user-agent'] || clientHeaders?.['User-Agent'] |
|
|
let cachedUA = await redis.client.get(CACHE_KEY) |
|
|
|
|
|
if (clientUA && CLAUDE_CODE_UA_PATTERN.test(clientUA)) { |
|
|
if (!cachedUA) { |
|
|
|
|
|
await redis.client.setex(CACHE_KEY, TTL, clientUA) |
|
|
logger.info(`📱 Captured unified Claude Code User-Agent: ${clientUA}`) |
|
|
cachedUA = clientUA |
|
|
} else { |
|
|
|
|
|
const shouldUpdate = this.compareClaudeCodeVersions(clientUA, cachedUA) |
|
|
if (shouldUpdate) { |
|
|
await redis.client.setex(CACHE_KEY, TTL, clientUA) |
|
|
logger.info(`🔄 Updated to newer Claude Code User-Agent: ${clientUA} (was: ${cachedUA})`) |
|
|
cachedUA = clientUA |
|
|
} else { |
|
|
|
|
|
await redis.client.expire(CACHE_KEY, TTL) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return cachedUA |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
compareClaudeCodeVersions(newUA, cachedUA) { |
|
|
try { |
|
|
|
|
|
|
|
|
const newVersionMatch = newUA.match(/claude-cli\/([\d.]+(?:[a-zA-Z0-9-]*)?)/i) |
|
|
const cachedVersionMatch = cachedUA.match(/claude-cli\/([\d.]+(?:[a-zA-Z0-9-]*)?)/i) |
|
|
|
|
|
if (!newVersionMatch || !cachedVersionMatch) { |
|
|
|
|
|
logger.warn(`⚠️ Unable to parse Claude Code versions: new=${newUA}, cached=${cachedUA}`) |
|
|
return true |
|
|
} |
|
|
|
|
|
const newVersion = newVersionMatch[1] |
|
|
const cachedVersion = cachedVersionMatch[1] |
|
|
|
|
|
|
|
|
const compareResult = this.compareSemanticVersions(newVersion, cachedVersion) |
|
|
|
|
|
logger.debug(`🔍 Version comparison: ${newVersion} vs ${cachedVersion} = ${compareResult}`) |
|
|
|
|
|
return compareResult > 0 |
|
|
} catch (error) { |
|
|
logger.warn(`⚠️ Error comparing Claude Code versions, defaulting to update: ${error.message}`) |
|
|
return true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
compareSemanticVersions(version1, version2) { |
|
|
|
|
|
const arr1 = version1.split('.') |
|
|
const arr2 = version2.split('.') |
|
|
|
|
|
|
|
|
const maxLength = Math.max(arr1.length, arr2.length) |
|
|
|
|
|
|
|
|
for (let i = 0; i < maxLength; i++) { |
|
|
|
|
|
const num1 = parseInt(arr1[i] || 0, 10) |
|
|
const num2 = parseInt(arr2[i] || 0, 10) |
|
|
|
|
|
if (num1 > num2) { |
|
|
return 1 |
|
|
} |
|
|
if (num1 < num2) { |
|
|
return -1 |
|
|
} |
|
|
} |
|
|
|
|
|
return 0 |
|
|
} |
|
|
|
|
|
|
|
|
async healthCheck() { |
|
|
try { |
|
|
const accounts = await claudeAccountService.getAllAccounts() |
|
|
const activeAccounts = accounts.filter((acc) => acc.isActive && acc.status === 'active') |
|
|
|
|
|
return { |
|
|
healthy: activeAccounts.length > 0, |
|
|
activeAccounts: activeAccounts.length, |
|
|
totalAccounts: accounts.length, |
|
|
timestamp: new Date().toISOString() |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('❌ Health check failed:', error) |
|
|
return { |
|
|
healthy: false, |
|
|
error: error.message, |
|
|
timestamp: new Date().toISOString() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
module.exports = new ClaudeRelayService() |
|
|
|