|
|
const axios = require('axios') |
|
|
const claudeConsoleAccountService = require('./claudeConsoleAccountService') |
|
|
const logger = require('../utils/logger') |
|
|
const config = require('../../config/config') |
|
|
const { |
|
|
sanitizeUpstreamError, |
|
|
sanitizeErrorMessage, |
|
|
isAccountDisabledError |
|
|
} = require('../utils/errorSanitizer') |
|
|
|
|
|
class ClaudeConsoleRelayService { |
|
|
constructor() { |
|
|
this.defaultUserAgent = 'claude-cli/1.0.69 (external, cli)' |
|
|
} |
|
|
|
|
|
|
|
|
async relayRequest( |
|
|
requestBody, |
|
|
apiKeyData, |
|
|
clientRequest, |
|
|
clientResponse, |
|
|
clientHeaders, |
|
|
accountId, |
|
|
options = {} |
|
|
) { |
|
|
let abortController = null |
|
|
let account = null |
|
|
|
|
|
try { |
|
|
|
|
|
account = await claudeConsoleAccountService.getAccount(accountId) |
|
|
if (!account) { |
|
|
throw new Error('Claude Console Claude account not found') |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
`📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` |
|
|
) |
|
|
logger.debug(`🌐 Account API URL: ${account.apiUrl}`) |
|
|
logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`) |
|
|
logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`) |
|
|
logger.debug(`📝 Request model: ${requestBody.model}`) |
|
|
|
|
|
|
|
|
let mappedModel = requestBody.model |
|
|
if ( |
|
|
account.supportedModels && |
|
|
typeof account.supportedModels === 'object' && |
|
|
!Array.isArray(account.supportedModels) |
|
|
) { |
|
|
const newModel = claudeConsoleAccountService.getMappedModel( |
|
|
account.supportedModels, |
|
|
requestBody.model |
|
|
) |
|
|
if (newModel !== requestBody.model) { |
|
|
logger.info(`🔄 Mapping model from ${requestBody.model} to ${newModel}`) |
|
|
mappedModel = newModel |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const modifiedRequestBody = { |
|
|
...requestBody, |
|
|
model: mappedModel |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy) |
|
|
|
|
|
|
|
|
abortController = new AbortController() |
|
|
|
|
|
|
|
|
const handleClientDisconnect = () => { |
|
|
logger.info('🔌 Client disconnected, aborting Claude Console Claude request') |
|
|
if (abortController && !abortController.signal.aborted) { |
|
|
abortController.abort() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (clientRequest) { |
|
|
clientRequest.once('close', handleClientDisconnect) |
|
|
} |
|
|
if (clientResponse) { |
|
|
clientResponse.once('close', handleClientDisconnect) |
|
|
} |
|
|
|
|
|
|
|
|
const cleanUrl = account.apiUrl.replace(/\/$/, '') |
|
|
let apiEndpoint |
|
|
|
|
|
if (options.customPath) { |
|
|
|
|
|
const baseUrl = cleanUrl.replace(/\/v1\/messages$/, '') |
|
|
apiEndpoint = `${baseUrl}${options.customPath}` |
|
|
} else { |
|
|
|
|
|
apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` |
|
|
} |
|
|
|
|
|
logger.debug(`🎯 Final API endpoint: ${apiEndpoint}`) |
|
|
logger.debug(`[DEBUG] Options passed to relayRequest: ${JSON.stringify(options)}`) |
|
|
logger.debug(`[DEBUG] Client headers received: ${JSON.stringify(clientHeaders)}`) |
|
|
|
|
|
|
|
|
const filteredHeaders = this._filterClientHeaders(clientHeaders) |
|
|
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) |
|
|
|
|
|
|
|
|
const userAgent = |
|
|
account.userAgent || |
|
|
clientHeaders?.['user-agent'] || |
|
|
clientHeaders?.['User-Agent'] || |
|
|
this.defaultUserAgent |
|
|
|
|
|
|
|
|
const requestConfig = { |
|
|
method: 'POST', |
|
|
url: apiEndpoint, |
|
|
data: modifiedRequestBody, |
|
|
headers: { |
|
|
'Content-Type': 'application/json', |
|
|
'anthropic-version': '2023-06-01', |
|
|
'User-Agent': userAgent, |
|
|
...filteredHeaders |
|
|
}, |
|
|
timeout: config.requestTimeout || 600000, |
|
|
signal: abortController.signal, |
|
|
validateStatus: () => true |
|
|
} |
|
|
|
|
|
if (proxyAgent) { |
|
|
requestConfig.httpAgent = proxyAgent |
|
|
requestConfig.httpsAgent = proxyAgent |
|
|
requestConfig.proxy = false |
|
|
} |
|
|
|
|
|
|
|
|
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { |
|
|
|
|
|
requestConfig.headers['x-api-key'] = account.apiKey |
|
|
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') |
|
|
} else { |
|
|
|
|
|
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` |
|
|
logger.debug('[DEBUG] Using Authorization Bearer authentication') |
|
|
} |
|
|
|
|
|
logger.debug( |
|
|
`[DEBUG] Initial headers before beta: ${JSON.stringify(requestConfig.headers, null, 2)}` |
|
|
) |
|
|
|
|
|
|
|
|
if (options.betaHeader) { |
|
|
logger.debug(`[DEBUG] Adding beta header: ${options.betaHeader}`) |
|
|
requestConfig.headers['anthropic-beta'] = options.betaHeader |
|
|
} else { |
|
|
logger.debug('[DEBUG] No beta header to add') |
|
|
} |
|
|
|
|
|
|
|
|
logger.debug( |
|
|
'📤 Sending request to Claude Console API with headers:', |
|
|
JSON.stringify(requestConfig.headers, null, 2) |
|
|
) |
|
|
const response = await axios(requestConfig) |
|
|
|
|
|
|
|
|
if (clientRequest) { |
|
|
clientRequest.removeListener('close', handleClientDisconnect) |
|
|
} |
|
|
if (clientResponse) { |
|
|
clientResponse.removeListener('close', handleClientDisconnect) |
|
|
} |
|
|
|
|
|
logger.debug(`🔗 Claude Console API response: ${response.status}`) |
|
|
logger.debug(`[DEBUG] Response headers: ${JSON.stringify(response.headers)}`) |
|
|
logger.debug(`[DEBUG] Response data type: ${typeof response.data}`) |
|
|
logger.debug( |
|
|
`[DEBUG] Response data length: ${response.data ? (typeof response.data === 'string' ? response.data.length : JSON.stringify(response.data).length) : 0}` |
|
|
) |
|
|
|
|
|
|
|
|
if (response.status < 200 || response.status >= 300) { |
|
|
|
|
|
const rawData = |
|
|
typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
|
|
logger.error( |
|
|
`📝 Upstream error response from ${account?.name || accountId}: ${rawData.substring(0, 500)}` |
|
|
) |
|
|
|
|
|
|
|
|
try { |
|
|
const responseData = |
|
|
typeof response.data === 'string' ? JSON.parse(response.data) : response.data |
|
|
const sanitizedData = sanitizeUpstreamError(responseData) |
|
|
logger.error(`🧹 [SANITIZED] Error response to client: ${JSON.stringify(sanitizedData)}`) |
|
|
} catch (e) { |
|
|
const rawText = |
|
|
typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
|
|
const sanitizedText = sanitizeErrorMessage(rawText) |
|
|
logger.error(`🧹 [SANITIZED] Error response to client: ${sanitizedText}`) |
|
|
} |
|
|
} else { |
|
|
logger.debug( |
|
|
`[DEBUG] Response data preview: ${typeof response.data === 'string' ? response.data.substring(0, 200) : JSON.stringify(response.data).substring(0, 200)}` |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
const accountDisabledError = isAccountDisabledError(response.status, response.data) |
|
|
|
|
|
|
|
|
if (response.status === 401) { |
|
|
logger.warn(`🚫 Unauthorized error detected for Claude Console account ${accountId}`) |
|
|
await claudeConsoleAccountService.markAccountUnauthorized(accountId) |
|
|
} else if (accountDisabledError) { |
|
|
logger.error( |
|
|
`🚫 Account disabled error (400) detected for Claude Console account ${accountId}, marking as blocked` |
|
|
) |
|
|
|
|
|
const errorDetails = |
|
|
typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
|
|
await claudeConsoleAccountService.markConsoleAccountBlocked(accountId, errorDetails) |
|
|
} else if (response.status === 429) { |
|
|
logger.warn(`🚫 Rate limit detected for Claude Console account ${accountId}`) |
|
|
|
|
|
await claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { |
|
|
logger.error('❌ Failed to check quota after 429 error:', err) |
|
|
}) |
|
|
|
|
|
await claudeConsoleAccountService.markAccountRateLimited(accountId) |
|
|
} else if (response.status === 529) { |
|
|
logger.warn(`🚫 Overload error detected for Claude Console account ${accountId}`) |
|
|
await claudeConsoleAccountService.markAccountOverloaded(accountId) |
|
|
} else if (response.status === 200 || response.status === 201) { |
|
|
|
|
|
const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited(accountId) |
|
|
if (isRateLimited) { |
|
|
await claudeConsoleAccountService.removeAccountRateLimit(accountId) |
|
|
} |
|
|
const isOverloaded = await claudeConsoleAccountService.isAccountOverloaded(accountId) |
|
|
if (isOverloaded) { |
|
|
await claudeConsoleAccountService.removeAccountOverload(accountId) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
await this._updateLastUsedTime(accountId) |
|
|
|
|
|
|
|
|
let responseBody |
|
|
if (response.status < 200 || response.status >= 300) { |
|
|
|
|
|
try { |
|
|
const responseData = |
|
|
typeof response.data === 'string' ? JSON.parse(response.data) : response.data |
|
|
const sanitizedData = sanitizeUpstreamError(responseData) |
|
|
responseBody = JSON.stringify(sanitizedData) |
|
|
logger.debug(`🧹 Sanitized error response`) |
|
|
} catch (parseError) { |
|
|
|
|
|
const rawText = |
|
|
typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
|
|
responseBody = sanitizeErrorMessage(rawText) |
|
|
logger.debug(`🧹 Sanitized error text`) |
|
|
} |
|
|
} else { |
|
|
|
|
|
responseBody = |
|
|
typeof response.data === 'string' ? response.data : JSON.stringify(response.data) |
|
|
} |
|
|
|
|
|
logger.debug(`[DEBUG] Final response body to return: ${responseBody.substring(0, 200)}...`) |
|
|
|
|
|
return { |
|
|
statusCode: response.status, |
|
|
headers: response.headers, |
|
|
body: responseBody, |
|
|
accountId |
|
|
} |
|
|
} catch (error) { |
|
|
|
|
|
if (error.name === 'AbortError' || error.code === 'ECONNABORTED') { |
|
|
logger.info('Request aborted due to client disconnect') |
|
|
throw new Error('Client disconnected') |
|
|
} |
|
|
|
|
|
logger.error( |
|
|
`❌ Claude Console relay request failed (Account: ${account?.name || accountId}):`, |
|
|
error.message |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async relayStreamRequestWithUsageCapture( |
|
|
requestBody, |
|
|
apiKeyData, |
|
|
responseStream, |
|
|
clientHeaders, |
|
|
usageCallback, |
|
|
accountId, |
|
|
streamTransformer = null, |
|
|
options = {} |
|
|
) { |
|
|
let account = null |
|
|
try { |
|
|
|
|
|
account = await claudeConsoleAccountService.getAccount(accountId) |
|
|
if (!account) { |
|
|
throw new Error('Claude Console Claude account not found') |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
`📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` |
|
|
) |
|
|
logger.debug(`🌐 Account API URL: ${account.apiUrl}`) |
|
|
|
|
|
|
|
|
let mappedModel = requestBody.model |
|
|
if ( |
|
|
account.supportedModels && |
|
|
typeof account.supportedModels === 'object' && |
|
|
!Array.isArray(account.supportedModels) |
|
|
) { |
|
|
const newModel = claudeConsoleAccountService.getMappedModel( |
|
|
account.supportedModels, |
|
|
requestBody.model |
|
|
) |
|
|
if (newModel !== requestBody.model) { |
|
|
logger.info(`🔄 [Stream] Mapping model from ${requestBody.model} to ${newModel}`) |
|
|
mappedModel = newModel |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const modifiedRequestBody = { |
|
|
...requestBody, |
|
|
model: mappedModel |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const proxyAgent = claudeConsoleAccountService._createProxyAgent(account.proxy) |
|
|
|
|
|
|
|
|
await this._makeClaudeConsoleStreamRequest( |
|
|
modifiedRequestBody, |
|
|
account, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
responseStream, |
|
|
accountId, |
|
|
usageCallback, |
|
|
streamTransformer, |
|
|
options |
|
|
) |
|
|
|
|
|
|
|
|
await this._updateLastUsedTime(accountId) |
|
|
} catch (error) { |
|
|
logger.error( |
|
|
`❌ Claude Console stream relay failed (Account: ${account?.name || accountId}):`, |
|
|
error |
|
|
) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async _makeClaudeConsoleStreamRequest( |
|
|
body, |
|
|
account, |
|
|
proxyAgent, |
|
|
clientHeaders, |
|
|
responseStream, |
|
|
accountId, |
|
|
usageCallback, |
|
|
streamTransformer = null, |
|
|
requestOptions = {} |
|
|
) { |
|
|
return new Promise((resolve, reject) => { |
|
|
let aborted = false |
|
|
|
|
|
|
|
|
const cleanUrl = account.apiUrl.replace(/\/$/, '') |
|
|
const apiEndpoint = cleanUrl.endsWith('/v1/messages') ? cleanUrl : `${cleanUrl}/v1/messages` |
|
|
|
|
|
logger.debug(`🎯 Final API endpoint for stream: ${apiEndpoint}`) |
|
|
|
|
|
|
|
|
const filteredHeaders = this._filterClientHeaders(clientHeaders) |
|
|
logger.debug(`[DEBUG] Filtered client headers: ${JSON.stringify(filteredHeaders)}`) |
|
|
|
|
|
|
|
|
const userAgent = |
|
|
account.userAgent || |
|
|
clientHeaders?.['user-agent'] || |
|
|
clientHeaders?.['User-Agent'] || |
|
|
this.defaultUserAgent |
|
|
|
|
|
|
|
|
const requestConfig = { |
|
|
method: 'POST', |
|
|
url: apiEndpoint, |
|
|
data: body, |
|
|
headers: { |
|
|
'Content-Type': 'application/json', |
|
|
'anthropic-version': '2023-06-01', |
|
|
'User-Agent': userAgent, |
|
|
...filteredHeaders |
|
|
}, |
|
|
timeout: config.requestTimeout || 600000, |
|
|
responseType: 'stream', |
|
|
validateStatus: () => true |
|
|
} |
|
|
|
|
|
if (proxyAgent) { |
|
|
requestConfig.httpAgent = proxyAgent |
|
|
requestConfig.httpsAgent = proxyAgent |
|
|
requestConfig.proxy = false |
|
|
} |
|
|
|
|
|
|
|
|
if (account.apiKey && account.apiKey.startsWith('sk-ant-')) { |
|
|
|
|
|
requestConfig.headers['x-api-key'] = account.apiKey |
|
|
logger.debug('[DEBUG] Using x-api-key authentication for sk-ant-* API key') |
|
|
} else { |
|
|
|
|
|
requestConfig.headers['Authorization'] = `Bearer ${account.apiKey}` |
|
|
logger.debug('[DEBUG] Using Authorization Bearer authentication') |
|
|
} |
|
|
|
|
|
|
|
|
if (requestOptions.betaHeader) { |
|
|
requestConfig.headers['anthropic-beta'] = requestOptions.betaHeader |
|
|
} |
|
|
|
|
|
|
|
|
const request = axios(requestConfig) |
|
|
|
|
|
request |
|
|
.then((response) => { |
|
|
logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`) |
|
|
|
|
|
|
|
|
if (response.status !== 200) { |
|
|
logger.error( |
|
|
`❌ Claude Console API returned error status: ${response.status} | Account: ${account?.name || accountId}` |
|
|
) |
|
|
|
|
|
|
|
|
let errorDataForCheck = '' |
|
|
const errorChunks = [] |
|
|
|
|
|
response.data.on('data', (chunk) => { |
|
|
errorChunks.push(chunk) |
|
|
errorDataForCheck += chunk.toString() |
|
|
}) |
|
|
|
|
|
response.data.on('end', async () => { |
|
|
|
|
|
logger.error( |
|
|
`📝 [Stream] Upstream error response from ${account?.name || accountId}: ${errorDataForCheck.substring(0, 500)}` |
|
|
) |
|
|
|
|
|
|
|
|
const accountDisabledError = isAccountDisabledError( |
|
|
response.status, |
|
|
errorDataForCheck |
|
|
) |
|
|
|
|
|
if (response.status === 401) { |
|
|
await claudeConsoleAccountService.markAccountUnauthorized(accountId) |
|
|
} else if (accountDisabledError) { |
|
|
logger.error( |
|
|
`🚫 [Stream] Account disabled error (400) detected for Claude Console account ${accountId}, marking as blocked` |
|
|
) |
|
|
|
|
|
await claudeConsoleAccountService.markConsoleAccountBlocked( |
|
|
accountId, |
|
|
errorDataForCheck |
|
|
) |
|
|
} else if (response.status === 429) { |
|
|
await claudeConsoleAccountService.markAccountRateLimited(accountId) |
|
|
|
|
|
claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { |
|
|
logger.error('❌ Failed to check quota after 429 error:', err) |
|
|
}) |
|
|
} else if (response.status === 529) { |
|
|
await claudeConsoleAccountService.markAccountOverloaded(accountId) |
|
|
} |
|
|
|
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(response.status, { |
|
|
'Content-Type': 'application/json', |
|
|
'Cache-Control': 'no-cache' |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
try { |
|
|
const fullErrorData = Buffer.concat(errorChunks).toString() |
|
|
const errorJson = JSON.parse(fullErrorData) |
|
|
const sanitizedError = sanitizeUpstreamError(errorJson) |
|
|
|
|
|
|
|
|
logger.error( |
|
|
`🧹 [Stream] [SANITIZED] Error response to client: ${JSON.stringify(sanitizedError)}` |
|
|
) |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write(JSON.stringify(sanitizedError)) |
|
|
responseStream.end() |
|
|
} |
|
|
} catch (parseError) { |
|
|
const sanitizedText = sanitizeErrorMessage(errorDataForCheck) |
|
|
logger.error(`🧹 [Stream] [SANITIZED] Error response to client: ${sanitizedText}`) |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write(sanitizedText) |
|
|
responseStream.end() |
|
|
} |
|
|
} |
|
|
resolve() |
|
|
}) |
|
|
|
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => { |
|
|
if (isRateLimited) { |
|
|
claudeConsoleAccountService.removeAccountRateLimit(accountId) |
|
|
} |
|
|
}) |
|
|
claudeConsoleAccountService.isAccountOverloaded(accountId).then((isOverloaded) => { |
|
|
if (isOverloaded) { |
|
|
claudeConsoleAccountService.removeAccountOverload(accountId) |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(200, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive', |
|
|
'X-Accel-Buffering': 'no' |
|
|
}) |
|
|
} |
|
|
|
|
|
let buffer = '' |
|
|
let finalUsageReported = false |
|
|
const collectedUsageData = { |
|
|
model: body.model || account?.defaultModel || null |
|
|
} |
|
|
|
|
|
|
|
|
response.data.on('data', (chunk) => { |
|
|
try { |
|
|
if (aborted) { |
|
|
return |
|
|
} |
|
|
|
|
|
const chunkStr = chunk.toString() |
|
|
buffer += chunkStr |
|
|
|
|
|
|
|
|
const lines = buffer.split('\n') |
|
|
buffer = lines.pop() || '' |
|
|
|
|
|
|
|
|
if (lines.length > 0 && !responseStream.destroyed) { |
|
|
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') |
|
|
|
|
|
|
|
|
if (streamTransformer) { |
|
|
const transformed = streamTransformer(linesToForward) |
|
|
if (transformed) { |
|
|
responseStream.write(transformed) |
|
|
} |
|
|
} else { |
|
|
responseStream.write(linesToForward) |
|
|
} |
|
|
|
|
|
|
|
|
for (const line of lines) { |
|
|
if (line.startsWith('data:')) { |
|
|
const jsonStr = line.slice(5).trimStart() |
|
|
if (!jsonStr || jsonStr === '[DONE]') { |
|
|
continue |
|
|
} |
|
|
try { |
|
|
const data = JSON.parse(jsonStr) |
|
|
|
|
|
|
|
|
if (data.type === 'message_start' && data.message && data.message.usage) { |
|
|
collectedUsageData.input_tokens = data.message.usage.input_tokens || 0 |
|
|
collectedUsageData.cache_creation_input_tokens = |
|
|
data.message.usage.cache_creation_input_tokens || 0 |
|
|
collectedUsageData.cache_read_input_tokens = |
|
|
data.message.usage.cache_read_input_tokens || 0 |
|
|
collectedUsageData.model = data.message.model |
|
|
|
|
|
|
|
|
if ( |
|
|
data.message.usage.cache_creation && |
|
|
typeof data.message.usage.cache_creation === 'object' |
|
|
) { |
|
|
collectedUsageData.cache_creation = { |
|
|
ephemeral_5m_input_tokens: |
|
|
data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0, |
|
|
ephemeral_1h_input_tokens: |
|
|
data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0 |
|
|
} |
|
|
logger.info( |
|
|
'📊 Collected detailed cache creation data:', |
|
|
JSON.stringify(collectedUsageData.cache_creation) |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
if (data.type === 'message_delta' && data.usage) { |
|
|
|
|
|
if (data.usage.output_tokens !== undefined) { |
|
|
collectedUsageData.output_tokens = data.usage.output_tokens || 0 |
|
|
} |
|
|
|
|
|
|
|
|
if (data.usage.input_tokens !== undefined) { |
|
|
collectedUsageData.input_tokens = data.usage.input_tokens || 0 |
|
|
} |
|
|
|
|
|
|
|
|
if (data.usage.cache_creation_input_tokens !== undefined) { |
|
|
collectedUsageData.cache_creation_input_tokens = |
|
|
data.usage.cache_creation_input_tokens || 0 |
|
|
} |
|
|
if (data.usage.cache_read_input_tokens !== undefined) { |
|
|
collectedUsageData.cache_read_input_tokens = |
|
|
data.usage.cache_read_input_tokens || 0 |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
data.usage.cache_creation && |
|
|
typeof data.usage.cache_creation === 'object' |
|
|
) { |
|
|
collectedUsageData.cache_creation = { |
|
|
ephemeral_5m_input_tokens: |
|
|
data.usage.cache_creation.ephemeral_5m_input_tokens || 0, |
|
|
ephemeral_1h_input_tokens: |
|
|
data.usage.cache_creation.ephemeral_1h_input_tokens || 0 |
|
|
} |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
'📊 [Console] Collected usage data from message_delta:', |
|
|
JSON.stringify(collectedUsageData) |
|
|
) |
|
|
|
|
|
|
|
|
if ( |
|
|
collectedUsageData.input_tokens !== undefined && |
|
|
collectedUsageData.output_tokens !== undefined && |
|
|
!finalUsageReported |
|
|
) { |
|
|
if (!collectedUsageData.model) { |
|
|
collectedUsageData.model = body.model || account?.defaultModel || null |
|
|
} |
|
|
logger.info( |
|
|
'🎯 [Console] Complete usage data collected:', |
|
|
JSON.stringify(collectedUsageData) |
|
|
) |
|
|
usageCallback({ ...collectedUsageData, accountId }) |
|
|
finalUsageReported = true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} catch (e) { |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error( |
|
|
`❌ Error processing Claude Console stream data (Account: ${account?.name || accountId}):`, |
|
|
error |
|
|
) |
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Stream processing error', |
|
|
message: error.message, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
} |
|
|
} |
|
|
}) |
|
|
|
|
|
response.data.on('end', () => { |
|
|
try { |
|
|
|
|
|
if (buffer.trim() && !responseStream.destroyed) { |
|
|
if (streamTransformer) { |
|
|
const transformed = streamTransformer(buffer) |
|
|
if (transformed) { |
|
|
responseStream.write(transformed) |
|
|
} |
|
|
} else { |
|
|
responseStream.write(buffer) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!finalUsageReported) { |
|
|
if ( |
|
|
collectedUsageData.input_tokens !== undefined || |
|
|
collectedUsageData.output_tokens !== undefined |
|
|
) { |
|
|
|
|
|
if (collectedUsageData.input_tokens === undefined) { |
|
|
collectedUsageData.input_tokens = 0 |
|
|
logger.warn( |
|
|
'⚠️ [Console] message_delta missing input_tokens, setting to 0. This may indicate incomplete usage data.' |
|
|
) |
|
|
} |
|
|
if (collectedUsageData.output_tokens === undefined) { |
|
|
collectedUsageData.output_tokens = 0 |
|
|
logger.warn( |
|
|
'⚠️ [Console] message_delta missing output_tokens, setting to 0. This may indicate incomplete usage data.' |
|
|
) |
|
|
} |
|
|
|
|
|
if (!collectedUsageData.model) { |
|
|
collectedUsageData.model = body.model || account?.defaultModel || null |
|
|
} |
|
|
logger.info( |
|
|
`📊 [Console] Saving incomplete usage data via fallback: ${JSON.stringify(collectedUsageData)}` |
|
|
) |
|
|
usageCallback({ ...collectedUsageData, accountId }) |
|
|
finalUsageReported = true |
|
|
} else { |
|
|
logger.warn( |
|
|
'⚠️ [Console] Stream completed but no usage data was captured! This indicates a problem with SSE parsing or API response format.' |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.end() |
|
|
} |
|
|
|
|
|
logger.debug('🌊 Claude Console Claude stream response completed') |
|
|
resolve() |
|
|
} catch (error) { |
|
|
logger.error('❌ Error processing stream end:', error) |
|
|
reject(error) |
|
|
} |
|
|
}) |
|
|
|
|
|
response.data.on('error', (error) => { |
|
|
logger.error( |
|
|
`❌ Claude Console stream error (Account: ${account?.name || accountId}):`, |
|
|
error |
|
|
) |
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: 'Stream error', |
|
|
message: error.message, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
reject(error) |
|
|
}) |
|
|
}) |
|
|
.catch((error) => { |
|
|
if (aborted) { |
|
|
return |
|
|
} |
|
|
|
|
|
logger.error( |
|
|
`❌ Claude Console stream request error (Account: ${account?.name || accountId}):`, |
|
|
error.message |
|
|
) |
|
|
|
|
|
|
|
|
if (error.response) { |
|
|
if (error.response.status === 401) { |
|
|
claudeConsoleAccountService.markAccountUnauthorized(accountId) |
|
|
} else if (error.response.status === 429) { |
|
|
claudeConsoleAccountService.markAccountRateLimited(accountId) |
|
|
|
|
|
claudeConsoleAccountService.checkQuotaUsage(accountId).catch((err) => { |
|
|
logger.error('❌ Failed to check quota after 429 error:', err) |
|
|
}) |
|
|
} else if (error.response.status === 529) { |
|
|
claudeConsoleAccountService.markAccountOverloaded(accountId) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!responseStream.headersSent) { |
|
|
responseStream.writeHead(error.response?.status || 500, { |
|
|
'Content-Type': 'text/event-stream', |
|
|
'Cache-Control': 'no-cache', |
|
|
Connection: 'keep-alive' |
|
|
}) |
|
|
} |
|
|
|
|
|
if (!responseStream.destroyed) { |
|
|
responseStream.write('event: error\n') |
|
|
responseStream.write( |
|
|
`data: ${JSON.stringify({ |
|
|
error: error.message, |
|
|
code: error.code, |
|
|
timestamp: new Date().toISOString() |
|
|
})}\n\n` |
|
|
) |
|
|
responseStream.end() |
|
|
} |
|
|
|
|
|
reject(error) |
|
|
}) |
|
|
|
|
|
|
|
|
responseStream.on('close', () => { |
|
|
logger.debug('🔌 Client disconnected, cleaning up Claude Console stream') |
|
|
aborted = true |
|
|
}) |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
_filterClientHeaders(clientHeaders) { |
|
|
const sensitiveHeaders = [ |
|
|
'content-type', |
|
|
'user-agent', |
|
|
'authorization', |
|
|
'x-api-key', |
|
|
'host', |
|
|
'content-length', |
|
|
'connection', |
|
|
'proxy-authorization', |
|
|
'content-encoding', |
|
|
'transfer-encoding', |
|
|
'anthropic-version' |
|
|
] |
|
|
|
|
|
const filteredHeaders = {} |
|
|
|
|
|
Object.keys(clientHeaders || {}).forEach((key) => { |
|
|
const lowerKey = key.toLowerCase() |
|
|
if (!sensitiveHeaders.includes(lowerKey)) { |
|
|
filteredHeaders[key] = clientHeaders[key] |
|
|
} |
|
|
}) |
|
|
|
|
|
return filteredHeaders |
|
|
} |
|
|
|
|
|
|
|
|
async _updateLastUsedTime(accountId) { |
|
|
try { |
|
|
const client = require('../models/redis').getClientSafe() |
|
|
const accountKey = `claude_console_account:${accountId}` |
|
|
const exists = await client.exists(accountKey) |
|
|
|
|
|
if (!exists) { |
|
|
logger.debug(`🔎 跳过更新已删除的Claude Console账号最近使用时间: ${accountId}`) |
|
|
return |
|
|
} |
|
|
|
|
|
await client.hset(accountKey, 'lastUsedAt', new Date().toISOString()) |
|
|
} catch (error) { |
|
|
logger.warn( |
|
|
`⚠️ Failed to update last used time for Claude Console account ${accountId}:`, |
|
|
error.message |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async healthCheck() { |
|
|
try { |
|
|
const accounts = await claudeConsoleAccountService.getAllAccounts() |
|
|
const activeAccounts = accounts.filter((acc) => acc.isActive && acc.status === 'active') |
|
|
|
|
|
return { |
|
|
healthy: activeAccounts.length > 0, |
|
|
activeAccounts: activeAccounts.length, |
|
|
totalAccounts: accounts.length, |
|
|
timestamp: new Date().toISOString() |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('❌ Claude Console Claude health check failed:', error) |
|
|
return { |
|
|
healthy: false, |
|
|
error: error.message, |
|
|
timestamp: new Date().toISOString() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
module.exports = new ClaudeConsoleRelayService() |
|
|
|