|
|
const Redis = require('ioredis') |
|
|
const config = require('../../config/config') |
|
|
const logger = require('../utils/logger') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
function getDateInTimezone(date = new Date()) { |
|
|
const offset = config.system.timezoneOffset || 8 |
|
|
|
|
|
|
|
|
|
|
|
const offsetMs = offset * 3600000 |
|
|
const adjustedTime = new Date(date.getTime() + offsetMs) |
|
|
|
|
|
return adjustedTime |
|
|
} |
|
|
|
|
|
|
|
|
function getDateStringInTimezone(date = new Date()) { |
|
|
const tzDate = getDateInTimezone(date) |
|
|
|
|
|
return `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(2, '0')}-${String( |
|
|
tzDate.getUTCDate() |
|
|
).padStart(2, '0')}` |
|
|
} |
|
|
|
|
|
|
|
|
function getHourInTimezone(date = new Date()) { |
|
|
const tzDate = getDateInTimezone(date) |
|
|
return tzDate.getUTCHours() |
|
|
} |
|
|
|
|
|
|
|
|
function getWeekStringInTimezone(date = new Date()) { |
|
|
const tzDate = getDateInTimezone(date) |
|
|
|
|
|
|
|
|
const year = tzDate.getUTCFullYear() |
|
|
|
|
|
|
|
|
const dateObj = new Date(tzDate) |
|
|
const dayOfWeek = dateObj.getUTCDay() || 7 |
|
|
const firstThursday = new Date(dateObj) |
|
|
firstThursday.setUTCDate(dateObj.getUTCDate() + 4 - dayOfWeek) |
|
|
|
|
|
const yearStart = new Date(firstThursday.getUTCFullYear(), 0, 1) |
|
|
const weekNumber = Math.ceil(((firstThursday - yearStart) / 86400000 + 1) / 7) |
|
|
|
|
|
return `${year}-W${String(weekNumber).padStart(2, '0')}` |
|
|
} |
|
|
|
|
|
class RedisClient { |
|
|
constructor() { |
|
|
this.client = null |
|
|
this.isConnected = false |
|
|
} |
|
|
|
|
|
async connect() { |
|
|
try { |
|
|
this.client = new Redis({ |
|
|
host: config.redis.host, |
|
|
port: config.redis.port, |
|
|
password: config.redis.password, |
|
|
db: config.redis.db, |
|
|
retryDelayOnFailover: config.redis.retryDelayOnFailover, |
|
|
maxRetriesPerRequest: config.redis.maxRetriesPerRequest, |
|
|
lazyConnect: config.redis.lazyConnect, |
|
|
tls: config.redis.enableTLS ? {} : false |
|
|
}) |
|
|
|
|
|
this.client.on('connect', () => { |
|
|
this.isConnected = true |
|
|
logger.info('🔗 Redis connected successfully') |
|
|
}) |
|
|
|
|
|
this.client.on('error', (err) => { |
|
|
this.isConnected = false |
|
|
logger.error('❌ Redis connection error:', err) |
|
|
}) |
|
|
|
|
|
this.client.on('close', () => { |
|
|
this.isConnected = false |
|
|
logger.warn('⚠️ Redis connection closed') |
|
|
}) |
|
|
|
|
|
await this.client.connect() |
|
|
return this.client |
|
|
} catch (error) { |
|
|
logger.error('💥 Failed to connect to Redis:', error) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
async disconnect() { |
|
|
if (this.client) { |
|
|
await this.client.quit() |
|
|
this.isConnected = false |
|
|
logger.info('👋 Redis disconnected') |
|
|
} |
|
|
} |
|
|
|
|
|
getClient() { |
|
|
if (!this.client || !this.isConnected) { |
|
|
logger.warn('⚠️ Redis client is not connected') |
|
|
return null |
|
|
} |
|
|
return this.client |
|
|
} |
|
|
|
|
|
|
|
|
getClientSafe() { |
|
|
if (!this.client || !this.isConnected) { |
|
|
throw new Error('Redis client is not connected') |
|
|
} |
|
|
return this.client |
|
|
} |
|
|
|
|
|
|
|
|
async setApiKey(keyId, keyData, hashedKey = null) { |
|
|
const key = `apikey:${keyId}` |
|
|
const client = this.getClientSafe() |
|
|
|
|
|
|
|
|
|
|
|
if (hashedKey) { |
|
|
await client.hset('apikey:hash_map', hashedKey, keyId) |
|
|
} |
|
|
|
|
|
await client.hset(key, keyData) |
|
|
await client.expire(key, 86400 * 365) |
|
|
} |
|
|
|
|
|
async getApiKey(keyId) { |
|
|
const key = `apikey:${keyId}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
|
|
|
async deleteApiKey(keyId) { |
|
|
const key = `apikey:${keyId}` |
|
|
|
|
|
|
|
|
const keyData = await this.client.hgetall(key) |
|
|
if (keyData && keyData.apiKey) { |
|
|
|
|
|
await this.client.hdel('apikey:hash_map', keyData.apiKey) |
|
|
} |
|
|
|
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
async getAllApiKeys() { |
|
|
const keys = await this.client.keys('apikey:*') |
|
|
const apiKeys = [] |
|
|
for (const key of keys) { |
|
|
|
|
|
if (key === 'apikey:hash_map') { |
|
|
continue |
|
|
} |
|
|
|
|
|
const keyData = await this.client.hgetall(key) |
|
|
if (keyData && Object.keys(keyData).length > 0) { |
|
|
apiKeys.push({ id: key.replace('apikey:', ''), ...keyData }) |
|
|
} |
|
|
} |
|
|
return apiKeys |
|
|
} |
|
|
|
|
|
|
|
|
async findApiKeyByHash(hashedKey) { |
|
|
|
|
|
const keyId = await this.client.hget('apikey:hash_map', hashedKey) |
|
|
if (!keyId) { |
|
|
return null |
|
|
} |
|
|
|
|
|
const keyData = await this.client.hgetall(`apikey:${keyId}`) |
|
|
if (keyData && Object.keys(keyData).length > 0) { |
|
|
return { id: keyId, ...keyData } |
|
|
} |
|
|
|
|
|
|
|
|
await this.client.hdel('apikey:hash_map', hashedKey) |
|
|
return null |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_normalizeModelName(model) { |
|
|
if (!model || model === 'unknown') { |
|
|
return model |
|
|
} |
|
|
|
|
|
|
|
|
if (model.includes('.anthropic.') || model.includes('.claude')) { |
|
|
|
|
|
|
|
|
let normalized = model.replace(/^[a-z0-9-]+\./, '') |
|
|
normalized = normalized.replace('anthropic.', '') |
|
|
normalized = normalized.replace(/-v\d+:\d+$/, '') |
|
|
return normalized |
|
|
} |
|
|
|
|
|
|
|
|
return model.replace(/-v\d+:\d+$|:latest$/, '') |
|
|
} |
|
|
|
|
|
async incrementTokenUsage( |
|
|
keyId, |
|
|
tokens, |
|
|
inputTokens = 0, |
|
|
outputTokens = 0, |
|
|
cacheCreateTokens = 0, |
|
|
cacheReadTokens = 0, |
|
|
model = 'unknown', |
|
|
ephemeral5mTokens = 0, |
|
|
ephemeral1hTokens = 0, |
|
|
isLongContextRequest = false |
|
|
) { |
|
|
const key = `usage:${keyId}` |
|
|
const now = new Date() |
|
|
const today = getDateStringInTimezone(now) |
|
|
const tzDate = getDateInTimezone(now) |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}` |
|
|
|
|
|
const daily = `usage:daily:${keyId}:${today}` |
|
|
const monthly = `usage:monthly:${keyId}:${currentMonth}` |
|
|
const hourly = `usage:hourly:${keyId}:${currentHour}` |
|
|
|
|
|
|
|
|
const normalizedModel = this._normalizeModelName(model) |
|
|
|
|
|
|
|
|
const modelDaily = `usage:model:daily:${normalizedModel}:${today}` |
|
|
const modelMonthly = `usage:model:monthly:${normalizedModel}:${currentMonth}` |
|
|
const modelHourly = `usage:model:hourly:${normalizedModel}:${currentHour}` |
|
|
|
|
|
|
|
|
const keyModelDaily = `usage:${keyId}:model:daily:${normalizedModel}:${today}` |
|
|
const keyModelMonthly = `usage:${keyId}:model:monthly:${normalizedModel}:${currentMonth}` |
|
|
const keyModelHourly = `usage:${keyId}:model:hourly:${normalizedModel}:${currentHour}` |
|
|
|
|
|
|
|
|
const minuteTimestamp = Math.floor(now.getTime() / 60000) |
|
|
const systemMinuteKey = `system:metrics:minute:${minuteTimestamp}` |
|
|
|
|
|
|
|
|
const finalInputTokens = inputTokens || 0 |
|
|
const finalOutputTokens = outputTokens || (finalInputTokens > 0 ? 0 : tokens) |
|
|
const finalCacheCreateTokens = cacheCreateTokens || 0 |
|
|
const finalCacheReadTokens = cacheReadTokens || 0 |
|
|
|
|
|
|
|
|
const totalTokens = |
|
|
finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens |
|
|
|
|
|
const coreTokens = finalInputTokens + finalOutputTokens |
|
|
|
|
|
|
|
|
const pipeline = this.client.pipeline() |
|
|
|
|
|
|
|
|
|
|
|
pipeline.hincrby(key, 'totalTokens', coreTokens) |
|
|
pipeline.hincrby(key, 'totalInputTokens', finalInputTokens) |
|
|
pipeline.hincrby(key, 'totalOutputTokens', finalOutputTokens) |
|
|
|
|
|
pipeline.hincrby(key, 'totalCacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(key, 'totalCacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(key, 'totalAllTokens', totalTokens) |
|
|
|
|
|
pipeline.hincrby(key, 'totalEphemeral5mTokens', ephemeral5mTokens) |
|
|
pipeline.hincrby(key, 'totalEphemeral1hTokens', ephemeral1hTokens) |
|
|
|
|
|
if (isLongContextRequest) { |
|
|
pipeline.hincrby(key, 'totalLongContextInputTokens', finalInputTokens) |
|
|
pipeline.hincrby(key, 'totalLongContextOutputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(key, 'totalLongContextRequests', 1) |
|
|
} |
|
|
|
|
|
pipeline.hincrby(key, 'totalRequests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(daily, 'tokens', coreTokens) |
|
|
pipeline.hincrby(daily, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(daily, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(daily, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(daily, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(daily, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(daily, 'requests', 1) |
|
|
|
|
|
pipeline.hincrby(daily, 'ephemeral5mTokens', ephemeral5mTokens) |
|
|
pipeline.hincrby(daily, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
|
|
|
if (isLongContextRequest) { |
|
|
pipeline.hincrby(daily, 'longContextInputTokens', finalInputTokens) |
|
|
pipeline.hincrby(daily, 'longContextOutputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(daily, 'longContextRequests', 1) |
|
|
} |
|
|
|
|
|
|
|
|
pipeline.hincrby(monthly, 'tokens', coreTokens) |
|
|
pipeline.hincrby(monthly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(monthly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(monthly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(monthly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(monthly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(monthly, 'requests', 1) |
|
|
|
|
|
pipeline.hincrby(monthly, 'ephemeral5mTokens', ephemeral5mTokens) |
|
|
pipeline.hincrby(monthly, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
|
|
|
|
|
|
pipeline.hincrby(modelDaily, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(modelDaily, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(modelDaily, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(modelDaily, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(modelDaily, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(modelDaily, 'requests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(modelMonthly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(modelMonthly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(modelMonthly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(modelMonthly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(modelMonthly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(modelMonthly, 'requests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(keyModelDaily, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'requests', 1) |
|
|
|
|
|
pipeline.hincrby(keyModelDaily, 'ephemeral5mTokens', ephemeral5mTokens) |
|
|
pipeline.hincrby(keyModelDaily, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
|
|
|
|
|
|
pipeline.hincrby(keyModelMonthly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'requests', 1) |
|
|
|
|
|
pipeline.hincrby(keyModelMonthly, 'ephemeral5mTokens', ephemeral5mTokens) |
|
|
pipeline.hincrby(keyModelMonthly, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
|
|
|
|
|
|
pipeline.hincrby(hourly, 'tokens', coreTokens) |
|
|
pipeline.hincrby(hourly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(hourly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(hourly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(hourly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(hourly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(hourly, 'requests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(modelHourly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(modelHourly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(modelHourly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(modelHourly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(modelHourly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(modelHourly, 'requests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(keyModelHourly, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(keyModelHourly, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(keyModelHourly, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(keyModelHourly, 'cacheReadTokens', finalCacheReadTokens) |
|
|
pipeline.hincrby(keyModelHourly, 'allTokens', totalTokens) |
|
|
pipeline.hincrby(keyModelHourly, 'requests', 1) |
|
|
|
|
|
|
|
|
pipeline.hincrby(systemMinuteKey, 'requests', 1) |
|
|
pipeline.hincrby(systemMinuteKey, 'totalTokens', totalTokens) |
|
|
pipeline.hincrby(systemMinuteKey, 'inputTokens', finalInputTokens) |
|
|
pipeline.hincrby(systemMinuteKey, 'outputTokens', finalOutputTokens) |
|
|
pipeline.hincrby(systemMinuteKey, 'cacheCreateTokens', finalCacheCreateTokens) |
|
|
pipeline.hincrby(systemMinuteKey, 'cacheReadTokens', finalCacheReadTokens) |
|
|
|
|
|
|
|
|
pipeline.expire(daily, 86400 * 32) |
|
|
pipeline.expire(monthly, 86400 * 365) |
|
|
pipeline.expire(hourly, 86400 * 7) |
|
|
pipeline.expire(modelDaily, 86400 * 32) |
|
|
pipeline.expire(modelMonthly, 86400 * 365) |
|
|
pipeline.expire(modelHourly, 86400 * 7) |
|
|
pipeline.expire(keyModelDaily, 86400 * 32) |
|
|
pipeline.expire(keyModelMonthly, 86400 * 365) |
|
|
pipeline.expire(keyModelHourly, 86400 * 7) |
|
|
|
|
|
|
|
|
const configLocal = require('../../config/config') |
|
|
const { metricsWindow } = configLocal.system |
|
|
pipeline.expire(systemMinuteKey, metricsWindow * 60 * 2) |
|
|
|
|
|
|
|
|
await pipeline.exec() |
|
|
} |
|
|
|
|
|
|
|
|
async incrementAccountUsage( |
|
|
accountId, |
|
|
totalTokens, |
|
|
inputTokens = 0, |
|
|
outputTokens = 0, |
|
|
cacheCreateTokens = 0, |
|
|
cacheReadTokens = 0, |
|
|
model = 'unknown', |
|
|
isLongContextRequest = false |
|
|
) { |
|
|
const now = new Date() |
|
|
const today = getDateStringInTimezone(now) |
|
|
const tzDate = getDateInTimezone(now) |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}` |
|
|
|
|
|
|
|
|
const accountKey = `account_usage:${accountId}` |
|
|
const accountDaily = `account_usage:daily:${accountId}:${today}` |
|
|
const accountMonthly = `account_usage:monthly:${accountId}:${currentMonth}` |
|
|
const accountHourly = `account_usage:hourly:${accountId}:${currentHour}` |
|
|
|
|
|
|
|
|
const normalizedModel = this._normalizeModelName(model) |
|
|
|
|
|
|
|
|
const accountModelDaily = `account_usage:model:daily:${accountId}:${normalizedModel}:${today}` |
|
|
const accountModelMonthly = `account_usage:model:monthly:${accountId}:${normalizedModel}:${currentMonth}` |
|
|
const accountModelHourly = `account_usage:model:hourly:${accountId}:${normalizedModel}:${currentHour}` |
|
|
|
|
|
|
|
|
const finalInputTokens = inputTokens || 0 |
|
|
const finalOutputTokens = outputTokens || 0 |
|
|
const finalCacheCreateTokens = cacheCreateTokens || 0 |
|
|
const finalCacheReadTokens = cacheReadTokens || 0 |
|
|
const actualTotalTokens = |
|
|
finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens |
|
|
const coreTokens = finalInputTokens + finalOutputTokens |
|
|
|
|
|
|
|
|
const operations = [ |
|
|
|
|
|
this.client.hincrby(accountKey, 'totalTokens', coreTokens), |
|
|
this.client.hincrby(accountKey, 'totalInputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountKey, 'totalOutputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountKey, 'totalCacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountKey, 'totalCacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountKey, 'totalAllTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountKey, 'totalRequests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountDaily, 'tokens', coreTokens), |
|
|
this.client.hincrby(accountDaily, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountDaily, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountDaily, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountDaily, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountDaily, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountDaily, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountMonthly, 'tokens', coreTokens), |
|
|
this.client.hincrby(accountMonthly, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountMonthly, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountMonthly, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountMonthly, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountMonthly, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountMonthly, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountHourly, 'tokens', coreTokens), |
|
|
this.client.hincrby(accountHourly, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountHourly, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountHourly, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountHourly, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountHourly, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountHourly, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountHourly, `model:${normalizedModel}:inputTokens`, finalInputTokens), |
|
|
this.client.hincrby( |
|
|
accountHourly, |
|
|
`model:${normalizedModel}:outputTokens`, |
|
|
finalOutputTokens |
|
|
), |
|
|
this.client.hincrby( |
|
|
accountHourly, |
|
|
`model:${normalizedModel}:cacheCreateTokens`, |
|
|
finalCacheCreateTokens |
|
|
), |
|
|
this.client.hincrby( |
|
|
accountHourly, |
|
|
`model:${normalizedModel}:cacheReadTokens`, |
|
|
finalCacheReadTokens |
|
|
), |
|
|
this.client.hincrby(accountHourly, `model:${normalizedModel}:allTokens`, actualTotalTokens), |
|
|
this.client.hincrby(accountHourly, `model:${normalizedModel}:requests`, 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountModelDaily, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountModelDaily, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountModelDaily, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountModelDaily, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountModelDaily, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountModelDaily, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountModelMonthly, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountModelMonthly, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountModelMonthly, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountModelMonthly, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountModelMonthly, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.hincrby(accountModelHourly, 'inputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountModelHourly, 'outputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountModelHourly, 'cacheCreateTokens', finalCacheCreateTokens), |
|
|
this.client.hincrby(accountModelHourly, 'cacheReadTokens', finalCacheReadTokens), |
|
|
this.client.hincrby(accountModelHourly, 'allTokens', actualTotalTokens), |
|
|
this.client.hincrby(accountModelHourly, 'requests', 1), |
|
|
|
|
|
|
|
|
this.client.expire(accountDaily, 86400 * 32), |
|
|
this.client.expire(accountMonthly, 86400 * 365), |
|
|
this.client.expire(accountHourly, 86400 * 7), |
|
|
this.client.expire(accountModelDaily, 86400 * 32), |
|
|
this.client.expire(accountModelMonthly, 86400 * 365), |
|
|
this.client.expire(accountModelHourly, 86400 * 7) |
|
|
] |
|
|
|
|
|
|
|
|
if (isLongContextRequest) { |
|
|
operations.push( |
|
|
this.client.hincrby(accountKey, 'totalLongContextInputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountKey, 'totalLongContextOutputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountKey, 'totalLongContextRequests', 1), |
|
|
this.client.hincrby(accountDaily, 'longContextInputTokens', finalInputTokens), |
|
|
this.client.hincrby(accountDaily, 'longContextOutputTokens', finalOutputTokens), |
|
|
this.client.hincrby(accountDaily, 'longContextRequests', 1) |
|
|
) |
|
|
} |
|
|
|
|
|
await Promise.all(operations) |
|
|
} |
|
|
|
|
|
async getUsageStats(keyId) { |
|
|
const totalKey = `usage:${keyId}` |
|
|
const today = getDateStringInTimezone() |
|
|
const dailyKey = `usage:daily:${keyId}:${today}` |
|
|
const tzDate = getDateInTimezone() |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const monthlyKey = `usage:monthly:${keyId}:${currentMonth}` |
|
|
|
|
|
const [total, daily, monthly] = await Promise.all([ |
|
|
this.client.hgetall(totalKey), |
|
|
this.client.hgetall(dailyKey), |
|
|
this.client.hgetall(monthlyKey) |
|
|
]) |
|
|
|
|
|
|
|
|
const keyData = await this.client.hgetall(`apikey:${keyId}`) |
|
|
const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date() |
|
|
const now = new Date() |
|
|
const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24))) |
|
|
|
|
|
const totalTokens = parseInt(total.totalTokens) || 0 |
|
|
const totalRequests = parseInt(total.totalRequests) || 0 |
|
|
|
|
|
|
|
|
const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60) |
|
|
const avgRPM = totalRequests / totalMinutes |
|
|
const avgTPM = totalTokens / totalMinutes |
|
|
|
|
|
|
|
|
const handleLegacyData = (data) => { |
|
|
|
|
|
const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0 |
|
|
const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0 |
|
|
const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0 |
|
|
const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0 |
|
|
|
|
|
|
|
|
const cacheCreateTokens = |
|
|
parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0 |
|
|
const cacheReadTokens = |
|
|
parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0 |
|
|
const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0 |
|
|
|
|
|
const totalFromSeparate = inputTokens + outputTokens |
|
|
|
|
|
const actualAllTokens = |
|
|
allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens |
|
|
|
|
|
if (totalFromSeparate === 0 && tokens > 0) { |
|
|
|
|
|
return { |
|
|
tokens, |
|
|
inputTokens: Math.round(tokens * 0.3), |
|
|
outputTokens: Math.round(tokens * 0.7), |
|
|
cacheCreateTokens: 0, |
|
|
cacheReadTokens: 0, |
|
|
allTokens: tokens, |
|
|
requests |
|
|
} |
|
|
} else { |
|
|
|
|
|
return { |
|
|
tokens: actualAllTokens, |
|
|
inputTokens, |
|
|
outputTokens, |
|
|
cacheCreateTokens, |
|
|
cacheReadTokens, |
|
|
allTokens: actualAllTokens, |
|
|
requests |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
const totalData = handleLegacyData(total) |
|
|
const dailyData = handleLegacyData(daily) |
|
|
const monthlyData = handleLegacyData(monthly) |
|
|
|
|
|
return { |
|
|
total: totalData, |
|
|
daily: dailyData, |
|
|
monthly: monthlyData, |
|
|
averages: { |
|
|
rpm: Math.round(avgRPM * 100) / 100, |
|
|
tpm: Math.round(avgTPM * 100) / 100, |
|
|
dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100, |
|
|
dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100 |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
async addUsageRecord(keyId, record, maxRecords = 200) { |
|
|
const listKey = `usage:records:${keyId}` |
|
|
const client = this.getClientSafe() |
|
|
|
|
|
try { |
|
|
await client |
|
|
.multi() |
|
|
.lpush(listKey, JSON.stringify(record)) |
|
|
.ltrim(listKey, 0, Math.max(0, maxRecords - 1)) |
|
|
.expire(listKey, 86400 * 90) |
|
|
.exec() |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to append usage record for key ${keyId}:`, error) |
|
|
} |
|
|
} |
|
|
|
|
|
async getUsageRecords(keyId, limit = 50) { |
|
|
const listKey = `usage:records:${keyId}` |
|
|
const client = this.getClient() |
|
|
|
|
|
if (!client) { |
|
|
return [] |
|
|
} |
|
|
|
|
|
try { |
|
|
const rawRecords = await client.lrange(listKey, 0, Math.max(0, limit - 1)) |
|
|
return rawRecords |
|
|
.map((entry) => { |
|
|
try { |
|
|
return JSON.parse(entry) |
|
|
} catch (error) { |
|
|
logger.warn('⚠️ Failed to parse usage record entry:', error) |
|
|
return null |
|
|
} |
|
|
}) |
|
|
.filter(Boolean) |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to load usage records for key ${keyId}:`, error) |
|
|
return [] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getDailyCost(keyId) { |
|
|
const today = getDateStringInTimezone() |
|
|
const costKey = `usage:cost:daily:${keyId}:${today}` |
|
|
const cost = await this.client.get(costKey) |
|
|
const result = parseFloat(cost || 0) |
|
|
logger.debug( |
|
|
`💰 Getting daily cost for ${keyId}, date: ${today}, key: ${costKey}, value: ${cost}, result: ${result}` |
|
|
) |
|
|
return result |
|
|
} |
|
|
|
|
|
|
|
|
async incrementDailyCost(keyId, amount) { |
|
|
const today = getDateStringInTimezone() |
|
|
const tzDate = getDateInTimezone() |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}` |
|
|
|
|
|
const dailyKey = `usage:cost:daily:${keyId}:${today}` |
|
|
const monthlyKey = `usage:cost:monthly:${keyId}:${currentMonth}` |
|
|
const hourlyKey = `usage:cost:hourly:${keyId}:${currentHour}` |
|
|
const totalKey = `usage:cost:total:${keyId}` |
|
|
|
|
|
logger.debug( |
|
|
`💰 Incrementing cost for ${keyId}, amount: $${amount}, date: ${today}, dailyKey: ${dailyKey}` |
|
|
) |
|
|
|
|
|
const results = await Promise.all([ |
|
|
this.client.incrbyfloat(dailyKey, amount), |
|
|
this.client.incrbyfloat(monthlyKey, amount), |
|
|
this.client.incrbyfloat(hourlyKey, amount), |
|
|
this.client.incrbyfloat(totalKey, amount), |
|
|
|
|
|
this.client.expire(dailyKey, 86400 * 30), |
|
|
this.client.expire(monthlyKey, 86400 * 90), |
|
|
this.client.expire(hourlyKey, 86400 * 7) |
|
|
]) |
|
|
|
|
|
logger.debug(`💰 Cost incremented successfully, new daily total: $${results[0]}`) |
|
|
} |
|
|
|
|
|
|
|
|
async getCostStats(keyId) { |
|
|
const today = getDateStringInTimezone() |
|
|
const tzDate = getDateInTimezone() |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}` |
|
|
|
|
|
const [daily, monthly, hourly, total] = await Promise.all([ |
|
|
this.client.get(`usage:cost:daily:${keyId}:${today}`), |
|
|
this.client.get(`usage:cost:monthly:${keyId}:${currentMonth}`), |
|
|
this.client.get(`usage:cost:hourly:${keyId}:${currentHour}`), |
|
|
this.client.get(`usage:cost:total:${keyId}`) |
|
|
]) |
|
|
|
|
|
return { |
|
|
daily: parseFloat(daily || 0), |
|
|
monthly: parseFloat(monthly || 0), |
|
|
hourly: parseFloat(hourly || 0), |
|
|
total: parseFloat(total || 0) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getWeeklyOpusCost(keyId) { |
|
|
const currentWeek = getWeekStringInTimezone() |
|
|
const costKey = `usage:opus:weekly:${keyId}:${currentWeek}` |
|
|
const cost = await this.client.get(costKey) |
|
|
const result = parseFloat(cost || 0) |
|
|
logger.debug( |
|
|
`💰 Getting weekly Opus cost for ${keyId}, week: ${currentWeek}, key: ${costKey}, value: ${cost}, result: ${result}` |
|
|
) |
|
|
return result |
|
|
} |
|
|
|
|
|
|
|
|
async incrementWeeklyOpusCost(keyId, amount) { |
|
|
const currentWeek = getWeekStringInTimezone() |
|
|
const weeklyKey = `usage:opus:weekly:${keyId}:${currentWeek}` |
|
|
const totalKey = `usage:opus:total:${keyId}` |
|
|
|
|
|
logger.debug( |
|
|
`💰 Incrementing weekly Opus cost for ${keyId}, week: ${currentWeek}, amount: $${amount}` |
|
|
) |
|
|
|
|
|
|
|
|
const pipeline = this.client.pipeline() |
|
|
pipeline.incrbyfloat(weeklyKey, amount) |
|
|
pipeline.incrbyfloat(totalKey, amount) |
|
|
|
|
|
pipeline.expire(weeklyKey, 14 * 24 * 3600) |
|
|
|
|
|
const results = await pipeline.exec() |
|
|
logger.debug(`💰 Opus cost incremented successfully, new weekly total: $${results[0][1]}`) |
|
|
} |
|
|
|
|
|
|
|
|
async getAccountDailyCost(accountId) { |
|
|
const CostCalculator = require('../utils/costCalculator') |
|
|
const today = getDateStringInTimezone() |
|
|
|
|
|
|
|
|
const pattern = `account_usage:model:daily:${accountId}:*:${today}` |
|
|
const modelKeys = await this.client.keys(pattern) |
|
|
|
|
|
if (!modelKeys || modelKeys.length === 0) { |
|
|
return 0 |
|
|
} |
|
|
|
|
|
let totalCost = 0 |
|
|
|
|
|
for (const key of modelKeys) { |
|
|
|
|
|
|
|
|
const parts = key.split(':') |
|
|
const model = parts[4] |
|
|
|
|
|
|
|
|
const modelUsage = await this.client.hgetall(key) |
|
|
|
|
|
if (modelUsage && (modelUsage.inputTokens || modelUsage.outputTokens)) { |
|
|
const usage = { |
|
|
input_tokens: parseInt(modelUsage.inputTokens || 0), |
|
|
output_tokens: parseInt(modelUsage.outputTokens || 0), |
|
|
cache_creation_input_tokens: parseInt(modelUsage.cacheCreateTokens || 0), |
|
|
cache_read_input_tokens: parseInt(modelUsage.cacheReadTokens || 0) |
|
|
} |
|
|
|
|
|
|
|
|
const costResult = CostCalculator.calculateCost(usage, model) |
|
|
totalCost += costResult.costs.total |
|
|
|
|
|
logger.debug( |
|
|
`💰 Account ${accountId} daily cost for model ${model}: $${costResult.costs.total}` |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
logger.debug(`💰 Account ${accountId} total daily cost: $${totalCost}`) |
|
|
return totalCost |
|
|
} |
|
|
|
|
|
|
|
|
async getAccountUsageStats(accountId, accountType = null) { |
|
|
const accountKey = `account_usage:${accountId}` |
|
|
const today = getDateStringInTimezone() |
|
|
const accountDailyKey = `account_usage:daily:${accountId}:${today}` |
|
|
const tzDate = getDateInTimezone() |
|
|
const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
|
|
2, |
|
|
'0' |
|
|
)}` |
|
|
const accountMonthlyKey = `account_usage:monthly:${accountId}:${currentMonth}` |
|
|
|
|
|
const [total, daily, monthly] = await Promise.all([ |
|
|
this.client.hgetall(accountKey), |
|
|
this.client.hgetall(accountDailyKey), |
|
|
this.client.hgetall(accountMonthlyKey) |
|
|
]) |
|
|
|
|
|
|
|
|
let accountData = {} |
|
|
if (accountType === 'droid') { |
|
|
accountData = await this.client.hgetall(`droid:account:${accountId}`) |
|
|
} else if (accountType === 'openai') { |
|
|
accountData = await this.client.hgetall(`openai:account:${accountId}`) |
|
|
} else if (accountType === 'openai-responses') { |
|
|
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`) |
|
|
} else { |
|
|
|
|
|
accountData = await this.client.hgetall(`claude_account:${accountId}`) |
|
|
if (!accountData.createdAt) { |
|
|
accountData = await this.client.hgetall(`openai:account:${accountId}`) |
|
|
} |
|
|
if (!accountData.createdAt) { |
|
|
accountData = await this.client.hgetall(`openai_responses_account:${accountId}`) |
|
|
} |
|
|
if (!accountData.createdAt) { |
|
|
accountData = await this.client.hgetall(`openai_account:${accountId}`) |
|
|
} |
|
|
if (!accountData.createdAt) { |
|
|
accountData = await this.client.hgetall(`droid:account:${accountId}`) |
|
|
} |
|
|
} |
|
|
const createdAt = accountData.createdAt ? new Date(accountData.createdAt) : new Date() |
|
|
const now = new Date() |
|
|
const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24))) |
|
|
|
|
|
const totalTokens = parseInt(total.totalTokens) || 0 |
|
|
const totalRequests = parseInt(total.totalRequests) || 0 |
|
|
|
|
|
|
|
|
const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60) |
|
|
const avgRPM = totalRequests / totalMinutes |
|
|
const avgTPM = totalTokens / totalMinutes |
|
|
|
|
|
|
|
|
const handleAccountData = (data) => { |
|
|
const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0 |
|
|
const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0 |
|
|
const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0 |
|
|
const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0 |
|
|
const cacheCreateTokens = |
|
|
parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0 |
|
|
const cacheReadTokens = |
|
|
parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0 |
|
|
const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0 |
|
|
|
|
|
const actualAllTokens = |
|
|
allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens |
|
|
|
|
|
return { |
|
|
tokens, |
|
|
inputTokens, |
|
|
outputTokens, |
|
|
cacheCreateTokens, |
|
|
cacheReadTokens, |
|
|
allTokens: actualAllTokens, |
|
|
requests |
|
|
} |
|
|
} |
|
|
|
|
|
const totalData = handleAccountData(total) |
|
|
const dailyData = handleAccountData(daily) |
|
|
const monthlyData = handleAccountData(monthly) |
|
|
|
|
|
|
|
|
const dailyCost = await this.getAccountDailyCost(accountId) |
|
|
|
|
|
return { |
|
|
accountId, |
|
|
total: totalData, |
|
|
daily: { |
|
|
...dailyData, |
|
|
cost: dailyCost |
|
|
}, |
|
|
monthly: monthlyData, |
|
|
averages: { |
|
|
rpm: Math.round(avgRPM * 100) / 100, |
|
|
tpm: Math.round(avgTPM * 100) / 100, |
|
|
dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100, |
|
|
dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100 |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getAllAccountsUsageStats() { |
|
|
try { |
|
|
|
|
|
const accountKeys = await this.client.keys('claude_account:*') |
|
|
const accountStats = [] |
|
|
|
|
|
for (const accountKey of accountKeys) { |
|
|
const accountId = accountKey.replace('claude_account:', '') |
|
|
const accountData = await this.client.hgetall(accountKey) |
|
|
|
|
|
if (accountData.name) { |
|
|
const stats = await this.getAccountUsageStats(accountId) |
|
|
accountStats.push({ |
|
|
id: accountId, |
|
|
name: accountData.name, |
|
|
email: accountData.email || '', |
|
|
status: accountData.status || 'unknown', |
|
|
isActive: accountData.isActive === 'true', |
|
|
...stats |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
accountStats.sort((a, b) => (b.daily.allTokens || 0) - (a.daily.allTokens || 0)) |
|
|
|
|
|
return accountStats |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to get all accounts usage stats:', error) |
|
|
return [] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async resetAllUsageStats() { |
|
|
const client = this.getClientSafe() |
|
|
const stats = { |
|
|
deletedKeys: 0, |
|
|
deletedDailyKeys: 0, |
|
|
deletedMonthlyKeys: 0, |
|
|
resetApiKeys: 0 |
|
|
} |
|
|
|
|
|
try { |
|
|
|
|
|
const apiKeyIds = [] |
|
|
const apiKeyKeys = await client.keys('apikey:*') |
|
|
|
|
|
for (const key of apiKeyKeys) { |
|
|
if (key === 'apikey:hash_map') { |
|
|
continue |
|
|
} |
|
|
const keyId = key.replace('apikey:', '') |
|
|
apiKeyIds.push(keyId) |
|
|
} |
|
|
|
|
|
|
|
|
for (const keyId of apiKeyIds) { |
|
|
|
|
|
const usageKey = `usage:${keyId}` |
|
|
const deleted = await client.del(usageKey) |
|
|
if (deleted > 0) { |
|
|
stats.deletedKeys++ |
|
|
} |
|
|
|
|
|
|
|
|
const dailyKeys = await client.keys(`usage:daily:${keyId}:*`) |
|
|
if (dailyKeys.length > 0) { |
|
|
await client.del(...dailyKeys) |
|
|
stats.deletedDailyKeys += dailyKeys.length |
|
|
} |
|
|
|
|
|
|
|
|
const monthlyKeys = await client.keys(`usage:monthly:${keyId}:*`) |
|
|
if (monthlyKeys.length > 0) { |
|
|
await client.del(...monthlyKeys) |
|
|
stats.deletedMonthlyKeys += monthlyKeys.length |
|
|
} |
|
|
|
|
|
|
|
|
const keyData = await client.hgetall(`apikey:${keyId}`) |
|
|
if (keyData && Object.keys(keyData).length > 0) { |
|
|
keyData.lastUsedAt = '' |
|
|
await client.hset(`apikey:${keyId}`, keyData) |
|
|
stats.resetApiKeys++ |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const allUsageKeys = await client.keys('usage:*') |
|
|
if (allUsageKeys.length > 0) { |
|
|
await client.del(...allUsageKeys) |
|
|
stats.deletedKeys += allUsageKeys.length |
|
|
} |
|
|
|
|
|
return stats |
|
|
} catch (error) { |
|
|
throw new Error(`Failed to reset usage stats: ${error.message}`) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async setClaudeAccount(accountId, accountData) { |
|
|
const key = `claude:account:${accountId}` |
|
|
await this.client.hset(key, accountData) |
|
|
} |
|
|
|
|
|
async getClaudeAccount(accountId) { |
|
|
const key = `claude:account:${accountId}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
|
|
|
async getAllClaudeAccounts() { |
|
|
const keys = await this.client.keys('claude:account:*') |
|
|
const accounts = [] |
|
|
for (const key of keys) { |
|
|
const accountData = await this.client.hgetall(key) |
|
|
if (accountData && Object.keys(accountData).length > 0) { |
|
|
accounts.push({ id: key.replace('claude:account:', ''), ...accountData }) |
|
|
} |
|
|
} |
|
|
return accounts |
|
|
} |
|
|
|
|
|
async deleteClaudeAccount(accountId) { |
|
|
const key = `claude:account:${accountId}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
|
|
|
async setDroidAccount(accountId, accountData) { |
|
|
const key = `droid:account:${accountId}` |
|
|
await this.client.hset(key, accountData) |
|
|
} |
|
|
|
|
|
async getDroidAccount(accountId) { |
|
|
const key = `droid:account:${accountId}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
|
|
|
async getAllDroidAccounts() { |
|
|
const keys = await this.client.keys('droid:account:*') |
|
|
const accounts = [] |
|
|
for (const key of keys) { |
|
|
const accountData = await this.client.hgetall(key) |
|
|
if (accountData && Object.keys(accountData).length > 0) { |
|
|
accounts.push({ id: key.replace('droid:account:', ''), ...accountData }) |
|
|
} |
|
|
} |
|
|
return accounts |
|
|
} |
|
|
|
|
|
async deleteDroidAccount(accountId) { |
|
|
const key = `droid:account:${accountId}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
async setOpenAiAccount(accountId, accountData) { |
|
|
const key = `openai:account:${accountId}` |
|
|
await this.client.hset(key, accountData) |
|
|
} |
|
|
async getOpenAiAccount(accountId) { |
|
|
const key = `openai:account:${accountId}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
async deleteOpenAiAccount(accountId) { |
|
|
const key = `openai:account:${accountId}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
async getAllOpenAIAccounts() { |
|
|
const keys = await this.client.keys('openai:account:*') |
|
|
const accounts = [] |
|
|
for (const key of keys) { |
|
|
const accountData = await this.client.hgetall(key) |
|
|
if (accountData && Object.keys(accountData).length > 0) { |
|
|
accounts.push({ id: key.replace('openai:account:', ''), ...accountData }) |
|
|
} |
|
|
} |
|
|
return accounts |
|
|
} |
|
|
|
|
|
|
|
|
async setSession(sessionId, sessionData, ttl = 86400) { |
|
|
const key = `session:${sessionId}` |
|
|
await this.client.hset(key, sessionData) |
|
|
await this.client.expire(key, ttl) |
|
|
} |
|
|
|
|
|
async getSession(sessionId) { |
|
|
const key = `session:${sessionId}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
|
|
|
async deleteSession(sessionId) { |
|
|
const key = `session:${sessionId}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
|
|
|
async setApiKeyHash(hashedKey, keyData, ttl = 0) { |
|
|
const key = `apikey_hash:${hashedKey}` |
|
|
await this.client.hset(key, keyData) |
|
|
if (ttl > 0) { |
|
|
await this.client.expire(key, ttl) |
|
|
} |
|
|
} |
|
|
|
|
|
async getApiKeyHash(hashedKey) { |
|
|
const key = `apikey_hash:${hashedKey}` |
|
|
return await this.client.hgetall(key) |
|
|
} |
|
|
|
|
|
async deleteApiKeyHash(hashedKey) { |
|
|
const key = `apikey_hash:${hashedKey}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
|
|
|
async setOAuthSession(sessionId, sessionData, ttl = 600) { |
|
|
|
|
|
const key = `oauth:${sessionId}` |
|
|
|
|
|
|
|
|
const serializedData = {} |
|
|
for (const [dataKey, value] of Object.entries(sessionData)) { |
|
|
if (typeof value === 'object' && value !== null) { |
|
|
serializedData[dataKey] = JSON.stringify(value) |
|
|
} else { |
|
|
serializedData[dataKey] = value |
|
|
} |
|
|
} |
|
|
|
|
|
await this.client.hset(key, serializedData) |
|
|
await this.client.expire(key, ttl) |
|
|
} |
|
|
|
|
|
async getOAuthSession(sessionId) { |
|
|
const key = `oauth:${sessionId}` |
|
|
const data = await this.client.hgetall(key) |
|
|
|
|
|
|
|
|
if (data.proxy) { |
|
|
try { |
|
|
data.proxy = JSON.parse(data.proxy) |
|
|
} catch (error) { |
|
|
|
|
|
data.proxy = null |
|
|
} |
|
|
} |
|
|
|
|
|
return data |
|
|
} |
|
|
|
|
|
async deleteOAuthSession(sessionId) { |
|
|
const key = `oauth:${sessionId}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
|
|
|
async getSystemStats() { |
|
|
const keys = await Promise.all([ |
|
|
this.client.keys('apikey:*'), |
|
|
this.client.keys('claude:account:*'), |
|
|
this.client.keys('usage:*') |
|
|
]) |
|
|
|
|
|
return { |
|
|
totalApiKeys: keys[0].length, |
|
|
totalClaudeAccounts: keys[1].length, |
|
|
totalUsageRecords: keys[2].length |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getTodayStats() { |
|
|
try { |
|
|
const today = getDateStringInTimezone() |
|
|
const dailyKeys = await this.client.keys(`usage:daily:*:${today}`) |
|
|
|
|
|
let totalRequestsToday = 0 |
|
|
let totalTokensToday = 0 |
|
|
let totalInputTokensToday = 0 |
|
|
let totalOutputTokensToday = 0 |
|
|
let totalCacheCreateTokensToday = 0 |
|
|
let totalCacheReadTokensToday = 0 |
|
|
|
|
|
|
|
|
if (dailyKeys.length > 0) { |
|
|
const pipeline = this.client.pipeline() |
|
|
dailyKeys.forEach((key) => pipeline.hgetall(key)) |
|
|
const results = await pipeline.exec() |
|
|
|
|
|
for (const [error, dailyData] of results) { |
|
|
if (error || !dailyData) { |
|
|
continue |
|
|
} |
|
|
|
|
|
totalRequestsToday += parseInt(dailyData.requests) || 0 |
|
|
const currentDayTokens = parseInt(dailyData.tokens) || 0 |
|
|
totalTokensToday += currentDayTokens |
|
|
|
|
|
|
|
|
const inputTokens = parseInt(dailyData.inputTokens) || 0 |
|
|
const outputTokens = parseInt(dailyData.outputTokens) || 0 |
|
|
const cacheCreateTokens = parseInt(dailyData.cacheCreateTokens) || 0 |
|
|
const cacheReadTokens = parseInt(dailyData.cacheReadTokens) || 0 |
|
|
const totalTokensFromSeparate = inputTokens + outputTokens |
|
|
|
|
|
if (totalTokensFromSeparate === 0 && currentDayTokens > 0) { |
|
|
|
|
|
totalOutputTokensToday += Math.round(currentDayTokens * 0.7) |
|
|
totalInputTokensToday += Math.round(currentDayTokens * 0.3) |
|
|
} else { |
|
|
|
|
|
totalInputTokensToday += inputTokens |
|
|
totalOutputTokensToday += outputTokens |
|
|
} |
|
|
|
|
|
|
|
|
totalCacheCreateTokensToday += cacheCreateTokens |
|
|
totalCacheReadTokensToday += cacheReadTokens |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
const allApiKeys = await this.client.keys('apikey:*') |
|
|
let apiKeysCreatedToday = 0 |
|
|
|
|
|
if (allApiKeys.length > 0) { |
|
|
const pipeline = this.client.pipeline() |
|
|
allApiKeys.forEach((key) => pipeline.hget(key, 'createdAt')) |
|
|
const results = await pipeline.exec() |
|
|
|
|
|
for (const [error, createdAt] of results) { |
|
|
if (!error && createdAt && createdAt.startsWith(today)) { |
|
|
apiKeysCreatedToday++ |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return { |
|
|
requestsToday: totalRequestsToday, |
|
|
tokensToday: totalTokensToday, |
|
|
inputTokensToday: totalInputTokensToday, |
|
|
outputTokensToday: totalOutputTokensToday, |
|
|
cacheCreateTokensToday: totalCacheCreateTokensToday, |
|
|
cacheReadTokensToday: totalCacheReadTokensToday, |
|
|
apiKeysCreatedToday |
|
|
} |
|
|
} catch (error) { |
|
|
console.error('Error getting today stats:', error) |
|
|
return { |
|
|
requestsToday: 0, |
|
|
tokensToday: 0, |
|
|
inputTokensToday: 0, |
|
|
outputTokensToday: 0, |
|
|
cacheCreateTokensToday: 0, |
|
|
cacheReadTokensToday: 0, |
|
|
apiKeysCreatedToday: 0 |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getSystemAverages() { |
|
|
try { |
|
|
const allApiKeys = await this.client.keys('apikey:*') |
|
|
let totalRequests = 0 |
|
|
let totalTokens = 0 |
|
|
let totalInputTokens = 0 |
|
|
let totalOutputTokens = 0 |
|
|
let oldestCreatedAt = new Date() |
|
|
|
|
|
|
|
|
const usageKeys = allApiKeys.map((key) => `usage:${key.replace('apikey:', '')}`) |
|
|
const pipeline = this.client.pipeline() |
|
|
|
|
|
|
|
|
usageKeys.forEach((key) => pipeline.hgetall(key)) |
|
|
|
|
|
allApiKeys.forEach((key) => pipeline.hgetall(key)) |
|
|
|
|
|
const results = await pipeline.exec() |
|
|
const usageResults = results.slice(0, usageKeys.length) |
|
|
const keyResults = results.slice(usageKeys.length) |
|
|
|
|
|
for (let i = 0; i < allApiKeys.length; i++) { |
|
|
const totalData = usageResults[i][1] || {} |
|
|
const keyData = keyResults[i][1] || {} |
|
|
|
|
|
totalRequests += parseInt(totalData.totalRequests) || 0 |
|
|
totalTokens += parseInt(totalData.totalTokens) || 0 |
|
|
totalInputTokens += parseInt(totalData.totalInputTokens) || 0 |
|
|
totalOutputTokens += parseInt(totalData.totalOutputTokens) || 0 |
|
|
|
|
|
const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date() |
|
|
if (createdAt < oldestCreatedAt) { |
|
|
oldestCreatedAt = createdAt |
|
|
} |
|
|
} |
|
|
|
|
|
const now = new Date() |
|
|
|
|
|
const daysSinceOldest = Math.max( |
|
|
1, |
|
|
Math.ceil((now - oldestCreatedAt) / (1000 * 60 * 60 * 24)) |
|
|
) |
|
|
const totalMinutes = daysSinceOldest * 24 * 60 |
|
|
|
|
|
return { |
|
|
systemRPM: Math.round((totalRequests / totalMinutes) * 100) / 100, |
|
|
systemTPM: Math.round((totalTokens / totalMinutes) * 100) / 100, |
|
|
totalInputTokens, |
|
|
totalOutputTokens, |
|
|
totalTokens |
|
|
} |
|
|
} catch (error) { |
|
|
console.error('Error getting system averages:', error) |
|
|
return { |
|
|
systemRPM: 0, |
|
|
systemTPM: 0, |
|
|
totalInputTokens: 0, |
|
|
totalOutputTokens: 0, |
|
|
totalTokens: 0 |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getRealtimeSystemMetrics() { |
|
|
try { |
|
|
const configLocal = require('../../config/config') |
|
|
const windowMinutes = configLocal.system.metricsWindow || 5 |
|
|
|
|
|
const now = new Date() |
|
|
const currentMinute = Math.floor(now.getTime() / 60000) |
|
|
|
|
|
|
|
|
logger.debug( |
|
|
`🔍 Realtime metrics - Current time: ${now.toISOString()}, Minute timestamp: ${currentMinute}` |
|
|
) |
|
|
|
|
|
|
|
|
const pipeline = this.client.pipeline() |
|
|
const minuteKeys = [] |
|
|
for (let i = 0; i < windowMinutes; i++) { |
|
|
const minuteKey = `system:metrics:minute:${currentMinute - i}` |
|
|
minuteKeys.push(minuteKey) |
|
|
pipeline.hgetall(minuteKey) |
|
|
} |
|
|
|
|
|
logger.debug(`🔍 Realtime metrics - Checking keys: ${minuteKeys.join(', ')}`) |
|
|
|
|
|
const results = await pipeline.exec() |
|
|
|
|
|
|
|
|
let totalRequests = 0 |
|
|
let totalTokens = 0 |
|
|
let totalInputTokens = 0 |
|
|
let totalOutputTokens = 0 |
|
|
let totalCacheCreateTokens = 0 |
|
|
let totalCacheReadTokens = 0 |
|
|
let validDataCount = 0 |
|
|
|
|
|
results.forEach(([err, data], index) => { |
|
|
if (!err && data && Object.keys(data).length > 0) { |
|
|
validDataCount++ |
|
|
totalRequests += parseInt(data.requests || 0) |
|
|
totalTokens += parseInt(data.totalTokens || 0) |
|
|
totalInputTokens += parseInt(data.inputTokens || 0) |
|
|
totalOutputTokens += parseInt(data.outputTokens || 0) |
|
|
totalCacheCreateTokens += parseInt(data.cacheCreateTokens || 0) |
|
|
totalCacheReadTokens += parseInt(data.cacheReadTokens || 0) |
|
|
|
|
|
logger.debug(`🔍 Realtime metrics - Key ${minuteKeys[index]} data:`, { |
|
|
requests: data.requests, |
|
|
totalTokens: data.totalTokens |
|
|
}) |
|
|
} |
|
|
}) |
|
|
|
|
|
logger.debug( |
|
|
`🔍 Realtime metrics - Valid data count: ${validDataCount}/${windowMinutes}, Total requests: ${totalRequests}, Total tokens: ${totalTokens}` |
|
|
) |
|
|
|
|
|
|
|
|
const realtimeRPM = |
|
|
windowMinutes > 0 ? Math.round((totalRequests / windowMinutes) * 100) / 100 : 0 |
|
|
const realtimeTPM = |
|
|
windowMinutes > 0 ? Math.round((totalTokens / windowMinutes) * 100) / 100 : 0 |
|
|
|
|
|
const result = { |
|
|
realtimeRPM, |
|
|
realtimeTPM, |
|
|
windowMinutes, |
|
|
totalRequests, |
|
|
totalTokens, |
|
|
totalInputTokens, |
|
|
totalOutputTokens, |
|
|
totalCacheCreateTokens, |
|
|
totalCacheReadTokens |
|
|
} |
|
|
|
|
|
logger.debug('🔍 Realtime metrics - Final result:', result) |
|
|
|
|
|
return result |
|
|
} catch (error) { |
|
|
console.error('Error getting realtime system metrics:', error) |
|
|
|
|
|
const historicalMetrics = await this.getSystemAverages() |
|
|
return { |
|
|
realtimeRPM: historicalMetrics.systemRPM, |
|
|
realtimeTPM: historicalMetrics.systemTPM, |
|
|
windowMinutes: 0, |
|
|
totalRequests: 0, |
|
|
totalTokens: historicalMetrics.totalTokens, |
|
|
totalInputTokens: historicalMetrics.totalInputTokens, |
|
|
totalOutputTokens: historicalMetrics.totalOutputTokens, |
|
|
totalCacheCreateTokens: 0, |
|
|
totalCacheReadTokens: 0 |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async setSessionAccountMapping(sessionHash, accountId, ttl = null) { |
|
|
const appConfig = require('../../config/config') |
|
|
|
|
|
const defaultTTL = ttl !== null ? ttl : (appConfig.session?.stickyTtlHours || 1) * 60 * 60 |
|
|
const key = `sticky_session:${sessionHash}` |
|
|
await this.client.set(key, accountId, 'EX', defaultTTL) |
|
|
} |
|
|
|
|
|
async getSessionAccountMapping(sessionHash) { |
|
|
const key = `sticky_session:${sessionHash}` |
|
|
return await this.client.get(key) |
|
|
} |
|
|
|
|
|
|
|
|
async extendSessionAccountMappingTTL(sessionHash) { |
|
|
const appConfig = require('../../config/config') |
|
|
const key = `sticky_session:${sessionHash}` |
|
|
|
|
|
|
|
|
const ttlHours = appConfig.session?.stickyTtlHours || 1 |
|
|
const thresholdMinutes = appConfig.session?.renewalThresholdMinutes || 0 |
|
|
|
|
|
|
|
|
if (thresholdMinutes === 0) { |
|
|
return true |
|
|
} |
|
|
|
|
|
const fullTTL = ttlHours * 60 * 60 |
|
|
const renewalThreshold = thresholdMinutes * 60 |
|
|
|
|
|
try { |
|
|
|
|
|
const remainingTTL = await this.client.ttl(key) |
|
|
|
|
|
|
|
|
if (remainingTTL === -2) { |
|
|
return false |
|
|
} |
|
|
|
|
|
|
|
|
if (remainingTTL === -1) { |
|
|
return true |
|
|
} |
|
|
|
|
|
|
|
|
if (remainingTTL < renewalThreshold) { |
|
|
await this.client.expire(key, fullTTL) |
|
|
logger.debug( |
|
|
`🔄 Renewed sticky session TTL: ${sessionHash} (was ${Math.round( |
|
|
remainingTTL / 60 |
|
|
)}min, renewed to ${ttlHours}h)` |
|
|
) |
|
|
return true |
|
|
} |
|
|
|
|
|
|
|
|
logger.debug( |
|
|
`✅ Sticky session TTL sufficient: ${sessionHash} (remaining ${Math.round( |
|
|
remainingTTL / 60 |
|
|
)}min)` |
|
|
) |
|
|
return true |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to extend session TTL:', error) |
|
|
return false |
|
|
} |
|
|
} |
|
|
|
|
|
async deleteSessionAccountMapping(sessionHash) { |
|
|
const key = `sticky_session:${sessionHash}` |
|
|
return await this.client.del(key) |
|
|
} |
|
|
|
|
|
|
|
|
async cleanup() { |
|
|
try { |
|
|
const patterns = ['usage:daily:*', 'ratelimit:*', 'session:*', 'sticky_session:*', 'oauth:*'] |
|
|
|
|
|
for (const pattern of patterns) { |
|
|
const keys = await this.client.keys(pattern) |
|
|
const pipeline = this.client.pipeline() |
|
|
|
|
|
for (const key of keys) { |
|
|
const ttl = await this.client.ttl(key) |
|
|
if (ttl === -1) { |
|
|
|
|
|
if (key.startsWith('oauth:')) { |
|
|
pipeline.expire(key, 600) |
|
|
} else { |
|
|
pipeline.expire(key, 86400) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
await pipeline.exec() |
|
|
} |
|
|
|
|
|
logger.info('🧹 Redis cleanup completed') |
|
|
} catch (error) { |
|
|
logger.error('❌ Redis cleanup failed:', error) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_getConcurrencyConfig() { |
|
|
const defaults = { |
|
|
leaseSeconds: 300, |
|
|
renewIntervalSeconds: 30, |
|
|
cleanupGraceSeconds: 30 |
|
|
} |
|
|
|
|
|
const configValues = { |
|
|
...defaults, |
|
|
...(config.concurrency || {}) |
|
|
} |
|
|
|
|
|
const normalizeNumber = (value, fallback, options = {}) => { |
|
|
const parsed = Number(value) |
|
|
if (!Number.isFinite(parsed)) { |
|
|
return fallback |
|
|
} |
|
|
|
|
|
if (options.allowZero && parsed === 0) { |
|
|
return 0 |
|
|
} |
|
|
|
|
|
if (options.min !== undefined && parsed < options.min) { |
|
|
return options.min |
|
|
} |
|
|
|
|
|
return parsed |
|
|
} |
|
|
|
|
|
return { |
|
|
leaseSeconds: normalizeNumber(configValues.leaseSeconds, defaults.leaseSeconds, { |
|
|
min: 30 |
|
|
}), |
|
|
renewIntervalSeconds: normalizeNumber( |
|
|
configValues.renewIntervalSeconds, |
|
|
defaults.renewIntervalSeconds, |
|
|
{ |
|
|
allowZero: true, |
|
|
min: 0 |
|
|
} |
|
|
), |
|
|
cleanupGraceSeconds: normalizeNumber( |
|
|
configValues.cleanupGraceSeconds, |
|
|
defaults.cleanupGraceSeconds, |
|
|
{ |
|
|
min: 0 |
|
|
} |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async incrConcurrency(apiKeyId, requestId, leaseSeconds = null) { |
|
|
if (!requestId) { |
|
|
throw new Error('Request ID is required for concurrency tracking') |
|
|
} |
|
|
|
|
|
try { |
|
|
const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } = |
|
|
this._getConcurrencyConfig() |
|
|
const lease = leaseSeconds || defaultLeaseSeconds |
|
|
const key = `concurrency:${apiKeyId}` |
|
|
const now = Date.now() |
|
|
const expireAt = now + lease * 1000 |
|
|
const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000) |
|
|
|
|
|
const luaScript = ` |
|
|
local key = KEYS[1] |
|
|
local member = ARGV[1] |
|
|
local expireAt = tonumber(ARGV[2]) |
|
|
local now = tonumber(ARGV[3]) |
|
|
local ttl = tonumber(ARGV[4]) |
|
|
|
|
|
redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
|
|
redis.call('ZADD', key, expireAt, member) |
|
|
|
|
|
if ttl > 0 then |
|
|
redis.call('PEXPIRE', key, ttl) |
|
|
end |
|
|
|
|
|
local count = redis.call('ZCARD', key) |
|
|
return count |
|
|
` |
|
|
|
|
|
const count = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl) |
|
|
logger.database( |
|
|
`🔢 Incremented concurrency for key ${apiKeyId}: ${count} (request ${requestId})` |
|
|
) |
|
|
return count |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to increment concurrency:', error) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async refreshConcurrencyLease(apiKeyId, requestId, leaseSeconds = null) { |
|
|
if (!requestId) { |
|
|
return 0 |
|
|
} |
|
|
|
|
|
try { |
|
|
const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } = |
|
|
this._getConcurrencyConfig() |
|
|
const lease = leaseSeconds || defaultLeaseSeconds |
|
|
const key = `concurrency:${apiKeyId}` |
|
|
const now = Date.now() |
|
|
const expireAt = now + lease * 1000 |
|
|
const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000) |
|
|
|
|
|
const luaScript = ` |
|
|
local key = KEYS[1] |
|
|
local member = ARGV[1] |
|
|
local expireAt = tonumber(ARGV[2]) |
|
|
local now = tonumber(ARGV[3]) |
|
|
local ttl = tonumber(ARGV[4]) |
|
|
|
|
|
redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
|
|
|
|
|
local exists = redis.call('ZSCORE', key, member) |
|
|
|
|
|
if exists then |
|
|
redis.call('ZADD', key, expireAt, member) |
|
|
if ttl > 0 then |
|
|
redis.call('PEXPIRE', key, ttl) |
|
|
end |
|
|
return 1 |
|
|
end |
|
|
|
|
|
return 0 |
|
|
` |
|
|
|
|
|
const refreshed = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl) |
|
|
if (refreshed === 1) { |
|
|
logger.debug(`🔄 Refreshed concurrency lease for key ${apiKeyId} (request ${requestId})`) |
|
|
} |
|
|
return refreshed |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to refresh concurrency lease:', error) |
|
|
return 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async decrConcurrency(apiKeyId, requestId) { |
|
|
try { |
|
|
const key = `concurrency:${apiKeyId}` |
|
|
const now = Date.now() |
|
|
|
|
|
const luaScript = ` |
|
|
local key = KEYS[1] |
|
|
local member = ARGV[1] |
|
|
local now = tonumber(ARGV[2]) |
|
|
|
|
|
if member then |
|
|
redis.call('ZREM', key, member) |
|
|
end |
|
|
|
|
|
redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
|
|
|
|
|
local count = redis.call('ZCARD', key) |
|
|
if count <= 0 then |
|
|
redis.call('DEL', key) |
|
|
return 0 |
|
|
end |
|
|
|
|
|
return count |
|
|
` |
|
|
|
|
|
const count = await this.client.eval(luaScript, 1, key, requestId || '', now) |
|
|
logger.database( |
|
|
`🔢 Decremented concurrency for key ${apiKeyId}: ${count} (request ${requestId || 'n/a'})` |
|
|
) |
|
|
return count |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to decrement concurrency:', error) |
|
|
throw error |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async getConcurrency(apiKeyId) { |
|
|
try { |
|
|
const key = `concurrency:${apiKeyId}` |
|
|
const now = Date.now() |
|
|
|
|
|
const luaScript = ` |
|
|
local key = KEYS[1] |
|
|
local now = tonumber(ARGV[1]) |
|
|
|
|
|
redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
|
|
return redis.call('ZCARD', key) |
|
|
` |
|
|
|
|
|
const count = await this.client.eval(luaScript, 1, key, now) |
|
|
return parseInt(count || 0) |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to get concurrency:', error) |
|
|
return 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async get(key) { |
|
|
const client = this.getClientSafe() |
|
|
return await client.get(key) |
|
|
} |
|
|
|
|
|
async set(key, value, ...args) { |
|
|
const client = this.getClientSafe() |
|
|
return await client.set(key, value, ...args) |
|
|
} |
|
|
|
|
|
async setex(key, ttl, value) { |
|
|
const client = this.getClientSafe() |
|
|
return await client.setex(key, ttl, value) |
|
|
} |
|
|
|
|
|
async del(...keys) { |
|
|
const client = this.getClientSafe() |
|
|
return await client.del(...keys) |
|
|
} |
|
|
|
|
|
async keys(pattern) { |
|
|
const client = this.getClientSafe() |
|
|
return await client.keys(pattern) |
|
|
} |
|
|
|
|
|
|
|
|
async getAccountSessionWindowUsage(accountId, windowStart, windowEnd) { |
|
|
try { |
|
|
if (!windowStart || !windowEnd) { |
|
|
return { |
|
|
totalInputTokens: 0, |
|
|
totalOutputTokens: 0, |
|
|
totalCacheCreateTokens: 0, |
|
|
totalCacheReadTokens: 0, |
|
|
totalAllTokens: 0, |
|
|
totalRequests: 0, |
|
|
modelUsage: {} |
|
|
} |
|
|
} |
|
|
|
|
|
const startDate = new Date(windowStart) |
|
|
const endDate = new Date(windowEnd) |
|
|
|
|
|
|
|
|
logger.debug(`📊 Getting session window usage for account ${accountId}`) |
|
|
logger.debug(` Window: ${windowStart} to ${windowEnd}`) |
|
|
logger.debug(` Start UTC: ${startDate.toISOString()}, End UTC: ${endDate.toISOString()}`) |
|
|
|
|
|
|
|
|
|
|
|
const hourlyKeys = [] |
|
|
const currentHour = new Date(startDate) |
|
|
currentHour.setMinutes(0) |
|
|
currentHour.setSeconds(0) |
|
|
currentHour.setMilliseconds(0) |
|
|
|
|
|
while (currentHour <= endDate) { |
|
|
|
|
|
const tzDateStr = getDateStringInTimezone(currentHour) |
|
|
const tzHour = String(getHourInTimezone(currentHour)).padStart(2, '0') |
|
|
const key = `account_usage:hourly:${accountId}:${tzDateStr}:${tzHour}` |
|
|
|
|
|
logger.debug(` Adding hourly key: ${key}`) |
|
|
hourlyKeys.push(key) |
|
|
currentHour.setHours(currentHour.getHours() + 1) |
|
|
} |
|
|
|
|
|
|
|
|
const pipeline = this.client.pipeline() |
|
|
for (const key of hourlyKeys) { |
|
|
pipeline.hgetall(key) |
|
|
} |
|
|
const results = await pipeline.exec() |
|
|
|
|
|
|
|
|
let totalInputTokens = 0 |
|
|
let totalOutputTokens = 0 |
|
|
let totalCacheCreateTokens = 0 |
|
|
let totalCacheReadTokens = 0 |
|
|
let totalAllTokens = 0 |
|
|
let totalRequests = 0 |
|
|
const modelUsage = {} |
|
|
|
|
|
logger.debug(` Processing ${results.length} hourly results`) |
|
|
|
|
|
for (const [error, data] of results) { |
|
|
if (error || !data || Object.keys(data).length === 0) { |
|
|
continue |
|
|
} |
|
|
|
|
|
|
|
|
const hourInputTokens = parseInt(data.inputTokens || 0) |
|
|
const hourOutputTokens = parseInt(data.outputTokens || 0) |
|
|
const hourCacheCreateTokens = parseInt(data.cacheCreateTokens || 0) |
|
|
const hourCacheReadTokens = parseInt(data.cacheReadTokens || 0) |
|
|
const hourAllTokens = parseInt(data.allTokens || 0) |
|
|
const hourRequests = parseInt(data.requests || 0) |
|
|
|
|
|
totalInputTokens += hourInputTokens |
|
|
totalOutputTokens += hourOutputTokens |
|
|
totalCacheCreateTokens += hourCacheCreateTokens |
|
|
totalCacheReadTokens += hourCacheReadTokens |
|
|
totalAllTokens += hourAllTokens |
|
|
totalRequests += hourRequests |
|
|
|
|
|
if (hourAllTokens > 0) { |
|
|
logger.debug(` Hour data: allTokens=${hourAllTokens}, requests=${hourRequests}`) |
|
|
} |
|
|
|
|
|
|
|
|
for (const [key, value] of Object.entries(data)) { |
|
|
|
|
|
if (key.startsWith('model:')) { |
|
|
const parts = key.split(':') |
|
|
if (parts.length >= 3) { |
|
|
const modelName = parts[1] |
|
|
const metric = parts.slice(2).join(':') |
|
|
|
|
|
if (!modelUsage[modelName]) { |
|
|
modelUsage[modelName] = { |
|
|
inputTokens: 0, |
|
|
outputTokens: 0, |
|
|
cacheCreateTokens: 0, |
|
|
cacheReadTokens: 0, |
|
|
allTokens: 0, |
|
|
requests: 0 |
|
|
} |
|
|
} |
|
|
|
|
|
if (metric === 'inputTokens') { |
|
|
modelUsage[modelName].inputTokens += parseInt(value || 0) |
|
|
} else if (metric === 'outputTokens') { |
|
|
modelUsage[modelName].outputTokens += parseInt(value || 0) |
|
|
} else if (metric === 'cacheCreateTokens') { |
|
|
modelUsage[modelName].cacheCreateTokens += parseInt(value || 0) |
|
|
} else if (metric === 'cacheReadTokens') { |
|
|
modelUsage[modelName].cacheReadTokens += parseInt(value || 0) |
|
|
} else if (metric === 'allTokens') { |
|
|
modelUsage[modelName].allTokens += parseInt(value || 0) |
|
|
} else if (metric === 'requests') { |
|
|
modelUsage[modelName].requests += parseInt(value || 0) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
logger.debug(`📊 Session window usage summary:`) |
|
|
logger.debug(` Total allTokens: ${totalAllTokens}`) |
|
|
logger.debug(` Total requests: ${totalRequests}`) |
|
|
logger.debug(` Input: ${totalInputTokens}, Output: ${totalOutputTokens}`) |
|
|
logger.debug( |
|
|
` Cache Create: ${totalCacheCreateTokens}, Cache Read: ${totalCacheReadTokens}` |
|
|
) |
|
|
|
|
|
return { |
|
|
totalInputTokens, |
|
|
totalOutputTokens, |
|
|
totalCacheCreateTokens, |
|
|
totalCacheReadTokens, |
|
|
totalAllTokens, |
|
|
totalRequests, |
|
|
modelUsage |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error(`❌ Failed to get session window usage for account ${accountId}:`, error) |
|
|
return { |
|
|
totalInputTokens: 0, |
|
|
totalOutputTokens: 0, |
|
|
totalCacheCreateTokens: 0, |
|
|
totalCacheReadTokens: 0, |
|
|
totalAllTokens: 0, |
|
|
totalRequests: 0, |
|
|
modelUsage: {} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
const redisClient = new RedisClient() |
|
|
|
|
|
|
|
|
redisClient.setAccountLock = async function (lockKey, lockValue, ttlMs) { |
|
|
try { |
|
|
|
|
|
|
|
|
const result = await this.client.set(lockKey, lockValue, 'PX', ttlMs, 'NX') |
|
|
return result === 'OK' |
|
|
} catch (error) { |
|
|
logger.error(`Failed to acquire lock ${lockKey}:`, error) |
|
|
return false |
|
|
} |
|
|
} |
|
|
|
|
|
redisClient.releaseAccountLock = async function (lockKey, lockValue) { |
|
|
try { |
|
|
|
|
|
const script = ` |
|
|
if redis.call("get", KEYS[1]) == ARGV[1] then |
|
|
return redis.call("del", KEYS[1]) |
|
|
else |
|
|
return 0 |
|
|
end |
|
|
` |
|
|
|
|
|
const result = await this.client.eval(script, 1, lockKey, lockValue) |
|
|
return result === 1 |
|
|
} catch (error) { |
|
|
logger.error(`Failed to release lock ${lockKey}:`, error) |
|
|
return false |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
redisClient.getDateInTimezone = getDateInTimezone |
|
|
redisClient.getDateStringInTimezone = getDateStringInTimezone |
|
|
redisClient.getHourInTimezone = getHourInTimezone |
|
|
redisClient.getWeekStringInTimezone = getWeekStringInTimezone |
|
|
|
|
|
module.exports = redisClient |
|
|
|