const redis = require('../models/redis') const logger = require('../utils/logger') /** * 计费事件发布器 - 使用 Redis Stream 解耦计费系统 * * 设计原则: * 1. 异步非阻塞: 发布失败不影响主流程 * 2. 结构化数据: 使用标准化的事件格式 * 3. 可追溯性: 每个事件包含完整上下文 */ class BillingEventPublisher { constructor() { this.streamKey = 'billing:events' this.maxLength = 100000 // 保留最近 10 万条事件 this.enabled = process.env.BILLING_EVENTS_ENABLED !== 'false' // 默认开启 } /** * 发布计费事件 * @param {Object} eventData - 事件数据 * @returns {Promise} - 事件ID 或 null */ async publishBillingEvent(eventData) { if (!this.enabled) { logger.debug('📭 Billing events disabled, skipping publish') return null } try { const client = redis.getClientSafe() // 构建标准化事件 const event = { // 事件元数据 eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, eventType: 'usage.recorded', timestamp: new Date().toISOString(), version: '1.0', // 核心计费数据 apiKey: { id: eventData.keyId, name: eventData.keyName || null, userId: eventData.userId || null }, // 使用量详情 usage: { model: eventData.model, inputTokens: eventData.inputTokens || 0, outputTokens: eventData.outputTokens || 0, cacheCreateTokens: eventData.cacheCreateTokens || 0, cacheReadTokens: eventData.cacheReadTokens || 0, ephemeral5mTokens: eventData.ephemeral5mTokens || 0, ephemeral1hTokens: eventData.ephemeral1hTokens || 0, totalTokens: eventData.totalTokens || 0 }, // 费用详情 cost: { total: eventData.cost || 0, currency: 'USD', breakdown: { input: eventData.costBreakdown?.input || 0, output: eventData.costBreakdown?.output || 0, cacheCreate: eventData.costBreakdown?.cacheCreate || 0, cacheRead: eventData.costBreakdown?.cacheRead || 0, ephemeral5m: eventData.costBreakdown?.ephemeral5m || 0, ephemeral1h: eventData.costBreakdown?.ephemeral1h || 0 } }, // 账户信息 account: { id: eventData.accountId || null, type: eventData.accountType || null }, // 请求上下文 context: { isLongContext: eventData.isLongContext || false, requestTimestamp: eventData.requestTimestamp || new Date().toISOString() } } // 使用 XADD 发布事件到 Stream // MAXLEN ~ 10000: 近似截断,保持性能 const messageId = await client.xadd( this.streamKey, 'MAXLEN', '~', this.maxLength, '*', // 自动生成消息ID 'data', JSON.stringify(event) ) logger.debug( `📤 Published billing event: ${messageId} | Key: ${eventData.keyId} | Cost: $${event.cost.total.toFixed(6)}` ) return messageId } catch (error) { // ⚠️ 发布失败不影响主流程,只记录错误 logger.error('❌ Failed to publish billing event:', error) return null } } /** * 批量发布计费事件(优化性能) * @param {Array} events - 事件数组 * @returns {Promise} - 成功发布的事件数 */ async publishBatchBillingEvents(events) { if (!this.enabled || !events || events.length === 0) { return 0 } try { const client = redis.getClientSafe() const pipeline = client.pipeline() events.forEach((eventData) => { const event = { eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, eventType: 'usage.recorded', timestamp: new Date().toISOString(), version: '1.0', apiKey: { id: eventData.keyId, name: eventData.keyName || null }, usage: { model: eventData.model, inputTokens: eventData.inputTokens || 0, outputTokens: eventData.outputTokens || 0, totalTokens: eventData.totalTokens || 0 }, cost: { total: eventData.cost || 0, currency: 'USD' } } pipeline.xadd( this.streamKey, 'MAXLEN', '~', this.maxLength, '*', 'data', JSON.stringify(event) ) }) const results = await pipeline.exec() const successCount = results.filter((r) => r[0] === null).length logger.info(`📤 Batch published ${successCount}/${events.length} billing events`) return successCount } catch (error) { logger.error('❌ Failed to batch publish billing events:', error) return 0 } } /** * 获取 Stream 信息(用于监控) * @returns {Promise} */ async getStreamInfo() { try { const client = redis.getClientSafe() const info = await client.xinfo('STREAM', this.streamKey) // 解析 Redis XINFO 返回的数组格式 const result = {} for (let i = 0; i < info.length; i += 2) { result[info[i]] = info[i + 1] } return { length: result.length || 0, firstEntry: result['first-entry'] || null, lastEntry: result['last-entry'] || null, groups: result.groups || 0 } } catch (error) { if (error.message.includes('no such key')) { return { length: 0, groups: 0 } } logger.error('❌ Failed to get stream info:', error) return null } } /** * 创建消费者组(供外部计费系统使用) * @param {string} groupName - 消费者组名称 * @returns {Promise} */ async createConsumerGroup(groupName = 'billing-system') { try { const client = redis.getClientSafe() // MKSTREAM: 如果 stream 不存在则创建 await client.xgroup('CREATE', this.streamKey, groupName, '0', 'MKSTREAM') logger.success(`✅ Created consumer group: ${groupName}`) return true } catch (error) { if (error.message.includes('BUSYGROUP')) { logger.debug(`Consumer group ${groupName} already exists`) return true } logger.error(`❌ Failed to create consumer group ${groupName}:`, error) return false } } } module.exports = new BillingEventPublisher()