const claudeAccountService = require('./claudeAccountService') const claudeConsoleAccountService = require('./claudeConsoleAccountService') const bedrockAccountService = require('./bedrockAccountService') const ccrAccountService = require('./ccrAccountService') const accountGroupService = require('./accountGroupService') const redis = require('../models/redis') const logger = require('../utils/logger') const { parseVendorPrefixedModel } = require('../utils/modelHelper') class UnifiedClaudeScheduler { constructor() { this.SESSION_MAPPING_PREFIX = 'unified_claude_session_mapping:' } // 🔧 辅助方法:检查账户是否可调度(兼容字符串和布尔值) _isSchedulable(schedulable) { // 如果是 undefined 或 null,默认为可调度 if (schedulable === undefined || schedulable === null) { return true } // 明确设置为 false(布尔值)或 'false'(字符串)时不可调度 return schedulable !== false && schedulable !== 'false' } // 🔍 检查账户是否支持请求的模型 _isModelSupportedByAccount(account, accountType, requestedModel, context = '') { if (!requestedModel) { return true // 没有指定模型时,默认支持 } // Claude OAuth 账户的模型检查 if (accountType === 'claude-official') { // 1. 首先检查是否为 Claude 官方支持的模型 // Claude Official API 只支持 Anthropic 自己的模型,不支持第三方模型(如 deepseek-chat) const isClaudeOfficialModel = requestedModel.startsWith('claude-') || requestedModel.includes('claude') || requestedModel.includes('sonnet') || requestedModel.includes('opus') || requestedModel.includes('haiku') if (!isClaudeOfficialModel) { logger.info( `🚫 Claude official account ${account.name} does not support non-Claude model ${requestedModel}${context ? ` ${context}` : ''}` ) return false } // 2. Opus 模型的订阅级别检查 if (requestedModel.toLowerCase().includes('opus')) { if (account.subscriptionInfo) { try { const info = typeof account.subscriptionInfo === 'string' ? JSON.parse(account.subscriptionInfo) : account.subscriptionInfo // Pro 和 Free 账号不支持 Opus if (info.hasClaudePro === true && info.hasClaudeMax !== true) { logger.info( `🚫 Claude account ${account.name} (Pro) does not support Opus model${context ? ` ${context}` : ''}` ) return false } if (info.accountType === 'claude_pro' || info.accountType === 'claude_free') { logger.info( `🚫 Claude account ${account.name} (${info.accountType}) does not support Opus model${context ? ` ${context}` : ''}` ) return false } } catch (e) { // 解析失败,假设为旧数据,默认支持(兼容旧数据为 Max) logger.debug( `Account ${account.name} has invalid subscriptionInfo${context ? ` ${context}` : ''}, assuming Max` ) } } // 没有订阅信息的账号,默认当作支持(兼容旧数据) } } // Claude Console 账户的模型支持检查 if (accountType === 'claude-console' && account.supportedModels) { // 兼容旧格式(数组)和新格式(对象) if (Array.isArray(account.supportedModels)) { // 旧格式:数组 if ( account.supportedModels.length > 0 && !account.supportedModels.includes(requestedModel) ) { logger.info( `🚫 Claude Console account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}` ) return false } } else if (typeof account.supportedModels === 'object') { // 新格式:映射表 if ( Object.keys(account.supportedModels).length > 0 && !claudeConsoleAccountService.isModelSupported(account.supportedModels, requestedModel) ) { logger.info( `🚫 Claude Console account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}` ) return false } } } // CCR 账户的模型支持检查 if (accountType === 'ccr' && account.supportedModels) { // 兼容旧格式(数组)和新格式(对象) if (Array.isArray(account.supportedModels)) { // 旧格式:数组 if ( account.supportedModels.length > 0 && !account.supportedModels.includes(requestedModel) ) { logger.info( `🚫 CCR account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}` ) return false } } else if (typeof account.supportedModels === 'object') { // 新格式:映射表 if ( Object.keys(account.supportedModels).length > 0 && !ccrAccountService.isModelSupported(account.supportedModels, requestedModel) ) { logger.info( `🚫 CCR account ${account.name} does not support model ${requestedModel}${context ? ` ${context}` : ''}` ) return false } } } return true } // 🎯 统一调度Claude账号(官方和Console) async selectAccountForApiKey(apiKeyData, sessionHash = null, requestedModel = null) { try { // 解析供应商前缀 const { vendor, baseModel } = parseVendorPrefixedModel(requestedModel) const effectiveModel = vendor === 'ccr' ? baseModel : requestedModel logger.debug( `🔍 Model parsing - Original: ${requestedModel}, Vendor: ${vendor}, Effective: ${effectiveModel}` ) const isOpusRequest = effectiveModel && typeof effectiveModel === 'string' ? effectiveModel.toLowerCase().includes('opus') : false // 如果是 CCR 前缀,只在 CCR 账户池中选择 if (vendor === 'ccr') { logger.info(`🎯 CCR vendor prefix detected, routing to CCR accounts only`) return await this._selectCcrAccount(apiKeyData, sessionHash, effectiveModel) } // 如果API Key绑定了专属账户或分组,优先使用 if (apiKeyData.claudeAccountId) { // 检查是否是分组 if (apiKeyData.claudeAccountId.startsWith('group:')) { const groupId = apiKeyData.claudeAccountId.replace('group:', '') logger.info( `🎯 API key ${apiKeyData.name} is bound to group ${groupId}, selecting from group` ) return await this.selectAccountFromGroup( groupId, sessionHash, effectiveModel, vendor === 'ccr' ) } // 普通专属账户 const boundAccount = await redis.getClaudeAccount(apiKeyData.claudeAccountId) if (boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error') { const isRateLimited = await claudeAccountService.isAccountRateLimited(boundAccount.id) if (isRateLimited) { const rateInfo = await claudeAccountService.getAccountRateLimitInfo(boundAccount.id) const error = new Error('Dedicated Claude account is rate limited') error.code = 'CLAUDE_DEDICATED_RATE_LIMITED' error.accountId = boundAccount.id error.rateLimitEndAt = rateInfo?.rateLimitEndAt || boundAccount.rateLimitEndAt || null throw error } if (!this._isSchedulable(boundAccount.schedulable)) { logger.warn( `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not schedulable (schedulable: ${boundAccount?.schedulable}), falling back to pool` ) } else { if (isOpusRequest) { await claudeAccountService.clearExpiredOpusRateLimit(boundAccount.id) } logger.info( `🎯 Using bound dedicated Claude OAuth account: ${boundAccount.name} (${apiKeyData.claudeAccountId}) for API key ${apiKeyData.name}` ) return { accountId: apiKeyData.claudeAccountId, accountType: 'claude-official' } } } else { logger.warn( `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not available (isActive: ${boundAccount?.isActive}, status: ${boundAccount?.status}), falling back to pool` ) } } // 2. 检查Claude Console账户绑定 if (apiKeyData.claudeConsoleAccountId) { const boundConsoleAccount = await claudeConsoleAccountService.getAccount( apiKeyData.claudeConsoleAccountId ) if ( boundConsoleAccount && boundConsoleAccount.isActive === true && boundConsoleAccount.status === 'active' && this._isSchedulable(boundConsoleAccount.schedulable) ) { logger.info( `🎯 Using bound dedicated Claude Console account: ${boundConsoleAccount.name} (${apiKeyData.claudeConsoleAccountId}) for API key ${apiKeyData.name}` ) return { accountId: apiKeyData.claudeConsoleAccountId, accountType: 'claude-console' } } else { logger.warn( `⚠️ Bound Claude Console account ${apiKeyData.claudeConsoleAccountId} is not available (isActive: ${boundConsoleAccount?.isActive}, status: ${boundConsoleAccount?.status}, schedulable: ${boundConsoleAccount?.schedulable}), falling back to pool` ) } } // 3. 检查Bedrock账户绑定 if (apiKeyData.bedrockAccountId) { const boundBedrockAccountResult = await bedrockAccountService.getAccount( apiKeyData.bedrockAccountId ) if ( boundBedrockAccountResult.success && boundBedrockAccountResult.data.isActive === true && this._isSchedulable(boundBedrockAccountResult.data.schedulable) ) { logger.info( `🎯 Using bound dedicated Bedrock account: ${boundBedrockAccountResult.data.name} (${apiKeyData.bedrockAccountId}) for API key ${apiKeyData.name}` ) return { accountId: apiKeyData.bedrockAccountId, accountType: 'bedrock' } } else { logger.warn( `⚠️ Bound Bedrock account ${apiKeyData.bedrockAccountId} is not available (isActive: ${boundBedrockAccountResult?.data?.isActive}, schedulable: ${boundBedrockAccountResult?.data?.schedulable}), falling back to pool` ) } } // CCR 账户不支持绑定(仅通过 ccr, 前缀进行 CCR 路由) // 如果有会话哈希,检查是否有已映射的账户 if (sessionHash) { const mappedAccount = await this._getSessionMapping(sessionHash) if (mappedAccount) { // 当本次请求不是 CCR 前缀时,不允许使用指向 CCR 的粘性会话映射 if (vendor !== 'ccr' && mappedAccount.accountType === 'ccr') { logger.info( `ℹ️ Skipping CCR sticky session mapping for non-CCR request; removing mapping for session ${sessionHash}` ) await this._deleteSessionMapping(sessionHash) } else { // 验证映射的账户是否仍然可用 const isAvailable = await this._isAccountAvailable( mappedAccount.accountId, mappedAccount.accountType, effectiveModel ) if (isAvailable) { // 🚀 智能会话续期:剩余时间少于14天时自动续期到15天(续期正确的 unified 映射键) await this._extendSessionMappingTTL(sessionHash) logger.info( `🎯 Using sticky session account: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}` ) return mappedAccount } else { logger.warn( `⚠️ Mapped account ${mappedAccount.accountId} is no longer available, selecting new account` ) await this._deleteSessionMapping(sessionHash) } } } } // 获取所有可用账户(传递请求的模型进行过滤) const availableAccounts = await this._getAllAvailableAccounts( apiKeyData, effectiveModel, false // 仅前缀才走 CCR:默认池不包含 CCR 账户 ) if (availableAccounts.length === 0) { // 提供更详细的错误信息 if (effectiveModel) { throw new Error( `No available Claude accounts support the requested model: ${effectiveModel}` ) } else { throw new Error('No available Claude accounts (neither official nor console)') } } // 按优先级和最后使用时间排序 const sortedAccounts = this._sortAccountsByPriority(availableAccounts) // 选择第一个账户 const selectedAccount = sortedAccounts[0] // 如果有会话哈希,建立新的映射 if (sessionHash) { await this._setSessionMapping( sessionHash, selectedAccount.accountId, selectedAccount.accountType ) logger.info( `🎯 Created new sticky session mapping: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) for session ${sessionHash}` ) } logger.info( `🎯 Selected account: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) with priority ${selectedAccount.priority} for API key ${apiKeyData.name}` ) return { accountId: selectedAccount.accountId, accountType: selectedAccount.accountType } } catch (error) { logger.error('❌ Failed to select account for API key:', error) throw error } } // 📋 获取所有可用账户(合并官方和Console) async _getAllAvailableAccounts(apiKeyData, requestedModel = null, includeCcr = false) { const availableAccounts = [] const isOpusRequest = requestedModel && typeof requestedModel === 'string' ? requestedModel.toLowerCase().includes('opus') : false // 如果API Key绑定了专属账户,优先返回 // 1. 检查Claude OAuth账户绑定 if (apiKeyData.claudeAccountId) { const boundAccount = await redis.getClaudeAccount(apiKeyData.claudeAccountId) if ( boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error' && boundAccount.status !== 'blocked' && boundAccount.status !== 'temp_error' ) { const isRateLimited = await claudeAccountService.isAccountRateLimited(boundAccount.id) if (isRateLimited) { const rateInfo = await claudeAccountService.getAccountRateLimitInfo(boundAccount.id) const error = new Error('Dedicated Claude account is rate limited') error.code = 'CLAUDE_DEDICATED_RATE_LIMITED' error.accountId = boundAccount.id error.rateLimitEndAt = rateInfo?.rateLimitEndAt || boundAccount.rateLimitEndAt || null throw error } if (!this._isSchedulable(boundAccount.schedulable)) { logger.warn( `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not schedulable (schedulable: ${boundAccount?.schedulable})` ) } else { logger.info( `🎯 Using bound dedicated Claude OAuth account: ${boundAccount.name} (${apiKeyData.claudeAccountId})` ) return [ { ...boundAccount, accountId: boundAccount.id, accountType: 'claude-official', priority: parseInt(boundAccount.priority) || 50, lastUsedAt: boundAccount.lastUsedAt || '0' } ] } } else { logger.warn( `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not available (isActive: ${boundAccount?.isActive}, status: ${boundAccount?.status})` ) } } // 2. 检查Claude Console账户绑定 if (apiKeyData.claudeConsoleAccountId) { const boundConsoleAccount = await claudeConsoleAccountService.getAccount( apiKeyData.claudeConsoleAccountId ) if ( boundConsoleAccount && boundConsoleAccount.isActive === true && boundConsoleAccount.status === 'active' && this._isSchedulable(boundConsoleAccount.schedulable) ) { // 主动触发一次额度检查 try { await claudeConsoleAccountService.checkQuotaUsage(boundConsoleAccount.id) } catch (e) { logger.warn( `Failed to check quota for bound Claude Console account ${boundConsoleAccount.name}: ${e.message}` ) // 继续使用该账号 } // 检查限流状态和额度状态 const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited( boundConsoleAccount.id ) const isQuotaExceeded = await claudeConsoleAccountService.isAccountQuotaExceeded( boundConsoleAccount.id ) if (!isRateLimited && !isQuotaExceeded) { logger.info( `🎯 Using bound dedicated Claude Console account: ${boundConsoleAccount.name} (${apiKeyData.claudeConsoleAccountId})` ) return [ { ...boundConsoleAccount, accountId: boundConsoleAccount.id, accountType: 'claude-console', priority: parseInt(boundConsoleAccount.priority) || 50, lastUsedAt: boundConsoleAccount.lastUsedAt || '0' } ] } } else { logger.warn( `⚠️ Bound Claude Console account ${apiKeyData.claudeConsoleAccountId} is not available (isActive: ${boundConsoleAccount?.isActive}, status: ${boundConsoleAccount?.status}, schedulable: ${boundConsoleAccount?.schedulable})` ) } } // 3. 检查Bedrock账户绑定 if (apiKeyData.bedrockAccountId) { const boundBedrockAccountResult = await bedrockAccountService.getAccount( apiKeyData.bedrockAccountId ) if ( boundBedrockAccountResult.success && boundBedrockAccountResult.data.isActive === true && this._isSchedulable(boundBedrockAccountResult.data.schedulable) ) { logger.info( `🎯 Using bound dedicated Bedrock account: ${boundBedrockAccountResult.data.name} (${apiKeyData.bedrockAccountId})` ) return [ { ...boundBedrockAccountResult.data, accountId: boundBedrockAccountResult.data.id, accountType: 'bedrock', priority: parseInt(boundBedrockAccountResult.data.priority) || 50, lastUsedAt: boundBedrockAccountResult.data.lastUsedAt || '0' } ] } else { logger.warn( `⚠️ Bound Bedrock account ${apiKeyData.bedrockAccountId} is not available (isActive: ${boundBedrockAccountResult?.data?.isActive}, schedulable: ${boundBedrockAccountResult?.data?.schedulable})` ) } } // 获取官方Claude账户(共享池) const claudeAccounts = await redis.getAllClaudeAccounts() for (const account of claudeAccounts) { if ( account.isActive === 'true' && account.status !== 'error' && account.status !== 'blocked' && account.status !== 'temp_error' && (account.accountType === 'shared' || !account.accountType) && // 兼容旧数据 this._isSchedulable(account.schedulable) ) { // 检查是否可调度 // 检查模型支持 if (!this._isModelSupportedByAccount(account, 'claude-official', requestedModel)) { continue } // 检查是否被限流 const isRateLimited = await claudeAccountService.isAccountRateLimited(account.id) if (isRateLimited) { continue } if (isOpusRequest) { const isOpusRateLimited = await claudeAccountService.isAccountOpusRateLimited(account.id) if (isOpusRateLimited) { logger.info( `🚫 Skipping account ${account.name} (${account.id}) due to active Opus limit` ) continue } } availableAccounts.push({ ...account, accountId: account.id, accountType: 'claude-official', priority: parseInt(account.priority) || 50, // 默认优先级50 lastUsedAt: account.lastUsedAt || '0' }) } } // 获取Claude Console账户 const consoleAccounts = await claudeConsoleAccountService.getAllAccounts() logger.info(`📋 Found ${consoleAccounts.length} total Claude Console accounts`) for (const account of consoleAccounts) { // 主动检查封禁状态并尝试恢复(在过滤之前执行,确保可以恢复被封禁的账户) const wasBlocked = await claudeConsoleAccountService.isAccountBlocked(account.id) // 如果账户之前被封禁但现在已恢复,重新获取最新状态 let currentAccount = account if (wasBlocked === false && account.status === 'account_blocked') { // 可能刚刚被恢复,重新获取账户状态 const freshAccount = await claudeConsoleAccountService.getAccount(account.id) if (freshAccount) { currentAccount = freshAccount logger.info(`🔄 Account ${account.name} was recovered from blocked status`) } } logger.info( `🔍 Checking Claude Console account: ${currentAccount.name} - isActive: ${currentAccount.isActive}, status: ${currentAccount.status}, accountType: ${currentAccount.accountType}, schedulable: ${currentAccount.schedulable}` ) // 注意:getAllAccounts返回的isActive是布尔值,getAccount返回的也是布尔值 if ( currentAccount.isActive === true && currentAccount.status === 'active' && currentAccount.accountType === 'shared' && this._isSchedulable(currentAccount.schedulable) ) { // 检查是否可调度 // 检查模型支持 if (!this._isModelSupportedByAccount(currentAccount, 'claude-console', requestedModel)) { continue } // 检查订阅是否过期 if (claudeConsoleAccountService.isSubscriptionExpired(currentAccount)) { logger.debug( `⏰ Claude Console account ${currentAccount.name} (${currentAccount.id}) expired at ${currentAccount.subscriptionExpiresAt}` ) continue } // 主动触发一次额度检查,确保状态即时生效 try { await claudeConsoleAccountService.checkQuotaUsage(currentAccount.id) } catch (e) { logger.warn( `Failed to check quota for Claude Console account ${currentAccount.name}: ${e.message}` ) // 继续处理该账号 } // 检查是否被限流 const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited( currentAccount.id ) const isQuotaExceeded = await claudeConsoleAccountService.isAccountQuotaExceeded( currentAccount.id ) if (!isRateLimited && !isQuotaExceeded) { availableAccounts.push({ ...currentAccount, accountId: currentAccount.id, accountType: 'claude-console', priority: parseInt(currentAccount.priority) || 50, lastUsedAt: currentAccount.lastUsedAt || '0' }) logger.info( `✅ Added Claude Console account to available pool: ${currentAccount.name} (priority: ${currentAccount.priority})` ) } else { if (isRateLimited) { logger.warn(`⚠️ Claude Console account ${currentAccount.name} is rate limited`) } if (isQuotaExceeded) { logger.warn(`💰 Claude Console account ${currentAccount.name} quota exceeded`) } } } else { logger.info( `❌ Claude Console account ${currentAccount.name} not eligible - isActive: ${currentAccount.isActive}, status: ${currentAccount.status}, accountType: ${currentAccount.accountType}, schedulable: ${currentAccount.schedulable}` ) } } // 获取Bedrock账户(共享池) const bedrockAccountsResult = await bedrockAccountService.getAllAccounts() if (bedrockAccountsResult.success) { const bedrockAccounts = bedrockAccountsResult.data logger.info(`📋 Found ${bedrockAccounts.length} total Bedrock accounts`) for (const account of bedrockAccounts) { logger.info( `🔍 Checking Bedrock account: ${account.name} - isActive: ${account.isActive}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) if ( account.isActive === true && account.accountType === 'shared' && this._isSchedulable(account.schedulable) ) { // 检查是否可调度 availableAccounts.push({ ...account, accountId: account.id, accountType: 'bedrock', priority: parseInt(account.priority) || 50, lastUsedAt: account.lastUsedAt || '0' }) logger.info( `✅ Added Bedrock account to available pool: ${account.name} (priority: ${account.priority})` ) } else { logger.info( `❌ Bedrock account ${account.name} not eligible - isActive: ${account.isActive}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) } } } // 获取CCR账户(共享池)- 仅当明确要求包含时 if (includeCcr) { const ccrAccounts = await ccrAccountService.getAllAccounts() logger.info(`📋 Found ${ccrAccounts.length} total CCR accounts`) for (const account of ccrAccounts) { logger.info( `🔍 Checking CCR account: ${account.name} - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) if ( account.isActive === true && account.status === 'active' && account.accountType === 'shared' && this._isSchedulable(account.schedulable) ) { // 检查模型支持 if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel)) { continue } // 检查订阅是否过期 if (ccrAccountService.isSubscriptionExpired(account)) { logger.debug( `⏰ CCR account ${account.name} (${account.id}) expired at ${account.subscriptionExpiresAt}` ) continue } // 检查是否被限流 const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id) const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id) if (!isRateLimited && !isQuotaExceeded) { availableAccounts.push({ ...account, accountId: account.id, accountType: 'ccr', priority: parseInt(account.priority) || 50, lastUsedAt: account.lastUsedAt || '0' }) logger.info( `✅ Added CCR account to available pool: ${account.name} (priority: ${account.priority})` ) } else { if (isRateLimited) { logger.warn(`⚠️ CCR account ${account.name} is rate limited`) } if (isQuotaExceeded) { logger.warn(`💰 CCR account ${account.name} quota exceeded`) } } } else { logger.info( `❌ CCR account ${account.name} not eligible - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) } } } logger.info( `📊 Total available accounts: ${availableAccounts.length} (Claude: ${availableAccounts.filter((a) => a.accountType === 'claude-official').length}, Console: ${availableAccounts.filter((a) => a.accountType === 'claude-console').length}, Bedrock: ${availableAccounts.filter((a) => a.accountType === 'bedrock').length}, CCR: ${availableAccounts.filter((a) => a.accountType === 'ccr').length})` ) return availableAccounts } // 🔢 按优先级和最后使用时间排序账户 _sortAccountsByPriority(accounts) { return accounts.sort((a, b) => { // 首先按优先级排序(数字越小优先级越高) if (a.priority !== b.priority) { return a.priority - b.priority } // 优先级相同时,按最后使用时间排序(最久未使用的优先) const aLastUsed = new Date(a.lastUsedAt || 0).getTime() const bLastUsed = new Date(b.lastUsedAt || 0).getTime() return aLastUsed - bLastUsed }) } // 🔍 检查账户是否可用 async _isAccountAvailable(accountId, accountType, requestedModel = null) { try { if (accountType === 'claude-official') { const account = await redis.getClaudeAccount(accountId) if ( !account || account.isActive !== 'true' || account.status === 'error' || account.status === 'temp_error' ) { return false } // 检查是否可调度 if (!this._isSchedulable(account.schedulable)) { logger.info(`🚫 Account ${accountId} is not schedulable`) return false } // 检查模型兼容性 if ( !this._isModelSupportedByAccount( account, 'claude-official', requestedModel, 'in session check' ) ) { return false } // 检查是否限流或过载 const isRateLimited = await claudeAccountService.isAccountRateLimited(accountId) const isOverloaded = await claudeAccountService.isAccountOverloaded(accountId) if (isRateLimited || isOverloaded) { return false } if ( requestedModel && typeof requestedModel === 'string' && requestedModel.toLowerCase().includes('opus') ) { const isOpusRateLimited = await claudeAccountService.isAccountOpusRateLimited(accountId) if (isOpusRateLimited) { logger.info(`🚫 Account ${accountId} skipped due to active Opus limit (session check)`) return false } } return true } else if (accountType === 'claude-console') { const account = await claudeConsoleAccountService.getAccount(accountId) if (!account || !account.isActive) { return false } // 检查账户状态 if ( account.status !== 'active' && account.status !== 'unauthorized' && account.status !== 'overloaded' ) { return false } // 检查是否可调度 if (!this._isSchedulable(account.schedulable)) { logger.info(`🚫 Claude Console account ${accountId} is not schedulable`) return false } // 检查模型支持 if ( !this._isModelSupportedByAccount( account, 'claude-console', requestedModel, 'in session check' ) ) { return false } // 检查订阅是否过期 if (claudeConsoleAccountService.isSubscriptionExpired(account)) { logger.debug( `⏰ Claude Console account ${account.name} (${accountId}) expired at ${account.subscriptionExpiresAt} (session check)` ) return false } // 检查是否超额 try { await claudeConsoleAccountService.checkQuotaUsage(accountId) } catch (e) { logger.warn(`Failed to check quota for Claude Console account ${accountId}: ${e.message}`) // 继续处理 } // 检查是否被限流 if (await claudeConsoleAccountService.isAccountRateLimited(accountId)) { return false } if (await claudeConsoleAccountService.isAccountQuotaExceeded(accountId)) { return false } // 检查是否未授权(401错误) if (account.status === 'unauthorized') { return false } // 检查是否过载(529错误) if (await claudeConsoleAccountService.isAccountOverloaded(accountId)) { return false } return true } else if (accountType === 'bedrock') { const accountResult = await bedrockAccountService.getAccount(accountId) if (!accountResult.success || !accountResult.data.isActive) { return false } // 检查是否可调度 if (!this._isSchedulable(accountResult.data.schedulable)) { logger.info(`🚫 Bedrock account ${accountId} is not schedulable`) return false } // Bedrock账户暂不需要限流检查,因为AWS管理限流 return true } else if (accountType === 'ccr') { const account = await ccrAccountService.getAccount(accountId) if (!account || !account.isActive) { return false } // 检查账户状态 if ( account.status !== 'active' && account.status !== 'unauthorized' && account.status !== 'overloaded' ) { return false } // 检查是否可调度 if (!this._isSchedulable(account.schedulable)) { logger.info(`🚫 CCR account ${accountId} is not schedulable`) return false } // 检查模型支持 if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel, 'in session check')) { return false } // 检查订阅是否过期 if (ccrAccountService.isSubscriptionExpired(account)) { logger.debug( `⏰ CCR account ${account.name} (${accountId}) expired at ${account.subscriptionExpiresAt} (session check)` ) return false } // 检查是否超额 try { await ccrAccountService.checkQuotaUsage(accountId) } catch (e) { logger.warn(`Failed to check quota for CCR account ${accountId}: ${e.message}`) // 继续处理 } // 检查是否被限流 if (await ccrAccountService.isAccountRateLimited(accountId)) { return false } if (await ccrAccountService.isAccountQuotaExceeded(accountId)) { return false } // 检查是否未授权(401错误) if (account.status === 'unauthorized') { return false } // 检查是否过载(529错误) if (await ccrAccountService.isAccountOverloaded(accountId)) { return false } return true } return false } catch (error) { logger.warn(`⚠️ Failed to check account availability: ${accountId}`, error) return false } } // 🔗 获取会话映射 async _getSessionMapping(sessionHash) { const client = redis.getClientSafe() const mappingData = await client.get(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`) if (mappingData) { try { return JSON.parse(mappingData) } catch (error) { logger.warn('⚠️ Failed to parse session mapping:', error) return null } } return null } // 💾 设置会话映射 async _setSessionMapping(sessionHash, accountId, accountType) { const client = redis.getClientSafe() const mappingData = JSON.stringify({ accountId, accountType }) // 依据配置设置TTL(小时) const appConfig = require('../../config/config') const ttlHours = appConfig.session?.stickyTtlHours || 1 const ttlSeconds = Math.max(1, Math.floor(ttlHours * 60 * 60)) await client.setex(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`, ttlSeconds, mappingData) } // 🗑️ 删除会话映射 async _deleteSessionMapping(sessionHash) { const client = redis.getClientSafe() await client.del(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`) } // 🔁 续期统一调度会话映射TTL(针对 unified_claude_session_mapping:* 键),遵循会话配置 async _extendSessionMappingTTL(sessionHash) { try { const client = redis.getClientSafe() const key = `${this.SESSION_MAPPING_PREFIX}${sessionHash}` const remainingTTL = await client.ttl(key) // -2: key 不存在;-1: 无过期时间 if (remainingTTL === -2) { return false } if (remainingTTL === -1) { return true } const appConfig = require('../../config/config') const ttlHours = appConfig.session?.stickyTtlHours || 1 const renewalThresholdMinutes = appConfig.session?.renewalThresholdMinutes || 0 // 阈值为0则不续期 if (!renewalThresholdMinutes) { return true } const fullTTL = Math.max(1, Math.floor(ttlHours * 60 * 60)) const threshold = Math.max(0, Math.floor(renewalThresholdMinutes * 60)) if (remainingTTL < threshold) { await client.expire(key, fullTTL) logger.debug( `🔄 Renewed unified session TTL: ${sessionHash} (was ${Math.round(remainingTTL / 60)}m, renewed to ${ttlHours}h)` ) } else { logger.debug( `✅ Unified session TTL sufficient: ${sessionHash} (remaining ${Math.round(remainingTTL / 60)}m)` ) } return true } catch (error) { logger.error('❌ Failed to extend unified session TTL:', error) return false } } // 🚫 标记账户为限流状态 async markAccountRateLimited( accountId, accountType, sessionHash = null, rateLimitResetTimestamp = null ) { try { if (accountType === 'claude-official') { await claudeAccountService.markAccountRateLimited( accountId, sessionHash, rateLimitResetTimestamp ) } else if (accountType === 'claude-console') { await claudeConsoleAccountService.markAccountRateLimited(accountId) } else if (accountType === 'ccr') { await ccrAccountService.markAccountRateLimited(accountId) } // 删除会话映射 if (sessionHash) { await this._deleteSessionMapping(sessionHash) } return { success: true } } catch (error) { logger.error( `❌ Failed to mark account as rate limited: ${accountId} (${accountType})`, error ) throw error } } // ✅ 移除账户的限流状态 async removeAccountRateLimit(accountId, accountType) { try { if (accountType === 'claude-official') { await claudeAccountService.removeAccountRateLimit(accountId) } else if (accountType === 'claude-console') { await claudeConsoleAccountService.removeAccountRateLimit(accountId) } else if (accountType === 'ccr') { await ccrAccountService.removeAccountRateLimit(accountId) } return { success: true } } catch (error) { logger.error( `❌ Failed to remove rate limit for account: ${accountId} (${accountType})`, error ) throw error } } // 🔍 检查账户是否处于限流状态 async isAccountRateLimited(accountId, accountType) { try { if (accountType === 'claude-official') { return await claudeAccountService.isAccountRateLimited(accountId) } else if (accountType === 'claude-console') { return await claudeConsoleAccountService.isAccountRateLimited(accountId) } else if (accountType === 'ccr') { return await ccrAccountService.isAccountRateLimited(accountId) } return false } catch (error) { logger.error(`❌ Failed to check rate limit status: ${accountId} (${accountType})`, error) return false } } // 🚫 标记账户为未授权状态(401错误) async markAccountUnauthorized(accountId, accountType, sessionHash = null) { try { // 只处理claude-official类型的账户,不处理claude-console和gemini if (accountType === 'claude-official') { await claudeAccountService.markAccountUnauthorized(accountId, sessionHash) // 删除会话映射 if (sessionHash) { await this._deleteSessionMapping(sessionHash) } logger.warn(`🚫 Account ${accountId} marked as unauthorized due to consecutive 401 errors`) } else { logger.info( `ℹ️ Skipping unauthorized marking for non-Claude OAuth account: ${accountId} (${accountType})` ) } return { success: true } } catch (error) { logger.error( `❌ Failed to mark account as unauthorized: ${accountId} (${accountType})`, error ) throw error } } // 🚫 标记账户为被封锁状态(403错误) async markAccountBlocked(accountId, accountType, sessionHash = null) { try { // 只处理claude-official类型的账户,不处理claude-console和gemini if (accountType === 'claude-official') { await claudeAccountService.markAccountBlocked(accountId, sessionHash) // 删除会话映射 if (sessionHash) { await this._deleteSessionMapping(sessionHash) } logger.warn(`🚫 Account ${accountId} marked as blocked due to 403 error`) } else { logger.info( `ℹ️ Skipping blocked marking for non-Claude OAuth account: ${accountId} (${accountType})` ) } return { success: true } } catch (error) { logger.error(`❌ Failed to mark account as blocked: ${accountId} (${accountType})`, error) throw error } } // 🚫 标记Claude Console账户为封锁状态(模型不支持) async blockConsoleAccount(accountId, reason) { try { await claudeConsoleAccountService.blockAccount(accountId, reason) return { success: true } } catch (error) { logger.error(`❌ Failed to block console account: ${accountId}`, error) throw error } } // 👥 从分组中选择账户 async selectAccountFromGroup( groupId, sessionHash = null, requestedModel = null, allowCcr = false ) { try { // 获取分组信息 const group = await accountGroupService.getGroup(groupId) if (!group) { throw new Error(`Group ${groupId} not found`) } logger.info(`👥 Selecting account from group: ${group.name} (${group.platform})`) // 如果有会话哈希,检查是否有已映射的账户 if (sessionHash) { const mappedAccount = await this._getSessionMapping(sessionHash) if (mappedAccount) { // 验证映射的账户是否属于这个分组 const memberIds = await accountGroupService.getGroupMembers(groupId) if (memberIds.includes(mappedAccount.accountId)) { // 非 CCR 请求时不允许 CCR 粘性映射 if (!allowCcr && mappedAccount.accountType === 'ccr') { await this._deleteSessionMapping(sessionHash) } else { const isAvailable = await this._isAccountAvailable( mappedAccount.accountId, mappedAccount.accountType, requestedModel ) if (isAvailable) { // 🚀 智能会话续期:续期 unified 映射键 await this._extendSessionMappingTTL(sessionHash) logger.info( `🎯 Using sticky session account from group: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}` ) return mappedAccount } } } // 如果映射的账户不可用或不在分组中,删除映射 await this._deleteSessionMapping(sessionHash) } } // 获取分组内的所有账户 const memberIds = await accountGroupService.getGroupMembers(groupId) if (memberIds.length === 0) { throw new Error(`Group ${group.name} has no members`) } const availableAccounts = [] const isOpusRequest = requestedModel && typeof requestedModel === 'string' ? requestedModel.toLowerCase().includes('opus') : false // 获取所有成员账户的详细信息 for (const memberId of memberIds) { let account = null let accountType = null // 根据平台类型获取账户 if (group.platform === 'claude') { // 先尝试官方账户 account = await redis.getClaudeAccount(memberId) if (account?.id) { accountType = 'claude-official' } else { // 尝试Console账户 account = await claudeConsoleAccountService.getAccount(memberId) if (account) { accountType = 'claude-console' } else { // 尝试CCR账户(仅允许在 allowCcr 为 true 时) if (allowCcr) { account = await ccrAccountService.getAccount(memberId) if (account) { accountType = 'ccr' } } } } } else if (group.platform === 'gemini') { // Gemini暂时不支持,预留接口 logger.warn('⚠️ Gemini group scheduling not yet implemented') continue } if (!account) { logger.warn(`⚠️ Account ${memberId} not found in group ${group.name}`) continue } // 检查账户是否可用 const isActive = accountType === 'claude-official' ? account.isActive === 'true' : account.isActive === true const status = accountType === 'claude-official' ? account.status !== 'error' && account.status !== 'blocked' : accountType === 'ccr' ? account.status === 'active' : account.status === 'active' if (isActive && status && this._isSchedulable(account.schedulable)) { // 检查模型支持 if (!this._isModelSupportedByAccount(account, accountType, requestedModel, 'in group')) { continue } // 检查是否被限流 const isRateLimited = await this.isAccountRateLimited(account.id, accountType) if (isRateLimited) { continue } if (accountType === 'claude-official' && isOpusRequest) { const isOpusRateLimited = await claudeAccountService.isAccountOpusRateLimited( account.id ) if (isOpusRateLimited) { logger.info( `🚫 Skipping group member ${account.name} (${account.id}) due to active Opus limit` ) continue } } availableAccounts.push({ ...account, accountId: account.id, accountType, priority: parseInt(account.priority) || 50, lastUsedAt: account.lastUsedAt || '0' }) } } if (availableAccounts.length === 0) { throw new Error(`No available accounts in group ${group.name}`) } // 使用现有的优先级排序逻辑 const sortedAccounts = this._sortAccountsByPriority(availableAccounts) // 选择第一个账户 const selectedAccount = sortedAccounts[0] // 如果有会话哈希,建立新的映射 if (sessionHash) { await this._setSessionMapping( sessionHash, selectedAccount.accountId, selectedAccount.accountType ) logger.info( `🎯 Created new sticky session mapping in group: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) for session ${sessionHash}` ) } logger.info( `🎯 Selected account from group ${group.name}: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) with priority ${selectedAccount.priority}` ) return { accountId: selectedAccount.accountId, accountType: selectedAccount.accountType } } catch (error) { logger.error(`❌ Failed to select account from group ${groupId}:`, error) throw error } } // 🎯 专门选择CCR账户(仅限CCR前缀路由使用) async _selectCcrAccount(apiKeyData, sessionHash = null, effectiveModel = null) { try { // 1. 检查会话粘性 if (sessionHash) { const mappedAccount = await this._getSessionMapping(sessionHash) if (mappedAccount && mappedAccount.accountType === 'ccr') { // 验证映射的CCR账户是否仍然可用 const isAvailable = await this._isAccountAvailable( mappedAccount.accountId, mappedAccount.accountType, effectiveModel ) if (isAvailable) { // 🚀 智能会话续期:续期 unified 映射键 await this._extendSessionMappingTTL(sessionHash) logger.info( `🎯 Using sticky CCR session account: ${mappedAccount.accountId} for session ${sessionHash}` ) return mappedAccount } else { logger.warn( `⚠️ Mapped CCR account ${mappedAccount.accountId} is no longer available, selecting new account` ) await this._deleteSessionMapping(sessionHash) } } } // 2. 获取所有可用的CCR账户 const availableCcrAccounts = await this._getAvailableCcrAccounts(effectiveModel) if (availableCcrAccounts.length === 0) { throw new Error( `No available CCR accounts support the requested model: ${effectiveModel || 'unspecified'}` ) } // 3. 按优先级和最后使用时间排序 const sortedAccounts = this._sortAccountsByPriority(availableCcrAccounts) const selectedAccount = sortedAccounts[0] // 4. 建立会话映射 if (sessionHash) { await this._setSessionMapping( sessionHash, selectedAccount.accountId, selectedAccount.accountType ) logger.info( `🎯 Created new sticky CCR session mapping: ${selectedAccount.name} (${selectedAccount.accountId}) for session ${sessionHash}` ) } logger.info( `🎯 Selected CCR account: ${selectedAccount.name} (${selectedAccount.accountId}) with priority ${selectedAccount.priority} for API key ${apiKeyData.name}` ) return { accountId: selectedAccount.accountId, accountType: selectedAccount.accountType } } catch (error) { logger.error('❌ Failed to select CCR account:', error) throw error } } // 📋 获取所有可用的CCR账户 async _getAvailableCcrAccounts(requestedModel = null) { const availableAccounts = [] try { const ccrAccounts = await ccrAccountService.getAllAccounts() logger.info(`📋 Found ${ccrAccounts.length} total CCR accounts for CCR-only selection`) for (const account of ccrAccounts) { logger.debug( `🔍 Checking CCR account: ${account.name} - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) if ( account.isActive === true && account.status === 'active' && account.accountType === 'shared' && this._isSchedulable(account.schedulable) ) { // 检查模型支持 if (!this._isModelSupportedByAccount(account, 'ccr', requestedModel)) { logger.debug(`CCR account ${account.name} does not support model ${requestedModel}`) continue } // 检查订阅是否过期 if (ccrAccountService.isSubscriptionExpired(account)) { logger.debug( `⏰ CCR account ${account.name} (${account.id}) expired at ${account.subscriptionExpiresAt}` ) continue } // 检查是否被限流或超额 const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id) const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id) const isOverloaded = await ccrAccountService.isAccountOverloaded(account.id) if (!isRateLimited && !isQuotaExceeded && !isOverloaded) { availableAccounts.push({ ...account, accountId: account.id, accountType: 'ccr', priority: parseInt(account.priority) || 50, lastUsedAt: account.lastUsedAt || '0' }) logger.debug(`✅ Added CCR account to available pool: ${account.name}`) } else { logger.debug( `❌ CCR account ${account.name} not available - rateLimited: ${isRateLimited}, quotaExceeded: ${isQuotaExceeded}, overloaded: ${isOverloaded}` ) } } else { logger.debug( `❌ CCR account ${account.name} not eligible - isActive: ${account.isActive}, status: ${account.status}, accountType: ${account.accountType}, schedulable: ${account.schedulable}` ) } } logger.info(`📊 Total available CCR accounts: ${availableAccounts.length}`) return availableAccounts } catch (error) { logger.error('❌ Failed to get available CCR accounts:', error) return [] } } } module.exports = new UnifiedClaudeScheduler()