Spaces:
Paused
Paused
| import { EventEmitter } from 'events'; | |
| import os from 'os'; | |
| type Handler = (payload: any) => Promise<void> | void; | |
| interface EventBusOptions { | |
| redisUrl?: string; | |
| streamKey?: string; | |
| groupName?: string; | |
| consumerName?: string; | |
| readCount?: number; | |
| blockMs?: number; | |
| } | |
| class PersistentEventBus extends EventEmitter { | |
| private redis: any; | |
| private ready = false; | |
| private handlers: Map<string, Set<Handler>> = new Map(); | |
| private polling = false; | |
| private readonly streamKey: string; | |
| private readonly group: string; | |
| private readonly consumer: string; | |
| private readonly blockMs: number; | |
| private readonly readCount: number; | |
| private readonly redisUrl: string; | |
| constructor(options: EventBusOptions = {}) { | |
| super(); | |
| this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379'; | |
| this.streamKey = options.streamKey || 'widgetdc:events'; | |
| this.group = options.groupName || 'widgetdc-consumers'; | |
| this.consumer = options.consumerName || `${os.hostname()}-${process.pid}`; | |
| this.blockMs = options.blockMs ?? 1000; | |
| this.readCount = options.readCount ?? 20; | |
| } | |
| isReady(): boolean { | |
| return this.ready; | |
| } | |
| async init(): Promise<void> { | |
| if (this.ready) return; | |
| try { | |
| const Redis = (await import('ioredis')).default; | |
| this.redis = new Redis(this.redisUrl, { maxRetriesPerRequest: 3 }); | |
| await this.ensureGroup(); | |
| this.ready = true; | |
| console.log('🔴 PersistentEventBus: Redis Streams ready'); | |
| } catch (err: any) { | |
| console.warn(`⚠️ PersistentEventBus fallback to in-memory: ${err?.message || err}`); | |
| this.ready = false; | |
| } | |
| } | |
| async publish(eventType: string, payload: any): Promise<void> { | |
| const entry = JSON.stringify({ eventType, payload, ts: Date.now() }); | |
| if (!this.ready || !this.redis) { | |
| // in-memory fallback | |
| this.emit(eventType, payload); | |
| this.emit('*', { eventType, payload }); | |
| return; | |
| } | |
| try { | |
| await this.redis.xadd(this.streamKey, '*', 'type', eventType, 'data', entry); | |
| } catch (err: any) { | |
| console.error('Failed to publish event, falling back to memory:', err?.message || err); | |
| this.emit(eventType, payload); | |
| this.emit('*', { eventType, payload }); | |
| } | |
| } | |
| subscribe(eventType: string, handler: Handler): void { | |
| if (!this.handlers.has(eventType)) this.handlers.set(eventType, new Set()); | |
| this.handlers.get(eventType)!.add(handler); | |
| // Local immediate delivery | |
| this.on(eventType, handler); | |
| if (this.ready && !this.polling) { | |
| this.startPolling(); | |
| } | |
| } | |
| remove(eventType: string, handler: Handler): void { | |
| this.off(eventType, handler); | |
| this.handlers.get(eventType)?.delete(handler); | |
| } | |
| private async ensureGroup(): Promise<void> { | |
| try { | |
| await this.redis.xgroup('CREATE', this.streamKey, this.group, '0', 'MKSTREAM'); | |
| } catch (err: any) { | |
| // Ignore BUSYGROUP | |
| if (!String(err?.message).includes('BUSYGROUP')) { | |
| throw err; | |
| } | |
| } | |
| } | |
| private async startPolling(): Promise<void> { | |
| if (this.polling || !this.redis) return; | |
| this.polling = true; | |
| const loop = async () => { | |
| while (this.polling && this.redis) { | |
| try { | |
| const entries = await this.redis.xreadgroup( | |
| 'GROUP', this.group, this.consumer, | |
| 'COUNT', this.readCount, | |
| 'BLOCK', this.blockMs, | |
| 'STREAMS', this.streamKey, | |
| '>' | |
| ); | |
| if (entries) { | |
| const [_, messages] = entries[0]; | |
| for (const [id, fields] of messages) { | |
| const payload = this.parseFields(fields as any[]); | |
| this.dispatch(payload); | |
| await this.redis.xack(this.streamKey, this.group, id); | |
| } | |
| } | |
| } catch (err: any) { | |
| console.error('PersistentEventBus poll error:', err?.message || err); | |
| await new Promise(r => setTimeout(r, 500)); | |
| } | |
| } | |
| }; | |
| loop(); | |
| } | |
| private parseFields(fields: any[]): { eventType: string; payload: any } { | |
| const obj: Record<string, any> = {}; | |
| for (let i = 0; i < fields.length; i += 2) { | |
| obj[fields[i]] = fields[i + 1]; | |
| } | |
| try { | |
| const parsed = JSON.parse(obj['data']); | |
| return { eventType: parsed.eventType, payload: parsed.payload }; | |
| } catch { | |
| return { eventType: obj['type'] || 'unknown', payload: obj['data'] }; | |
| } | |
| } | |
| private dispatch(entry: { eventType: string; payload: any }) { | |
| const handlers = this.handlers.get(entry.eventType); | |
| if (handlers) { | |
| handlers.forEach(async (handler) => { | |
| try { | |
| await handler(entry.payload); | |
| } catch (err: any) { | |
| console.error(`Handler error for ${entry.eventType}:`, err?.message || err); | |
| } | |
| }); | |
| } | |
| } | |
| async shutdown(): Promise<void> { | |
| this.polling = false; | |
| if (this.redis) { | |
| await this.redis.quit(); | |
| } | |
| } | |
| } | |
| export const persistentEventBus = new PersistentEventBus(); | |