|
|
#!/usr/bin/env node |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const path = require('path') |
|
|
const Redis = require('ioredis') |
|
|
|
|
|
|
|
|
require('dotenv').config({ path: path.join(__dirname, '../.env') }) |
|
|
|
|
|
const config = { |
|
|
host: process.env.REDIS_HOST || 'localhost', |
|
|
port: parseInt(process.env.REDIS_PORT) || 6379, |
|
|
password: process.env.REDIS_PASSWORD || '', |
|
|
db: parseInt(process.env.REDIS_DB) || 0 |
|
|
} |
|
|
|
|
|
const redis = new Redis(config) |
|
|
const STREAM_KEY = 'billing:events' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function publishTestEvent() { |
|
|
console.log('📤 Publishing test billing event...') |
|
|
|
|
|
const testEvent = { |
|
|
eventId: `test-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, |
|
|
eventType: 'usage.recorded', |
|
|
timestamp: new Date().toISOString(), |
|
|
version: '1.0', |
|
|
apiKey: { |
|
|
id: 'test-key-123', |
|
|
name: 'Test API Key', |
|
|
userId: 'test-user-456' |
|
|
}, |
|
|
usage: { |
|
|
model: 'claude-sonnet-4-20250514', |
|
|
inputTokens: 1500, |
|
|
outputTokens: 800, |
|
|
cacheCreateTokens: 200, |
|
|
cacheReadTokens: 100, |
|
|
ephemeral5mTokens: 150, |
|
|
ephemeral1hTokens: 50, |
|
|
totalTokens: 2600 |
|
|
}, |
|
|
cost: { |
|
|
total: 0.0156, |
|
|
currency: 'USD', |
|
|
breakdown: { |
|
|
input: 0.0045, |
|
|
output: 0.012, |
|
|
cacheCreate: 0.00075, |
|
|
cacheRead: 0.00003, |
|
|
ephemeral5m: 0.0005625, |
|
|
ephemeral1h: 0.0001875 |
|
|
} |
|
|
}, |
|
|
account: { |
|
|
id: 'test-account-789', |
|
|
type: 'claude-official' |
|
|
}, |
|
|
context: { |
|
|
isLongContext: false, |
|
|
requestTimestamp: new Date().toISOString() |
|
|
} |
|
|
} |
|
|
|
|
|
try { |
|
|
const messageId = await redis.xadd( |
|
|
STREAM_KEY, |
|
|
'MAXLEN', |
|
|
'~', |
|
|
100000, |
|
|
'*', |
|
|
'data', |
|
|
JSON.stringify(testEvent) |
|
|
) |
|
|
|
|
|
console.log('✅ Event published successfully!') |
|
|
console.log(` Message ID: ${messageId}`) |
|
|
console.log(` Event ID: ${testEvent.eventId}`) |
|
|
console.log(` Cost: $${testEvent.cost.total}`) |
|
|
} catch (error) { |
|
|
console.error('❌ Failed to publish event:', error.message) |
|
|
process.exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function consumeTestEvents() { |
|
|
console.log('📬 Consuming test events...') |
|
|
console.log(' Press Ctrl+C to stop\n') |
|
|
|
|
|
let isRunning = true |
|
|
|
|
|
process.on('SIGINT', () => { |
|
|
console.log('\n⏹️ Stopping consumer...') |
|
|
isRunning = false |
|
|
}) |
|
|
|
|
|
let lastId = '0' |
|
|
|
|
|
while (isRunning) { |
|
|
try { |
|
|
|
|
|
const messages = await redis.xread('BLOCK', 5000, 'COUNT', 10, 'STREAMS', STREAM_KEY, lastId) |
|
|
|
|
|
if (!messages || messages.length === 0) { |
|
|
continue |
|
|
} |
|
|
|
|
|
const [streamKey, entries] = messages[0] |
|
|
console.log(`📬 Received ${entries.length} messages from ${streamKey}\n`) |
|
|
|
|
|
for (const [messageId, fields] of entries) { |
|
|
try { |
|
|
const data = {} |
|
|
for (let i = 0; i < fields.length; i += 2) { |
|
|
data[fields[i]] = fields[i + 1] |
|
|
} |
|
|
|
|
|
const event = JSON.parse(data.data) |
|
|
|
|
|
console.log(`📊 Event: ${event.eventId}`) |
|
|
console.log(` API Key: ${event.apiKey.name} (${event.apiKey.id})`) |
|
|
console.log(` Model: ${event.usage.model}`) |
|
|
console.log(` Tokens: ${event.usage.totalTokens}`) |
|
|
console.log(` Cost: $${event.cost.total.toFixed(6)}`) |
|
|
console.log(` Timestamp: ${event.timestamp}`) |
|
|
console.log('') |
|
|
|
|
|
lastId = messageId |
|
|
} catch (parseError) { |
|
|
console.error(`❌ Failed to parse message ${messageId}:`, parseError.message) |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
if (isRunning) { |
|
|
console.error('❌ Error consuming messages:', error.message) |
|
|
await new Promise((resolve) => setTimeout(resolve, 5000)) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
console.log('👋 Consumer stopped') |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function showQueueInfo() { |
|
|
console.log('📊 Queue Information\n') |
|
|
|
|
|
try { |
|
|
|
|
|
const length = await redis.xlen(STREAM_KEY) |
|
|
console.log(`Stream: ${STREAM_KEY}`) |
|
|
console.log(`Length: ${length} messages\n`) |
|
|
|
|
|
if (length === 0) { |
|
|
console.log('ℹ️ Queue is empty') |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
const info = await redis.xinfo('STREAM', STREAM_KEY) |
|
|
const infoObj = {} |
|
|
for (let i = 0; i < info.length; i += 2) { |
|
|
infoObj[info[i]] = info[i + 1] |
|
|
} |
|
|
|
|
|
console.log('Stream Details:') |
|
|
console.log(` First Entry ID: ${infoObj['first-entry'] ? infoObj['first-entry'][0] : 'N/A'}`) |
|
|
console.log(` Last Entry ID: ${infoObj['last-entry'] ? infoObj['last-entry'][0] : 'N/A'}`) |
|
|
console.log(` Consumer Groups: ${infoObj.groups || 0}\n`) |
|
|
|
|
|
|
|
|
if (infoObj.groups > 0) { |
|
|
console.log('Consumer Groups:') |
|
|
const groups = await redis.xinfo('GROUPS', STREAM_KEY) |
|
|
|
|
|
for (let i = 0; i < groups.length; i++) { |
|
|
const group = groups[i] |
|
|
const groupObj = {} |
|
|
for (let j = 0; j < group.length; j += 2) { |
|
|
groupObj[group[j]] = group[j + 1] |
|
|
} |
|
|
|
|
|
console.log(`\n Group: ${groupObj.name}`) |
|
|
console.log(` Consumers: ${groupObj.consumers}`) |
|
|
console.log(` Pending: ${groupObj.pending}`) |
|
|
console.log(` Last Delivered ID: ${groupObj['last-delivered-id']}`) |
|
|
|
|
|
|
|
|
if (groupObj.consumers > 0) { |
|
|
const consumers = await redis.xinfo('CONSUMERS', STREAM_KEY, groupObj.name) |
|
|
console.log(' Consumer Details:') |
|
|
|
|
|
for (let k = 0; k < consumers.length; k++) { |
|
|
const consumer = consumers[k] |
|
|
const consumerObj = {} |
|
|
for (let l = 0; l < consumer.length; l += 2) { |
|
|
consumerObj[consumer[l]] = consumer[l + 1] |
|
|
} |
|
|
|
|
|
console.log(` - ${consumerObj.name}`) |
|
|
console.log(` Pending: ${consumerObj.pending}`) |
|
|
console.log(` Idle: ${Math.round(consumerObj.idle / 1000)}s`) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
console.log('\n📬 Latest 5 Messages:') |
|
|
const latest = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 5) |
|
|
|
|
|
if (latest.length === 0) { |
|
|
console.log(' No messages') |
|
|
} else { |
|
|
for (const [messageId, fields] of latest) { |
|
|
const data = {} |
|
|
for (let i = 0; i < fields.length; i += 2) { |
|
|
data[fields[i]] = fields[i + 1] |
|
|
} |
|
|
|
|
|
try { |
|
|
const event = JSON.parse(data.data) |
|
|
console.log(`\n ${messageId}`) |
|
|
console.log(` Event ID: ${event.eventId}`) |
|
|
console.log(` Model: ${event.usage.model}`) |
|
|
console.log(` Cost: $${event.cost.total.toFixed(6)}`) |
|
|
console.log(` Time: ${event.timestamp}`) |
|
|
} catch (e) { |
|
|
console.log(`\n ${messageId} (Parse Error)`) |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
console.error('❌ Failed to get queue info:', error.message) |
|
|
process.exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function clearQueue() { |
|
|
console.log('⚠️ WARNING: This will delete all messages in the queue!') |
|
|
console.log(` Stream: ${STREAM_KEY}`) |
|
|
|
|
|
|
|
|
const readline = require('readline') |
|
|
const rl = readline.createInterface({ |
|
|
input: process.stdin, |
|
|
output: process.stdout |
|
|
}) |
|
|
|
|
|
rl.question('Type "yes" to confirm: ', async (answer) => { |
|
|
if (answer.toLowerCase() === 'yes') { |
|
|
try { |
|
|
await redis.del(STREAM_KEY) |
|
|
console.log('✅ Queue cleared successfully') |
|
|
} catch (error) { |
|
|
console.error('❌ Failed to clear queue:', error.message) |
|
|
} |
|
|
} else { |
|
|
console.log('❌ Operation cancelled') |
|
|
} |
|
|
rl.close() |
|
|
redis.quit() |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function main() { |
|
|
const command = process.argv[2] || 'info' |
|
|
|
|
|
console.log('🔧 Billing Events Test Tool\n') |
|
|
|
|
|
try { |
|
|
switch (command) { |
|
|
case 'publish': |
|
|
await publishTestEvent() |
|
|
break |
|
|
|
|
|
case 'consume': |
|
|
await consumeTestEvents() |
|
|
break |
|
|
|
|
|
case 'info': |
|
|
await showQueueInfo() |
|
|
break |
|
|
|
|
|
case 'clear': |
|
|
await clearQueue() |
|
|
return |
|
|
|
|
|
default: |
|
|
console.error(`❌ Unknown command: ${command}`) |
|
|
console.log('\nAvailable commands:') |
|
|
console.log(' publish - Publish a test event') |
|
|
console.log(' consume - Consume events (test mode)') |
|
|
console.log(' info - Show queue status') |
|
|
console.log(' clear - Clear the queue (dangerous)') |
|
|
process.exit(1) |
|
|
} |
|
|
|
|
|
await redis.quit() |
|
|
} catch (error) { |
|
|
console.error('💥 Fatal error:', error) |
|
|
await redis.quit() |
|
|
process.exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
main() |
|
|
|