|
|
const redis = require('../models/redis') |
|
|
const logger = require('../utils/logger') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BillingEventPublisher { |
|
|
constructor() { |
|
|
this.streamKey = 'billing:events' |
|
|
this.maxLength = 100000 |
|
|
this.enabled = process.env.BILLING_EVENTS_ENABLED !== 'false' |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async publishBillingEvent(eventData) { |
|
|
if (!this.enabled) { |
|
|
logger.debug('📭 Billing events disabled, skipping publish') |
|
|
return null |
|
|
} |
|
|
|
|
|
try { |
|
|
const client = redis.getClientSafe() |
|
|
|
|
|
|
|
|
const event = { |
|
|
|
|
|
eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, |
|
|
eventType: 'usage.recorded', |
|
|
timestamp: new Date().toISOString(), |
|
|
version: '1.0', |
|
|
|
|
|
|
|
|
apiKey: { |
|
|
id: eventData.keyId, |
|
|
name: eventData.keyName || null, |
|
|
userId: eventData.userId || null |
|
|
}, |
|
|
|
|
|
|
|
|
usage: { |
|
|
model: eventData.model, |
|
|
inputTokens: eventData.inputTokens || 0, |
|
|
outputTokens: eventData.outputTokens || 0, |
|
|
cacheCreateTokens: eventData.cacheCreateTokens || 0, |
|
|
cacheReadTokens: eventData.cacheReadTokens || 0, |
|
|
ephemeral5mTokens: eventData.ephemeral5mTokens || 0, |
|
|
ephemeral1hTokens: eventData.ephemeral1hTokens || 0, |
|
|
totalTokens: eventData.totalTokens || 0 |
|
|
}, |
|
|
|
|
|
|
|
|
cost: { |
|
|
total: eventData.cost || 0, |
|
|
currency: 'USD', |
|
|
breakdown: { |
|
|
input: eventData.costBreakdown?.input || 0, |
|
|
output: eventData.costBreakdown?.output || 0, |
|
|
cacheCreate: eventData.costBreakdown?.cacheCreate || 0, |
|
|
cacheRead: eventData.costBreakdown?.cacheRead || 0, |
|
|
ephemeral5m: eventData.costBreakdown?.ephemeral5m || 0, |
|
|
ephemeral1h: eventData.costBreakdown?.ephemeral1h || 0 |
|
|
} |
|
|
}, |
|
|
|
|
|
|
|
|
account: { |
|
|
id: eventData.accountId || null, |
|
|
type: eventData.accountType || null |
|
|
}, |
|
|
|
|
|
|
|
|
context: { |
|
|
isLongContext: eventData.isLongContext || false, |
|
|
requestTimestamp: eventData.requestTimestamp || new Date().toISOString() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const messageId = await client.xadd( |
|
|
this.streamKey, |
|
|
'MAXLEN', |
|
|
'~', |
|
|
this.maxLength, |
|
|
'*', |
|
|
'data', |
|
|
JSON.stringify(event) |
|
|
) |
|
|
|
|
|
logger.debug( |
|
|
`📤 Published billing event: ${messageId} | Key: ${eventData.keyId} | Cost: $${event.cost.total.toFixed(6)}` |
|
|
) |
|
|
|
|
|
return messageId |
|
|
} catch (error) { |
|
|
|
|
|
logger.error('❌ Failed to publish billing event:', error) |
|
|
return null |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async publishBatchBillingEvents(events) { |
|
|
if (!this.enabled || !events || events.length === 0) { |
|
|
return 0 |
|
|
} |
|
|
|
|
|
try { |
|
|
const client = redis.getClientSafe() |
|
|
const pipeline = client.pipeline() |
|
|
|
|
|
events.forEach((eventData) => { |
|
|
const event = { |
|
|
eventId: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, |
|
|
eventType: 'usage.recorded', |
|
|
timestamp: new Date().toISOString(), |
|
|
version: '1.0', |
|
|
apiKey: { |
|
|
id: eventData.keyId, |
|
|
name: eventData.keyName || null |
|
|
}, |
|
|
usage: { |
|
|
model: eventData.model, |
|
|
inputTokens: eventData.inputTokens || 0, |
|
|
outputTokens: eventData.outputTokens || 0, |
|
|
totalTokens: eventData.totalTokens || 0 |
|
|
}, |
|
|
cost: { |
|
|
total: eventData.cost || 0, |
|
|
currency: 'USD' |
|
|
} |
|
|
} |
|
|
|
|
|
pipeline.xadd( |
|
|
this.streamKey, |
|
|
'MAXLEN', |
|
|
'~', |
|
|
this.maxLength, |
|
|
'*', |
|
|
'data', |
|
|
JSON.stringify(event) |
|
|
) |
|
|
}) |
|
|
|
|
|
const results = await pipeline.exec() |
|
|
const successCount = results.filter((r) => r[0] === null).length |
|
|
|
|
|
logger.info(`📤 Batch published ${successCount}/${events.length} billing events`) |
|
|
return successCount |
|
|
} catch (error) { |
|
|
logger.error('❌ Failed to batch publish billing events:', error) |
|
|
return 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async getStreamInfo() { |
|
|
try { |
|
|
const client = redis.getClientSafe() |
|
|
const info = await client.xinfo('STREAM', this.streamKey) |
|
|
|
|
|
|
|
|
const result = {} |
|
|
for (let i = 0; i < info.length; i += 2) { |
|
|
result[info[i]] = info[i + 1] |
|
|
} |
|
|
|
|
|
return { |
|
|
length: result.length || 0, |
|
|
firstEntry: result['first-entry'] || null, |
|
|
lastEntry: result['last-entry'] || null, |
|
|
groups: result.groups || 0 |
|
|
} |
|
|
} catch (error) { |
|
|
if (error.message.includes('no such key')) { |
|
|
return { length: 0, groups: 0 } |
|
|
} |
|
|
logger.error('❌ Failed to get stream info:', error) |
|
|
return null |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async createConsumerGroup(groupName = 'billing-system') { |
|
|
try { |
|
|
const client = redis.getClientSafe() |
|
|
|
|
|
|
|
|
await client.xgroup('CREATE', this.streamKey, groupName, '0', 'MKSTREAM') |
|
|
|
|
|
logger.success(`✅ Created consumer group: ${groupName}`) |
|
|
return true |
|
|
} catch (error) { |
|
|
if (error.message.includes('BUSYGROUP')) { |
|
|
logger.debug(`Consumer group ${groupName} already exists`) |
|
|
return true |
|
|
} |
|
|
logger.error(`❌ Failed to create consumer group ${groupName}:`, error) |
|
|
return false |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
module.exports = new BillingEventPublisher() |
|
|
|