cc / src /services /droidRelayService.js
hequ's picture
Upload 224 files
6c6056a verified
const https = require('https')
const axios = require('axios')
const ProxyHelper = require('../utils/proxyHelper')
const droidScheduler = require('./droidScheduler')
const droidAccountService = require('./droidAccountService')
const apiKeyService = require('./apiKeyService')
const redis = require('../models/redis')
const { updateRateLimitCounters } = require('../utils/rateLimitHelper')
const logger = require('../utils/logger')
const runtimeAddon = require('../utils/runtimeAddon')
const SYSTEM_PROMPT = 'You are Droid, an AI software engineering agent built by Factory.'
const RUNTIME_EVENT_FMT_PAYLOAD = 'fmtPayload'
/**
* Droid API 转发服务
*/
class DroidRelayService {
constructor() {
this.factoryApiBaseUrl = 'https://app.factory.ai/api/llm'
this.endpoints = {
anthropic: '/a/v1/messages',
openai: '/o/v1/responses'
}
this.userAgent = 'factory-cli/0.19.12'
this.systemPrompt = SYSTEM_PROMPT
this.API_KEY_STICKY_PREFIX = 'droid_api_key'
}
_normalizeEndpointType(endpointType) {
if (!endpointType) {
return 'anthropic'
}
const normalized = String(endpointType).toLowerCase()
if (normalized === 'openai' || normalized === 'common') {
return 'openai'
}
if (normalized === 'anthropic') {
return 'anthropic'
}
return 'anthropic'
}
_normalizeRequestBody(requestBody, endpointType) {
if (!requestBody || typeof requestBody !== 'object') {
return requestBody
}
const normalizedBody = { ...requestBody }
if (endpointType === 'anthropic' && typeof normalizedBody.model === 'string') {
const originalModel = normalizedBody.model
const trimmedModel = originalModel.trim()
const lowerModel = trimmedModel.toLowerCase()
if (lowerModel.includes('haiku')) {
const mappedModel = 'claude-sonnet-4-20250514'
if (originalModel !== mappedModel) {
logger.info(`🔄 将请求模型从 ${originalModel} 映射为 ${mappedModel}`)
}
normalizedBody.model = mappedModel
}
}
if (endpointType === 'openai' && typeof normalizedBody.model === 'string') {
const originalModel = normalizedBody.model
const trimmedModel = originalModel.trim()
const lowerModel = trimmedModel.toLowerCase()
if (lowerModel === 'gpt-5') {
const mappedModel = 'gpt-5-2025-08-07'
if (originalModel !== mappedModel) {
logger.info(`🔄 将请求模型从 ${originalModel} 映射为 ${mappedModel}`)
}
normalizedBody.model = mappedModel
}
}
return normalizedBody
}
async _applyRateLimitTracking(rateLimitInfo, usageSummary, model, context = '') {
if (!rateLimitInfo) {
return
}
try {
const { totalTokens, totalCost } = await updateRateLimitCounters(
rateLimitInfo,
usageSummary,
model
)
if (totalTokens > 0) {
logger.api(`📊 Updated rate limit token count${context}: +${totalTokens}`)
}
if (typeof totalCost === 'number' && totalCost > 0) {
logger.api(`💰 Updated rate limit cost count${context}: +$${totalCost.toFixed(6)}`)
}
} catch (error) {
logger.error(`❌ Failed to update rate limit counters${context}:`, error)
}
}
_composeApiKeyStickyKey(accountId, endpointType, sessionHash) {
if (!accountId || !sessionHash) {
return null
}
const normalizedEndpoint = this._normalizeEndpointType(endpointType)
return `${this.API_KEY_STICKY_PREFIX}:${accountId}:${normalizedEndpoint}:${sessionHash}`
}
async _selectApiKey(account, endpointType, sessionHash) {
const entries = await droidAccountService.getDecryptedApiKeyEntries(account.id)
if (!entries || entries.length === 0) {
throw new Error(`Droid account ${account.id} 未配置任何 API Key`)
}
// 过滤掉异常状态的API Key
const activeEntries = entries.filter((entry) => entry.status !== 'error')
if (!activeEntries || activeEntries.length === 0) {
throw new Error(`Droid account ${account.id} 没有可用的 API Key(所有API Key均已异常)`)
}
const stickyKey = this._composeApiKeyStickyKey(account.id, endpointType, sessionHash)
if (stickyKey) {
const mappedKeyId = await redis.getSessionAccountMapping(stickyKey)
if (mappedKeyId) {
const mappedEntry = activeEntries.find((entry) => entry.id === mappedKeyId)
if (mappedEntry) {
await redis.extendSessionAccountMappingTTL(stickyKey)
await droidAccountService.touchApiKeyUsage(account.id, mappedEntry.id)
logger.info(`🔐 使用已绑定的 Droid API Key ${mappedEntry.id}(Account: ${account.id})`)
return mappedEntry
}
await redis.deleteSessionAccountMapping(stickyKey)
}
}
const selectedEntry = activeEntries[Math.floor(Math.random() * activeEntries.length)]
if (!selectedEntry) {
throw new Error(`Droid account ${account.id} 没有可用的 API Key`)
}
if (stickyKey) {
await redis.setSessionAccountMapping(stickyKey, selectedEntry.id)
}
await droidAccountService.touchApiKeyUsage(account.id, selectedEntry.id)
logger.info(
`🔐 随机选取 Droid API Key ${selectedEntry.id}(Account: ${account.id}, Active Keys: ${activeEntries.length}/${entries.length})`
)
return selectedEntry
}
async relayRequest(
requestBody,
apiKeyData,
clientRequest,
clientResponse,
clientHeaders,
options = {}
) {
const {
endpointType = 'anthropic',
sessionHash = null,
customPath = null,
skipUsageRecord = false,
disableStreaming = false
} = options
const keyInfo = apiKeyData || {}
const clientApiKeyId = keyInfo.id || null
const normalizedEndpoint = this._normalizeEndpointType(endpointType)
const normalizedRequestBody = this._normalizeRequestBody(requestBody, normalizedEndpoint)
let account = null
let selectedApiKey = null
let accessToken = null
try {
logger.info(
`📤 Processing Droid API request for key: ${
keyInfo.name || keyInfo.id || 'unknown'
}, endpoint: ${normalizedEndpoint}${sessionHash ? `, session: ${sessionHash}` : ''}`
)
// 选择一个可用的 Droid 账户(支持粘性会话和分组调度)
account = await droidScheduler.selectAccount(keyInfo, normalizedEndpoint, sessionHash)
if (!account) {
throw new Error(`No available Droid account for endpoint type: ${normalizedEndpoint}`)
}
// 获取认证凭据:支持 Access Token 和 API Key 两种模式
if (
typeof account.authenticationMethod === 'string' &&
account.authenticationMethod.toLowerCase().trim() === 'api_key'
) {
selectedApiKey = await this._selectApiKey(account, normalizedEndpoint, sessionHash)
accessToken = selectedApiKey.key
} else {
accessToken = await droidAccountService.getValidAccessToken(account.id)
}
// 获取 Factory.ai API URL
let endpointPath = this.endpoints[normalizedEndpoint]
if (typeof customPath === 'string' && customPath.trim()) {
endpointPath = customPath.startsWith('/') ? customPath : `/${customPath}`
}
const apiUrl = `${this.factoryApiBaseUrl}${endpointPath}`
logger.info(`🌐 Forwarding to Factory.ai: ${apiUrl}`)
// 获取代理配置
const proxyConfig = account.proxy ? JSON.parse(account.proxy) : null
const proxyAgent = proxyConfig ? ProxyHelper.createProxyAgent(proxyConfig) : null
if (proxyAgent) {
logger.info(`🌐 Using proxy: ${ProxyHelper.getProxyDescription(proxyConfig)}`)
}
// 构建请求头
const headers = this._buildHeaders(
accessToken,
normalizedRequestBody,
normalizedEndpoint,
clientHeaders
)
if (selectedApiKey) {
logger.info(
`🔑 Forwarding request with Droid API Key ${selectedApiKey.id} (Account: ${account.id})`
)
}
// 处理请求体(注入 system prompt 等)
const streamRequested = !disableStreaming && this._isStreamRequested(normalizedRequestBody)
let processedBody = this._processRequestBody(normalizedRequestBody, normalizedEndpoint, {
disableStreaming,
streamRequested
})
const extensionPayload = {
body: processedBody,
endpoint: normalizedEndpoint,
rawRequest: normalizedRequestBody,
originalRequest: requestBody
}
const extensionResult = runtimeAddon.emitSync(RUNTIME_EVENT_FMT_PAYLOAD, extensionPayload)
const resolvedPayload =
extensionResult && typeof extensionResult === 'object' ? extensionResult : extensionPayload
if (resolvedPayload && typeof resolvedPayload === 'object') {
if (resolvedPayload.abortResponse && typeof resolvedPayload.abortResponse === 'object') {
return resolvedPayload.abortResponse
}
if (resolvedPayload.body && typeof resolvedPayload.body === 'object') {
processedBody = resolvedPayload.body
} else if (resolvedPayload !== extensionPayload) {
processedBody = resolvedPayload
}
}
// 发送请求
const isStreaming = streamRequested
// 根据是否流式选择不同的处理方式
if (isStreaming) {
// 流式响应:使用原生 https 模块以更好地控制流
return await this._handleStreamRequest(
apiUrl,
headers,
processedBody,
proxyAgent,
clientRequest,
clientResponse,
account,
keyInfo,
normalizedRequestBody,
normalizedEndpoint,
skipUsageRecord,
selectedApiKey,
sessionHash,
clientApiKeyId
)
} else {
// 非流式响应:使用 axios
const requestOptions = {
method: 'POST',
url: apiUrl,
headers,
data: processedBody,
timeout: 600 * 1000, // 10分钟超时
responseType: 'json',
...(proxyAgent && {
httpAgent: proxyAgent,
httpsAgent: proxyAgent,
proxy: false
})
}
const response = await axios(requestOptions)
logger.info(`✅ Factory.ai response status: ${response.status}`)
// 处理非流式响应
return this._handleNonStreamResponse(
response,
account,
keyInfo,
normalizedRequestBody,
clientRequest,
normalizedEndpoint,
skipUsageRecord
)
}
} catch (error) {
logger.error(`❌ Droid relay error: ${error.message}`, error)
const status = error?.response?.status
if (status >= 400 && status < 500) {
try {
await this._handleUpstreamClientError(status, {
account,
selectedAccountApiKey: selectedApiKey,
endpointType: normalizedEndpoint,
sessionHash,
clientApiKeyId
})
} catch (handlingError) {
logger.error('❌ 处理 Droid 4xx 异常失败:', handlingError)
}
}
if (error.response) {
// HTTP 错误响应
return {
statusCode: error.response.status,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(
error.response.data || {
error: 'upstream_error',
message: error.message
}
)
}
}
// 网络错误或其他错误(统一返回 4xx)
const mappedStatus = this._mapNetworkErrorStatus(error)
return {
statusCode: mappedStatus,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(this._buildNetworkErrorBody(error))
}
}
}
/**
* 处理流式请求
*/
async _handleStreamRequest(
apiUrl,
headers,
processedBody,
proxyAgent,
clientRequest,
clientResponse,
account,
apiKeyData,
requestBody,
endpointType,
skipUsageRecord = false,
selectedAccountApiKey = null,
sessionHash = null,
clientApiKeyId = null
) {
return new Promise((resolve, reject) => {
const url = new URL(apiUrl)
const bodyString = JSON.stringify(processedBody)
const contentLength = Buffer.byteLength(bodyString)
const requestHeaders = {
...headers,
'content-length': contentLength.toString()
}
let responseStarted = false
let responseCompleted = false
let settled = false
let upstreamResponse = null
let completionWindow = ''
let hasForwardedData = false
const resolveOnce = (value) => {
if (settled) {
return
}
settled = true
resolve(value)
}
const rejectOnce = (error) => {
if (settled) {
return
}
settled = true
reject(error)
}
const handleStreamError = (error) => {
if (responseStarted) {
const isConnectionReset =
error && (error.code === 'ECONNRESET' || error.message === 'aborted')
const upstreamComplete =
responseCompleted || upstreamResponse?.complete || clientResponse.writableEnded
if (isConnectionReset && (upstreamComplete || hasForwardedData)) {
logger.debug('🔁 Droid stream连接在响应阶段被重置,视为正常结束:', {
message: error?.message,
code: error?.code
})
if (!clientResponse.destroyed && !clientResponse.writableEnded) {
clientResponse.end()
}
resolveOnce({ statusCode: 200, streaming: true })
return
}
logger.error('❌ Droid stream error:', error)
const mappedStatus = this._mapNetworkErrorStatus(error)
const errorBody = this._buildNetworkErrorBody(error)
if (!clientResponse.destroyed) {
if (!clientResponse.writableEnded) {
const canUseJson =
!hasForwardedData &&
typeof clientResponse.status === 'function' &&
typeof clientResponse.json === 'function'
if (canUseJson) {
clientResponse.status(mappedStatus).json(errorBody)
} else {
const errorPayload = JSON.stringify(errorBody)
if (!hasForwardedData) {
if (typeof clientResponse.setHeader === 'function') {
clientResponse.setHeader('Content-Type', 'application/json')
}
clientResponse.write(errorPayload)
clientResponse.end()
} else {
clientResponse.write(`event: error\ndata: ${errorPayload}\n\n`)
clientResponse.end()
}
}
}
}
resolveOnce({ statusCode: mappedStatus, streaming: true, error })
} else {
rejectOnce(error)
}
}
const options = {
hostname: url.hostname,
port: url.port || 443,
path: url.pathname,
method: 'POST',
headers: requestHeaders,
agent: proxyAgent,
timeout: 600 * 1000
}
const req = https.request(options, (res) => {
upstreamResponse = res
logger.info(`✅ Factory.ai stream response status: ${res.statusCode}`)
// 错误响应
if (res.statusCode !== 200) {
const chunks = []
res.on('data', (chunk) => {
chunks.push(chunk)
logger.info(`📦 got ${chunk.length} bytes of data`)
})
res.on('end', () => {
logger.info('✅ res.end() reached')
const body = Buffer.concat(chunks).toString()
logger.error(`❌ Factory.ai error response body: ${body || '(empty)'}`)
if (res.statusCode >= 400 && res.statusCode < 500) {
this._handleUpstreamClientError(res.statusCode, {
account,
selectedAccountApiKey,
endpointType,
sessionHash,
clientApiKeyId
}).catch((handlingError) => {
logger.error('❌ 处理 Droid 流式4xx 异常失败:', handlingError)
})
}
if (!clientResponse.headersSent) {
clientResponse.status(res.statusCode).json({
error: 'upstream_error',
details: body
})
}
resolveOnce({ statusCode: res.statusCode, streaming: true })
})
res.on('close', () => {
logger.warn('⚠️ response closed before end event')
})
res.on('error', handleStreamError)
return
}
responseStarted = true
// 设置流式响应头
clientResponse.setHeader('Content-Type', 'text/event-stream')
clientResponse.setHeader('Cache-Control', 'no-cache')
clientResponse.setHeader('Connection', 'keep-alive')
// Usage 数据收集
let buffer = ''
const currentUsageData = {}
const model = requestBody.model || 'unknown'
// 处理 SSE 流
res.on('data', (chunk) => {
const chunkStr = chunk.toString()
completionWindow = (completionWindow + chunkStr).slice(-1024)
hasForwardedData = true
// 转发数据到客户端
clientResponse.write(chunk)
hasForwardedData = true
// 解析 usage 数据(根据端点类型)
if (endpointType === 'anthropic') {
// Anthropic Messages API 格式
this._parseAnthropicUsageFromSSE(chunkStr, buffer, currentUsageData)
} else if (endpointType === 'openai') {
// OpenAI Chat Completions 格式
this._parseOpenAIUsageFromSSE(chunkStr, buffer, currentUsageData)
}
if (!responseCompleted && this._detectStreamCompletion(completionWindow, endpointType)) {
responseCompleted = true
}
buffer += chunkStr
})
res.on('end', async () => {
responseCompleted = true
clientResponse.end()
// 记录 usage 数据
if (!skipUsageRecord) {
const normalizedUsage = await this._recordUsageFromStreamData(
currentUsageData,
apiKeyData,
account,
model
)
const usageSummary = {
inputTokens: normalizedUsage.input_tokens || 0,
outputTokens: normalizedUsage.output_tokens || 0,
cacheCreateTokens: normalizedUsage.cache_creation_input_tokens || 0,
cacheReadTokens: normalizedUsage.cache_read_input_tokens || 0
}
await this._applyRateLimitTracking(
clientRequest?.rateLimitInfo,
usageSummary,
model,
' [stream]'
)
logger.success(`✅ Droid stream completed - Account: ${account.name}`)
} else {
logger.success(
`✅ Droid stream completed - Account: ${account.name}, usage recording skipped`
)
}
resolveOnce({ statusCode: 200, streaming: true })
})
res.on('error', handleStreamError)
res.on('close', () => {
if (settled) {
return
}
if (responseCompleted) {
if (!clientResponse.destroyed && !clientResponse.writableEnded) {
clientResponse.end()
}
resolveOnce({ statusCode: 200, streaming: true })
} else {
handleStreamError(new Error('Upstream stream closed unexpectedly'))
}
})
})
// 客户端断开连接时清理
clientResponse.on('close', () => {
if (req && !req.destroyed) {
req.destroy()
}
})
req.on('error', handleStreamError)
req.on('timeout', () => {
req.destroy()
logger.error('❌ Droid request timeout')
handleStreamError(new Error('Request timeout'))
})
// 写入请求体
req.end(bodyString)
})
}
/**
* 从 SSE 流中解析 Anthropic usage 数据
*/
_parseAnthropicUsageFromSSE(chunkStr, buffer, currentUsageData) {
try {
// 分割成行
const lines = (buffer + chunkStr).split('\n')
for (const line of lines) {
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
const data = JSON.parse(jsonStr)
// message_start 包含 input tokens 和 cache tokens
if (data.type === 'message_start' && data.message && data.message.usage) {
currentUsageData.input_tokens = data.message.usage.input_tokens || 0
currentUsageData.cache_creation_input_tokens =
data.message.usage.cache_creation_input_tokens || 0
currentUsageData.cache_read_input_tokens =
data.message.usage.cache_read_input_tokens || 0
// 详细的缓存类型
if (data.message.usage.cache_creation) {
currentUsageData.cache_creation = {
ephemeral_5m_input_tokens:
data.message.usage.cache_creation.ephemeral_5m_input_tokens || 0,
ephemeral_1h_input_tokens:
data.message.usage.cache_creation.ephemeral_1h_input_tokens || 0
}
}
logger.debug('📊 Droid Anthropic input usage:', currentUsageData)
}
// message_delta 包含 output tokens
if (data.type === 'message_delta' && data.usage) {
currentUsageData.output_tokens = data.usage.output_tokens || 0
logger.debug('📊 Droid Anthropic output usage:', currentUsageData.output_tokens)
}
} catch (parseError) {
// 忽略解析错误
}
}
}
} catch (error) {
logger.debug('Error parsing Anthropic usage:', error)
}
}
/**
* 从 SSE 流中解析 OpenAI usage 数据
*/
_parseOpenAIUsageFromSSE(chunkStr, buffer, currentUsageData) {
try {
// OpenAI Chat Completions 流式格式
const lines = (buffer + chunkStr).split('\n')
for (const line of lines) {
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
if (jsonStr === '[DONE]') {
continue
}
const data = JSON.parse(jsonStr)
// 兼容传统 Chat Completions usage 字段
if (data.usage) {
currentUsageData.input_tokens = data.usage.prompt_tokens || 0
currentUsageData.output_tokens = data.usage.completion_tokens || 0
currentUsageData.total_tokens = data.usage.total_tokens || 0
logger.debug('📊 Droid OpenAI usage:', currentUsageData)
}
// 新 Response API 在 response.usage 中返回统计
if (data.response && data.response.usage) {
const { usage } = data.response
currentUsageData.input_tokens =
usage.input_tokens || usage.prompt_tokens || usage.total_tokens || 0
currentUsageData.output_tokens = usage.output_tokens || usage.completion_tokens || 0
currentUsageData.total_tokens = usage.total_tokens || 0
logger.debug('📊 Droid OpenAI response usage:', currentUsageData)
}
} catch (parseError) {
// 忽略解析错误
}
}
}
} catch (error) {
logger.debug('Error parsing OpenAI usage:', error)
}
}
/**
* 检测流式响应是否已经包含终止标记
*/
_detectStreamCompletion(windowStr, endpointType) {
if (!windowStr) {
return false
}
const lower = windowStr.toLowerCase()
const compact = lower.replace(/\s+/g, '')
if (endpointType === 'anthropic') {
if (lower.includes('event: message_stop')) {
return true
}
if (compact.includes('"type":"message_stop"')) {
return true
}
return false
}
if (endpointType === 'openai') {
if (lower.includes('data: [done]')) {
return true
}
if (compact.includes('"finish_reason"')) {
return true
}
if (lower.includes('event: response.done') || lower.includes('event: response.completed')) {
return true
}
if (
compact.includes('"type":"response.done"') ||
compact.includes('"type":"response.completed"')
) {
return true
}
}
return false
}
/**
* 记录从流中解析的 usage 数据
*/
async _recordUsageFromStreamData(usageData, apiKeyData, account, model) {
const normalizedUsage = this._normalizeUsageSnapshot(usageData)
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
return normalizedUsage
}
/**
* 标准化 usage 数据,确保字段完整且为数字
*/
_normalizeUsageSnapshot(usageData = {}) {
const toNumber = (value) => {
if (value === undefined || value === null || value === '') {
return 0
}
const num = Number(value)
if (!Number.isFinite(num)) {
return 0
}
return Math.max(0, num)
}
const inputTokens = toNumber(
usageData.input_tokens ??
usageData.prompt_tokens ??
usageData.inputTokens ??
usageData.total_input_tokens
)
const outputTokens = toNumber(
usageData.output_tokens ?? usageData.completion_tokens ?? usageData.outputTokens
)
const cacheReadTokens = toNumber(
usageData.cache_read_input_tokens ??
usageData.cacheReadTokens ??
usageData.input_tokens_details?.cached_tokens
)
const rawCacheCreateTokens =
usageData.cache_creation_input_tokens ??
usageData.cacheCreateTokens ??
usageData.cache_tokens ??
0
let cacheCreateTokens = toNumber(rawCacheCreateTokens)
const ephemeral5m = toNumber(
usageData.cache_creation?.ephemeral_5m_input_tokens ?? usageData.ephemeral_5m_input_tokens
)
const ephemeral1h = toNumber(
usageData.cache_creation?.ephemeral_1h_input_tokens ?? usageData.ephemeral_1h_input_tokens
)
if (cacheCreateTokens === 0 && (ephemeral5m > 0 || ephemeral1h > 0)) {
cacheCreateTokens = ephemeral5m + ephemeral1h
}
const normalized = {
input_tokens: inputTokens,
output_tokens: outputTokens,
cache_creation_input_tokens: cacheCreateTokens,
cache_read_input_tokens: cacheReadTokens
}
if (ephemeral5m > 0 || ephemeral1h > 0) {
normalized.cache_creation = {
ephemeral_5m_input_tokens: ephemeral5m,
ephemeral_1h_input_tokens: ephemeral1h
}
}
return normalized
}
/**
* 计算 usage 对象的总 token 数
*/
_getTotalTokens(usageObject = {}) {
const toNumber = (value) => {
if (value === undefined || value === null || value === '') {
return 0
}
const num = Number(value)
if (!Number.isFinite(num)) {
return 0
}
return Math.max(0, num)
}
return (
toNumber(usageObject.input_tokens) +
toNumber(usageObject.output_tokens) +
toNumber(usageObject.cache_creation_input_tokens) +
toNumber(usageObject.cache_read_input_tokens)
)
}
/**
* 提取账户 ID
*/
_extractAccountId(account) {
if (!account || typeof account !== 'object') {
return null
}
return account.id || account.accountId || account.account_id || null
}
/**
* 构建请求头
*/
_buildHeaders(accessToken, requestBody, endpointType, clientHeaders = {}) {
const headers = {
'content-type': 'application/json',
authorization: `Bearer ${accessToken}`,
'user-agent': this.userAgent,
'x-factory-client': 'cli',
connection: 'keep-alive'
}
// Anthropic 特定头
if (endpointType === 'anthropic') {
headers['accept'] = 'application/json'
headers['anthropic-version'] = '2023-06-01'
headers['x-api-key'] = 'placeholder'
headers['x-api-provider'] = 'anthropic'
if (this._isThinkingRequested(requestBody)) {
headers['anthropic-beta'] = 'interleaved-thinking-2025-05-14'
}
}
// OpenAI 特定头
if (endpointType === 'openai') {
headers['x-api-provider'] = 'azure_openai'
}
// 生成会话 ID(如果客户端没有提供)
headers['x-session-id'] = clientHeaders['x-session-id'] || this._generateUUID()
return headers
}
/**
* 判断请求是否要求流式响应
*/
_isStreamRequested(requestBody) {
if (!requestBody || typeof requestBody !== 'object') {
return false
}
const value = requestBody.stream
if (value === true) {
return true
}
if (typeof value === 'string') {
return value.toLowerCase() === 'true'
}
return false
}
/**
* 判断请求是否启用 Anthropic 推理模式
*/
_isThinkingRequested(requestBody) {
const thinking = requestBody && typeof requestBody === 'object' ? requestBody.thinking : null
if (!thinking) {
return false
}
if (thinking === true) {
return true
}
if (typeof thinking === 'string') {
return thinking.trim().toLowerCase() === 'enabled'
}
if (typeof thinking === 'object') {
if (thinking.enabled === true) {
return true
}
if (typeof thinking.type === 'string') {
return thinking.type.trim().toLowerCase() === 'enabled'
}
}
return false
}
/**
* 处理请求体(注入 system prompt 等)
*/
_processRequestBody(requestBody, endpointType, options = {}) {
const { disableStreaming = false, streamRequested = false } = options
const processedBody = { ...requestBody }
const hasStreamField =
requestBody && Object.prototype.hasOwnProperty.call(requestBody, 'stream')
if (processedBody && Object.prototype.hasOwnProperty.call(processedBody, 'metadata')) {
delete processedBody.metadata
}
if (disableStreaming || !streamRequested) {
if (hasStreamField) {
processedBody.stream = false
} else if ('stream' in processedBody) {
delete processedBody.stream
}
} else {
processedBody.stream = true
}
// Anthropic 端点:仅注入系统提示
if (endpointType === 'anthropic') {
if (this.systemPrompt) {
const promptBlock = { type: 'text', text: this.systemPrompt }
if (Array.isArray(processedBody.system)) {
const hasPrompt = processedBody.system.some(
(item) => item && item.type === 'text' && item.text === this.systemPrompt
)
if (!hasPrompt) {
processedBody.system = [promptBlock, ...processedBody.system]
}
} else {
processedBody.system = [promptBlock]
}
}
}
// OpenAI 端点:仅前置系统提示
if (endpointType === 'openai') {
if (this.systemPrompt) {
if (processedBody.instructions) {
if (!processedBody.instructions.startsWith(this.systemPrompt)) {
processedBody.instructions = `${this.systemPrompt}${processedBody.instructions}`
}
} else {
processedBody.instructions = this.systemPrompt
}
}
}
// 处理 temperature 和 top_p 参数
const hasValidTemperature =
processedBody.temperature !== undefined && processedBody.temperature !== null
const hasValidTopP = processedBody.top_p !== undefined && processedBody.top_p !== null
if (hasValidTemperature && hasValidTopP) {
// 仅允许 temperature 或 top_p 其一,同时优先保留 temperature
delete processedBody.top_p
}
return processedBody
}
/**
* 处理非流式响应
*/
async _handleNonStreamResponse(
response,
account,
apiKeyData,
requestBody,
clientRequest,
endpointType,
skipUsageRecord = false
) {
const { data } = response
// 从响应中提取 usage 数据
const usage = data.usage || {}
const model = requestBody.model || 'unknown'
const normalizedUsage = this._normalizeUsageSnapshot(usage)
if (!skipUsageRecord) {
await this._recordUsage(apiKeyData, account, model, normalizedUsage)
const totalTokens = this._getTotalTokens(normalizedUsage)
const usageSummary = {
inputTokens: normalizedUsage.input_tokens || 0,
outputTokens: normalizedUsage.output_tokens || 0,
cacheCreateTokens: normalizedUsage.cache_creation_input_tokens || 0,
cacheReadTokens: normalizedUsage.cache_read_input_tokens || 0
}
await this._applyRateLimitTracking(
clientRequest?.rateLimitInfo,
usageSummary,
model,
endpointType === 'anthropic' ? ' [anthropic]' : ' [openai]'
)
logger.success(
`✅ Droid request completed - Account: ${account.name}, Tokens: ${totalTokens}`
)
} else {
logger.success(
`✅ Droid request completed - Account: ${account.name}, usage recording skipped`
)
}
return {
statusCode: 200,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
}
}
/**
* 记录使用统计
*/
async _recordUsage(apiKeyData, account, model, usageObject = {}) {
const totalTokens = this._getTotalTokens(usageObject)
if (totalTokens <= 0) {
logger.debug('🪙 Droid usage 数据为空,跳过记录')
return
}
try {
const keyId = apiKeyData?.id
const accountId = this._extractAccountId(account)
if (keyId) {
await apiKeyService.recordUsageWithDetails(keyId, usageObject, model, accountId, 'droid')
} else if (accountId) {
await redis.incrementAccountUsage(
accountId,
totalTokens,
usageObject.input_tokens || 0,
usageObject.output_tokens || 0,
usageObject.cache_creation_input_tokens || 0,
usageObject.cache_read_input_tokens || 0,
model,
false
)
} else {
logger.warn('⚠️ 无法记录 Droid usage:缺少 API Key 和账户标识')
return
}
logger.debug(
`📊 Droid usage recorded - Key: ${keyId || 'unknown'}, Account: ${accountId || 'unknown'}, Model: ${model}, Input: ${usageObject.input_tokens || 0}, Output: ${usageObject.output_tokens || 0}, Cache Create: ${usageObject.cache_creation_input_tokens || 0}, Cache Read: ${usageObject.cache_read_input_tokens || 0}, Total: ${totalTokens}`
)
} catch (error) {
logger.error('❌ Failed to record Droid usage:', error)
}
}
/**
* 处理上游 4xx 响应,移除问题 API Key 或停止账号调度
*/
async _handleUpstreamClientError(statusCode, context = {}) {
if (!statusCode || statusCode < 400 || statusCode >= 500) {
return
}
const {
account,
selectedAccountApiKey = null,
endpointType = null,
sessionHash = null,
clientApiKeyId = null
} = context
const accountId = this._extractAccountId(account)
if (!accountId) {
logger.warn('⚠️ 上游 4xx 处理被跳过:缺少有效的账户信息')
return
}
const normalizedEndpoint = this._normalizeEndpointType(
endpointType || account?.endpointType || 'anthropic'
)
const authMethod =
typeof account?.authenticationMethod === 'string'
? account.authenticationMethod.toLowerCase().trim()
: ''
if (authMethod === 'api_key') {
if (selectedAccountApiKey?.id) {
let markResult = null
const errorMessage = `${statusCode}`
try {
// 标记API Key为异常状态而不是删除
markResult = await droidAccountService.markApiKeyAsError(
accountId,
selectedAccountApiKey.id,
errorMessage
)
} catch (error) {
logger.error(
`❌ 标记 Droid API Key ${selectedAccountApiKey.id} 异常状态(Account: ${accountId})失败:`,
error
)
}
await this._clearApiKeyStickyMapping(accountId, normalizedEndpoint, sessionHash)
if (markResult?.marked) {
logger.warn(
`⚠️ 上游返回 ${statusCode},已标记 Droid API Key ${selectedAccountApiKey.id} 为异常状态(Account: ${accountId})`
)
} else {
logger.warn(
`⚠️ 上游返回 ${statusCode},但未能标记 Droid API Key ${selectedAccountApiKey.id} 异常状态(Account: ${accountId}):${markResult?.error || '未知错误'}`
)
}
// 检查是否还有可用的API Key
try {
const availableEntries = await droidAccountService.getDecryptedApiKeyEntries(accountId)
const activeEntries = availableEntries.filter((entry) => entry.status !== 'error')
if (activeEntries.length === 0) {
await this._stopDroidAccountScheduling(accountId, statusCode, '所有API Key均已异常')
await this._clearAccountStickyMapping(normalizedEndpoint, sessionHash, clientApiKeyId)
} else {
logger.info(`ℹ️ Droid 账号 ${accountId} 仍有 ${activeEntries.length} 个可用 API Key`)
}
} catch (error) {
logger.error(`❌ 检查可用API Key失败(Account: ${accountId}):`, error)
await this._stopDroidAccountScheduling(accountId, statusCode, 'API Key检查失败')
await this._clearAccountStickyMapping(normalizedEndpoint, sessionHash, clientApiKeyId)
}
return
}
logger.warn(
`⚠️ 上游返回 ${statusCode},但未获取到对应的 Droid API Key(Account: ${accountId})`
)
await this._stopDroidAccountScheduling(accountId, statusCode, '缺少可用 API Key')
await this._clearAccountStickyMapping(normalizedEndpoint, sessionHash, clientApiKeyId)
return
}
await this._stopDroidAccountScheduling(accountId, statusCode, '凭证不可用')
await this._clearAccountStickyMapping(normalizedEndpoint, sessionHash, clientApiKeyId)
}
/**
* 停止指定 Droid 账号的调度
*/
async _stopDroidAccountScheduling(accountId, statusCode, reason = '') {
if (!accountId) {
return
}
const message = reason ? `${reason}` : '上游返回 4xx 错误'
try {
await droidAccountService.updateAccount(accountId, {
schedulable: 'false',
status: 'error',
errorMessage: `上游返回 ${statusCode}${message}`
})
logger.warn(`🚫 已停止调度 Droid 账号 ${accountId}(状态码 ${statusCode},原因:${message})`)
} catch (error) {
logger.error(`❌ 停止调度 Droid 账号失败:${accountId}`, error)
}
}
/**
* 清理账号层面的粘性调度映射
*/
async _clearAccountStickyMapping(endpointType, sessionHash, clientApiKeyId) {
if (!sessionHash) {
return
}
const normalizedEndpoint = this._normalizeEndpointType(endpointType)
const apiKeyPart = clientApiKeyId || 'default'
const stickyKey = `droid:${normalizedEndpoint}:${apiKeyPart}:${sessionHash}`
try {
await redis.deleteSessionAccountMapping(stickyKey)
logger.debug(`🧹 已清理 Droid 粘性会话映射:${stickyKey}`)
} catch (error) {
logger.warn(`⚠️ 清理 Droid 粘性会话映射失败:${stickyKey}`, error)
}
}
/**
* 清理 API Key 级别的粘性映射
*/
async _clearApiKeyStickyMapping(accountId, endpointType, sessionHash) {
if (!accountId || !sessionHash) {
return
}
try {
const stickyKey = this._composeApiKeyStickyKey(accountId, endpointType, sessionHash)
if (stickyKey) {
await redis.deleteSessionAccountMapping(stickyKey)
logger.debug(`🧹 已清理 Droid API Key 粘性映射:${stickyKey}`)
}
} catch (error) {
logger.warn(
`⚠️ 清理 Droid API Key 粘性映射失败:${accountId}(endpoint: ${endpointType})`,
error
)
}
}
_mapNetworkErrorStatus(error) {
const code = (error && error.code ? String(error.code) : '').toUpperCase()
if (code === 'ECONNABORTED' || code === 'ETIMEDOUT') {
return 408
}
if (code === 'ECONNRESET' || code === 'EPIPE') {
return 424
}
if (code === 'ENOTFOUND' || code === 'EAI_AGAIN') {
return 424
}
if (typeof error === 'object' && error !== null) {
const message = (error.message || '').toLowerCase()
if (message.includes('timeout')) {
return 408
}
}
return 424
}
_buildNetworkErrorBody(error) {
const body = {
error: 'relay_upstream_failure',
message: error?.message || '上游请求失败'
}
if (error?.code) {
body.code = error.code
}
if (error?.config?.url) {
body.upstream = error.config.url
}
return body
}
/**
* 生成 UUID
*/
_generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
const r = (Math.random() * 16) | 0
const v = c === 'x' ? r : (r & 0x3) | 0x8
return v.toString(16)
})
}
}
// 导出单例
module.exports = new DroidRelayService()