Kraft102's picture
Initial deployment - WidgeTDC Cortex Backend v2.1.0
529090e
import { EventEmitter } from 'events';
import os from 'os';
type Handler = (payload: any) => Promise<void> | void;
interface EventBusOptions {
redisUrl?: string;
streamKey?: string;
groupName?: string;
consumerName?: string;
readCount?: number;
blockMs?: number;
}
class PersistentEventBus extends EventEmitter {
private redis: any;
private ready = false;
private handlers: Map<string, Set<Handler>> = new Map();
private polling = false;
private readonly streamKey: string;
private readonly group: string;
private readonly consumer: string;
private readonly blockMs: number;
private readonly readCount: number;
private readonly redisUrl: string;
constructor(options: EventBusOptions = {}) {
super();
this.redisUrl = options.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379';
this.streamKey = options.streamKey || 'widgetdc:events';
this.group = options.groupName || 'widgetdc-consumers';
this.consumer = options.consumerName || `${os.hostname()}-${process.pid}`;
this.blockMs = options.blockMs ?? 1000;
this.readCount = options.readCount ?? 20;
}
isReady(): boolean {
return this.ready;
}
async init(): Promise<void> {
if (this.ready) return;
try {
const Redis = (await import('ioredis')).default;
this.redis = new Redis(this.redisUrl, { maxRetriesPerRequest: 3 });
await this.ensureGroup();
this.ready = true;
console.log('🔴 PersistentEventBus: Redis Streams ready');
} catch (err: any) {
console.warn(`⚠️ PersistentEventBus fallback to in-memory: ${err?.message || err}`);
this.ready = false;
}
}
async publish(eventType: string, payload: any): Promise<void> {
const entry = JSON.stringify({ eventType, payload, ts: Date.now() });
if (!this.ready || !this.redis) {
// in-memory fallback
this.emit(eventType, payload);
this.emit('*', { eventType, payload });
return;
}
try {
await this.redis.xadd(this.streamKey, '*', 'type', eventType, 'data', entry);
} catch (err: any) {
console.error('Failed to publish event, falling back to memory:', err?.message || err);
this.emit(eventType, payload);
this.emit('*', { eventType, payload });
}
}
subscribe(eventType: string, handler: Handler): void {
if (!this.handlers.has(eventType)) this.handlers.set(eventType, new Set());
this.handlers.get(eventType)!.add(handler);
// Local immediate delivery
this.on(eventType, handler);
if (this.ready && !this.polling) {
this.startPolling();
}
}
remove(eventType: string, handler: Handler): void {
this.off(eventType, handler);
this.handlers.get(eventType)?.delete(handler);
}
private async ensureGroup(): Promise<void> {
try {
await this.redis.xgroup('CREATE', this.streamKey, this.group, '0', 'MKSTREAM');
} catch (err: any) {
// Ignore BUSYGROUP
if (!String(err?.message).includes('BUSYGROUP')) {
throw err;
}
}
}
private async startPolling(): Promise<void> {
if (this.polling || !this.redis) return;
this.polling = true;
const loop = async () => {
while (this.polling && this.redis) {
try {
const entries = await this.redis.xreadgroup(
'GROUP', this.group, this.consumer,
'COUNT', this.readCount,
'BLOCK', this.blockMs,
'STREAMS', this.streamKey,
'>'
);
if (entries) {
const [_, messages] = entries[0];
for (const [id, fields] of messages) {
const payload = this.parseFields(fields as any[]);
this.dispatch(payload);
await this.redis.xack(this.streamKey, this.group, id);
}
}
} catch (err: any) {
console.error('PersistentEventBus poll error:', err?.message || err);
await new Promise(r => setTimeout(r, 500));
}
}
};
loop();
}
private parseFields(fields: any[]): { eventType: string; payload: any } {
const obj: Record<string, any> = {};
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
try {
const parsed = JSON.parse(obj['data']);
return { eventType: parsed.eventType, payload: parsed.payload };
} catch {
return { eventType: obj['type'] || 'unknown', payload: obj['data'] };
}
}
private dispatch(entry: { eventType: string; payload: any }) {
const handlers = this.handlers.get(entry.eventType);
if (handlers) {
handlers.forEach(async (handler) => {
try {
await handler(entry.payload);
} catch (err: any) {
console.error(`Handler error for ${entry.eventType}:`, err?.message || err);
}
});
}
}
async shutdown(): Promise<void> {
this.polling = false;
if (this.redis) {
await this.redis.quit();
}
}
}
export const persistentEventBus = new PersistentEventBus();