const { v4: uuidv4 } = require('uuid') const crypto = require('crypto') const ProxyHelper = require('../utils/proxyHelper') const axios = require('axios') const redis = require('../models/redis') const config = require('../../config/config') const logger = require('../utils/logger') const { maskToken } = require('../utils/tokenMask') const { logRefreshStart, logRefreshSuccess, logRefreshError, logTokenUsage, logRefreshSkipped } = require('../utils/tokenRefreshLogger') const tokenRefreshService = require('./tokenRefreshService') const LRUCache = require('../utils/lruCache') const { formatDateWithTimezone, getISOStringWithTimezone } = require('../utils/dateHelper') class ClaudeAccountService { constructor() { this.claudeApiUrl = 'https://console.anthropic.com/v1/oauth/token' this.claudeOauthClientId = '9d1c250a-e61b-44d9-88ed-5944d1962f5e' let maxWarnings = parseInt(process.env.CLAUDE_5H_WARNING_MAX_NOTIFICATIONS || '', 10) if (Number.isNaN(maxWarnings) && config.claude?.fiveHourWarning) { maxWarnings = parseInt(config.claude.fiveHourWarning.maxNotificationsPerWindow, 10) } if (Number.isNaN(maxWarnings) || maxWarnings < 1) { maxWarnings = 1 } this.maxFiveHourWarningsPerWindow = Math.min(maxWarnings, 10) // 加密相关常量 this.ENCRYPTION_ALGORITHM = 'aes-256-cbc' this.ENCRYPTION_SALT = 'salt' // 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算 // scryptSync 是 CPU 密集型操作,缓存可以减少 95%+ 的 CPU 占用 this._encryptionKeyCache = null // 🔄 解密结果缓存,提高解密性能 this._decryptCache = new LRUCache(500) // 🧹 定期清理缓存(每10分钟) setInterval( () => { this._decryptCache.cleanup() logger.info('🧹 Claude decrypt cache cleanup completed', this._decryptCache.getStats()) }, 10 * 60 * 1000 ) } // 🏢 创建Claude账户 async createAccount(options = {}) { const { name = 'Unnamed Account', description = '', email = '', password = '', refreshToken = '', claudeAiOauth = null, // Claude标准格式的OAuth数据 proxy = null, // { type: 'socks5', host: 'localhost', port: 1080, username: '', password: '' } isActive = true, accountType = 'shared', // 'dedicated' or 'shared' platform = 'claude', priority = 50, // 调度优先级 (1-100,数字越小优先级越高) schedulable = true, // 是否可被调度 subscriptionInfo = null, // 手动设置的订阅信息 autoStopOnWarning = false, // 5小时使用量接近限制时自动停止调度 useUnifiedUserAgent = false, // 是否使用统一Claude Code版本的User-Agent useUnifiedClientId = false, // 是否使用统一的客户端标识 unifiedClientId = '', // 统一的客户端标识 expiresAt = null // 账户订阅到期时间 } = options const accountId = uuidv4() let accountData if (claudeAiOauth) { // 使用Claude标准格式的OAuth数据 accountData = { id: accountId, name, description, email: this._encryptSensitiveData(email), password: this._encryptSensitiveData(password), claudeAiOauth: this._encryptSensitiveData(JSON.stringify(claudeAiOauth)), accessToken: this._encryptSensitiveData(claudeAiOauth.accessToken), refreshToken: this._encryptSensitiveData(claudeAiOauth.refreshToken), expiresAt: claudeAiOauth.expiresAt.toString(), scopes: claudeAiOauth.scopes.join(' '), proxy: proxy ? JSON.stringify(proxy) : '', isActive: isActive.toString(), accountType, // 账号类型:'dedicated' 或 'shared' 或 'group' platform, priority: priority.toString(), // 调度优先级 createdAt: new Date().toISOString(), lastUsedAt: '', lastRefreshAt: '', status: 'active', // 有OAuth数据的账户直接设为active errorMessage: '', schedulable: schedulable.toString(), // 是否可被调度 autoStopOnWarning: autoStopOnWarning.toString(), // 5小时使用量接近限制时自动停止调度 useUnifiedUserAgent: useUnifiedUserAgent.toString(), // 是否使用统一Claude Code版本的User-Agent useUnifiedClientId: useUnifiedClientId.toString(), // 是否使用统一的客户端标识 unifiedClientId: unifiedClientId || '', // 统一的客户端标识 // 优先使用手动设置的订阅信息,否则使用OAuth数据中的,否则默认为空 subscriptionInfo: subscriptionInfo ? JSON.stringify(subscriptionInfo) : claudeAiOauth.subscriptionInfo ? JSON.stringify(claudeAiOauth.subscriptionInfo) : '', // 账户订阅到期时间 subscriptionExpiresAt: expiresAt || '' } } else { // 兼容旧格式 accountData = { id: accountId, name, description, email: this._encryptSensitiveData(email), password: this._encryptSensitiveData(password), refreshToken: this._encryptSensitiveData(refreshToken), accessToken: '', expiresAt: '', scopes: '', proxy: proxy ? JSON.stringify(proxy) : '', isActive: isActive.toString(), accountType, // 账号类型:'dedicated' 或 'shared' 或 'group' platform, priority: priority.toString(), // 调度优先级 createdAt: new Date().toISOString(), lastUsedAt: '', lastRefreshAt: '', status: 'created', // created, active, expired, error errorMessage: '', schedulable: schedulable.toString(), // 是否可被调度 autoStopOnWarning: autoStopOnWarning.toString(), // 5小时使用量接近限制时自动停止调度 useUnifiedUserAgent: useUnifiedUserAgent.toString(), // 是否使用统一Claude Code版本的User-Agent // 手动设置的订阅信息 subscriptionInfo: subscriptionInfo ? JSON.stringify(subscriptionInfo) : '', // 账户订阅到期时间 subscriptionExpiresAt: expiresAt || '' } } await redis.setClaudeAccount(accountId, accountData) logger.success(`🏢 Created Claude account: ${name} (${accountId})`) // 如果有 OAuth 数据和 accessToken,且包含 user:profile 权限,尝试获取 profile 信息 if (claudeAiOauth && claudeAiOauth.accessToken) { // 检查是否有 user:profile 权限(标准 OAuth 有,Setup Token 没有) const hasProfileScope = claudeAiOauth.scopes && claudeAiOauth.scopes.includes('user:profile') if (hasProfileScope) { try { const agent = this._createProxyAgent(proxy) await this.fetchAndUpdateAccountProfile(accountId, claudeAiOauth.accessToken, agent) logger.info(`📊 Successfully fetched profile info for new account: ${name}`) } catch (profileError) { logger.warn(`⚠️ Failed to fetch profile info for new account: ${profileError.message}`) } } else { logger.info(`⏩ Skipping profile fetch for account ${name} (no user:profile scope)`) } } return { id: accountId, name, description, email, isActive, proxy, accountType, platform, priority, status: accountData.status, createdAt: accountData.createdAt, expiresAt: accountData.expiresAt, subscriptionExpiresAt: accountData.subscriptionExpiresAt && accountData.subscriptionExpiresAt !== '' ? accountData.subscriptionExpiresAt : null, scopes: claudeAiOauth ? claudeAiOauth.scopes : [], autoStopOnWarning, useUnifiedUserAgent, useUnifiedClientId, unifiedClientId } } // 🔄 刷新Claude账户token async refreshAccountToken(accountId) { let lockAcquired = false try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } const refreshToken = this._decryptSensitiveData(accountData.refreshToken) if (!refreshToken) { throw new Error('No refresh token available - manual token update required') } // 尝试获取分布式锁 lockAcquired = await tokenRefreshService.acquireRefreshLock(accountId, 'claude') if (!lockAcquired) { // 如果无法获取锁,说明另一个进程正在刷新 logger.info( `🔒 Token refresh already in progress for account: ${accountData.name} (${accountId})` ) logRefreshSkipped(accountId, accountData.name, 'claude', 'already_locked') // 等待一段时间后返回,期望其他进程已完成刷新 await new Promise((resolve) => setTimeout(resolve, 2000)) // 重新获取账户数据(可能已被其他进程刷新) const updatedData = await redis.getClaudeAccount(accountId) if (updatedData && updatedData.accessToken) { const accessToken = this._decryptSensitiveData(updatedData.accessToken) return { success: true, accessToken, expiresAt: updatedData.expiresAt } } throw new Error('Token refresh in progress by another process') } // 记录开始刷新 logRefreshStart(accountId, accountData.name, 'claude', 'manual_refresh') logger.info(`🔄 Starting token refresh for account: ${accountData.name} (${accountId})`) // 创建代理agent const agent = this._createProxyAgent(accountData.proxy) const axiosConfig = { headers: { 'Content-Type': 'application/json', Accept: 'application/json, text/plain, */*', 'User-Agent': 'claude-cli/1.0.56 (external, cli)', 'Accept-Language': 'en-US,en;q=0.9', Referer: 'https://claude.ai/', Origin: 'https://claude.ai' }, timeout: 30000 } if (agent) { axiosConfig.httpAgent = agent axiosConfig.httpsAgent = agent axiosConfig.proxy = false } const response = await axios.post( this.claudeApiUrl, { grant_type: 'refresh_token', refresh_token: refreshToken, client_id: this.claudeOauthClientId }, axiosConfig ) if (response.status === 200) { // 记录完整的响应数据到专门的认证详细日志 logger.authDetail('Token refresh response', response.data) // 记录简化版本到主日志 logger.info('📊 Token refresh response (analyzing for subscription info):', { status: response.status, hasData: !!response.data, dataKeys: response.data ? Object.keys(response.data) : [] }) const { access_token, refresh_token, expires_in } = response.data // 检查是否有套餐信息 if ( response.data.subscription || response.data.plan || response.data.tier || response.data.account_type ) { const subscriptionInfo = { subscription: response.data.subscription, plan: response.data.plan, tier: response.data.tier, accountType: response.data.account_type, features: response.data.features, limits: response.data.limits } logger.info('🎯 Found subscription info in refresh response:', subscriptionInfo) // 将套餐信息存储在账户数据中 accountData.subscriptionInfo = JSON.stringify(subscriptionInfo) } // 更新账户数据 accountData.accessToken = this._encryptSensitiveData(access_token) accountData.refreshToken = this._encryptSensitiveData(refresh_token) accountData.expiresAt = (Date.now() + expires_in * 1000).toString() accountData.lastRefreshAt = new Date().toISOString() accountData.status = 'active' accountData.errorMessage = '' await redis.setClaudeAccount(accountId, accountData) // 刷新成功后,如果有 user:profile 权限,尝试获取账号 profile 信息 // 检查账户的 scopes 是否包含 user:profile(标准 OAuth 有,Setup Token 没有) const hasProfileScope = accountData.scopes && accountData.scopes.includes('user:profile') if (hasProfileScope) { try { await this.fetchAndUpdateAccountProfile(accountId, access_token, agent) } catch (profileError) { logger.warn(`⚠️ Failed to fetch profile info after refresh: ${profileError.message}`) } } else { logger.debug( `⏩ Skipping profile fetch after refresh for account ${accountId} (no user:profile scope)` ) } // 记录刷新成功 logRefreshSuccess(accountId, accountData.name, 'claude', { accessToken: access_token, refreshToken: refresh_token, expiresAt: accountData.expiresAt, scopes: accountData.scopes }) logger.success( `🔄 Refreshed token for account: ${accountData.name} (${accountId}) - Access Token: ${maskToken(access_token)}` ) return { success: true, accessToken: access_token, expiresAt: accountData.expiresAt } } else { throw new Error(`Token refresh failed with status: ${response.status}`) } } catch (error) { // 记录刷新失败 const accountData = await redis.getClaudeAccount(accountId) if (accountData) { logRefreshError(accountId, accountData.name, 'claude', error) accountData.status = 'error' accountData.errorMessage = error.message await redis.setClaudeAccount(accountId, accountData) // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name, platform: 'claude-oauth', status: 'error', errorCode: 'CLAUDE_OAUTH_ERROR', reason: `Token refresh failed: ${error.message}` }) } catch (webhookError) { logger.error('Failed to send webhook notification:', webhookError) } } logger.error(`❌ Failed to refresh token for account ${accountId}:`, error) throw error } finally { // 释放锁 if (lockAcquired) { await tokenRefreshService.releaseRefreshLock(accountId, 'claude') } } } // 🔍 获取账户信息 async getAccount(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return null } return accountData } catch (error) { logger.error('❌ Failed to get Claude account:', error) return null } } // 🎯 获取有效的访问token async getValidAccessToken(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } if (accountData.isActive !== 'true') { throw new Error('Account is disabled') } // 检查token是否过期 const expiresAt = parseInt(accountData.expiresAt) const now = Date.now() const isExpired = !expiresAt || now >= expiresAt - 60000 // 60秒提前刷新 // 记录token使用情况 logTokenUsage(accountId, accountData.name, 'claude', accountData.expiresAt, isExpired) if (isExpired) { logger.info(`🔄 Token expired/expiring for account ${accountId}, attempting refresh...`) try { const refreshResult = await this.refreshAccountToken(accountId) return refreshResult.accessToken } catch (refreshError) { logger.warn(`⚠️ Token refresh failed for account ${accountId}: ${refreshError.message}`) // 如果刷新失败,仍然尝试使用当前token(可能是手动添加的长期有效token) const currentToken = this._decryptSensitiveData(accountData.accessToken) if (currentToken) { logger.info(`🔄 Using current token for account ${accountId} (refresh failed)`) return currentToken } throw refreshError } } const accessToken = this._decryptSensitiveData(accountData.accessToken) if (!accessToken) { throw new Error('No access token available') } // 更新最后使用时间和会话窗口 accountData.lastUsedAt = new Date().toISOString() await this.updateSessionWindow(accountId, accountData) await redis.setClaudeAccount(accountId, accountData) return accessToken } catch (error) { logger.error(`❌ Failed to get valid access token for account ${accountId}:`, error) throw error } } // 📋 获取所有Claude账户 async getAllAccounts() { try { const accounts = await redis.getAllClaudeAccounts() // 处理返回数据,移除敏感信息并添加限流状态和会话窗口信息 const processedAccounts = await Promise.all( accounts.map(async (account) => { // 获取限流状态信息 const rateLimitInfo = await this.getAccountRateLimitInfo(account.id) // 获取会话窗口信息 const sessionWindowInfo = await this.getSessionWindowInfo(account.id) // 构建 Claude Usage 快照(从 Redis 读取) const claudeUsage = this.buildClaudeUsageSnapshot(account) // 判断授权类型:检查 scopes 是否包含 OAuth 相关权限 const scopes = account.scopes && account.scopes.trim() ? account.scopes.split(' ') : [] const isOAuth = scopes.includes('user:profile') && scopes.includes('user:inference') const authType = isOAuth ? 'oauth' : 'setup-token' return { id: account.id, name: account.name, description: account.description, email: account.email ? this._maskEmail(this._decryptSensitiveData(account.email)) : '', isActive: account.isActive === 'true', proxy: account.proxy ? JSON.parse(account.proxy) : null, status: account.status, errorMessage: account.errorMessage, accountType: account.accountType || 'shared', // 兼容旧数据,默认为共享 priority: parseInt(account.priority) || 50, // 兼容旧数据,默认优先级50 platform: account.platform || 'claude', // 添加平台标识,用于前端区分 authType, // OAuth 或 Setup Token createdAt: account.createdAt, lastUsedAt: account.lastUsedAt, lastRefreshAt: account.lastRefreshAt, expiresAt: account.expiresAt || null, subscriptionExpiresAt: account.subscriptionExpiresAt && account.subscriptionExpiresAt !== '' ? account.subscriptionExpiresAt : null, // 添加 scopes 字段用于判断认证方式 // 处理空字符串的情况,避免返回 [''] scopes: account.scopes && account.scopes.trim() ? account.scopes.split(' ') : [], // 添加 refreshToken 是否存在的标记(不返回实际值) hasRefreshToken: !!account.refreshToken, // 添加套餐信息(如果存在) subscriptionInfo: account.subscriptionInfo ? JSON.parse(account.subscriptionInfo) : null, // 添加限流状态信息 rateLimitStatus: rateLimitInfo ? { isRateLimited: rateLimitInfo.isRateLimited, rateLimitedAt: rateLimitInfo.rateLimitedAt, minutesRemaining: rateLimitInfo.minutesRemaining } : null, // 添加会话窗口信息 sessionWindow: sessionWindowInfo || { hasActiveWindow: false, windowStart: null, windowEnd: null, progress: 0, remainingTime: null, lastRequestTime: null }, // 添加 Claude Usage 信息(三窗口) claudeUsage: claudeUsage || null, // 添加调度状态 schedulable: account.schedulable !== 'false', // 默认为true,兼容历史数据 // 添加自动停止调度设置 autoStopOnWarning: account.autoStopOnWarning === 'true', // 默认为false // 添加5小时自动停止状态 fiveHourAutoStopped: account.fiveHourAutoStopped === 'true', fiveHourStoppedAt: account.fiveHourStoppedAt || null, // 添加统一User-Agent设置 useUnifiedUserAgent: account.useUnifiedUserAgent === 'true', // 默认为false // 添加统一客户端标识设置 useUnifiedClientId: account.useUnifiedClientId === 'true', // 默认为false unifiedClientId: account.unifiedClientId || '', // 统一的客户端标识 // 添加停止原因 stoppedReason: account.stoppedReason || null } }) ) return processedAccounts } catch (error) { logger.error('❌ Failed to get Claude accounts:', error) throw error } } // 📋 获取单个账号的概要信息(用于前端展示会话窗口等状态) async getAccountOverview(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return null } const [sessionWindowInfo, rateLimitInfo] = await Promise.all([ this.getSessionWindowInfo(accountId), this.getAccountRateLimitInfo(accountId) ]) const sessionWindow = sessionWindowInfo || { hasActiveWindow: false, windowStart: null, windowEnd: null, progress: 0, remainingTime: null, lastRequestTime: accountData.lastRequestTime || null, sessionWindowStatus: accountData.sessionWindowStatus || null } const rateLimitStatus = rateLimitInfo ? { isRateLimited: !!rateLimitInfo.isRateLimited, rateLimitedAt: rateLimitInfo.rateLimitedAt || null, minutesRemaining: rateLimitInfo.minutesRemaining || 0, rateLimitEndAt: rateLimitInfo.rateLimitEndAt || null } : { isRateLimited: false, rateLimitedAt: null, minutesRemaining: 0, rateLimitEndAt: null } return { id: accountData.id, accountType: accountData.accountType || 'shared', platform: accountData.platform || 'claude', isActive: accountData.isActive === 'true', schedulable: accountData.schedulable !== 'false', sessionWindow, rateLimitStatus } } catch (error) { logger.error(`❌ Failed to build Claude account overview for ${accountId}:`, error) return null } } // 📝 更新Claude账户 async updateAccount(accountId, updates) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } const allowedUpdates = [ 'name', 'description', 'email', 'password', 'refreshToken', 'proxy', 'isActive', 'claudeAiOauth', 'accountType', 'priority', 'schedulable', 'subscriptionInfo', 'autoStopOnWarning', 'useUnifiedUserAgent', 'useUnifiedClientId', 'unifiedClientId', 'subscriptionExpiresAt' ] const updatedData = { ...accountData } let shouldClearAutoStopFields = false // 检查是否新增了 refresh token const oldRefreshToken = this._decryptSensitiveData(accountData.refreshToken) for (const [field, value] of Object.entries(updates)) { if (allowedUpdates.includes(field)) { if (['email', 'password', 'refreshToken'].includes(field)) { updatedData[field] = this._encryptSensitiveData(value) } else if (field === 'proxy') { updatedData[field] = value ? JSON.stringify(value) : '' } else if (field === 'priority') { updatedData[field] = value.toString() } else if (field === 'subscriptionInfo') { // 处理订阅信息更新 updatedData[field] = typeof value === 'string' ? value : JSON.stringify(value) } else if (field === 'subscriptionExpiresAt') { // 处理订阅到期时间,允许 null 值(永不过期) updatedData[field] = value ? value.toString() : '' } else if (field === 'claudeAiOauth') { // 更新 Claude AI OAuth 数据 if (value) { updatedData.claudeAiOauth = this._encryptSensitiveData(JSON.stringify(value)) updatedData.accessToken = this._encryptSensitiveData(value.accessToken) updatedData.refreshToken = this._encryptSensitiveData(value.refreshToken) updatedData.expiresAt = value.expiresAt.toString() updatedData.scopes = value.scopes.join(' ') updatedData.status = 'active' updatedData.errorMessage = '' updatedData.lastRefreshAt = new Date().toISOString() } } else { updatedData[field] = value !== null && value !== undefined ? value.toString() : '' } } } // 如果新增了 refresh token(之前没有,现在有了),更新过期时间为10分钟 if (updates.refreshToken && !oldRefreshToken && updates.refreshToken.trim()) { const newExpiresAt = Date.now() + 10 * 60 * 1000 // 10分钟 updatedData.expiresAt = newExpiresAt.toString() logger.info( `🔄 New refresh token added for account ${accountId}, setting expiry to 10 minutes` ) } // 如果通过 claudeAiOauth 更新,也要检查是否新增了 refresh token if (updates.claudeAiOauth && updates.claudeAiOauth.refreshToken && !oldRefreshToken) { // 如果 expiresAt 设置的时间过长(超过1小时),调整为10分钟 const providedExpiry = parseInt(updates.claudeAiOauth.expiresAt) const now = Date.now() const oneHour = 60 * 60 * 1000 if (providedExpiry - now > oneHour) { const newExpiresAt = now + 10 * 60 * 1000 // 10分钟 updatedData.expiresAt = newExpiresAt.toString() logger.info( `🔄 Adjusted expiry time to 10 minutes for account ${accountId} with refresh token` ) } } updatedData.updatedAt = new Date().toISOString() // 如果是手动修改调度状态,清除所有自动停止相关的字段 if (Object.prototype.hasOwnProperty.call(updates, 'schedulable')) { // 清除所有自动停止的标记,防止自动恢复 delete updatedData.rateLimitAutoStopped delete updatedData.fiveHourAutoStopped delete updatedData.fiveHourStoppedAt delete updatedData.tempErrorAutoStopped // 兼容旧的标记(逐步迁移) delete updatedData.autoStoppedAt delete updatedData.stoppedReason shouldClearAutoStopFields = true await this._clearFiveHourWarningMetadata(accountId, updatedData) // 如果是手动启用调度,记录日志 if (updates.schedulable === true || updates.schedulable === 'true') { logger.info(`✅ Manually enabled scheduling for account ${accountId}`) } else { logger.info(`⛔ Manually disabled scheduling for account ${accountId}`) } } // 检查是否手动禁用了账号,如果是则发送webhook通知 if (updates.isActive === 'false' && accountData.isActive === 'true') { try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: updatedData.name || 'Unknown Account', platform: 'claude-oauth', status: 'disabled', errorCode: 'CLAUDE_OAUTH_MANUALLY_DISABLED', reason: 'Account manually disabled by administrator' }) } catch (webhookError) { logger.error( 'Failed to send webhook notification for manual account disable:', webhookError ) } } await redis.setClaudeAccount(accountId, updatedData) if (shouldClearAutoStopFields) { const fieldsToRemove = [ 'rateLimitAutoStopped', 'fiveHourAutoStopped', 'fiveHourStoppedAt', 'tempErrorAutoStopped', 'autoStoppedAt', 'stoppedReason' ] await this._removeAccountFields(accountId, fieldsToRemove, 'manual_schedule_update') } logger.success(`📝 Updated Claude account: ${accountId}`) return { success: true } } catch (error) { logger.error('❌ Failed to update Claude account:', error) throw error } } // 🗑️ 删除Claude账户 async deleteAccount(accountId) { try { // 首先从所有分组中移除此账户 const accountGroupService = require('./accountGroupService') await accountGroupService.removeAccountFromAllGroups(accountId) const result = await redis.deleteClaudeAccount(accountId) if (result === 0) { throw new Error('Account not found') } logger.success(`🗑️ Deleted Claude account: ${accountId}`) return { success: true } } catch (error) { logger.error('❌ Failed to delete Claude account:', error) throw error } } /** * 检查账户订阅是否过期 * @param {Object} account - 账户对象 * @returns {boolean} - true: 已过期, false: 未过期 */ isSubscriptionExpired(account) { if (!account.subscriptionExpiresAt) { return false // 未设置过期时间,视为永不过期 } const expiryDate = new Date(account.subscriptionExpiresAt) const now = new Date() if (expiryDate <= now) { logger.debug( `⏰ Account ${account.name} (${account.id}) expired at ${account.subscriptionExpiresAt}` ) return true } return false } // 🎯 智能选择可用账户(支持sticky会话和模型过滤) async selectAvailableAccount(sessionHash = null, modelName = null) { try { const accounts = await redis.getAllClaudeAccounts() let activeAccounts = accounts.filter( (account) => account.isActive === 'true' && account.status !== 'error' && account.schedulable !== 'false' && !this.isSubscriptionExpired(account) ) // 如果请求的是 Opus 模型,过滤掉 Pro 和 Free 账号 if (modelName && modelName.toLowerCase().includes('opus')) { activeAccounts = activeAccounts.filter((account) => { // 检查账号的订阅信息 if (account.subscriptionInfo) { try { const info = JSON.parse(account.subscriptionInfo) // Pro 和 Free 账号不支持 Opus if (info.hasClaudePro === true && info.hasClaudeMax !== true) { return false // Claude Pro 不支持 Opus } if (info.accountType === 'claude_pro' || info.accountType === 'claude_free') { return false // 明确标记为 Pro 或 Free 的账号不支持 } } catch (e) { // 解析失败,假设为旧数据,默认支持(兼容旧数据为 Max) return true } } // 没有订阅信息的账号,默认当作支持(兼容旧数据) return true }) if (activeAccounts.length === 0) { throw new Error('No Claude accounts available that support Opus model') } } if (activeAccounts.length === 0) { throw new Error('No active Claude accounts available') } // 如果有会话哈希,检查是否有已映射的账户 if (sessionHash) { const mappedAccountId = await redis.getSessionAccountMapping(sessionHash) if (mappedAccountId) { // 验证映射的账户是否仍然可用 const mappedAccount = activeAccounts.find((acc) => acc.id === mappedAccountId) if (mappedAccount) { // 🚀 智能会话续期:剩余时间少于14天时自动续期到15天 await redis.extendSessionAccountMappingTTL(sessionHash) logger.info( `🎯 Using sticky session account: ${mappedAccount.name} (${mappedAccountId}) for session ${sessionHash}` ) return mappedAccountId } else { logger.warn( `⚠️ Mapped account ${mappedAccountId} is no longer available, selecting new account` ) // 清理无效的映射 await redis.deleteSessionAccountMapping(sessionHash) } } } // 如果没有映射或映射无效,选择新账户 // 优先选择最久未使用的账户(负载均衡) const sortedAccounts = activeAccounts.sort((a, b) => { const aLastUsed = new Date(a.lastUsedAt || 0).getTime() const bLastUsed = new Date(b.lastUsedAt || 0).getTime() return aLastUsed - bLastUsed // 最久未使用的优先 }) const selectedAccountId = sortedAccounts[0].id // 如果有会话哈希,建立新的映射 if (sessionHash) { // 从配置获取TTL(小时),转换为秒 const ttlSeconds = (config.session?.stickyTtlHours || 1) * 60 * 60 await redis.setSessionAccountMapping(sessionHash, selectedAccountId, ttlSeconds) logger.info( `🎯 Created new sticky session mapping: ${sortedAccounts[0].name} (${selectedAccountId}) for session ${sessionHash}` ) } return selectedAccountId } catch (error) { logger.error('❌ Failed to select available account:', error) throw error } } // 🎯 基于API Key选择账户(支持专属绑定、共享池和模型过滤) async selectAccountForApiKey(apiKeyData, sessionHash = null, modelName = null) { try { // 如果API Key绑定了专属账户,优先使用 if (apiKeyData.claudeAccountId) { const boundAccount = await redis.getClaudeAccount(apiKeyData.claudeAccountId) if ( boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error' && boundAccount.schedulable !== 'false' && !this.isSubscriptionExpired(boundAccount) ) { logger.info( `🎯 Using bound dedicated account: ${boundAccount.name} (${apiKeyData.claudeAccountId}) for API key ${apiKeyData.name}` ) return apiKeyData.claudeAccountId } else { logger.warn( `⚠️ Bound account ${apiKeyData.claudeAccountId} is not available, falling back to shared pool` ) } } // 如果没有绑定账户或绑定账户不可用,从共享池选择 const accounts = await redis.getAllClaudeAccounts() let sharedAccounts = accounts.filter( (account) => account.isActive === 'true' && account.status !== 'error' && account.schedulable !== 'false' && (account.accountType === 'shared' || !account.accountType) && // 兼容旧数据 !this.isSubscriptionExpired(account) ) // 如果请求的是 Opus 模型,过滤掉 Pro 和 Free 账号 if (modelName && modelName.toLowerCase().includes('opus')) { sharedAccounts = sharedAccounts.filter((account) => { // 检查账号的订阅信息 if (account.subscriptionInfo) { try { const info = JSON.parse(account.subscriptionInfo) // Pro 和 Free 账号不支持 Opus if (info.hasClaudePro === true && info.hasClaudeMax !== true) { return false // Claude Pro 不支持 Opus } if (info.accountType === 'claude_pro' || info.accountType === 'claude_free') { return false // 明确标记为 Pro 或 Free 的账号不支持 } } catch (e) { // 解析失败,假设为旧数据,默认支持(兼容旧数据为 Max) return true } } // 没有订阅信息的账号,默认当作支持(兼容旧数据) return true }) if (sharedAccounts.length === 0) { throw new Error('No shared Claude accounts available that support Opus model') } } if (sharedAccounts.length === 0) { throw new Error('No active shared Claude accounts available') } // 如果有会话哈希,检查是否有已映射的账户 if (sessionHash) { const mappedAccountId = await redis.getSessionAccountMapping(sessionHash) if (mappedAccountId) { // 验证映射的账户是否仍然在共享池中且可用 const mappedAccount = sharedAccounts.find((acc) => acc.id === mappedAccountId) if (mappedAccount) { // 如果映射的账户被限流了,删除映射并重新选择 const isRateLimited = await this.isAccountRateLimited(mappedAccountId) if (isRateLimited) { logger.warn( `⚠️ Mapped account ${mappedAccountId} is rate limited, selecting new account` ) await redis.deleteSessionAccountMapping(sessionHash) } else { // 🚀 智能会话续期:剩余时间少于14天时自动续期到15天 await redis.extendSessionAccountMappingTTL(sessionHash) logger.info( `🎯 Using sticky session shared account: ${mappedAccount.name} (${mappedAccountId}) for session ${sessionHash}` ) return mappedAccountId } } else { logger.warn( `⚠️ Mapped shared account ${mappedAccountId} is no longer available, selecting new account` ) // 清理无效的映射 await redis.deleteSessionAccountMapping(sessionHash) } } } // 将账户分为限流和非限流两组 const nonRateLimitedAccounts = [] const rateLimitedAccounts = [] for (const account of sharedAccounts) { const isRateLimited = await this.isAccountRateLimited(account.id) if (isRateLimited) { const rateLimitInfo = await this.getAccountRateLimitInfo(account.id) account._rateLimitInfo = rateLimitInfo // 临时存储限流信息 rateLimitedAccounts.push(account) } else { nonRateLimitedAccounts.push(account) } } // 优先从非限流账户中选择 let candidateAccounts = nonRateLimitedAccounts // 如果没有非限流账户,则从限流账户中选择(按限流时间排序,最早限流的优先) if (candidateAccounts.length === 0) { logger.warn('⚠️ All shared accounts are rate limited, selecting from rate limited pool') candidateAccounts = rateLimitedAccounts.sort((a, b) => { const aRateLimitedAt = new Date(a._rateLimitInfo.rateLimitedAt).getTime() const bRateLimitedAt = new Date(b._rateLimitInfo.rateLimitedAt).getTime() return aRateLimitedAt - bRateLimitedAt // 最早限流的优先 }) } else { // 非限流账户按最后使用时间排序(最久未使用的优先) candidateAccounts = candidateAccounts.sort((a, b) => { const aLastUsed = new Date(a.lastUsedAt || 0).getTime() const bLastUsed = new Date(b.lastUsedAt || 0).getTime() return aLastUsed - bLastUsed // 最久未使用的优先 }) } if (candidateAccounts.length === 0) { throw new Error('No available shared Claude accounts') } const selectedAccountId = candidateAccounts[0].id // 如果有会话哈希,建立新的映射 if (sessionHash) { // 从配置获取TTL(小时),转换为秒 const ttlSeconds = (config.session?.stickyTtlHours || 1) * 60 * 60 await redis.setSessionAccountMapping(sessionHash, selectedAccountId, ttlSeconds) logger.info( `🎯 Created new sticky session mapping for shared account: ${candidateAccounts[0].name} (${selectedAccountId}) for session ${sessionHash}` ) } logger.info( `🎯 Selected shared account: ${candidateAccounts[0].name} (${selectedAccountId}) for API key ${apiKeyData.name}` ) return selectedAccountId } catch (error) { logger.error('❌ Failed to select account for API key:', error) throw error } } // 🌐 创建代理agent(使用统一的代理工具) _createProxyAgent(proxyConfig) { const proxyAgent = ProxyHelper.createProxyAgent(proxyConfig) if (proxyAgent) { logger.info( `🌐 Using proxy for Claude request: ${ProxyHelper.getProxyDescription(proxyConfig)}` ) } else if (proxyConfig) { logger.debug('🌐 Failed to create proxy agent for Claude') } else { logger.debug('🌐 No proxy configured for Claude request') } return proxyAgent } // 🔐 加密敏感数据 _encryptSensitiveData(data) { if (!data) { return '' } try { const key = this._generateEncryptionKey() const iv = crypto.randomBytes(16) const cipher = crypto.createCipheriv(this.ENCRYPTION_ALGORITHM, key, iv) let encrypted = cipher.update(data, 'utf8', 'hex') encrypted += cipher.final('hex') // 将IV和加密数据一起返回,用:分隔 return `${iv.toString('hex')}:${encrypted}` } catch (error) { logger.error('❌ Encryption error:', error) return data } } // 🔓 解密敏感数据 _decryptSensitiveData(encryptedData) { if (!encryptedData) { return '' } // 🎯 检查缓存 const cacheKey = crypto.createHash('sha256').update(encryptedData).digest('hex') const cached = this._decryptCache.get(cacheKey) if (cached !== undefined) { return cached } try { let decrypted = '' // 检查是否是新格式(包含IV) if (encryptedData.includes(':')) { // 新格式:iv:encryptedData const parts = encryptedData.split(':') if (parts.length === 2) { const key = this._generateEncryptionKey() const iv = Buffer.from(parts[0], 'hex') const encrypted = parts[1] const decipher = crypto.createDecipheriv(this.ENCRYPTION_ALGORITHM, key, iv) decrypted = decipher.update(encrypted, 'hex', 'utf8') decrypted += decipher.final('utf8') // 💾 存入缓存(5分钟过期) this._decryptCache.set(cacheKey, decrypted, 5 * 60 * 1000) // 📊 定期打印缓存统计 if ((this._decryptCache.hits + this._decryptCache.misses) % 1000 === 0) { this._decryptCache.printStats() } return decrypted } } // 旧格式或格式错误,尝试旧方式解密(向后兼容) // 注意:在新版本Node.js中这将失败,但我们会捕获错误 try { const decipher = crypto.createDecipher('aes-256-cbc', config.security.encryptionKey) decrypted = decipher.update(encryptedData, 'hex', 'utf8') decrypted += decipher.final('utf8') // 💾 旧格式也存入缓存 this._decryptCache.set(cacheKey, decrypted, 5 * 60 * 1000) return decrypted } catch (oldError) { // 如果旧方式也失败,返回原数据 logger.warn('⚠️ Could not decrypt data, returning as-is:', oldError.message) return encryptedData } } catch (error) { logger.error('❌ Decryption error:', error) return encryptedData } } // 🔑 生成加密密钥(辅助方法) _generateEncryptionKey() { // 性能优化:缓存密钥派生结果,避免重复的 CPU 密集计算 // scryptSync 是故意设计为慢速的密钥派生函数(防暴力破解) // 但在高并发场景下,每次都重新计算会导致 CPU 100% 占用 if (!this._encryptionKeyCache) { // 只在第一次调用时计算,后续使用缓存 // 由于输入参数固定,派生结果永远相同,不影响数据兼容性 this._encryptionKeyCache = crypto.scryptSync( config.security.encryptionKey, this.ENCRYPTION_SALT, 32 ) logger.info('🔑 Encryption key derived and cached for performance optimization') } return this._encryptionKeyCache } // 🎭 掩码邮箱地址 _maskEmail(email) { if (!email || !email.includes('@')) { return email } const [username, domain] = email.split('@') const maskedUsername = username.length > 2 ? `${username.slice(0, 2)}***${username.slice(-1)}` : `${username.slice(0, 1)}***` return `${maskedUsername}@${domain}` } // 🔢 安全转换为数字或null _toNumberOrNull(value) { if (value === undefined || value === null || value === '') { return null } const num = Number(value) return Number.isFinite(num) ? num : null } // 🧹 清理错误账户 async cleanupErrorAccounts() { try { const accounts = await redis.getAllClaudeAccounts() let cleanedCount = 0 for (const account of accounts) { if (account.status === 'error' && account.lastRefreshAt) { const lastRefresh = new Date(account.lastRefreshAt) const now = new Date() const hoursSinceLastRefresh = (now - lastRefresh) / (1000 * 60 * 60) // 如果错误状态超过24小时,尝试重新激活 if (hoursSinceLastRefresh > 24) { account.status = 'created' account.errorMessage = '' await redis.setClaudeAccount(account.id, account) cleanedCount++ } } } if (cleanedCount > 0) { logger.success(`🧹 Reset ${cleanedCount} error accounts`) } return cleanedCount } catch (error) { logger.error('❌ Failed to cleanup error accounts:', error) return 0 } } // 🚫 标记账号为限流状态 async markAccountRateLimited(accountId, sessionHash = null, rateLimitResetTimestamp = null) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 设置限流状态和时间 const updatedAccountData = { ...accountData } updatedAccountData.rateLimitedAt = new Date().toISOString() updatedAccountData.rateLimitStatus = 'limited' // 限流时停止调度,与 OpenAI 账号保持一致 updatedAccountData.schedulable = 'false' // 使用独立的限流自动停止标记,避免与其他自动停止冲突 updatedAccountData.rateLimitAutoStopped = 'true' // 如果提供了准确的限流重置时间戳(来自API响应头) if (rateLimitResetTimestamp) { // 将Unix时间戳(秒)转换为毫秒并创建Date对象 const resetTime = new Date(rateLimitResetTimestamp * 1000) updatedAccountData.rateLimitEndAt = resetTime.toISOString() // 计算当前会话窗口的开始时间(重置时间减去5小时) const windowStartTime = new Date(resetTime.getTime() - 5 * 60 * 60 * 1000) updatedAccountData.sessionWindowStart = windowStartTime.toISOString() updatedAccountData.sessionWindowEnd = resetTime.toISOString() const now = new Date() const minutesUntilEnd = Math.ceil((resetTime - now) / (1000 * 60)) logger.warn( `🚫 Account marked as rate limited with accurate reset time: ${accountData.name} (${accountId}) - ${minutesUntilEnd} minutes remaining until ${resetTime.toISOString()}` ) } else { // 获取或创建会话窗口(预估方式) const windowData = await this.updateSessionWindow(accountId, updatedAccountData) Object.assign(updatedAccountData, windowData) // 限流结束时间 = 会话窗口结束时间 if (updatedAccountData.sessionWindowEnd) { updatedAccountData.rateLimitEndAt = updatedAccountData.sessionWindowEnd const windowEnd = new Date(updatedAccountData.sessionWindowEnd) const now = new Date() const minutesUntilEnd = Math.ceil((windowEnd - now) / (1000 * 60)) logger.warn( `🚫 Account marked as rate limited until estimated session window ends: ${accountData.name} (${accountId}) - ${minutesUntilEnd} minutes remaining` ) } else { // 如果没有会话窗口,使用默认1小时(兼容旧逻辑) const oneHourLater = new Date(Date.now() + 60 * 60 * 1000) updatedAccountData.rateLimitEndAt = oneHourLater.toISOString() logger.warn( `🚫 Account marked as rate limited (1 hour default): ${accountData.name} (${accountId})` ) } } await redis.setClaudeAccount(accountId, updatedAccountData) // 如果有会话哈希,删除粘性会话映射 if (sessionHash) { await redis.deleteSessionAccountMapping(sessionHash) logger.info(`🗑️ Deleted sticky session mapping for rate limited account: ${accountId}`) } // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name || 'Claude Account', platform: 'claude-oauth', status: 'error', errorCode: 'CLAUDE_OAUTH_RATE_LIMITED', reason: `Account rate limited (429 error). ${rateLimitResetTimestamp ? `Reset at: ${formatDateWithTimezone(rateLimitResetTimestamp)}` : 'Estimated reset in 1-5 hours'}`, timestamp: getISOStringWithTimezone(new Date()) }) } catch (webhookError) { logger.error('Failed to send rate limit webhook notification:', webhookError) } return { success: true } } catch (error) { logger.error(`❌ Failed to mark account as rate limited: ${accountId}`, error) throw error } } // 🚫 标记账号的 Opus 限流状态(不影响其他模型调度) async markAccountOpusRateLimited(accountId, rateLimitResetTimestamp = null) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } const updatedAccountData = { ...accountData } const now = new Date() updatedAccountData.opusRateLimitedAt = now.toISOString() if (rateLimitResetTimestamp) { const resetTime = new Date(rateLimitResetTimestamp * 1000) updatedAccountData.opusRateLimitEndAt = resetTime.toISOString() logger.warn( `🚫 Account ${accountData.name} (${accountId}) reached Opus weekly cap, resets at ${resetTime.toISOString()}` ) } else { // 如果缺少准确时间戳,保留现有值但记录警告,便于后续人工干预 logger.warn( `⚠️ Account ${accountData.name} (${accountId}) reported Opus limit without reset timestamp` ) } await redis.setClaudeAccount(accountId, updatedAccountData) return { success: true } } catch (error) { logger.error(`❌ Failed to mark Opus rate limit for account: ${accountId}`, error) throw error } } // ✅ 清除账号的 Opus 限流状态 async clearAccountOpusRateLimit(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return { success: true } } const updatedAccountData = { ...accountData } delete updatedAccountData.opusRateLimitedAt delete updatedAccountData.opusRateLimitEndAt await redis.setClaudeAccount(accountId, updatedAccountData) const redisKey = `claude:account:${accountId}` if (redis.client && typeof redis.client.hdel === 'function') { await redis.client.hdel(redisKey, 'opusRateLimitedAt', 'opusRateLimitEndAt') } logger.info(`✅ Cleared Opus rate limit state for account ${accountId}`) return { success: true } } catch (error) { logger.error(`❌ Failed to clear Opus rate limit for account: ${accountId}`, error) throw error } } // 🔍 检查账号是否处于 Opus 限流状态(自动清理过期标记) async isAccountOpusRateLimited(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return false } if (!accountData.opusRateLimitEndAt) { return false } const resetTime = new Date(accountData.opusRateLimitEndAt) if (Number.isNaN(resetTime.getTime())) { await this.clearAccountOpusRateLimit(accountId) return false } const now = new Date() if (now >= resetTime) { await this.clearAccountOpusRateLimit(accountId) return false } return true } catch (error) { logger.error(`❌ Failed to check Opus rate limit status for account: ${accountId}`, error) return false } } // ♻️ 检查并清理已过期的 Opus 限流标记 async clearExpiredOpusRateLimit(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return { success: true } } if (!accountData.opusRateLimitEndAt) { return { success: true } } const resetTime = new Date(accountData.opusRateLimitEndAt) if (Number.isNaN(resetTime.getTime()) || new Date() >= resetTime) { await this.clearAccountOpusRateLimit(accountId) } return { success: true } } catch (error) { logger.error(`❌ Failed to clear expired Opus rate limit for account: ${accountId}`, error) throw error } } // ✅ 移除账号的限流状态 async removeAccountRateLimit(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } const accountKey = `claude:account:${accountId}` // 清除限流状态 const redisKey = `claude:account:${accountId}` await redis.client.hdel(redisKey, 'rateLimitedAt', 'rateLimitStatus', 'rateLimitEndAt') delete accountData.rateLimitedAt delete accountData.rateLimitStatus delete accountData.rateLimitEndAt // 清除限流结束时间 const hadAutoStop = accountData.rateLimitAutoStopped === 'true' // 只恢复因限流而自动停止的账户 if (hadAutoStop && accountData.schedulable === 'false') { accountData.schedulable = 'true' logger.info(`✅ Auto-resuming scheduling for account ${accountId} after rate limit cleared`) logger.info( `📊 Account ${accountId} state after recovery: schedulable=${accountData.schedulable}` ) } else { logger.info( `ℹ️ Account ${accountId} did not need auto-resume: autoStopped=${accountData.rateLimitAutoStopped}, schedulable=${accountData.schedulable}` ) } if (hadAutoStop) { await redis.client.hdel(redisKey, 'rateLimitAutoStopped') delete accountData.rateLimitAutoStopped } await redis.setClaudeAccount(accountId, accountData) // 显式删除Redis中的限流字段,避免旧标记阻止账号恢复调度 await redis.client.hdel( accountKey, 'rateLimitedAt', 'rateLimitStatus', 'rateLimitEndAt', 'rateLimitAutoStopped' ) logger.success(`✅ Rate limit removed for account: ${accountData.name} (${accountId})`) return { success: true } } catch (error) { logger.error(`❌ Failed to remove rate limit for account: ${accountId}`, error) throw error } } // 🔍 检查账号是否处于限流状态 async isAccountRateLimited(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return false } const now = new Date() // 检查是否有限流状态(包括字段缺失但有自动停止标记的情况) if ( (accountData.rateLimitStatus === 'limited' && accountData.rateLimitedAt) || (accountData.rateLimitAutoStopped === 'true' && accountData.rateLimitEndAt) ) { // 优先使用 rateLimitEndAt(基于会话窗口) if (accountData.rateLimitEndAt) { const rateLimitEndAt = new Date(accountData.rateLimitEndAt) // 如果当前时间超过限流结束时间,自动解除 if (now >= rateLimitEndAt) { await this.removeAccountRateLimit(accountId) return false } return true } else if (accountData.rateLimitedAt) { // 兼容旧数据:使用1小时限流 const rateLimitedAt = new Date(accountData.rateLimitedAt) const hoursSinceRateLimit = (now - rateLimitedAt) / (1000 * 60 * 60) // 如果限流超过1小时,自动解除 if (hoursSinceRateLimit >= 1) { await this.removeAccountRateLimit(accountId) return false } return true } } return false } catch (error) { logger.error(`❌ Failed to check rate limit status for account: ${accountId}`, error) return false } } // 📊 获取账号的限流信息 async getAccountRateLimitInfo(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return null } if (accountData.rateLimitStatus === 'limited' && accountData.rateLimitedAt) { const rateLimitedAt = new Date(accountData.rateLimitedAt) const now = new Date() const minutesSinceRateLimit = Math.floor((now - rateLimitedAt) / (1000 * 60)) let minutesRemaining let rateLimitEndAt // 优先使用 rateLimitEndAt(基于会话窗口) if (accountData.rateLimitEndAt) { ;({ rateLimitEndAt } = accountData) const endTime = new Date(accountData.rateLimitEndAt) minutesRemaining = Math.max(0, Math.ceil((endTime - now) / (1000 * 60))) } else { // 兼容旧数据:使用1小时限流 minutesRemaining = Math.max(0, 60 - minutesSinceRateLimit) // 计算预期的结束时间 const endTime = new Date(rateLimitedAt.getTime() + 60 * 60 * 1000) rateLimitEndAt = endTime.toISOString() } return { isRateLimited: minutesRemaining > 0, rateLimitedAt: accountData.rateLimitedAt, minutesSinceRateLimit, minutesRemaining, rateLimitEndAt // 新增:限流结束时间 } } return { isRateLimited: false, rateLimitedAt: null, minutesSinceRateLimit: 0, minutesRemaining: 0, rateLimitEndAt: null } } catch (error) { logger.error(`❌ Failed to get rate limit info for account: ${accountId}`, error) return null } } // 🕐 更新会话窗口 async updateSessionWindow(accountId, accountData = null) { try { // 如果没有传入accountData,从Redis获取 if (!accountData) { accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } } const now = new Date() const currentTime = now.getTime() let shouldClearSessionStatus = false let shouldClearFiveHourFlags = false // 检查当前是否有活跃的会话窗口 if (accountData.sessionWindowStart && accountData.sessionWindowEnd) { const windowEnd = new Date(accountData.sessionWindowEnd).getTime() // 如果当前时间在窗口内,只更新最后请求时间 if (currentTime < windowEnd) { accountData.lastRequestTime = now.toISOString() return accountData } // 窗口已过期,记录日志 const windowStart = new Date(accountData.sessionWindowStart) logger.info( `⏰ Session window expired for account ${accountData.name} (${accountId}): ${windowStart.toISOString()} - ${new Date(windowEnd).toISOString()}` ) } // 基于当前时间计算新的会话窗口 const windowStart = this._calculateSessionWindowStart(now) const windowEnd = this._calculateSessionWindowEnd(windowStart) // 更新会话窗口信息 accountData.sessionWindowStart = windowStart.toISOString() accountData.sessionWindowEnd = windowEnd.toISOString() accountData.lastRequestTime = now.toISOString() // 清除会话窗口状态,因为进入了新窗口 if (accountData.sessionWindowStatus) { delete accountData.sessionWindowStatus delete accountData.sessionWindowStatusUpdatedAt await this._clearFiveHourWarningMetadata(accountId, accountData) shouldClearSessionStatus = true } // 如果账户因为5小时限制被自动停止,现在恢复调度 if (accountData.fiveHourAutoStopped === 'true' && accountData.schedulable === 'false') { logger.info( `✅ Auto-resuming scheduling for account ${accountData.name} (${accountId}) - new session window started` ) accountData.schedulable = 'true' delete accountData.fiveHourAutoStopped delete accountData.fiveHourStoppedAt await this._clearFiveHourWarningMetadata(accountId, accountData) shouldClearFiveHourFlags = true // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name || 'Claude Account', platform: 'claude', status: 'resumed', errorCode: 'CLAUDE_5H_LIMIT_RESUMED', reason: '进入新的5小时窗口,已自动恢复调度', timestamp: getISOStringWithTimezone(new Date()) }) } catch (webhookError) { logger.error('Failed to send webhook notification:', webhookError) } } if (shouldClearSessionStatus || shouldClearFiveHourFlags) { const fieldsToRemove = [] if (shouldClearFiveHourFlags) { fieldsToRemove.push('fiveHourAutoStopped', 'fiveHourStoppedAt') } if (shouldClearSessionStatus) { fieldsToRemove.push('sessionWindowStatus', 'sessionWindowStatusUpdatedAt') } await this._removeAccountFields(accountId, fieldsToRemove, 'session_window_refresh') } logger.info( `🕐 Created new session window for account ${accountData.name} (${accountId}): ${windowStart.toISOString()} - ${windowEnd.toISOString()} (from current time)` ) return accountData } catch (error) { logger.error(`❌ Failed to update session window for account ${accountId}:`, error) throw error } } // 🕐 计算会话窗口开始时间 _calculateSessionWindowStart(requestTime) { // 从当前时间开始创建窗口,只将分钟取整到整点 const windowStart = new Date(requestTime) windowStart.setMinutes(0) windowStart.setSeconds(0) windowStart.setMilliseconds(0) return windowStart } // 🕐 计算会话窗口结束时间 _calculateSessionWindowEnd(startTime) { const endTime = new Date(startTime) endTime.setHours(endTime.getHours() + 5) // 加5小时 return endTime } async _clearFiveHourWarningMetadata(accountId, accountData = null) { if (accountData) { delete accountData.fiveHourWarningWindow delete accountData.fiveHourWarningCount delete accountData.fiveHourWarningLastSentAt } try { if (redis.client && typeof redis.client.hdel === 'function') { await redis.client.hdel( `claude:account:${accountId}`, 'fiveHourWarningWindow', 'fiveHourWarningCount', 'fiveHourWarningLastSentAt' ) } } catch (error) { logger.warn( `⚠️ Failed to clear five-hour warning metadata for account ${accountId}: ${error.message}` ) } } // 📊 获取会话窗口信息 async getSessionWindowInfo(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { return null } // 如果没有会话窗口信息,返回null if (!accountData.sessionWindowStart || !accountData.sessionWindowEnd) { return { hasActiveWindow: false, windowStart: null, windowEnd: null, progress: 0, remainingTime: null, lastRequestTime: accountData.lastRequestTime || null, sessionWindowStatus: accountData.sessionWindowStatus || null } } const now = new Date() const windowStart = new Date(accountData.sessionWindowStart) const windowEnd = new Date(accountData.sessionWindowEnd) const currentTime = now.getTime() // 检查窗口是否已过期 if (currentTime >= windowEnd.getTime()) { return { hasActiveWindow: false, windowStart: accountData.sessionWindowStart, windowEnd: accountData.sessionWindowEnd, progress: 100, remainingTime: 0, lastRequestTime: accountData.lastRequestTime || null, sessionWindowStatus: accountData.sessionWindowStatus || null } } // 计算进度百分比 const totalDuration = windowEnd.getTime() - windowStart.getTime() const elapsedTime = currentTime - windowStart.getTime() const progress = Math.round((elapsedTime / totalDuration) * 100) // 计算剩余时间(分钟) const remainingTime = Math.round((windowEnd.getTime() - currentTime) / (1000 * 60)) return { hasActiveWindow: true, windowStart: accountData.sessionWindowStart, windowEnd: accountData.sessionWindowEnd, progress, remainingTime, lastRequestTime: accountData.lastRequestTime || null, sessionWindowStatus: accountData.sessionWindowStatus || null } } catch (error) { logger.error(`❌ Failed to get session window info for account ${accountId}:`, error) return null } } // 📊 获取 OAuth Usage 数据 async fetchOAuthUsage(accountId, accessToken = null, agent = null) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 如果没有提供 accessToken,使用 getValidAccessToken 自动检查过期并刷新 if (!accessToken) { accessToken = await this.getValidAccessToken(accountId) } // 如果没有提供 agent,创建代理 if (!agent) { agent = this._createProxyAgent(accountData.proxy) } logger.debug(`📊 Fetching OAuth usage for account: ${accountData.name} (${accountId})`) // 请求 OAuth usage 接口 const axiosConfig = { headers: { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json', Accept: 'application/json', 'anthropic-beta': 'oauth-2025-04-20', 'User-Agent': 'claude-cli/1.0.56 (external, cli)', 'Accept-Language': 'en-US,en;q=0.9' }, timeout: 15000 } if (agent) { axiosConfig.httpAgent = agent axiosConfig.httpsAgent = agent axiosConfig.proxy = false } const response = await axios.get('https://api.anthropic.com/api/oauth/usage', axiosConfig) if (response.status === 200 && response.data) { logger.debug('✅ Successfully fetched OAuth usage data:', { accountId, fiveHour: response.data.five_hour?.utilization, sevenDay: response.data.seven_day?.utilization, sevenDayOpus: response.data.seven_day_opus?.utilization }) return response.data } logger.warn(`⚠️ Failed to fetch OAuth usage for account ${accountId}: ${response.status}`) return null } catch (error) { // 403 错误通常表示使用的是 Setup Token 而非 OAuth if (error.response?.status === 403) { logger.debug( `⚠️ OAuth usage API returned 403 for account ${accountId}. This account likely uses Setup Token instead of OAuth.` ) return null } // 其他错误正常记录 logger.error( `❌ Failed to fetch OAuth usage for account ${accountId}:`, error.response?.data || error.message ) return null } } // 📊 构建 Claude Usage 快照(从 Redis 数据) buildClaudeUsageSnapshot(accountData) { const updatedAt = accountData.claudeUsageUpdatedAt const fiveHourUtilization = this._toNumberOrNull(accountData.claudeFiveHourUtilization) const fiveHourResetsAt = accountData.claudeFiveHourResetsAt const sevenDayUtilization = this._toNumberOrNull(accountData.claudeSevenDayUtilization) const sevenDayResetsAt = accountData.claudeSevenDayResetsAt const sevenDayOpusUtilization = this._toNumberOrNull(accountData.claudeSevenDayOpusUtilization) const sevenDayOpusResetsAt = accountData.claudeSevenDayOpusResetsAt const hasFiveHourData = fiveHourUtilization !== null || fiveHourResetsAt const hasSevenDayData = sevenDayUtilization !== null || sevenDayResetsAt const hasSevenDayOpusData = sevenDayOpusUtilization !== null || sevenDayOpusResetsAt if (!updatedAt && !hasFiveHourData && !hasSevenDayData && !hasSevenDayOpusData) { return null } const now = Date.now() return { updatedAt, fiveHour: { utilization: fiveHourUtilization, resetsAt: fiveHourResetsAt, remainingSeconds: fiveHourResetsAt ? Math.max(0, Math.floor((new Date(fiveHourResetsAt).getTime() - now) / 1000)) : null }, sevenDay: { utilization: sevenDayUtilization, resetsAt: sevenDayResetsAt, remainingSeconds: sevenDayResetsAt ? Math.max(0, Math.floor((new Date(sevenDayResetsAt).getTime() - now) / 1000)) : null }, sevenDayOpus: { utilization: sevenDayOpusUtilization, resetsAt: sevenDayOpusResetsAt, remainingSeconds: sevenDayOpusResetsAt ? Math.max(0, Math.floor((new Date(sevenDayOpusResetsAt).getTime() - now) / 1000)) : null } } } // 📊 更新 Claude Usage 快照到 Redis async updateClaudeUsageSnapshot(accountId, usageData) { if (!usageData || typeof usageData !== 'object') { return } const updates = {} // 5小时窗口 if (usageData.five_hour) { if (usageData.five_hour.utilization !== undefined) { updates.claudeFiveHourUtilization = String(usageData.five_hour.utilization) } if (usageData.five_hour.resets_at) { updates.claudeFiveHourResetsAt = usageData.five_hour.resets_at } } // 7天窗口 if (usageData.seven_day) { if (usageData.seven_day.utilization !== undefined) { updates.claudeSevenDayUtilization = String(usageData.seven_day.utilization) } if (usageData.seven_day.resets_at) { updates.claudeSevenDayResetsAt = usageData.seven_day.resets_at } } // 7天Opus窗口 if (usageData.seven_day_opus) { if (usageData.seven_day_opus.utilization !== undefined) { updates.claudeSevenDayOpusUtilization = String(usageData.seven_day_opus.utilization) } if (usageData.seven_day_opus.resets_at) { updates.claudeSevenDayOpusResetsAt = usageData.seven_day_opus.resets_at } } if (Object.keys(updates).length === 0) { return } updates.claudeUsageUpdatedAt = new Date().toISOString() const accountData = await redis.getClaudeAccount(accountId) if (accountData && Object.keys(accountData).length > 0) { Object.assign(accountData, updates) await redis.setClaudeAccount(accountId, accountData) logger.debug( `📊 Updated Claude usage snapshot for account ${accountId}:`, Object.keys(updates) ) } } // 📊 获取账号 Profile 信息并更新账号类型 async fetchAndUpdateAccountProfile(accountId, accessToken = null, agent = null) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 检查账户是否有 user:profile 权限 const hasProfileScope = accountData.scopes && accountData.scopes.includes('user:profile') if (!hasProfileScope) { logger.warn( `⚠️ Account ${accountId} does not have user:profile scope, cannot fetch profile` ) throw new Error('Account does not have user:profile permission') } // 如果没有提供 accessToken,使用账号存储的 token if (!accessToken) { accessToken = this._decryptSensitiveData(accountData.accessToken) if (!accessToken) { throw new Error('No access token available') } } // 如果没有提供 agent,创建代理 if (!agent) { agent = this._createProxyAgent(accountData.proxy) } logger.info(`📊 Fetching profile info for account: ${accountData.name} (${accountId})`) // 请求 profile 接口 const axiosConfig = { headers: { Authorization: `Bearer ${accessToken}`, 'Content-Type': 'application/json', Accept: 'application/json', 'User-Agent': 'claude-cli/1.0.56 (external, cli)', 'Accept-Language': 'en-US,en;q=0.9' }, timeout: 15000 } if (agent) { axiosConfig.httpAgent = agent axiosConfig.httpsAgent = agent axiosConfig.proxy = false } const response = await axios.get('https://api.anthropic.com/api/oauth/profile', axiosConfig) if (response.status === 200 && response.data) { const profileData = response.data logger.info('✅ Successfully fetched profile data:', { email: profileData.account?.email, hasClaudeMax: profileData.account?.has_claude_max, hasClaudePro: profileData.account?.has_claude_pro, organizationType: profileData.organization?.organization_type }) // 构建订阅信息 const subscriptionInfo = { // 账号信息 email: profileData.account?.email, fullName: profileData.account?.full_name, displayName: profileData.account?.display_name, hasClaudeMax: profileData.account?.has_claude_max || false, hasClaudePro: profileData.account?.has_claude_pro || false, accountUuid: profileData.account?.uuid, // 组织信息 organizationName: profileData.organization?.name, organizationUuid: profileData.organization?.uuid, billingType: profileData.organization?.billing_type, rateLimitTier: profileData.organization?.rate_limit_tier, organizationType: profileData.organization?.organization_type, // 账号类型(基于 has_claude_max 和 has_claude_pro 判断) accountType: profileData.account?.has_claude_max === true ? 'claude_max' : profileData.account?.has_claude_pro === true ? 'claude_pro' : 'free', // 更新时间 profileFetchedAt: new Date().toISOString() } // 更新账户数据 accountData.subscriptionInfo = JSON.stringify(subscriptionInfo) accountData.profileUpdatedAt = new Date().toISOString() // 如果提供了邮箱,更新邮箱字段 if (profileData.account?.email) { accountData.email = this._encryptSensitiveData(profileData.account.email) } await redis.setClaudeAccount(accountId, accountData) logger.success( `✅ Updated account profile for ${accountData.name} (${accountId}) - Type: ${subscriptionInfo.accountType}` ) return subscriptionInfo } else { throw new Error(`Failed to fetch profile with status: ${response.status}`) } } catch (error) { if (error.response?.status === 401) { logger.warn(`⚠️ Profile API returned 401 for account ${accountId} - token may be invalid`) } else if (error.response?.status === 403) { logger.warn( `⚠️ Profile API returned 403 for account ${accountId} - insufficient permissions` ) } else { logger.error(`❌ Failed to fetch profile for account ${accountId}:`, error.message) } throw error } } // 🔄 手动更新所有账号的 Profile 信息 async updateAllAccountProfiles() { try { logger.info('🔄 Starting batch profile update for all accounts...') const accounts = await redis.getAllClaudeAccounts() let successCount = 0 let failureCount = 0 const results = [] for (const account of accounts) { // 跳过未激活或错误状态的账号 if (account.isActive !== 'true' || account.status === 'error') { logger.info(`⏩ Skipping inactive/error account: ${account.name} (${account.id})`) continue } // 跳过没有 user:profile 权限的账号(Setup Token 账号) const hasProfileScope = account.scopes && account.scopes.includes('user:profile') if (!hasProfileScope) { logger.info( `⏩ Skipping account without user:profile scope: ${account.name} (${account.id})` ) results.push({ accountId: account.id, accountName: account.name, success: false, error: 'No user:profile permission (Setup Token account)' }) continue } try { // 获取有效的 access token const accessToken = await this.getValidAccessToken(account.id) if (accessToken) { const profileInfo = await this.fetchAndUpdateAccountProfile(account.id, accessToken) successCount++ results.push({ accountId: account.id, accountName: account.name, success: true, accountType: profileInfo.accountType }) } } catch (error) { failureCount++ results.push({ accountId: account.id, accountName: account.name, success: false, error: error.message }) logger.warn( `⚠️ Failed to update profile for account ${account.name} (${account.id}): ${error.message}` ) } // 添加延迟以避免触发限流 await new Promise((resolve) => setTimeout(resolve, 1000)) } logger.success(`✅ Profile update completed: ${successCount} success, ${failureCount} failed`) return { totalAccounts: accounts.length, successCount, failureCount, results } } catch (error) { logger.error('❌ Failed to update account profiles:', error) throw error } } // 🔄 初始化所有账户的会话窗口(从历史数据恢复) async initializeSessionWindows(forceRecalculate = false) { try { logger.info('🔄 Initializing session windows for all Claude accounts...') const accounts = await redis.getAllClaudeAccounts() let validWindowCount = 0 let expiredWindowCount = 0 let noWindowCount = 0 const now = new Date() for (const account of accounts) { // 如果强制重算,清除现有窗口信息 if (forceRecalculate && (account.sessionWindowStart || account.sessionWindowEnd)) { logger.info(`🔄 Force recalculating window for account ${account.name} (${account.id})`) delete account.sessionWindowStart delete account.sessionWindowEnd delete account.lastRequestTime await redis.setClaudeAccount(account.id, account) } // 检查现有会话窗口 if (account.sessionWindowStart && account.sessionWindowEnd) { const windowEnd = new Date(account.sessionWindowEnd) const windowStart = new Date(account.sessionWindowStart) const timeUntilExpires = Math.round((windowEnd.getTime() - now.getTime()) / (1000 * 60)) if (now.getTime() < windowEnd.getTime()) { // 窗口仍然有效,保留它 validWindowCount++ logger.info( `✅ Account ${account.name} (${account.id}) has valid window: ${windowStart.toISOString()} - ${windowEnd.toISOString()} (${timeUntilExpires} minutes remaining)` ) } else { // 窗口已过期,清除它 expiredWindowCount++ logger.warn( `⏰ Account ${account.name} (${account.id}) window expired: ${windowStart.toISOString()} - ${windowEnd.toISOString()}` ) // 清除过期的窗口信息 delete account.sessionWindowStart delete account.sessionWindowEnd delete account.lastRequestTime await redis.setClaudeAccount(account.id, account) } } else { noWindowCount++ logger.info( `📭 Account ${account.name} (${account.id}) has no session window - will create on next request` ) } } logger.success('✅ Session window initialization completed:') logger.success(` 📊 Total accounts: ${accounts.length}`) logger.success(` ✅ Valid windows: ${validWindowCount}`) logger.success(` ⏰ Expired windows: ${expiredWindowCount}`) logger.success(` 📭 No windows: ${noWindowCount}`) return { total: accounts.length, validWindows: validWindowCount, expiredWindows: expiredWindowCount, noWindows: noWindowCount } } catch (error) { logger.error('❌ Failed to initialize session windows:', error) return { total: 0, validWindows: 0, expiredWindows: 0, noWindows: 0, error: error.message } } } // 🚫 通用的账户错误标记方法 async markAccountError(accountId, errorType, sessionHash = null) { const ERROR_CONFIG = { unauthorized: { status: 'unauthorized', errorMessage: 'Account unauthorized (401 errors detected)', timestampField: 'unauthorizedAt', errorCode: 'CLAUDE_OAUTH_UNAUTHORIZED', logMessage: 'unauthorized' }, blocked: { status: 'blocked', errorMessage: 'Account blocked (403 error detected - account may be suspended by Claude)', timestampField: 'blockedAt', errorCode: 'CLAUDE_OAUTH_BLOCKED', logMessage: 'blocked' } } try { const errorConfig = ERROR_CONFIG[errorType] if (!errorConfig) { throw new Error(`Unsupported error type: ${errorType}`) } const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 更新账户状态 const updatedAccountData = { ...accountData } updatedAccountData.status = errorConfig.status updatedAccountData.schedulable = 'false' // 设置为不可调度 updatedAccountData.errorMessage = errorConfig.errorMessage updatedAccountData[errorConfig.timestampField] = new Date().toISOString() // 保存更新后的账户数据 await redis.setClaudeAccount(accountId, updatedAccountData) // 如果有sessionHash,删除粘性会话映射 if (sessionHash) { await redis.client.del(`sticky_session:${sessionHash}`) logger.info(`🗑️ Deleted sticky session mapping for hash: ${sessionHash}`) } logger.warn( `⚠️ Account ${accountData.name} (${accountId}) marked as ${errorConfig.logMessage} and disabled for scheduling` ) // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name, platform: 'claude-oauth', status: errorConfig.status, errorCode: errorConfig.errorCode, reason: errorConfig.errorMessage, timestamp: getISOStringWithTimezone(new Date()) }) } catch (webhookError) { logger.error('Failed to send webhook notification:', webhookError) } return { success: true } } catch (error) { logger.error(`❌ Failed to mark account ${accountId} as ${errorType}:`, error) throw error } } // 🚫 标记账户为未授权状态(401错误) async markAccountUnauthorized(accountId, sessionHash = null) { return this.markAccountError(accountId, 'unauthorized', sessionHash) } // 🚫 标记账户为被封锁状态(403错误) async markAccountBlocked(accountId, sessionHash = null) { return this.markAccountError(accountId, 'blocked', sessionHash) } // 🔄 重置账户所有异常状态 async resetAccountStatus(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 重置账户状态 const updatedAccountData = { ...accountData } // 根据是否有有效的accessToken来设置status if (updatedAccountData.accessToken) { updatedAccountData.status = 'active' } else { updatedAccountData.status = 'created' } // 恢复可调度状态(管理员手动重置时恢复调度是合理的) updatedAccountData.schedulable = 'true' // 清除所有自动停止相关的标记 delete updatedAccountData.rateLimitAutoStopped delete updatedAccountData.fiveHourAutoStopped delete updatedAccountData.fiveHourStoppedAt delete updatedAccountData.tempErrorAutoStopped delete updatedAccountData.fiveHourWarningWindow delete updatedAccountData.fiveHourWarningCount delete updatedAccountData.fiveHourWarningLastSentAt // 兼容旧的标记 delete updatedAccountData.autoStoppedAt delete updatedAccountData.stoppedReason // 清除错误相关字段 delete updatedAccountData.errorMessage delete updatedAccountData.unauthorizedAt delete updatedAccountData.blockedAt delete updatedAccountData.rateLimitedAt delete updatedAccountData.rateLimitStatus delete updatedAccountData.rateLimitEndAt delete updatedAccountData.tempErrorAt delete updatedAccountData.sessionWindowStart delete updatedAccountData.sessionWindowEnd // 保存更新后的账户数据 await redis.setClaudeAccount(accountId, updatedAccountData) // 显式从 Redis 中删除这些字段(因为 HSET 不会删除现有字段) const fieldsToDelete = [ 'errorMessage', 'unauthorizedAt', 'blockedAt', 'rateLimitedAt', 'rateLimitStatus', 'rateLimitEndAt', 'tempErrorAt', 'sessionWindowStart', 'sessionWindowEnd', // 新的独立标记 'rateLimitAutoStopped', 'fiveHourAutoStopped', 'fiveHourStoppedAt', 'fiveHourWarningWindow', 'fiveHourWarningCount', 'fiveHourWarningLastSentAt', 'tempErrorAutoStopped', // 兼容旧的标记 'autoStoppedAt', 'stoppedReason' ] await redis.client.hdel(`claude:account:${accountId}`, ...fieldsToDelete) // 清除401错误计数 const errorKey = `claude_account:${accountId}:401_errors` await redis.client.del(errorKey) // 清除限流状态(如果存在) const rateLimitKey = `ratelimit:${accountId}` await redis.client.del(rateLimitKey) // 清除5xx错误计数 const serverErrorKey = `claude_account:${accountId}:5xx_errors` await redis.client.del(serverErrorKey) logger.info( `✅ Successfully reset all error states for account ${accountData.name} (${accountId})` ) return { success: true, account: { id: accountId, name: accountData.name, status: updatedAccountData.status, schedulable: updatedAccountData.schedulable === 'true' } } } catch (error) { logger.error(`❌ Failed to reset account status for ${accountId}:`, error) throw error } } // 🧹 清理临时错误账户 async cleanupTempErrorAccounts() { try { const accounts = await redis.getAllClaudeAccounts() let cleanedCount = 0 const TEMP_ERROR_RECOVERY_MINUTES = 5 // 临时错误状态恢复时间(分钟) for (const account of accounts) { if (account.status === 'temp_error' && account.tempErrorAt) { const tempErrorAt = new Date(account.tempErrorAt) const now = new Date() const minutesSinceTempError = (now - tempErrorAt) / (1000 * 60) // 如果临时错误状态超过指定时间,尝试重新激活 if (minutesSinceTempError > TEMP_ERROR_RECOVERY_MINUTES) { account.status = 'active' // 恢复为 active 状态 // 只恢复因临时错误而自动停止的账户 if (account.tempErrorAutoStopped === 'true') { account.schedulable = 'true' // 恢复为可调度 delete account.tempErrorAutoStopped } delete account.errorMessage delete account.tempErrorAt await redis.setClaudeAccount(account.id, account) // 显式从 Redis 中删除这些字段(因为 HSET 不会删除现有字段) await redis.client.hdel( `claude:account:${account.id}`, 'errorMessage', 'tempErrorAt', 'tempErrorAutoStopped' ) // 同时清除500错误计数 await this.clearInternalErrors(account.id) cleanedCount++ logger.success(`🧹 Reset temp_error status for account ${account.name} (${account.id})`) } } } if (cleanedCount > 0) { logger.success(`🧹 Reset ${cleanedCount} temp_error accounts`) } return cleanedCount } catch (error) { logger.error('❌ Failed to cleanup temp_error accounts:', error) return 0 } } // 记录5xx服务器错误 async recordServerError(accountId, statusCode) { try { const key = `claude_account:${accountId}:5xx_errors` // 增加错误计数,设置5分钟过期时间 await redis.client.incr(key) await redis.client.expire(key, 300) // 5分钟 logger.info(`📝 Recorded ${statusCode} error for account ${accountId}`) } catch (error) { logger.error(`❌ Failed to record ${statusCode} error for account ${accountId}:`, error) } } // 记录500内部错误(保留以便向后兼容) async recordInternalError(accountId) { return this.recordServerError(accountId, 500) } // 获取5xx错误计数 async getServerErrorCount(accountId) { try { const key = `claude_account:${accountId}:5xx_errors` const count = await redis.client.get(key) return parseInt(count) || 0 } catch (error) { logger.error(`❌ Failed to get 5xx error count for account ${accountId}:`, error) return 0 } } // 获取500错误计数(保留以便向后兼容) async getInternalErrorCount(accountId) { return this.getServerErrorCount(accountId) } // 清除500错误计数 async clearInternalErrors(accountId) { try { const key = `claude_account:${accountId}:5xx_errors` await redis.client.del(key) logger.info(`✅ Cleared 5xx error count for account ${accountId}`) } catch (error) { logger.error(`❌ Failed to clear 5xx errors for account ${accountId}:`, error) } } // 标记账号为临时错误状态 async markAccountTempError(accountId, sessionHash = null) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { throw new Error('Account not found') } // 更新账户状态 const updatedAccountData = { ...accountData } updatedAccountData.status = 'temp_error' // 新增的临时错误状态 updatedAccountData.schedulable = 'false' // 设置为不可调度 updatedAccountData.errorMessage = 'Account temporarily disabled due to consecutive 500 errors' updatedAccountData.tempErrorAt = new Date().toISOString() // 使用独立的临时错误自动停止标记 updatedAccountData.tempErrorAutoStopped = 'true' // 保存更新后的账户数据 await redis.setClaudeAccount(accountId, updatedAccountData) // 设置 5 分钟后自动恢复(一次性定时器) setTimeout( async () => { try { const account = await redis.getClaudeAccount(accountId) if (account && account.status === 'temp_error' && account.tempErrorAt) { // 验证是否确实过了 5 分钟(防止重复定时器) const tempErrorAt = new Date(account.tempErrorAt) const now = new Date() const minutesSince = (now - tempErrorAt) / (1000 * 60) if (minutesSince >= 5) { // 恢复账户 account.status = 'active' // 只恢复因临时错误而自动停止的账户 if (account.tempErrorAutoStopped === 'true') { account.schedulable = 'true' delete account.tempErrorAutoStopped } delete account.errorMessage delete account.tempErrorAt await redis.setClaudeAccount(accountId, account) // 显式删除 Redis 字段 await redis.client.hdel( `claude:account:${accountId}`, 'errorMessage', 'tempErrorAt', 'tempErrorAutoStopped' ) // 清除 500 错误计数 await this.clearInternalErrors(accountId) logger.success( `✅ Auto-recovered temp_error after 5 minutes: ${account.name} (${accountId})` ) } else { logger.debug( `⏰ Temp error timer triggered but only ${minutesSince.toFixed(1)} minutes passed for ${account.name} (${accountId})` ) } } } catch (error) { logger.error(`❌ Failed to auto-recover temp_error account ${accountId}:`, error) } }, 6 * 60 * 1000 ) // 6 分钟后执行,确保已过 5 分钟 // 如果有sessionHash,删除粘性会话映射 if (sessionHash) { await redis.client.del(`sticky_session:${sessionHash}`) logger.info(`🗑️ Deleted sticky session mapping for hash: ${sessionHash}`) } logger.warn( `⚠️ Account ${accountData.name} (${accountId}) marked as temp_error and disabled for scheduling` ) // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name, platform: 'claude-oauth', status: 'temp_error', errorCode: 'CLAUDE_OAUTH_TEMP_ERROR', reason: 'Account temporarily disabled due to consecutive 500 errors' }) } catch (webhookError) { logger.error('Failed to send webhook notification:', webhookError) } return { success: true } } catch (error) { logger.error(`❌ Failed to mark account ${accountId} as temp_error:`, error) throw error } } // 更新会话窗口状态(allowed, allowed_warning, rejected) async updateSessionWindowStatus(accountId, status) { try { // 参数验证 if (!accountId || !status) { logger.warn( `Invalid parameters for updateSessionWindowStatus: accountId=${accountId}, status=${status}` ) return } const accountData = await redis.getClaudeAccount(accountId) if (!accountData || Object.keys(accountData).length === 0) { logger.warn(`Account not found: ${accountId}`) return } // 验证状态值是否有效 const validStatuses = ['allowed', 'allowed_warning', 'rejected'] if (!validStatuses.includes(status)) { logger.warn(`Invalid session window status: ${status} for account ${accountId}`) return } const now = new Date() const nowIso = now.toISOString() // 更新会话窗口状态 accountData.sessionWindowStatus = status accountData.sessionWindowStatusUpdatedAt = nowIso // 如果状态是 allowed_warning 且账户设置了自动停止调度 if (status === 'allowed_warning' && accountData.autoStopOnWarning === 'true') { const alreadyAutoStopped = accountData.schedulable === 'false' && accountData.fiveHourAutoStopped === 'true' if (!alreadyAutoStopped) { const windowIdentifier = accountData.sessionWindowEnd || accountData.sessionWindowStart || 'unknown' let warningCount = 0 if (accountData.fiveHourWarningWindow === windowIdentifier) { const parsedCount = parseInt(accountData.fiveHourWarningCount || '0', 10) warningCount = Number.isNaN(parsedCount) ? 0 : parsedCount } const maxWarningsPerWindow = this.maxFiveHourWarningsPerWindow logger.warn( `⚠️ Account ${accountData.name} (${accountId}) approaching 5h limit, auto-stopping scheduling` ) accountData.schedulable = 'false' // 使用独立的5小时限制自动停止标记 accountData.fiveHourAutoStopped = 'true' accountData.fiveHourStoppedAt = nowIso // 设置停止原因,供前端显示 accountData.stoppedReason = '5小时使用量接近限制,已自动停止调度' const canSendWarning = warningCount < maxWarningsPerWindow let updatedWarningCount = warningCount accountData.fiveHourWarningWindow = windowIdentifier if (canSendWarning) { updatedWarningCount += 1 accountData.fiveHourWarningLastSentAt = nowIso } accountData.fiveHourWarningCount = updatedWarningCount.toString() if (canSendWarning) { // 发送Webhook通知 try { const webhookNotifier = require('../utils/webhookNotifier') await webhookNotifier.sendAccountAnomalyNotification({ accountId, accountName: accountData.name || 'Claude Account', platform: 'claude', status: 'warning', errorCode: 'CLAUDE_5H_LIMIT_WARNING', reason: '5小时使用量接近限制,已自动停止调度', timestamp: getISOStringWithTimezone(now) }) } catch (webhookError) { logger.error('Failed to send webhook notification:', webhookError) } } else { logger.debug( `⚠️ Account ${accountData.name} (${accountId}) reached max ${maxWarningsPerWindow} warning notifications for current 5h window, skipping webhook` ) } } else { logger.debug( `⚠️ Account ${accountData.name} (${accountId}) already auto-stopped for 5h limit, skipping duplicate warning` ) } } await redis.setClaudeAccount(accountId, accountData) logger.info( `📊 Updated session window status for account ${accountData.name} (${accountId}): ${status}` ) } catch (error) { logger.error(`❌ Failed to update session window status for account ${accountId}:`, error) } } // 🚫 标记账号为过载状态(529错误) async markAccountOverloaded(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData) { throw new Error('Account not found') } // 获取配置的过载处理时间(分钟) const overloadMinutes = config.overloadHandling?.enabled || 0 if (overloadMinutes === 0) { logger.info('⏭️ 529 error handling is disabled') return { success: false, error: '529 error handling is disabled' } } const overloadKey = `account:overload:${accountId}` const ttl = overloadMinutes * 60 // 转换为秒 await redis.setex( overloadKey, ttl, JSON.stringify({ accountId, accountName: accountData.name, markedAt: new Date().toISOString(), expiresAt: new Date(Date.now() + ttl * 1000).toISOString() }) ) logger.warn( `🚫 Account ${accountData.name} (${accountId}) marked as overloaded for ${overloadMinutes} minutes` ) // 在账号上记录最后一次529错误 const updates = { lastOverloadAt: new Date().toISOString(), errorMessage: `529错误 - 过载${overloadMinutes}分钟` } const updatedAccountData = { ...accountData, ...updates } await redis.setClaudeAccount(accountId, updatedAccountData) return { success: true, accountName: accountData.name, duration: overloadMinutes } } catch (error) { logger.error(`❌ Failed to mark account as overloaded: ${accountId}`, error) // 不抛出错误,避免影响主请求流程 return { success: false, error: error.message } } } // ✅ 检查账号是否过载 async isAccountOverloaded(accountId) { try { // 如果529处理未启用,直接返回false const overloadMinutes = config.overloadHandling?.enabled || 0 if (overloadMinutes === 0) { return false } const overloadKey = `account:overload:${accountId}` const overloadData = await redis.get(overloadKey) if (overloadData) { // 账号处于过载状态 return true } // 账号未过载 return false } catch (error) { logger.error(`❌ Failed to check if account is overloaded: ${accountId}`, error) return false } } // 🔄 移除账号的过载状态 async removeAccountOverload(accountId) { try { const accountData = await redis.getClaudeAccount(accountId) if (!accountData) { throw new Error('Account not found') } const overloadKey = `account:overload:${accountId}` await redis.del(overloadKey) logger.info(`✅ Account ${accountData.name} (${accountId}) overload status removed`) // 清理账号上的错误信息 if (accountData.errorMessage && accountData.errorMessage.includes('529错误')) { const updatedAccountData = { ...accountData } delete updatedAccountData.errorMessage delete updatedAccountData.lastOverloadAt await redis.setClaudeAccount(accountId, updatedAccountData) } } catch (error) { logger.error(`❌ Failed to remove overload status for account: ${accountId}`, error) // 不抛出错误,移除过载状态失败不应该影响主流程 } } /** * 检查并恢复因5小时限制被自动停止的账号 * 用于定时任务自动恢复 * @returns {Promise<{checked: number, recovered: number, accounts: Array}>} */ async checkAndRecoverFiveHourStoppedAccounts() { const result = { checked: 0, recovered: 0, accounts: [] } try { const accounts = await this.getAllAccounts() const now = new Date() for (const account of accounts) { // 只检查因5小时限制被自动停止的账号 // 重要:不恢复手动停止的账号(没有fiveHourAutoStopped标记的) if (account.fiveHourAutoStopped === true && account.schedulable === false) { result.checked++ // 使用分布式锁防止并发修改 const lockKey = `lock:account:${account.id}:recovery` const lockValue = `${Date.now()}_${Math.random()}` const lockTTL = 5000 // 5秒锁超时 try { // 尝试获取锁 const lockAcquired = await redis.setAccountLock(lockKey, lockValue, lockTTL) if (!lockAcquired) { logger.debug( `⏭️ Account ${account.name} (${account.id}) is being processed by another instance` ) continue } // 重新获取账号数据,确保是最新的 const latestAccount = await redis.getClaudeAccount(account.id) if ( !latestAccount || latestAccount.fiveHourAutoStopped !== 'true' || latestAccount.schedulable !== 'false' ) { // 账号状态已变化,跳过 await redis.releaseAccountLock(lockKey, lockValue) continue } // 检查当前时间是否已经进入新的5小时窗口 let shouldRecover = false let newWindowStart = null let newWindowEnd = null if (latestAccount.sessionWindowEnd) { const windowEnd = new Date(latestAccount.sessionWindowEnd) // 使用严格的时间比较,添加1分钟缓冲避免边界问题 if (now.getTime() > windowEnd.getTime() + 60000) { shouldRecover = true // 计算新的窗口时间(基于窗口结束时间,而不是当前时间) // 这样可以保证窗口时间的连续性 newWindowStart = new Date(windowEnd) newWindowStart.setMilliseconds(newWindowStart.getMilliseconds() + 1) newWindowEnd = new Date(newWindowStart) newWindowEnd.setHours(newWindowEnd.getHours() + 5) logger.info( `🔄 Account ${latestAccount.name} (${latestAccount.id}) has entered new session window. ` + `Old window: ${latestAccount.sessionWindowStart} - ${latestAccount.sessionWindowEnd}, ` + `New window: ${newWindowStart.toISOString()} - ${newWindowEnd.toISOString()}` ) } } else { // 如果没有窗口结束时间,但有停止时间,检查是否已经过了5小时 if (latestAccount.fiveHourStoppedAt) { const stoppedAt = new Date(latestAccount.fiveHourStoppedAt) const hoursSinceStopped = (now.getTime() - stoppedAt.getTime()) / (1000 * 60 * 60) // 使用严格的5小时判断,加上1分钟缓冲 if (hoursSinceStopped > 5.017) { // 5小时1分钟 shouldRecover = true newWindowStart = this._calculateSessionWindowStart(now) newWindowEnd = this._calculateSessionWindowEnd(newWindowStart) logger.info( `🔄 Account ${latestAccount.name} (${latestAccount.id}) stopped ${hoursSinceStopped.toFixed(2)} hours ago, recovering` ) } } } if (shouldRecover) { // 恢复账号调度 const updatedAccountData = { ...latestAccount } // 恢复调度状态 updatedAccountData.schedulable = 'true' delete updatedAccountData.fiveHourAutoStopped delete updatedAccountData.fiveHourStoppedAt await this._clearFiveHourWarningMetadata(account.id, updatedAccountData) delete updatedAccountData.stoppedReason // 更新会话窗口(如果有新窗口) if (newWindowStart && newWindowEnd) { updatedAccountData.sessionWindowStart = newWindowStart.toISOString() updatedAccountData.sessionWindowEnd = newWindowEnd.toISOString() // 清除会话窗口状态 delete updatedAccountData.sessionWindowStatus delete updatedAccountData.sessionWindowStatusUpdatedAt } // 保存更新 await redis.setClaudeAccount(account.id, updatedAccountData) const fieldsToRemove = ['fiveHourAutoStopped', 'fiveHourStoppedAt'] if (newWindowStart && newWindowEnd) { fieldsToRemove.push('sessionWindowStatus', 'sessionWindowStatusUpdatedAt') } await this._removeAccountFields(account.id, fieldsToRemove, 'five_hour_recovery_task') result.recovered++ result.accounts.push({ id: latestAccount.id, name: latestAccount.name, oldWindow: latestAccount.sessionWindowEnd ? { start: latestAccount.sessionWindowStart, end: latestAccount.sessionWindowEnd } : null, newWindow: newWindowStart && newWindowEnd ? { start: newWindowStart.toISOString(), end: newWindowEnd.toISOString() } : null }) logger.info( `✅ Auto-resumed scheduling for account ${latestAccount.name} (${latestAccount.id}) - 5-hour limit expired` ) } // 释放锁 await redis.releaseAccountLock(lockKey, lockValue) } catch (error) { // 确保释放锁 if (lockKey && lockValue) { try { await redis.releaseAccountLock(lockKey, lockValue) } catch (unlockError) { logger.error(`Failed to release lock for account ${account.id}:`, unlockError) } } logger.error( `❌ Failed to check/recover 5-hour stopped account ${account.name} (${account.id}):`, error ) } } } if (result.recovered > 0) { logger.info( `🔄 5-hour limit recovery completed: ${result.recovered}/${result.checked} accounts recovered` ) } return result } catch (error) { logger.error('❌ Failed to check and recover 5-hour stopped accounts:', error) throw error } } async _removeAccountFields(accountId, fields = [], context = 'general_cleanup') { if (!Array.isArray(fields) || fields.length === 0) { return } const filteredFields = fields.filter((field) => typeof field === 'string' && field.trim()) if (filteredFields.length === 0) { return } const accountKey = `claude:account:${accountId}` try { await redis.client.hdel(accountKey, ...filteredFields) logger.debug( `🧹 已在 ${context} 阶段为账号 ${accountId} 删除字段 [${filteredFields.join(', ')}]` ) } catch (error) { logger.error( `❌ 无法在 ${context} 阶段为账号 ${accountId} 删除字段 [${filteredFields.join(', ')}]:`, error ) } } } module.exports = new ClaudeAccountService()