import { EventEmitter } from 'events'; import { persistentEventBus } from '../services/EventBus.js'; export type EventType = | 'system.alert' | 'security.alert' | 'agent.decision' | 'agent.log' | 'mcp.tool.executed' | 'autonomous.task.executed' | 'taskrecorder.suggestion.created' | 'taskrecorder.suggestion.approved' | 'taskrecorder.execution.started' | 'data:ingested' | 'widget:invoke' | 'osint:investigation:start' | 'threat:hunt:start' | 'orchestrator:coordinate:start' | 'docgen:powerpoint:create' | 'docgen:word:create' | 'docgen:excel:create' | 'docgen:powerpoint:completed' | 'docgen:powerpoint:failed' | 'docgen:word:completed' | 'docgen:word:failed' | 'docgen:excel:completed' | 'docgen:excel:failed' | 'docgen:powerpoint:created' | 'docgen:powerpoint:error' | 'devtools:scan:started' | 'devtools:scan:completed' | 'devtools:scan:failed' // Data ingestion events | 'ingestion:emails' | 'ingestion:news' | 'ingestion:documents' | 'ingestion:assets' | 'email:new' | 'email:fetched' | 'threat:detected' | 'system:heartbeat' | 'system:force-refresh' // WebSocket events | 'ws:connected' | 'ws:disconnected' // HansPedder agent events | 'hanspedder:test-results' | 'hanspedder:nudge' | 'hanspedder:fix-reported' // System health events | 'system:backend-unhealthy' | 'system:health-check' | 'mcp:reconnect-requested' // Prototype generation events | 'prototype.generation.started' | 'prototype.generation.completed' | 'prototype.generation.error' | 'prototype.saved' | 'prototype.deleted' // MCP tool events | 'mcp.tool.call' | 'mcp.tool.result' | 'mcp.tool.error' // NudgeService events | 'nudge.system_metrics' | 'nudge.cycle_complete' | 'agent.ping' | 'system.activity' | 'data.push' // Autonomous memory events | 'pattern:recorded' | 'failure:recorded' | 'failure:recurring' | 'recovery:completed' | 'health:recorded' | 'source:health' | 'decision:made' // Email events | 'email:refresh' | 'email:new'; export interface BaseEvent { type: EventType; timestamp: string; source: string; payload: any; } /** * Unified Event Bus Interface * Uses RedisEventBus in production for persistence and scalability * Falls back to in-memory EventEmitter in development */ class MCPEventBus extends EventEmitter { constructor() { super(); // init persistent bus (no await needed; it will fallback if unavailable) persistentEventBus.init().catch((err) => { console.warn('⚠️ Redis Streams not ready, using in-memory bus:', err?.message || err); }); } async initialize(): Promise { // persistent bus handles its own initialization await persistentEventBus.init(); } emitEvent(event: BaseEvent) { // Publish to stream (persistent) or fallback to memory persistentEventBus.publish(event.type, event); if (!persistentEventBus.isReady()) { // Immediate local delivery for dev/fallback super.emit(event.type, event); super.emit('*', event); } } // Direct emit for convenience (for non-BaseEvent objects) emit(type: EventType | '*', ...args: any[]): boolean { if (type !== '*') { persistentEventBus.publish(type, args[0]); } if (!persistentEventBus.isReady()) { return super.emit(type, ...args); } return true; } onEvent(type: EventType | '*', listener: (event: BaseEvent | any) => void) { if (type !== '*') { persistentEventBus.subscribe(type, listener); } // Always listen locally for fallback this.on(type, listener); } async shutdown(): Promise { await persistentEventBus.shutdown?.(); } } export const eventBus = new MCPEventBus();