import { EventEmitter } from 'events'; import os from 'os'; type Handler = (payload: any) => Promise | void; interface EventBusOptions { redisUrl?: string; streamKey?: string; groupName?: string; consumerName?: string; readCount?: number; blockMs?: number; } class PersistentEventBus extends EventEmitter { private redis: any; private ready = false; private handlers: Map> = new Map(); private polling = false; private readonly streamKey: string; private readonly group: string; private readonly consumer: string; private readonly blockMs: number; private readonly readCount: number; private readonly redisUrl: string; constructor(options: EventBusOptions = {}) { super(); this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379'; this.streamKey = options.streamKey || 'widgetdc:events'; this.group = options.groupName || 'widgetdc-consumers'; this.consumer = options.consumerName || `${os.hostname()}-${process.pid}`; this.blockMs = options.blockMs ?? 1000; this.readCount = options.readCount ?? 20; } isReady(): boolean { return this.ready; } async init(): Promise { if (this.ready) return; try { const Redis = (await import('ioredis')).default; this.redis = new Redis(this.redisUrl, { maxRetriesPerRequest: 3 }); await this.ensureGroup(); this.ready = true; console.log('🔴 PersistentEventBus: Redis Streams ready'); } catch (err: any) { console.warn(`⚠️ PersistentEventBus fallback to in-memory: ${err?.message || err}`); this.ready = false; } } async publish(eventType: string, payload: any): Promise { const entry = JSON.stringify({ eventType, payload, ts: Date.now() }); if (!this.ready || !this.redis) { // in-memory fallback this.emit(eventType, payload); this.emit('*', { eventType, payload }); return; } try { await this.redis.xadd(this.streamKey, '*', 'type', eventType, 'data', entry); } catch (err: any) { console.error('Failed to publish event, falling back to memory:', err?.message || err); this.emit(eventType, payload); this.emit('*', { eventType, payload }); } } subscribe(eventType: string, handler: Handler): void { if (!this.handlers.has(eventType)) this.handlers.set(eventType, new Set()); this.handlers.get(eventType)!.add(handler); // Local immediate delivery this.on(eventType, handler); if (this.ready && !this.polling) { this.startPolling(); } } remove(eventType: string, handler: Handler): void { this.off(eventType, handler); this.handlers.get(eventType)?.delete(handler); } private async ensureGroup(): Promise { try { await this.redis.xgroup('CREATE', this.streamKey, this.group, '0', 'MKSTREAM'); } catch (err: any) { // Ignore BUSYGROUP if (!String(err?.message).includes('BUSYGROUP')) { throw err; } } } private async startPolling(): Promise { if (this.polling || !this.redis) return; this.polling = true; const loop = async () => { while (this.polling && this.redis) { try { const entries = await this.redis.xreadgroup( 'GROUP', this.group, this.consumer, 'COUNT', this.readCount, 'BLOCK', this.blockMs, 'STREAMS', this.streamKey, '>' ); if (entries) { const [_, messages] = entries[0]; for (const [id, fields] of messages) { const payload = this.parseFields(fields as any[]); this.dispatch(payload); await this.redis.xack(this.streamKey, this.group, id); } } } catch (err: any) { console.error('PersistentEventBus poll error:', err?.message || err); await new Promise(r => setTimeout(r, 500)); } } }; loop(); } private parseFields(fields: any[]): { eventType: string; payload: any } { const obj: Record = {}; for (let i = 0; i < fields.length; i += 2) { obj[fields[i]] = fields[i + 1]; } try { const parsed = JSON.parse(obj['data']); return { eventType: parsed.eventType, payload: parsed.payload }; } catch { return { eventType: obj['type'] || 'unknown', payload: obj['data'] }; } } private dispatch(entry: { eventType: string; payload: any }) { const handlers = this.handlers.get(entry.eventType); if (handlers) { handlers.forEach(async (handler) => { try { await handler(entry.payload); } catch (err: any) { console.error(`Handler error for ${entry.eventType}:`, err?.message || err); } }); } } async shutdown(): Promise { this.polling = false; if (this.redis) { await this.redis.quit(); } } } export const persistentEventBus = new PersistentEventBus();