Kraft102's picture
Deploy backend fix v2.1.0
1d28c11
import { EventEmitter } from 'events';
import { persistentEventBus } from '../services/EventBus.js';
export type EventType =
| 'system.alert'
| 'security.alert'
| 'agent.decision'
| 'agent.log'
| 'mcp.tool.executed'
| 'autonomous.task.executed'
| 'taskrecorder.suggestion.created'
| 'taskrecorder.suggestion.approved'
| 'taskrecorder.execution.started'
| 'data:ingested'
| 'widget:invoke'
| 'osint:investigation:start'
| 'threat:hunt:start'
| 'orchestrator:coordinate:start'
| 'docgen:powerpoint:create'
| 'docgen:word:create'
| 'docgen:excel:create'
| 'docgen:powerpoint:completed'
| 'docgen:powerpoint:failed'
| 'docgen:word:completed'
| 'docgen:word:failed'
| 'docgen:excel:completed'
| 'docgen:excel:failed'
| 'docgen:powerpoint:created'
| 'docgen:powerpoint:error'
| 'devtools:scan:started'
| 'devtools:scan:completed'
| 'devtools:scan:failed'
// Data ingestion events
| 'ingestion:emails'
| 'ingestion:news'
| 'ingestion:documents'
| 'ingestion:assets'
| 'email:new'
| 'email:fetched'
| 'threat:detected'
| 'system:heartbeat'
| 'system:force-refresh'
// WebSocket events
| 'ws:connected'
| 'ws:disconnected'
// HansPedder agent events
| 'hanspedder:test-results'
| 'hanspedder:nudge'
| 'hanspedder:fix-reported'
// System health events
| 'system:backend-unhealthy'
| 'system:health-check'
| 'mcp:reconnect-requested'
// Prototype generation events
| 'prototype.generation.started'
| 'prototype.generation.completed'
| 'prototype.generation.error'
| 'prototype.saved'
| 'prototype.deleted'
// MCP tool events
| 'mcp.tool.call'
| 'mcp.tool.result'
| 'mcp.tool.error'
// NudgeService events
| 'nudge.system_metrics'
| 'nudge.cycle_complete'
| 'agent.ping'
| 'system.activity'
| 'data.push'
// Autonomous memory events
| 'pattern:recorded'
| 'failure:recorded'
| 'failure:recurring'
| 'recovery:completed'
| 'health:recorded'
| 'source:health'
| 'decision:made'
// Email events
| 'email:refresh'
| 'email:new';
export interface BaseEvent {
type: EventType;
timestamp: string;
source: string;
payload: any;
}
/**
* Unified Event Bus Interface
* Uses RedisEventBus in production for persistence and scalability
* Falls back to in-memory EventEmitter in development
*/
class MCPEventBus extends EventEmitter {
constructor() {
super();
// init persistent bus (no await needed; it will fallback if unavailable)
persistentEventBus.init().catch((err) => {
console.warn('⚠️ Redis Streams not ready, using in-memory bus:', err?.message || err);
});
}
async initialize(): Promise<void> {
// persistent bus handles its own initialization
await persistentEventBus.init();
}
emitEvent(event: BaseEvent) {
// Publish to stream (persistent) or fallback to memory
persistentEventBus.publish(event.type, event);
if (!persistentEventBus.isReady()) {
// Immediate local delivery for dev/fallback
super.emit(event.type, event);
super.emit('*', event);
}
}
// Direct emit for convenience (for non-BaseEvent objects)
emit(type: EventType | '*', ...args: any[]): boolean {
if (type !== '*') {
persistentEventBus.publish(type, args[0]);
}
if (!persistentEventBus.isReady()) {
return super.emit(type, ...args);
}
return true;
}
onEvent(type: EventType | '*', listener: (event: BaseEvent | any) => void) {
if (type !== '*') {
persistentEventBus.subscribe(type, listener);
}
// Always listen locally for fallback
this.on(type, listener);
}
async shutdown(): Promise<void> {
await persistentEventBus.shutdown?.();
}
}
export const eventBus = new MCPEventBus();