cc / src /models /redis.js
hequ's picture
Upload 221 files
69b897d verified
const Redis = require('ioredis')
const config = require('../../config/config')
const logger = require('../utils/logger')
// 时区辅助函数
// 注意:这个函数的目的是获取某个时间点在目标时区的"本地"表示
// 例如:UTC时间 2025-07-30 01:00:00 在 UTC+8 时区表示为 2025-07-30 09:00:00
function getDateInTimezone(date = new Date()) {
const offset = config.system.timezoneOffset || 8 // 默认UTC+8
// 方法:创建一个偏移后的Date对象,使其getUTCXXX方法返回目标时区的值
// 这样我们可以用getUTCFullYear()等方法获取目标时区的年月日时分秒
const offsetMs = offset * 3600000 // 时区偏移的毫秒数
const adjustedTime = new Date(date.getTime() + offsetMs)
return adjustedTime
}
// 获取配置时区的日期字符串 (YYYY-MM-DD)
function getDateStringInTimezone(date = new Date()) {
const tzDate = getDateInTimezone(date)
// 使用UTC方法获取偏移后的日期部分
return `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(2, '0')}-${String(
tzDate.getUTCDate()
).padStart(2, '0')}`
}
// 获取配置时区的小时 (0-23)
function getHourInTimezone(date = new Date()) {
const tzDate = getDateInTimezone(date)
return tzDate.getUTCHours()
}
// 获取配置时区的 ISO 周(YYYY-Wxx 格式,周一到周日)
function getWeekStringInTimezone(date = new Date()) {
const tzDate = getDateInTimezone(date)
// 获取年份
const year = tzDate.getUTCFullYear()
// 计算 ISO 周数(周一为第一天)
const dateObj = new Date(tzDate)
const dayOfWeek = dateObj.getUTCDay() || 7 // 将周日(0)转换为7
const firstThursday = new Date(dateObj)
firstThursday.setUTCDate(dateObj.getUTCDate() + 4 - dayOfWeek) // 找到这周的周四
const yearStart = new Date(firstThursday.getUTCFullYear(), 0, 1)
const weekNumber = Math.ceil(((firstThursday - yearStart) / 86400000 + 1) / 7)
return `${year}-W${String(weekNumber).padStart(2, '0')}`
}
class RedisClient {
constructor() {
this.client = null
this.isConnected = false
}
async connect() {
try {
this.client = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
db: config.redis.db,
retryDelayOnFailover: config.redis.retryDelayOnFailover,
maxRetriesPerRequest: config.redis.maxRetriesPerRequest,
lazyConnect: config.redis.lazyConnect,
tls: config.redis.enableTLS ? {} : false
})
this.client.on('connect', () => {
this.isConnected = true
logger.info('🔗 Redis connected successfully')
})
this.client.on('error', (err) => {
this.isConnected = false
logger.error('❌ Redis connection error:', err)
})
this.client.on('close', () => {
this.isConnected = false
logger.warn('⚠️ Redis connection closed')
})
await this.client.connect()
return this.client
} catch (error) {
logger.error('💥 Failed to connect to Redis:', error)
throw error
}
}
async disconnect() {
if (this.client) {
await this.client.quit()
this.isConnected = false
logger.info('👋 Redis disconnected')
}
}
getClient() {
if (!this.client || !this.isConnected) {
logger.warn('⚠️ Redis client is not connected')
return null
}
return this.client
}
// 安全获取客户端(用于关键操作)
getClientSafe() {
if (!this.client || !this.isConnected) {
throw new Error('Redis client is not connected')
}
return this.client
}
// 🔑 API Key 相关操作
async setApiKey(keyId, keyData, hashedKey = null) {
const key = `apikey:${keyId}`
const client = this.getClientSafe()
// 维护哈希映射表(用于快速查找)
// hashedKey参数是实际的哈希值,用于建立映射
if (hashedKey) {
await client.hset('apikey:hash_map', hashedKey, keyId)
}
await client.hset(key, keyData)
await client.expire(key, 86400 * 365) // 1年过期
}
async getApiKey(keyId) {
const key = `apikey:${keyId}`
return await this.client.hgetall(key)
}
async deleteApiKey(keyId) {
const key = `apikey:${keyId}`
// 获取要删除的API Key哈希值,以便从映射表中移除
const keyData = await this.client.hgetall(key)
if (keyData && keyData.apiKey) {
// keyData.apiKey现在存储的是哈希值,直接从映射表删除
await this.client.hdel('apikey:hash_map', keyData.apiKey)
}
return await this.client.del(key)
}
async getAllApiKeys() {
const keys = await this.client.keys('apikey:*')
const apiKeys = []
for (const key of keys) {
// 过滤掉hash_map,它不是真正的API Key
if (key === 'apikey:hash_map') {
continue
}
const keyData = await this.client.hgetall(key)
if (keyData && Object.keys(keyData).length > 0) {
apiKeys.push({ id: key.replace('apikey:', ''), ...keyData })
}
}
return apiKeys
}
// 🔍 通过哈希值查找API Key(性能优化)
async findApiKeyByHash(hashedKey) {
// 使用反向映射表:hash -> keyId
const keyId = await this.client.hget('apikey:hash_map', hashedKey)
if (!keyId) {
return null
}
const keyData = await this.client.hgetall(`apikey:${keyId}`)
if (keyData && Object.keys(keyData).length > 0) {
return { id: keyId, ...keyData }
}
// 如果数据不存在,清理映射表
await this.client.hdel('apikey:hash_map', hashedKey)
return null
}
// 📊 使用统计相关操作(支持缓存token统计和模型信息)
// 标准化模型名称,用于统计聚合
_normalizeModelName(model) {
if (!model || model === 'unknown') {
return model
}
// 对于Bedrock模型,去掉区域前缀进行统一
if (model.includes('.anthropic.') || model.includes('.claude')) {
// 匹配所有AWS区域格式:region.anthropic.model-name-v1:0 -> claude-model-name
// 支持所有AWS区域格式,如:us-east-1, eu-west-1, ap-southeast-1, ca-central-1等
let normalized = model.replace(/^[a-z0-9-]+\./, '') // 去掉任何区域前缀(更通用)
normalized = normalized.replace('anthropic.', '') // 去掉anthropic前缀
normalized = normalized.replace(/-v\d+:\d+$/, '') // 去掉版本后缀(如-v1:0, -v2:1等)
return normalized
}
// 对于其他模型,去掉常见的版本后缀
return model.replace(/-v\d+:\d+$|:latest$/, '')
}
async incrementTokenUsage(
keyId,
tokens,
inputTokens = 0,
outputTokens = 0,
cacheCreateTokens = 0,
cacheReadTokens = 0,
model = 'unknown',
ephemeral5mTokens = 0, // 新增:5分钟缓存 tokens
ephemeral1hTokens = 0, // 新增:1小时缓存 tokens
isLongContextRequest = false // 新增:是否为 1M 上下文请求(超过200k)
) {
const key = `usage:${keyId}`
const now = new Date()
const today = getDateStringInTimezone(now)
const tzDate = getDateInTimezone(now)
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}` // 新增小时级别
const daily = `usage:daily:${keyId}:${today}`
const monthly = `usage:monthly:${keyId}:${currentMonth}`
const hourly = `usage:hourly:${keyId}:${currentHour}` // 新增小时级别key
// 标准化模型名用于统计聚合
const normalizedModel = this._normalizeModelName(model)
// 按模型统计的键
const modelDaily = `usage:model:daily:${normalizedModel}:${today}`
const modelMonthly = `usage:model:monthly:${normalizedModel}:${currentMonth}`
const modelHourly = `usage:model:hourly:${normalizedModel}:${currentHour}` // 新增模型小时级别
// API Key级别的模型统计
const keyModelDaily = `usage:${keyId}:model:daily:${normalizedModel}:${today}`
const keyModelMonthly = `usage:${keyId}:model:monthly:${normalizedModel}:${currentMonth}`
const keyModelHourly = `usage:${keyId}:model:hourly:${normalizedModel}:${currentHour}` // 新增API Key模型小时级别
// 新增:系统级分钟统计
const minuteTimestamp = Math.floor(now.getTime() / 60000)
const systemMinuteKey = `system:metrics:minute:${minuteTimestamp}`
// 智能处理输入输出token分配
const finalInputTokens = inputTokens || 0
const finalOutputTokens = outputTokens || (finalInputTokens > 0 ? 0 : tokens)
const finalCacheCreateTokens = cacheCreateTokens || 0
const finalCacheReadTokens = cacheReadTokens || 0
// 重新计算真实的总token数(包括缓存token)
const totalTokens =
finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens
// 核心token(不包括缓存)- 用于与历史数据兼容
const coreTokens = finalInputTokens + finalOutputTokens
// 使用Pipeline优化性能
const pipeline = this.client.pipeline()
// 现有的统计保持不变
// 核心token统计(保持向后兼容)
pipeline.hincrby(key, 'totalTokens', coreTokens)
pipeline.hincrby(key, 'totalInputTokens', finalInputTokens)
pipeline.hincrby(key, 'totalOutputTokens', finalOutputTokens)
// 缓存token统计(新增)
pipeline.hincrby(key, 'totalCacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(key, 'totalCacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(key, 'totalAllTokens', totalTokens) // 包含所有类型的总token
// 详细缓存类型统计(新增)
pipeline.hincrby(key, 'totalEphemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(key, 'totalEphemeral1hTokens', ephemeral1hTokens)
// 1M 上下文请求统计(新增)
if (isLongContextRequest) {
pipeline.hincrby(key, 'totalLongContextInputTokens', finalInputTokens)
pipeline.hincrby(key, 'totalLongContextOutputTokens', finalOutputTokens)
pipeline.hincrby(key, 'totalLongContextRequests', 1)
}
// 请求计数
pipeline.hincrby(key, 'totalRequests', 1)
// 每日统计
pipeline.hincrby(daily, 'tokens', coreTokens)
pipeline.hincrby(daily, 'inputTokens', finalInputTokens)
pipeline.hincrby(daily, 'outputTokens', finalOutputTokens)
pipeline.hincrby(daily, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(daily, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(daily, 'allTokens', totalTokens)
pipeline.hincrby(daily, 'requests', 1)
// 详细缓存类型统计
pipeline.hincrby(daily, 'ephemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(daily, 'ephemeral1hTokens', ephemeral1hTokens)
// 1M 上下文请求统计
if (isLongContextRequest) {
pipeline.hincrby(daily, 'longContextInputTokens', finalInputTokens)
pipeline.hincrby(daily, 'longContextOutputTokens', finalOutputTokens)
pipeline.hincrby(daily, 'longContextRequests', 1)
}
// 每月统计
pipeline.hincrby(monthly, 'tokens', coreTokens)
pipeline.hincrby(monthly, 'inputTokens', finalInputTokens)
pipeline.hincrby(monthly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(monthly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(monthly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(monthly, 'allTokens', totalTokens)
pipeline.hincrby(monthly, 'requests', 1)
// 详细缓存类型统计
pipeline.hincrby(monthly, 'ephemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(monthly, 'ephemeral1hTokens', ephemeral1hTokens)
// 按模型统计 - 每日
pipeline.hincrby(modelDaily, 'inputTokens', finalInputTokens)
pipeline.hincrby(modelDaily, 'outputTokens', finalOutputTokens)
pipeline.hincrby(modelDaily, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(modelDaily, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(modelDaily, 'allTokens', totalTokens)
pipeline.hincrby(modelDaily, 'requests', 1)
// 按模型统计 - 每月
pipeline.hincrby(modelMonthly, 'inputTokens', finalInputTokens)
pipeline.hincrby(modelMonthly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(modelMonthly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(modelMonthly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(modelMonthly, 'allTokens', totalTokens)
pipeline.hincrby(modelMonthly, 'requests', 1)
// API Key级别的模型统计 - 每日
pipeline.hincrby(keyModelDaily, 'inputTokens', finalInputTokens)
pipeline.hincrby(keyModelDaily, 'outputTokens', finalOutputTokens)
pipeline.hincrby(keyModelDaily, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(keyModelDaily, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(keyModelDaily, 'allTokens', totalTokens)
pipeline.hincrby(keyModelDaily, 'requests', 1)
// 详细缓存类型统计
pipeline.hincrby(keyModelDaily, 'ephemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(keyModelDaily, 'ephemeral1hTokens', ephemeral1hTokens)
// API Key级别的模型统计 - 每月
pipeline.hincrby(keyModelMonthly, 'inputTokens', finalInputTokens)
pipeline.hincrby(keyModelMonthly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(keyModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(keyModelMonthly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(keyModelMonthly, 'allTokens', totalTokens)
pipeline.hincrby(keyModelMonthly, 'requests', 1)
// 详细缓存类型统计
pipeline.hincrby(keyModelMonthly, 'ephemeral5mTokens', ephemeral5mTokens)
pipeline.hincrby(keyModelMonthly, 'ephemeral1hTokens', ephemeral1hTokens)
// 小时级别统计
pipeline.hincrby(hourly, 'tokens', coreTokens)
pipeline.hincrby(hourly, 'inputTokens', finalInputTokens)
pipeline.hincrby(hourly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(hourly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(hourly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(hourly, 'allTokens', totalTokens)
pipeline.hincrby(hourly, 'requests', 1)
// 按模型统计 - 每小时
pipeline.hincrby(modelHourly, 'inputTokens', finalInputTokens)
pipeline.hincrby(modelHourly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(modelHourly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(modelHourly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(modelHourly, 'allTokens', totalTokens)
pipeline.hincrby(modelHourly, 'requests', 1)
// API Key级别的模型统计 - 每小时
pipeline.hincrby(keyModelHourly, 'inputTokens', finalInputTokens)
pipeline.hincrby(keyModelHourly, 'outputTokens', finalOutputTokens)
pipeline.hincrby(keyModelHourly, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(keyModelHourly, 'cacheReadTokens', finalCacheReadTokens)
pipeline.hincrby(keyModelHourly, 'allTokens', totalTokens)
pipeline.hincrby(keyModelHourly, 'requests', 1)
// 新增:系统级分钟统计
pipeline.hincrby(systemMinuteKey, 'requests', 1)
pipeline.hincrby(systemMinuteKey, 'totalTokens', totalTokens)
pipeline.hincrby(systemMinuteKey, 'inputTokens', finalInputTokens)
pipeline.hincrby(systemMinuteKey, 'outputTokens', finalOutputTokens)
pipeline.hincrby(systemMinuteKey, 'cacheCreateTokens', finalCacheCreateTokens)
pipeline.hincrby(systemMinuteKey, 'cacheReadTokens', finalCacheReadTokens)
// 设置过期时间
pipeline.expire(daily, 86400 * 32) // 32天过期
pipeline.expire(monthly, 86400 * 365) // 1年过期
pipeline.expire(hourly, 86400 * 7) // 小时统计7天过期
pipeline.expire(modelDaily, 86400 * 32) // 模型每日统计32天过期
pipeline.expire(modelMonthly, 86400 * 365) // 模型每月统计1年过期
pipeline.expire(modelHourly, 86400 * 7) // 模型小时统计7天过期
pipeline.expire(keyModelDaily, 86400 * 32) // API Key模型每日统计32天过期
pipeline.expire(keyModelMonthly, 86400 * 365) // API Key模型每月统计1年过期
pipeline.expire(keyModelHourly, 86400 * 7) // API Key模型小时统计7天过期
// 系统级分钟统计的过期时间(窗口时间的2倍)
const configLocal = require('../../config/config')
const { metricsWindow } = configLocal.system
pipeline.expire(systemMinuteKey, metricsWindow * 60 * 2)
// 执行Pipeline
await pipeline.exec()
}
// 📊 记录账户级别的使用统计
async incrementAccountUsage(
accountId,
totalTokens,
inputTokens = 0,
outputTokens = 0,
cacheCreateTokens = 0,
cacheReadTokens = 0,
model = 'unknown',
isLongContextRequest = false
) {
const now = new Date()
const today = getDateStringInTimezone(now)
const tzDate = getDateInTimezone(now)
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}`
// 账户级别统计的键
const accountKey = `account_usage:${accountId}`
const accountDaily = `account_usage:daily:${accountId}:${today}`
const accountMonthly = `account_usage:monthly:${accountId}:${currentMonth}`
const accountHourly = `account_usage:hourly:${accountId}:${currentHour}`
// 标准化模型名用于统计聚合
const normalizedModel = this._normalizeModelName(model)
// 账户按模型统计的键
const accountModelDaily = `account_usage:model:daily:${accountId}:${normalizedModel}:${today}`
const accountModelMonthly = `account_usage:model:monthly:${accountId}:${normalizedModel}:${currentMonth}`
const accountModelHourly = `account_usage:model:hourly:${accountId}:${normalizedModel}:${currentHour}`
// 处理token分配
const finalInputTokens = inputTokens || 0
const finalOutputTokens = outputTokens || 0
const finalCacheCreateTokens = cacheCreateTokens || 0
const finalCacheReadTokens = cacheReadTokens || 0
const actualTotalTokens =
finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens
const coreTokens = finalInputTokens + finalOutputTokens
// 构建统计操作数组
const operations = [
// 账户总体统计
this.client.hincrby(accountKey, 'totalTokens', coreTokens),
this.client.hincrby(accountKey, 'totalInputTokens', finalInputTokens),
this.client.hincrby(accountKey, 'totalOutputTokens', finalOutputTokens),
this.client.hincrby(accountKey, 'totalCacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountKey, 'totalCacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountKey, 'totalAllTokens', actualTotalTokens),
this.client.hincrby(accountKey, 'totalRequests', 1),
// 账户每日统计
this.client.hincrby(accountDaily, 'tokens', coreTokens),
this.client.hincrby(accountDaily, 'inputTokens', finalInputTokens),
this.client.hincrby(accountDaily, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountDaily, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountDaily, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountDaily, 'allTokens', actualTotalTokens),
this.client.hincrby(accountDaily, 'requests', 1),
// 账户每月统计
this.client.hincrby(accountMonthly, 'tokens', coreTokens),
this.client.hincrby(accountMonthly, 'inputTokens', finalInputTokens),
this.client.hincrby(accountMonthly, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountMonthly, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountMonthly, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountMonthly, 'allTokens', actualTotalTokens),
this.client.hincrby(accountMonthly, 'requests', 1),
// 账户每小时统计
this.client.hincrby(accountHourly, 'tokens', coreTokens),
this.client.hincrby(accountHourly, 'inputTokens', finalInputTokens),
this.client.hincrby(accountHourly, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountHourly, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountHourly, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountHourly, 'allTokens', actualTotalTokens),
this.client.hincrby(accountHourly, 'requests', 1),
// 添加模型级别的数据到hourly键中,以支持会话窗口的统计
this.client.hincrby(accountHourly, `model:${normalizedModel}:inputTokens`, finalInputTokens),
this.client.hincrby(
accountHourly,
`model:${normalizedModel}:outputTokens`,
finalOutputTokens
),
this.client.hincrby(
accountHourly,
`model:${normalizedModel}:cacheCreateTokens`,
finalCacheCreateTokens
),
this.client.hincrby(
accountHourly,
`model:${normalizedModel}:cacheReadTokens`,
finalCacheReadTokens
),
this.client.hincrby(accountHourly, `model:${normalizedModel}:allTokens`, actualTotalTokens),
this.client.hincrby(accountHourly, `model:${normalizedModel}:requests`, 1),
// 账户按模型统计 - 每日
this.client.hincrby(accountModelDaily, 'inputTokens', finalInputTokens),
this.client.hincrby(accountModelDaily, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountModelDaily, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountModelDaily, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountModelDaily, 'allTokens', actualTotalTokens),
this.client.hincrby(accountModelDaily, 'requests', 1),
// 账户按模型统计 - 每月
this.client.hincrby(accountModelMonthly, 'inputTokens', finalInputTokens),
this.client.hincrby(accountModelMonthly, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountModelMonthly, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountModelMonthly, 'allTokens', actualTotalTokens),
this.client.hincrby(accountModelMonthly, 'requests', 1),
// 账户按模型统计 - 每小时
this.client.hincrby(accountModelHourly, 'inputTokens', finalInputTokens),
this.client.hincrby(accountModelHourly, 'outputTokens', finalOutputTokens),
this.client.hincrby(accountModelHourly, 'cacheCreateTokens', finalCacheCreateTokens),
this.client.hincrby(accountModelHourly, 'cacheReadTokens', finalCacheReadTokens),
this.client.hincrby(accountModelHourly, 'allTokens', actualTotalTokens),
this.client.hincrby(accountModelHourly, 'requests', 1),
// 设置过期时间
this.client.expire(accountDaily, 86400 * 32), // 32天过期
this.client.expire(accountMonthly, 86400 * 365), // 1年过期
this.client.expire(accountHourly, 86400 * 7), // 7天过期
this.client.expire(accountModelDaily, 86400 * 32), // 32天过期
this.client.expire(accountModelMonthly, 86400 * 365), // 1年过期
this.client.expire(accountModelHourly, 86400 * 7) // 7天过期
]
// 如果是 1M 上下文请求,添加额外的统计
if (isLongContextRequest) {
operations.push(
this.client.hincrby(accountKey, 'totalLongContextInputTokens', finalInputTokens),
this.client.hincrby(accountKey, 'totalLongContextOutputTokens', finalOutputTokens),
this.client.hincrby(accountKey, 'totalLongContextRequests', 1),
this.client.hincrby(accountDaily, 'longContextInputTokens', finalInputTokens),
this.client.hincrby(accountDaily, 'longContextOutputTokens', finalOutputTokens),
this.client.hincrby(accountDaily, 'longContextRequests', 1)
)
}
await Promise.all(operations)
}
async getUsageStats(keyId) {
const totalKey = `usage:${keyId}`
const today = getDateStringInTimezone()
const dailyKey = `usage:daily:${keyId}:${today}`
const tzDate = getDateInTimezone()
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const monthlyKey = `usage:monthly:${keyId}:${currentMonth}`
const [total, daily, monthly] = await Promise.all([
this.client.hgetall(totalKey),
this.client.hgetall(dailyKey),
this.client.hgetall(monthlyKey)
])
// 获取API Key的创建时间来计算平均值
const keyData = await this.client.hgetall(`apikey:${keyId}`)
const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date()
const now = new Date()
const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24)))
const totalTokens = parseInt(total.totalTokens) || 0
const totalRequests = parseInt(total.totalRequests) || 0
// 计算平均RPM (requests per minute) 和 TPM (tokens per minute)
const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60)
const avgRPM = totalRequests / totalMinutes
const avgTPM = totalTokens / totalMinutes
// 处理旧数据兼容性(支持缓存token)
const handleLegacyData = (data) => {
// 优先使用total*字段(存储时使用的字段)
const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0
const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0
const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0
const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0
// 新增缓存token字段
const cacheCreateTokens =
parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0
const cacheReadTokens =
parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0
const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0
const totalFromSeparate = inputTokens + outputTokens
// 计算实际的总tokens(包含所有类型)
const actualAllTokens =
allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens
if (totalFromSeparate === 0 && tokens > 0) {
// 旧数据:没有输入输出分离
return {
tokens, // 保持兼容性,但统一使用allTokens
inputTokens: Math.round(tokens * 0.3), // 假设30%为输入
outputTokens: Math.round(tokens * 0.7), // 假设70%为输出
cacheCreateTokens: 0, // 旧数据没有缓存token
cacheReadTokens: 0,
allTokens: tokens, // 对于旧数据,allTokens等于tokens
requests
}
} else {
// 新数据或无数据 - 统一使用allTokens作为tokens的值
return {
tokens: actualAllTokens, // 统一使用allTokens作为总数
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
allTokens: actualAllTokens,
requests
}
}
}
const totalData = handleLegacyData(total)
const dailyData = handleLegacyData(daily)
const monthlyData = handleLegacyData(monthly)
return {
total: totalData,
daily: dailyData,
monthly: monthlyData,
averages: {
rpm: Math.round(avgRPM * 100) / 100, // 保留2位小数
tpm: Math.round(avgTPM * 100) / 100,
dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100,
dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100
}
}
}
async addUsageRecord(keyId, record, maxRecords = 200) {
const listKey = `usage:records:${keyId}`
const client = this.getClientSafe()
try {
await client
.multi()
.lpush(listKey, JSON.stringify(record))
.ltrim(listKey, 0, Math.max(0, maxRecords - 1))
.expire(listKey, 86400 * 90) // 默认保留90天
.exec()
} catch (error) {
logger.error(`❌ Failed to append usage record for key ${keyId}:`, error)
}
}
async getUsageRecords(keyId, limit = 50) {
const listKey = `usage:records:${keyId}`
const client = this.getClient()
if (!client) {
return []
}
try {
const rawRecords = await client.lrange(listKey, 0, Math.max(0, limit - 1))
return rawRecords
.map((entry) => {
try {
return JSON.parse(entry)
} catch (error) {
logger.warn('⚠️ Failed to parse usage record entry:', error)
return null
}
})
.filter(Boolean)
} catch (error) {
logger.error(`❌ Failed to load usage records for key ${keyId}:`, error)
return []
}
}
// 💰 获取当日费用
async getDailyCost(keyId) {
const today = getDateStringInTimezone()
const costKey = `usage:cost:daily:${keyId}:${today}`
const cost = await this.client.get(costKey)
const result = parseFloat(cost || 0)
logger.debug(
`💰 Getting daily cost for ${keyId}, date: ${today}, key: ${costKey}, value: ${cost}, result: ${result}`
)
return result
}
// 💰 增加当日费用
async incrementDailyCost(keyId, amount) {
const today = getDateStringInTimezone()
const tzDate = getDateInTimezone()
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}`
const dailyKey = `usage:cost:daily:${keyId}:${today}`
const monthlyKey = `usage:cost:monthly:${keyId}:${currentMonth}`
const hourlyKey = `usage:cost:hourly:${keyId}:${currentHour}`
const totalKey = `usage:cost:total:${keyId}`
logger.debug(
`💰 Incrementing cost for ${keyId}, amount: $${amount}, date: ${today}, dailyKey: ${dailyKey}`
)
const results = await Promise.all([
this.client.incrbyfloat(dailyKey, amount),
this.client.incrbyfloat(monthlyKey, amount),
this.client.incrbyfloat(hourlyKey, amount),
this.client.incrbyfloat(totalKey, amount),
// 设置过期时间
this.client.expire(dailyKey, 86400 * 30), // 30天
this.client.expire(monthlyKey, 86400 * 90), // 90天
this.client.expire(hourlyKey, 86400 * 7) // 7天
])
logger.debug(`💰 Cost incremented successfully, new daily total: $${results[0]}`)
}
// 💰 获取费用统计
async getCostStats(keyId) {
const today = getDateStringInTimezone()
const tzDate = getDateInTimezone()
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}`
const [daily, monthly, hourly, total] = await Promise.all([
this.client.get(`usage:cost:daily:${keyId}:${today}`),
this.client.get(`usage:cost:monthly:${keyId}:${currentMonth}`),
this.client.get(`usage:cost:hourly:${keyId}:${currentHour}`),
this.client.get(`usage:cost:total:${keyId}`)
])
return {
daily: parseFloat(daily || 0),
monthly: parseFloat(monthly || 0),
hourly: parseFloat(hourly || 0),
total: parseFloat(total || 0)
}
}
// 💰 获取本周 Opus 费用
async getWeeklyOpusCost(keyId) {
const currentWeek = getWeekStringInTimezone()
const costKey = `usage:opus:weekly:${keyId}:${currentWeek}`
const cost = await this.client.get(costKey)
const result = parseFloat(cost || 0)
logger.debug(
`💰 Getting weekly Opus cost for ${keyId}, week: ${currentWeek}, key: ${costKey}, value: ${cost}, result: ${result}`
)
return result
}
// 💰 增加本周 Opus 费用
async incrementWeeklyOpusCost(keyId, amount) {
const currentWeek = getWeekStringInTimezone()
const weeklyKey = `usage:opus:weekly:${keyId}:${currentWeek}`
const totalKey = `usage:opus:total:${keyId}`
logger.debug(
`💰 Incrementing weekly Opus cost for ${keyId}, week: ${currentWeek}, amount: $${amount}`
)
// 使用 pipeline 批量执行,提高性能
const pipeline = this.client.pipeline()
pipeline.incrbyfloat(weeklyKey, amount)
pipeline.incrbyfloat(totalKey, amount)
// 设置周费用键的过期时间为 2 周
pipeline.expire(weeklyKey, 14 * 24 * 3600)
const results = await pipeline.exec()
logger.debug(`💰 Opus cost incremented successfully, new weekly total: $${results[0][1]}`)
}
// 💰 计算账户的每日费用(基于模型使用)
async getAccountDailyCost(accountId) {
const CostCalculator = require('../utils/costCalculator')
const today = getDateStringInTimezone()
// 获取账户今日所有模型的使用数据
const pattern = `account_usage:model:daily:${accountId}:*:${today}`
const modelKeys = await this.client.keys(pattern)
if (!modelKeys || modelKeys.length === 0) {
return 0
}
let totalCost = 0
for (const key of modelKeys) {
// 从key中解析模型名称
// 格式:account_usage:model:daily:{accountId}:{model}:{date}
const parts = key.split(':')
const model = parts[4] // 模型名在第5个位置(索引4)
// 获取该模型的使用数据
const modelUsage = await this.client.hgetall(key)
if (modelUsage && (modelUsage.inputTokens || modelUsage.outputTokens)) {
const usage = {
input_tokens: parseInt(modelUsage.inputTokens || 0),
output_tokens: parseInt(modelUsage.outputTokens || 0),
cache_creation_input_tokens: parseInt(modelUsage.cacheCreateTokens || 0),
cache_read_input_tokens: parseInt(modelUsage.cacheReadTokens || 0)
}
// 使用CostCalculator计算费用
const costResult = CostCalculator.calculateCost(usage, model)
totalCost += costResult.costs.total
logger.debug(
`💰 Account ${accountId} daily cost for model ${model}: $${costResult.costs.total}`
)
}
}
logger.debug(`💰 Account ${accountId} total daily cost: $${totalCost}`)
return totalCost
}
// 📊 获取账户使用统计
async getAccountUsageStats(accountId, accountType = null) {
const accountKey = `account_usage:${accountId}`
const today = getDateStringInTimezone()
const accountDailyKey = `account_usage:daily:${accountId}:${today}`
const tzDate = getDateInTimezone()
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(
2,
'0'
)}`
const accountMonthlyKey = `account_usage:monthly:${accountId}:${currentMonth}`
const [total, daily, monthly] = await Promise.all([
this.client.hgetall(accountKey),
this.client.hgetall(accountDailyKey),
this.client.hgetall(accountMonthlyKey)
])
// 获取账户创建时间来计算平均值 - 支持不同类型的账号
let accountData = {}
if (accountType === 'droid') {
accountData = await this.client.hgetall(`droid:account:${accountId}`)
} else if (accountType === 'openai') {
accountData = await this.client.hgetall(`openai:account:${accountId}`)
} else if (accountType === 'openai-responses') {
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`)
} else {
// 尝试多个前缀
accountData = await this.client.hgetall(`claude_account:${accountId}`)
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai:account:${accountId}`)
}
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`)
}
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`openai_account:${accountId}`)
}
if (!accountData.createdAt) {
accountData = await this.client.hgetall(`droid:account:${accountId}`)
}
}
const createdAt = accountData.createdAt ? new Date(accountData.createdAt) : new Date()
const now = new Date()
const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24)))
const totalTokens = parseInt(total.totalTokens) || 0
const totalRequests = parseInt(total.totalRequests) || 0
// 计算平均RPM和TPM
const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60)
const avgRPM = totalRequests / totalMinutes
const avgTPM = totalTokens / totalMinutes
// 处理账户统计数据
const handleAccountData = (data) => {
const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0
const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0
const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0
const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0
const cacheCreateTokens =
parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0
const cacheReadTokens =
parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0
const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0
const actualAllTokens =
allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens
return {
tokens,
inputTokens,
outputTokens,
cacheCreateTokens,
cacheReadTokens,
allTokens: actualAllTokens,
requests
}
}
const totalData = handleAccountData(total)
const dailyData = handleAccountData(daily)
const monthlyData = handleAccountData(monthly)
// 获取每日费用(基于模型使用)
const dailyCost = await this.getAccountDailyCost(accountId)
return {
accountId,
total: totalData,
daily: {
...dailyData,
cost: dailyCost
},
monthly: monthlyData,
averages: {
rpm: Math.round(avgRPM * 100) / 100,
tpm: Math.round(avgTPM * 100) / 100,
dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100,
dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100
}
}
}
// 📈 获取所有账户的使用统计
async getAllAccountsUsageStats() {
try {
// 获取所有Claude账户
const accountKeys = await this.client.keys('claude_account:*')
const accountStats = []
for (const accountKey of accountKeys) {
const accountId = accountKey.replace('claude_account:', '')
const accountData = await this.client.hgetall(accountKey)
if (accountData.name) {
const stats = await this.getAccountUsageStats(accountId)
accountStats.push({
id: accountId,
name: accountData.name,
email: accountData.email || '',
status: accountData.status || 'unknown',
isActive: accountData.isActive === 'true',
...stats
})
}
}
// 按当日token使用量排序
accountStats.sort((a, b) => (b.daily.allTokens || 0) - (a.daily.allTokens || 0))
return accountStats
} catch (error) {
logger.error('❌ Failed to get all accounts usage stats:', error)
return []
}
}
// 🧹 清空所有API Key的使用统计数据
async resetAllUsageStats() {
const client = this.getClientSafe()
const stats = {
deletedKeys: 0,
deletedDailyKeys: 0,
deletedMonthlyKeys: 0,
resetApiKeys: 0
}
try {
// 获取所有API Key ID
const apiKeyIds = []
const apiKeyKeys = await client.keys('apikey:*')
for (const key of apiKeyKeys) {
if (key === 'apikey:hash_map') {
continue
} // 跳过哈希映射表
const keyId = key.replace('apikey:', '')
apiKeyIds.push(keyId)
}
// 清空每个API Key的使用统计
for (const keyId of apiKeyIds) {
// 删除总体使用统计
const usageKey = `usage:${keyId}`
const deleted = await client.del(usageKey)
if (deleted > 0) {
stats.deletedKeys++
}
// 删除该API Key的每日统计(使用精确的keyId匹配)
const dailyKeys = await client.keys(`usage:daily:${keyId}:*`)
if (dailyKeys.length > 0) {
await client.del(...dailyKeys)
stats.deletedDailyKeys += dailyKeys.length
}
// 删除该API Key的每月统计(使用精确的keyId匹配)
const monthlyKeys = await client.keys(`usage:monthly:${keyId}:*`)
if (monthlyKeys.length > 0) {
await client.del(...monthlyKeys)
stats.deletedMonthlyKeys += monthlyKeys.length
}
// 重置API Key的lastUsedAt字段
const keyData = await client.hgetall(`apikey:${keyId}`)
if (keyData && Object.keys(keyData).length > 0) {
keyData.lastUsedAt = ''
await client.hset(`apikey:${keyId}`, keyData)
stats.resetApiKeys++
}
}
// 额外清理:删除所有可能遗漏的usage相关键
const allUsageKeys = await client.keys('usage:*')
if (allUsageKeys.length > 0) {
await client.del(...allUsageKeys)
stats.deletedKeys += allUsageKeys.length
}
return stats
} catch (error) {
throw new Error(`Failed to reset usage stats: ${error.message}`)
}
}
// 🏢 Claude 账户管理
async setClaudeAccount(accountId, accountData) {
const key = `claude:account:${accountId}`
await this.client.hset(key, accountData)
}
async getClaudeAccount(accountId) {
const key = `claude:account:${accountId}`
return await this.client.hgetall(key)
}
async getAllClaudeAccounts() {
const keys = await this.client.keys('claude:account:*')
const accounts = []
for (const key of keys) {
const accountData = await this.client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
accounts.push({ id: key.replace('claude:account:', ''), ...accountData })
}
}
return accounts
}
async deleteClaudeAccount(accountId) {
const key = `claude:account:${accountId}`
return await this.client.del(key)
}
// 🤖 Droid 账户相关操作
async setDroidAccount(accountId, accountData) {
const key = `droid:account:${accountId}`
await this.client.hset(key, accountData)
}
async getDroidAccount(accountId) {
const key = `droid:account:${accountId}`
return await this.client.hgetall(key)
}
async getAllDroidAccounts() {
const keys = await this.client.keys('droid:account:*')
const accounts = []
for (const key of keys) {
const accountData = await this.client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
accounts.push({ id: key.replace('droid:account:', ''), ...accountData })
}
}
return accounts
}
async deleteDroidAccount(accountId) {
const key = `droid:account:${accountId}`
return await this.client.del(key)
}
async setOpenAiAccount(accountId, accountData) {
const key = `openai:account:${accountId}`
await this.client.hset(key, accountData)
}
async getOpenAiAccount(accountId) {
const key = `openai:account:${accountId}`
return await this.client.hgetall(key)
}
async deleteOpenAiAccount(accountId) {
const key = `openai:account:${accountId}`
return await this.client.del(key)
}
async getAllOpenAIAccounts() {
const keys = await this.client.keys('openai:account:*')
const accounts = []
for (const key of keys) {
const accountData = await this.client.hgetall(key)
if (accountData && Object.keys(accountData).length > 0) {
accounts.push({ id: key.replace('openai:account:', ''), ...accountData })
}
}
return accounts
}
// 🔐 会话管理(用于管理员登录等)
async setSession(sessionId, sessionData, ttl = 86400) {
const key = `session:${sessionId}`
await this.client.hset(key, sessionData)
await this.client.expire(key, ttl)
}
async getSession(sessionId) {
const key = `session:${sessionId}`
return await this.client.hgetall(key)
}
async deleteSession(sessionId) {
const key = `session:${sessionId}`
return await this.client.del(key)
}
// 🗝️ API Key哈希索引管理
async setApiKeyHash(hashedKey, keyData, ttl = 0) {
const key = `apikey_hash:${hashedKey}`
await this.client.hset(key, keyData)
if (ttl > 0) {
await this.client.expire(key, ttl)
}
}
async getApiKeyHash(hashedKey) {
const key = `apikey_hash:${hashedKey}`
return await this.client.hgetall(key)
}
async deleteApiKeyHash(hashedKey) {
const key = `apikey_hash:${hashedKey}`
return await this.client.del(key)
}
// 🔗 OAuth会话管理
async setOAuthSession(sessionId, sessionData, ttl = 600) {
// 10分钟过期
const key = `oauth:${sessionId}`
// 序列化复杂对象,特别是 proxy 配置
const serializedData = {}
for (const [dataKey, value] of Object.entries(sessionData)) {
if (typeof value === 'object' && value !== null) {
serializedData[dataKey] = JSON.stringify(value)
} else {
serializedData[dataKey] = value
}
}
await this.client.hset(key, serializedData)
await this.client.expire(key, ttl)
}
async getOAuthSession(sessionId) {
const key = `oauth:${sessionId}`
const data = await this.client.hgetall(key)
// 反序列化 proxy 字段
if (data.proxy) {
try {
data.proxy = JSON.parse(data.proxy)
} catch (error) {
// 如果解析失败,设置为 null
data.proxy = null
}
}
return data
}
async deleteOAuthSession(sessionId) {
const key = `oauth:${sessionId}`
return await this.client.del(key)
}
// 📈 系统统计
async getSystemStats() {
const keys = await Promise.all([
this.client.keys('apikey:*'),
this.client.keys('claude:account:*'),
this.client.keys('usage:*')
])
return {
totalApiKeys: keys[0].length,
totalClaudeAccounts: keys[1].length,
totalUsageRecords: keys[2].length
}
}
// 📊 获取今日系统统计
async getTodayStats() {
try {
const today = getDateStringInTimezone()
const dailyKeys = await this.client.keys(`usage:daily:*:${today}`)
let totalRequestsToday = 0
let totalTokensToday = 0
let totalInputTokensToday = 0
let totalOutputTokensToday = 0
let totalCacheCreateTokensToday = 0
let totalCacheReadTokensToday = 0
// 批量获取所有今日数据,提高性能
if (dailyKeys.length > 0) {
const pipeline = this.client.pipeline()
dailyKeys.forEach((key) => pipeline.hgetall(key))
const results = await pipeline.exec()
for (const [error, dailyData] of results) {
if (error || !dailyData) {
continue
}
totalRequestsToday += parseInt(dailyData.requests) || 0
const currentDayTokens = parseInt(dailyData.tokens) || 0
totalTokensToday += currentDayTokens
// 处理旧数据兼容性:如果有总token但没有输入输出分离,则使用总token作为输出token
const inputTokens = parseInt(dailyData.inputTokens) || 0
const outputTokens = parseInt(dailyData.outputTokens) || 0
const cacheCreateTokens = parseInt(dailyData.cacheCreateTokens) || 0
const cacheReadTokens = parseInt(dailyData.cacheReadTokens) || 0
const totalTokensFromSeparate = inputTokens + outputTokens
if (totalTokensFromSeparate === 0 && currentDayTokens > 0) {
// 旧数据:没有输入输出分离,假设70%为输出,30%为输入(基于一般对话比例)
totalOutputTokensToday += Math.round(currentDayTokens * 0.7)
totalInputTokensToday += Math.round(currentDayTokens * 0.3)
} else {
// 新数据:使用实际的输入输出分离
totalInputTokensToday += inputTokens
totalOutputTokensToday += outputTokens
}
// 添加cache token统计
totalCacheCreateTokensToday += cacheCreateTokens
totalCacheReadTokensToday += cacheReadTokens
}
}
// 获取今日创建的API Key数量(批量优化)
const allApiKeys = await this.client.keys('apikey:*')
let apiKeysCreatedToday = 0
if (allApiKeys.length > 0) {
const pipeline = this.client.pipeline()
allApiKeys.forEach((key) => pipeline.hget(key, 'createdAt'))
const results = await pipeline.exec()
for (const [error, createdAt] of results) {
if (!error && createdAt && createdAt.startsWith(today)) {
apiKeysCreatedToday++
}
}
}
return {
requestsToday: totalRequestsToday,
tokensToday: totalTokensToday,
inputTokensToday: totalInputTokensToday,
outputTokensToday: totalOutputTokensToday,
cacheCreateTokensToday: totalCacheCreateTokensToday,
cacheReadTokensToday: totalCacheReadTokensToday,
apiKeysCreatedToday
}
} catch (error) {
console.error('Error getting today stats:', error)
return {
requestsToday: 0,
tokensToday: 0,
inputTokensToday: 0,
outputTokensToday: 0,
cacheCreateTokensToday: 0,
cacheReadTokensToday: 0,
apiKeysCreatedToday: 0
}
}
}
// 📈 获取系统总的平均RPM和TPM
async getSystemAverages() {
try {
const allApiKeys = await this.client.keys('apikey:*')
let totalRequests = 0
let totalTokens = 0
let totalInputTokens = 0
let totalOutputTokens = 0
let oldestCreatedAt = new Date()
// 批量获取所有usage数据和key数据,提高性能
const usageKeys = allApiKeys.map((key) => `usage:${key.replace('apikey:', '')}`)
const pipeline = this.client.pipeline()
// 添加所有usage查询
usageKeys.forEach((key) => pipeline.hgetall(key))
// 添加所有key数据查询
allApiKeys.forEach((key) => pipeline.hgetall(key))
const results = await pipeline.exec()
const usageResults = results.slice(0, usageKeys.length)
const keyResults = results.slice(usageKeys.length)
for (let i = 0; i < allApiKeys.length; i++) {
const totalData = usageResults[i][1] || {}
const keyData = keyResults[i][1] || {}
totalRequests += parseInt(totalData.totalRequests) || 0
totalTokens += parseInt(totalData.totalTokens) || 0
totalInputTokens += parseInt(totalData.totalInputTokens) || 0
totalOutputTokens += parseInt(totalData.totalOutputTokens) || 0
const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date()
if (createdAt < oldestCreatedAt) {
oldestCreatedAt = createdAt
}
}
const now = new Date()
// 保持与个人API Key计算一致的算法:按天计算然后转换为分钟
const daysSinceOldest = Math.max(
1,
Math.ceil((now - oldestCreatedAt) / (1000 * 60 * 60 * 24))
)
const totalMinutes = daysSinceOldest * 24 * 60
return {
systemRPM: Math.round((totalRequests / totalMinutes) * 100) / 100,
systemTPM: Math.round((totalTokens / totalMinutes) * 100) / 100,
totalInputTokens,
totalOutputTokens,
totalTokens
}
} catch (error) {
console.error('Error getting system averages:', error)
return {
systemRPM: 0,
systemTPM: 0,
totalInputTokens: 0,
totalOutputTokens: 0,
totalTokens: 0
}
}
}
// 📊 获取实时系统指标(基于滑动窗口)
async getRealtimeSystemMetrics() {
try {
const configLocal = require('../../config/config')
const windowMinutes = configLocal.system.metricsWindow || 5
const now = new Date()
const currentMinute = Math.floor(now.getTime() / 60000)
// 调试:打印当前时间和分钟时间戳
logger.debug(
`🔍 Realtime metrics - Current time: ${now.toISOString()}, Minute timestamp: ${currentMinute}`
)
// 使用Pipeline批量获取窗口内的所有分钟数据
const pipeline = this.client.pipeline()
const minuteKeys = []
for (let i = 0; i < windowMinutes; i++) {
const minuteKey = `system:metrics:minute:${currentMinute - i}`
minuteKeys.push(minuteKey)
pipeline.hgetall(minuteKey)
}
logger.debug(`🔍 Realtime metrics - Checking keys: ${minuteKeys.join(', ')}`)
const results = await pipeline.exec()
// 聚合计算
let totalRequests = 0
let totalTokens = 0
let totalInputTokens = 0
let totalOutputTokens = 0
let totalCacheCreateTokens = 0
let totalCacheReadTokens = 0
let validDataCount = 0
results.forEach(([err, data], index) => {
if (!err && data && Object.keys(data).length > 0) {
validDataCount++
totalRequests += parseInt(data.requests || 0)
totalTokens += parseInt(data.totalTokens || 0)
totalInputTokens += parseInt(data.inputTokens || 0)
totalOutputTokens += parseInt(data.outputTokens || 0)
totalCacheCreateTokens += parseInt(data.cacheCreateTokens || 0)
totalCacheReadTokens += parseInt(data.cacheReadTokens || 0)
logger.debug(`🔍 Realtime metrics - Key ${minuteKeys[index]} data:`, {
requests: data.requests,
totalTokens: data.totalTokens
})
}
})
logger.debug(
`🔍 Realtime metrics - Valid data count: ${validDataCount}/${windowMinutes}, Total requests: ${totalRequests}, Total tokens: ${totalTokens}`
)
// 计算平均值(每分钟)
const realtimeRPM =
windowMinutes > 0 ? Math.round((totalRequests / windowMinutes) * 100) / 100 : 0
const realtimeTPM =
windowMinutes > 0 ? Math.round((totalTokens / windowMinutes) * 100) / 100 : 0
const result = {
realtimeRPM,
realtimeTPM,
windowMinutes,
totalRequests,
totalTokens,
totalInputTokens,
totalOutputTokens,
totalCacheCreateTokens,
totalCacheReadTokens
}
logger.debug('🔍 Realtime metrics - Final result:', result)
return result
} catch (error) {
console.error('Error getting realtime system metrics:', error)
// 如果出错,返回历史平均值作为降级方案
const historicalMetrics = await this.getSystemAverages()
return {
realtimeRPM: historicalMetrics.systemRPM,
realtimeTPM: historicalMetrics.systemTPM,
windowMinutes: 0, // 标识使用了历史数据
totalRequests: 0,
totalTokens: historicalMetrics.totalTokens,
totalInputTokens: historicalMetrics.totalInputTokens,
totalOutputTokens: historicalMetrics.totalOutputTokens,
totalCacheCreateTokens: 0,
totalCacheReadTokens: 0
}
}
}
// 🔗 会话sticky映射管理
async setSessionAccountMapping(sessionHash, accountId, ttl = null) {
const appConfig = require('../../config/config')
// 从配置读取TTL(小时),转换为秒,默认1小时
const defaultTTL = ttl !== null ? ttl : (appConfig.session?.stickyTtlHours || 1) * 60 * 60
const key = `sticky_session:${sessionHash}`
await this.client.set(key, accountId, 'EX', defaultTTL)
}
async getSessionAccountMapping(sessionHash) {
const key = `sticky_session:${sessionHash}`
return await this.client.get(key)
}
// 🚀 智能会话TTL续期:剩余时间少于阈值时自动续期
async extendSessionAccountMappingTTL(sessionHash) {
const appConfig = require('../../config/config')
const key = `sticky_session:${sessionHash}`
// 📊 从配置获取参数
const ttlHours = appConfig.session?.stickyTtlHours || 1 // 小时,默认1小时
const thresholdMinutes = appConfig.session?.renewalThresholdMinutes || 0 // 分钟,默认0(不续期)
// 如果阈值为0,不执行续期
if (thresholdMinutes === 0) {
return true
}
const fullTTL = ttlHours * 60 * 60 // 转换为秒
const renewalThreshold = thresholdMinutes * 60 // 转换为秒
try {
// 获取当前剩余TTL(秒)
const remainingTTL = await this.client.ttl(key)
// 键不存在或已过期
if (remainingTTL === -2) {
return false
}
// 键存在但没有TTL(永不过期,不需要处理)
if (remainingTTL === -1) {
return true
}
// 🎯 智能续期策略:仅在剩余时间少于阈值时才续期
if (remainingTTL < renewalThreshold) {
await this.client.expire(key, fullTTL)
logger.debug(
`🔄 Renewed sticky session TTL: ${sessionHash} (was ${Math.round(
remainingTTL / 60
)}min, renewed to ${ttlHours}h)`
)
return true
}
// 剩余时间充足,无需续期
logger.debug(
`✅ Sticky session TTL sufficient: ${sessionHash} (remaining ${Math.round(
remainingTTL / 60
)}min)`
)
return true
} catch (error) {
logger.error('❌ Failed to extend session TTL:', error)
return false
}
}
async deleteSessionAccountMapping(sessionHash) {
const key = `sticky_session:${sessionHash}`
return await this.client.del(key)
}
// 🧹 清理过期数据
async cleanup() {
try {
const patterns = ['usage:daily:*', 'ratelimit:*', 'session:*', 'sticky_session:*', 'oauth:*']
for (const pattern of patterns) {
const keys = await this.client.keys(pattern)
const pipeline = this.client.pipeline()
for (const key of keys) {
const ttl = await this.client.ttl(key)
if (ttl === -1) {
// 没有设置过期时间的键
if (key.startsWith('oauth:')) {
pipeline.expire(key, 600) // OAuth会话设置10分钟过期
} else {
pipeline.expire(key, 86400) // 其他设置1天过期
}
}
}
await pipeline.exec()
}
logger.info('🧹 Redis cleanup completed')
} catch (error) {
logger.error('❌ Redis cleanup failed:', error)
}
}
// 获取并发配置
_getConcurrencyConfig() {
const defaults = {
leaseSeconds: 300,
renewIntervalSeconds: 30,
cleanupGraceSeconds: 30
}
const configValues = {
...defaults,
...(config.concurrency || {})
}
const normalizeNumber = (value, fallback, options = {}) => {
const parsed = Number(value)
if (!Number.isFinite(parsed)) {
return fallback
}
if (options.allowZero && parsed === 0) {
return 0
}
if (options.min !== undefined && parsed < options.min) {
return options.min
}
return parsed
}
return {
leaseSeconds: normalizeNumber(configValues.leaseSeconds, defaults.leaseSeconds, {
min: 30
}),
renewIntervalSeconds: normalizeNumber(
configValues.renewIntervalSeconds,
defaults.renewIntervalSeconds,
{
allowZero: true,
min: 0
}
),
cleanupGraceSeconds: normalizeNumber(
configValues.cleanupGraceSeconds,
defaults.cleanupGraceSeconds,
{
min: 0
}
)
}
}
// 增加并发计数(基于租约的有序集合)
async incrConcurrency(apiKeyId, requestId, leaseSeconds = null) {
if (!requestId) {
throw new Error('Request ID is required for concurrency tracking')
}
try {
const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } =
this._getConcurrencyConfig()
const lease = leaseSeconds || defaultLeaseSeconds
const key = `concurrency:${apiKeyId}`
const now = Date.now()
const expireAt = now + lease * 1000
const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000)
const luaScript = `
local key = KEYS[1]
local member = ARGV[1]
local expireAt = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
redis.call('ZADD', key, expireAt, member)
if ttl > 0 then
redis.call('PEXPIRE', key, ttl)
end
local count = redis.call('ZCARD', key)
return count
`
const count = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl)
logger.database(
`🔢 Incremented concurrency for key ${apiKeyId}: ${count} (request ${requestId})`
)
return count
} catch (error) {
logger.error('❌ Failed to increment concurrency:', error)
throw error
}
}
// 刷新并发租约,防止长连接提前过期
async refreshConcurrencyLease(apiKeyId, requestId, leaseSeconds = null) {
if (!requestId) {
return 0
}
try {
const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } =
this._getConcurrencyConfig()
const lease = leaseSeconds || defaultLeaseSeconds
const key = `concurrency:${apiKeyId}`
const now = Date.now()
const expireAt = now + lease * 1000
const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000)
const luaScript = `
local key = KEYS[1]
local member = ARGV[1]
local expireAt = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
local exists = redis.call('ZSCORE', key, member)
if exists then
redis.call('ZADD', key, expireAt, member)
if ttl > 0 then
redis.call('PEXPIRE', key, ttl)
end
return 1
end
return 0
`
const refreshed = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl)
if (refreshed === 1) {
logger.debug(`🔄 Refreshed concurrency lease for key ${apiKeyId} (request ${requestId})`)
}
return refreshed
} catch (error) {
logger.error('❌ Failed to refresh concurrency lease:', error)
return 0
}
}
// 减少并发计数
async decrConcurrency(apiKeyId, requestId) {
try {
const key = `concurrency:${apiKeyId}`
const now = Date.now()
const luaScript = `
local key = KEYS[1]
local member = ARGV[1]
local now = tonumber(ARGV[2])
if member then
redis.call('ZREM', key, member)
end
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
local count = redis.call('ZCARD', key)
if count <= 0 then
redis.call('DEL', key)
return 0
end
return count
`
const count = await this.client.eval(luaScript, 1, key, requestId || '', now)
logger.database(
`🔢 Decremented concurrency for key ${apiKeyId}: ${count} (request ${requestId || 'n/a'})`
)
return count
} catch (error) {
logger.error('❌ Failed to decrement concurrency:', error)
throw error
}
}
// 获取当前并发数
async getConcurrency(apiKeyId) {
try {
const key = `concurrency:${apiKeyId}`
const now = Date.now()
const luaScript = `
local key = KEYS[1]
local now = tonumber(ARGV[1])
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
return redis.call('ZCARD', key)
`
const count = await this.client.eval(luaScript, 1, key, now)
return parseInt(count || 0)
} catch (error) {
logger.error('❌ Failed to get concurrency:', error)
return 0
}
}
// 🔧 Basic Redis operations wrapper methods for convenience
async get(key) {
const client = this.getClientSafe()
return await client.get(key)
}
async set(key, value, ...args) {
const client = this.getClientSafe()
return await client.set(key, value, ...args)
}
async setex(key, ttl, value) {
const client = this.getClientSafe()
return await client.setex(key, ttl, value)
}
async del(...keys) {
const client = this.getClientSafe()
return await client.del(...keys)
}
async keys(pattern) {
const client = this.getClientSafe()
return await client.keys(pattern)
}
// 📊 获取账户会话窗口内的使用统计(包含模型细分)
async getAccountSessionWindowUsage(accountId, windowStart, windowEnd) {
try {
if (!windowStart || !windowEnd) {
return {
totalInputTokens: 0,
totalOutputTokens: 0,
totalCacheCreateTokens: 0,
totalCacheReadTokens: 0,
totalAllTokens: 0,
totalRequests: 0,
modelUsage: {}
}
}
const startDate = new Date(windowStart)
const endDate = new Date(windowEnd)
// 添加日志以调试时间窗口
logger.debug(`📊 Getting session window usage for account ${accountId}`)
logger.debug(` Window: ${windowStart} to ${windowEnd}`)
logger.debug(` Start UTC: ${startDate.toISOString()}, End UTC: ${endDate.toISOString()}`)
// 获取窗口内所有可能的小时键
// 重要:需要使用配置的时区来构建键名,因为数据存储时使用的是配置时区
const hourlyKeys = []
const currentHour = new Date(startDate)
currentHour.setMinutes(0)
currentHour.setSeconds(0)
currentHour.setMilliseconds(0)
while (currentHour <= endDate) {
// 使用时区转换函数来获取正确的日期和小时
const tzDateStr = getDateStringInTimezone(currentHour)
const tzHour = String(getHourInTimezone(currentHour)).padStart(2, '0')
const key = `account_usage:hourly:${accountId}:${tzDateStr}:${tzHour}`
logger.debug(` Adding hourly key: ${key}`)
hourlyKeys.push(key)
currentHour.setHours(currentHour.getHours() + 1)
}
// 批量获取所有小时的数据
const pipeline = this.client.pipeline()
for (const key of hourlyKeys) {
pipeline.hgetall(key)
}
const results = await pipeline.exec()
// 聚合所有数据
let totalInputTokens = 0
let totalOutputTokens = 0
let totalCacheCreateTokens = 0
let totalCacheReadTokens = 0
let totalAllTokens = 0
let totalRequests = 0
const modelUsage = {}
logger.debug(` Processing ${results.length} hourly results`)
for (const [error, data] of results) {
if (error || !data || Object.keys(data).length === 0) {
continue
}
// 处理总计数据
const hourInputTokens = parseInt(data.inputTokens || 0)
const hourOutputTokens = parseInt(data.outputTokens || 0)
const hourCacheCreateTokens = parseInt(data.cacheCreateTokens || 0)
const hourCacheReadTokens = parseInt(data.cacheReadTokens || 0)
const hourAllTokens = parseInt(data.allTokens || 0)
const hourRequests = parseInt(data.requests || 0)
totalInputTokens += hourInputTokens
totalOutputTokens += hourOutputTokens
totalCacheCreateTokens += hourCacheCreateTokens
totalCacheReadTokens += hourCacheReadTokens
totalAllTokens += hourAllTokens
totalRequests += hourRequests
if (hourAllTokens > 0) {
logger.debug(` Hour data: allTokens=${hourAllTokens}, requests=${hourRequests}`)
}
// 处理每个模型的数据
for (const [key, value] of Object.entries(data)) {
// 查找模型相关的键(格式: model:{modelName}:{metric})
if (key.startsWith('model:')) {
const parts = key.split(':')
if (parts.length >= 3) {
const modelName = parts[1]
const metric = parts.slice(2).join(':')
if (!modelUsage[modelName]) {
modelUsage[modelName] = {
inputTokens: 0,
outputTokens: 0,
cacheCreateTokens: 0,
cacheReadTokens: 0,
allTokens: 0,
requests: 0
}
}
if (metric === 'inputTokens') {
modelUsage[modelName].inputTokens += parseInt(value || 0)
} else if (metric === 'outputTokens') {
modelUsage[modelName].outputTokens += parseInt(value || 0)
} else if (metric === 'cacheCreateTokens') {
modelUsage[modelName].cacheCreateTokens += parseInt(value || 0)
} else if (metric === 'cacheReadTokens') {
modelUsage[modelName].cacheReadTokens += parseInt(value || 0)
} else if (metric === 'allTokens') {
modelUsage[modelName].allTokens += parseInt(value || 0)
} else if (metric === 'requests') {
modelUsage[modelName].requests += parseInt(value || 0)
}
}
}
}
}
logger.debug(`📊 Session window usage summary:`)
logger.debug(` Total allTokens: ${totalAllTokens}`)
logger.debug(` Total requests: ${totalRequests}`)
logger.debug(` Input: ${totalInputTokens}, Output: ${totalOutputTokens}`)
logger.debug(
` Cache Create: ${totalCacheCreateTokens}, Cache Read: ${totalCacheReadTokens}`
)
return {
totalInputTokens,
totalOutputTokens,
totalCacheCreateTokens,
totalCacheReadTokens,
totalAllTokens,
totalRequests,
modelUsage
}
} catch (error) {
logger.error(`❌ Failed to get session window usage for account ${accountId}:`, error)
return {
totalInputTokens: 0,
totalOutputTokens: 0,
totalCacheCreateTokens: 0,
totalCacheReadTokens: 0,
totalAllTokens: 0,
totalRequests: 0,
modelUsage: {}
}
}
}
}
const redisClient = new RedisClient()
// 分布式锁相关方法
redisClient.setAccountLock = async function (lockKey, lockValue, ttlMs) {
try {
// 使用SET NX PX实现原子性的锁获取
// ioredis语法: set(key, value, 'PX', milliseconds, 'NX')
const result = await this.client.set(lockKey, lockValue, 'PX', ttlMs, 'NX')
return result === 'OK'
} catch (error) {
logger.error(`Failed to acquire lock ${lockKey}:`, error)
return false
}
}
redisClient.releaseAccountLock = async function (lockKey, lockValue) {
try {
// 使用Lua脚本确保只有持有锁的进程才能释放锁
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
// ioredis语法: eval(script, numberOfKeys, key1, key2, ..., arg1, arg2, ...)
const result = await this.client.eval(script, 1, lockKey, lockValue)
return result === 1
} catch (error) {
logger.error(`Failed to release lock ${lockKey}:`, error)
return false
}
}
// 导出时区辅助函数
redisClient.getDateInTimezone = getDateInTimezone
redisClient.getDateStringInTimezone = getDateStringInTimezone
redisClient.getHourInTimezone = getHourInTimezone
redisClient.getWeekStringInTimezone = getWeekStringInTimezone
module.exports = redisClient