/** * ╔══════════════════════════════════════════════════════════════════════════════╗ * ║ FAILURE MEMORY SERVICE - ENTERPRISE A+ ║ * ║ ║ * ║ Records and learns from failures with persistent SQLite storage. ║ * ║ Enables: ║ * ║ • Self-healing by remembering successful recovery paths ║ * ║ • Avoiding known failure scenarios ║ * ║ • Predicting failures before they happen ║ * ║ • Cross-session failure memory persistence ║ * ║ ║ * ║ Schema: mcp_failure_memory table (defined in schema.sql) ║ * ╚══════════════════════════════════════════════════════════════════════════════╝ */ import { v4 as uuidv4 } from 'uuid'; import type { StorageAdapter } from './StorageAdapter.js'; import { logger } from '../../utils/logger.js'; import { eventBus, type EventType } from '../EventBus.js'; // ═══════════════════════════════════════════════════════════════════════════════ // INTERFACES // ═══════════════════════════════════════════════════════════════════════════════ export interface Failure { id: string; sourceName: string; errorType: string; errorMessage: string; errorContext: any; queryContext: any; recoveryAction?: string; recoverySuccess?: boolean; recoveryTimeMs?: number; occurredAt: Date; } export interface RecoveryPath { action: string; successRate: number; averageRecoveryTime: number; occurrences: number; lastSuccessAt?: Date; } export interface FailureStatistics { totalFailures: number; uniqueErrorTypes: number; uniqueSources: number; overallRecoveryRate: number; averageRecoveryTimeMs: number; failuresLast24h: number; failuresLast7d: number; topErrorTypes: { type: string; count: number; recoveryRate: number }[]; topFailingSources: { source: string; count: number }[]; recentRecoveries: { action: string; success: boolean; timeMs: number; at: Date }[]; } export interface SourceHealthSummary { sourceName: string; totalFailures: number; uniqueErrorTypes: number; recoverySuccessRate: number; averageRecoveryTime: number; isRecurring: boolean; lastFailure?: Date; recommendedAction?: string; } // ═══════════════════════════════════════════════════════════════════════════════ // FAILURE MEMORY SERVICE - ENTERPRISE IMPLEMENTATION // ═══════════════════════════════════════════════════════════════════════════════ export class FailureMemory { private storage: StorageAdapter | null = null; private initialized: boolean = false; private writeQueue: Failure[] = []; private flushInterval: ReturnType | null = null; // In-memory cache for hot failures (recent 500) private cache: Failure[] = []; private readonly CACHE_SIZE = 500; private readonly FLUSH_INTERVAL_MS = 3000; // Faster flush for failures private readonly MAX_DB_FAILURES = 50000; constructor(storage?: StorageAdapter) { if (storage) { this.storage = storage; this.initialize(); } } /** * Initialize with storage adapter */ public setStorage(storage: StorageAdapter): void { this.storage = storage; this.initialize(); } /** * Initialize the service */ private async initialize(): Promise { if (this.initialized || !this.storage) return; try { const isPostgres = this.storage.mode === 'postgres'; const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME'; // Ensure table exists - use exec for DDL statements (more compatible) await this.storage.execute(` CREATE TABLE IF NOT EXISTS mcp_failure_memory ( id TEXT PRIMARY KEY, source_name TEXT NOT NULL, error_type TEXT NOT NULL, error_message TEXT, error_context TEXT, query_context TEXT, recovery_action TEXT, recovery_success BOOLEAN, recovery_time_ms INTEGER, occurred_at ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP ) `); // Create indexes await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_source ON mcp_failure_memory(source_name, occurred_at DESC)`); await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_error ON mcp_failure_memory(error_type, occurred_at DESC)`); await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_recovery ON mcp_failure_memory(recovery_action, recovery_success)`); // Load recent failures into cache await this.loadCacheFromDb(); // Start background flush this.flushInterval = setInterval(() => this.flushWriteQueue(), this.FLUSH_INTERVAL_MS); this.initialized = true; logger.info(`🛡️ FailureMemory initialized with ${this.storage.mode} persistence`); } catch (error) { logger.error('❌ FailureMemory initialization failed:', error); this.initialized = true; } } /** * Load recent failures from database into cache */ private async loadCacheFromDb(): Promise { if (!this.storage) return; try { const rows = await this.storage.queryAll(` SELECT * FROM mcp_failure_memory ORDER BY occurred_at DESC LIMIT ? `, [this.CACHE_SIZE]); for (const row of rows) { this.cache.push(this.rowToFailure(row)); } logger.info(`📦 Loaded ${this.cache.length} failures into memory cache`); } catch (error) { logger.warn('⚠️ Failed to load failure cache:', error); } } /** * Convert database row to Failure */ private rowToFailure(row: any): Failure { return { id: row.id, sourceName: row.source_name, errorType: row.error_type, errorMessage: row.error_message || '', errorContext: row.error_context ? JSON.parse(row.error_context) : {}, queryContext: row.query_context ? JSON.parse(row.query_context) : {}, recoveryAction: row.recovery_action || undefined, recoverySuccess: row.recovery_success != null ? Boolean(row.recovery_success) : undefined, recoveryTimeMs: row.recovery_time_ms || undefined, occurredAt: new Date(row.occurred_at) }; } /** * Record a failure for learning */ async recordFailure(params: { sourceName: string; error: Error; queryContext?: any; recoveryAction?: string; recoverySuccess?: boolean; recoveryTimeMs?: number; }): Promise { const id = uuidv4(); const failure: Failure = { id, sourceName: params.sourceName, errorType: params.error.name || 'Error', errorMessage: params.error.message, errorContext: { stack: params.error.stack?.substring(0, 1000), // Limit stack size ...this.extractErrorContext(params.error) }, queryContext: params.queryContext || {}, recoveryAction: params.recoveryAction, recoverySuccess: params.recoverySuccess, recoveryTimeMs: params.recoveryTimeMs, occurredAt: new Date() }; // Add to cache this.cache.unshift(failure); if (this.cache.length > this.CACHE_SIZE) { this.cache.pop(); } // Queue for database write this.writeQueue.push(failure); // Emit event for monitoring eventBus.emit('failure:recorded' as EventType, { id, sourceName: params.sourceName, errorType: failure.errorType, recoveryAction: params.recoveryAction, recoverySuccess: params.recoverySuccess }); // Check if this is a recurring failure const isRecurring = await this.isRecurringFailure(params.sourceName, failure.errorType, 30); if (isRecurring) { eventBus.emit('failure:recurring' as EventType, { sourceName: params.sourceName, errorType: failure.errorType, message: `Recurring failure detected for ${params.sourceName}: ${failure.errorType}` }); } return id; } /** * Update a failure with recovery result */ async updateRecovery( failureId: string, recoveryAction: string, success: boolean, recoveryTimeMs: number ): Promise { // Update cache const cachedFailure = this.cache.find(f => f.id === failureId); if (cachedFailure) { cachedFailure.recoveryAction = recoveryAction; cachedFailure.recoverySuccess = success; cachedFailure.recoveryTimeMs = recoveryTimeMs; } // Update database // Update database if (this.storage) { try { await this.storage.execute(` UPDATE mcp_failure_memory SET recovery_action = ?, recovery_success = ?, recovery_time_ms = ? WHERE id = ? `, [recoveryAction, success ? 1 : 0, recoveryTimeMs, failureId]); } catch (error) { logger.warn('⚠️ Failed to update recovery:', error); } } eventBus.emit('recovery:completed' as EventType, { failureId, recoveryAction, success, recoveryTimeMs }); } /** * Flush write queue to database */ private async flushWriteQueue(): Promise { if (!this.storage || this.writeQueue.length === 0) return; const failures = [...this.writeQueue]; this.writeQueue = []; try { const columns = [ 'id', 'source_name', 'error_type', 'error_message', 'error_context', 'query_context', 'recovery_action', 'recovery_success', 'recovery_time_ms', 'occurred_at' ]; const rows = failures.map(f => [ f.id, f.sourceName, f.errorType, f.errorMessage, JSON.stringify(f.errorContext), JSON.stringify(f.queryContext), f.recoveryAction || null, f.recoverySuccess != null ? f.recoverySuccess : null, // Use boolean directly f.recoveryTimeMs || null, f.occurredAt.toISOString() ]); await this.storage.batchInsert('mcp_failure_memory', columns, rows); // Cleanup old failures periodically if (Math.random() < 0.02) { // 2% chance per flush this.cleanupOldFailures(); // Background } } catch (error) { logger.error('❌ Failed to flush failure write queue:', error); this.writeQueue.unshift(...failures); } } /** * Cleanup old failures */ private async cleanupOldFailures(): Promise { if (!this.storage) return; try { const ninetyDaysAgo = new Date(); ninetyDaysAgo.setDate(ninetyDaysAgo.getDate() - 90); await this.storage.execute(` DELETE FROM mcp_failure_memory WHERE occurred_at < ? AND id NOT IN ( SELECT id FROM mcp_failure_memory ORDER BY occurred_at DESC LIMIT ? ) `, [ninetyDaysAgo.toISOString(), this.MAX_DB_FAILURES]); logger.debug('🧹 Cleaned up old failure records'); } catch (error) { logger.warn('⚠️ Failure cleanup failed:', error); } } /** * Get failure history for a source */ async getFailureHistory( sourceName: string, limit: number = 50 ): Promise { if (this.storage) { try { const rows = await this.storage.queryAll(` SELECT * FROM mcp_failure_memory WHERE source_name = ? ORDER BY occurred_at DESC LIMIT ? `, [sourceName, limit]); if (rows.length > 0) { return rows.map(row => this.rowToFailure(row)); } // Fall back to cache if DB empty (e.g. write queue not flushed) } catch (error) { logger.warn('⚠️ Database query failed:', error); } } return this.cache .filter(f => f.sourceName === sourceName) .slice(0, limit); } /** * Get successful recovery paths for an error type */ async getRecoveryPaths( sourceName: string, errorType: string ): Promise { if (this.storage) { try { const rows = await this.storage.queryAll<{ recovery_action: string; total: number; successes: number; avg_time: number; last_success: string | null; }>(` SELECT recovery_action, COUNT(*) as total, SUM(CASE WHEN recovery_success = 1 THEN 1 ELSE 0 END) as successes, AVG(CASE WHEN recovery_success = 1 THEN recovery_time_ms ELSE NULL END) as avg_time, MAX(CASE WHEN recovery_success = 1 THEN occurred_at ELSE NULL END) as last_success FROM mcp_failure_memory WHERE source_name = ? AND error_type = ? GROUP BY recovery_action `, [sourceName, errorType]); if (rows.length > 0) { return rows.map(row => ({ action: row.recovery_action, successRate: row.total > 0 ? row.successes / row.total : 0, averageRecoveryTime: row.avg_time || 0, occurrences: row.total, lastSuccessAt: row.last_success ? new Date(row.last_success) : undefined })); } // Fall back to cache } catch (error) { logger.warn('⚠️ Database query failed:', error); } } // Fallback to in-memory calculation from cache const relevant = this.cache.filter( f => f.sourceName === sourceName && f.errorType === errorType && f.recoveryAction ); const actionGroups = new Map(); for (const f of relevant) { const action = f.recoveryAction!; if (!actionGroups.has(action)) { actionGroups.set(action, []); } actionGroups.get(action)!.push(f); } return Array.from(actionGroups.entries()).map(([action, failures]) => { const successes = failures.filter(f => f.recoverySuccess).length; const withTime = failures.filter(f => f.recoveryTimeMs); const totalTime = withTime.reduce((sum, f) => sum + (f.recoveryTimeMs || 0), 0); const lastSuccess = failures .filter(f => f.recoverySuccess) .sort((a, b) => b.occurredAt.getTime() - a.occurredAt.getTime())[0]; return { action, successRate: failures.length > 0 ? successes / failures.length : 0, averageRecoveryTime: withTime.length > 0 ? totalTime / withTime.length : 0, occurrences: failures.length, lastSuccessAt: lastSuccess?.occurredAt }; }).sort((a, b) => b.successRate - a.successRate); } /** * Get best recovery action for an error */ async getBestRecoveryAction( sourceName: string, errorType: string ): Promise { const paths = await this.getRecoveryPaths(sourceName, errorType); // Find path with best success rate and at least 2 occurrences const reliable = paths.filter(p => p.occurrences >= 2 && p.successRate > 0.5); if (reliable.length > 0) { return reliable[0]; } // Fall back to any successful recovery const anySuccess = paths.find(p => p.successRate > 0); return anySuccess || null; } /** * Check if a failure pattern is recurring */ async isRecurringFailure( sourceName: string, errorType: string, withinMinutes: number = 60 ): Promise { const cutoff = new Date(); cutoff.setMinutes(cutoff.getMinutes() - withinMinutes); if (this.storage) { try { const count = await this.storage.queryScalar(` SELECT COUNT(*) FROM mcp_failure_memory WHERE source_name = ? AND error_type = ? AND occurred_at > ? `, [sourceName, errorType, cutoff.toISOString()]); if ((count || 0) >= 3) return true; } catch (error) { logger.warn('⚠️ Recurring check failed:', error); } } const recentCount = this.cache.filter( f => f.sourceName === sourceName && f.errorType === errorType && f.occurredAt > cutoff ).length; return recentCount >= 3; } /** * Get source health summary */ async getSourceHealthSummary(sourceName: string): Promise { const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); let failures: Failure[] = []; if (this.storage) { try { const rows = await this.storage.queryAll(` SELECT * FROM mcp_failure_memory WHERE source_name = ? AND occurred_at > ? ORDER BY occurred_at DESC `, [sourceName, sevenDaysAgo.toISOString()]); failures = rows.map(row => this.rowToFailure(row)); } catch (error) { logger.warn('⚠️ Health summary query failed:', error); } } if (failures.length === 0) { failures = this.cache.filter( f => f.sourceName === sourceName && f.occurredAt > sevenDaysAgo ); } const errorTypes = new Set(failures.map(f => f.errorType)); const withRecovery = failures.filter(f => f.recoveryAction); const successfulRecoveries = withRecovery.filter(f => f.recoverySuccess); const recoveryTimes = successfulRecoveries .filter(f => f.recoveryTimeMs) .map(f => f.recoveryTimeMs!); // Check for recurring failures const recentErrorTypes = [...errorTypes]; let isRecurring = false; for (const errorType of recentErrorTypes) { if (await this.isRecurringFailure(sourceName, errorType, 60)) { isRecurring = true; break; } } // Get recommended action let recommendedAction: string | undefined; if (failures.length > 0) { const mostCommonError = failures .map(f => f.errorType) .reduce((acc, type) => { acc[type] = (acc[type] || 0) + 1; return acc; }, {} as Record); const topError = Object.entries(mostCommonError) .sort((a, b) => b[1] - a[1])[0]; if (topError) { const bestRecovery = await this.getBestRecoveryAction(sourceName, topError[0]); recommendedAction = bestRecovery?.action; } } return { sourceName, totalFailures: failures.length, uniqueErrorTypes: errorTypes.size, recoverySuccessRate: withRecovery.length > 0 ? successfulRecoveries.length / withRecovery.length : 0, averageRecoveryTime: recoveryTimes.length > 0 ? recoveryTimes.reduce((a, b) => a + b, 0) / recoveryTimes.length : 0, isRecurring, lastFailure: failures[0]?.occurredAt, recommendedAction }; } /** * Get comprehensive failure statistics */ async getStatistics(): Promise { const now = new Date(); const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); if (this.storage) { try { const total = await this.storage.queryScalar('SELECT COUNT(*) FROM mcp_failure_memory') || 0; const uniqueErrors = await this.storage.queryScalar('SELECT COUNT(DISTINCT error_type) FROM mcp_failure_memory') || 0; const uniqueSources = await this.storage.queryScalar('SELECT COUNT(DISTINCT source_name) FROM mcp_failure_memory') || 0; const recoveryRate = await this.storage.queryScalar(` SELECT AVG(CAST(recovery_success AS FLOAT)) FROM mcp_failure_memory WHERE recovery_action IS NOT NULL `) || 0; const avgRecoveryTime = await this.storage.queryScalar(` SELECT AVG(recovery_time_ms) FROM mcp_failure_memory WHERE recovery_success = 1 `) || 0; const last24h = await this.storage.queryScalar( 'SELECT COUNT(*) FROM mcp_failure_memory WHERE occurred_at > ?', [oneDayAgo.toISOString()] ) || 0; const last7d = await this.storage.queryScalar( 'SELECT COUNT(*) FROM mcp_failure_memory WHERE occurred_at > ?', [sevenDaysAgo.toISOString()] ) || 0; // Top error types const topErrorRows = await this.storage.queryAll<{ error_type: string; count: number; recovery_rate: number }>(` SELECT error_type, COUNT(*) as count, AVG(CASE WHEN recovery_success = 1 THEN 1.0 ELSE 0.0 END) as recovery_rate FROM mcp_failure_memory GROUP BY error_type ORDER BY count DESC LIMIT 10 `); const topErrors = topErrorRows.map(row => ({ type: row.error_type, count: row.count, recoveryRate: row.recovery_rate || 0 })); // Top failing sources const topSourceRows = await this.storage.queryAll<{ source_name: string; count: number }>(` SELECT source_name, COUNT(*) as count FROM mcp_failure_memory GROUP BY source_name ORDER BY count DESC LIMIT 10 `); const topSources = topSourceRows.map(row => ({ source: row.source_name, count: row.count })); // Recent recoveries const recentRecoveryRows = await this.storage.queryAll<{ recovery_action: string; recovery_success: number; recovery_time_ms: number; occurred_at: string; }>(` SELECT recovery_action, recovery_success, recovery_time_ms, occurred_at FROM mcp_failure_memory WHERE recovery_action IS NOT NULL ORDER BY occurred_at DESC LIMIT 10 `); const recentRecoveries = recentRecoveryRows.map(row => ({ action: row.recovery_action, success: Boolean(row.recovery_success), timeMs: row.recovery_time_ms || 0, at: new Date(row.occurred_at) })); return { totalFailures: total, uniqueErrorTypes: uniqueErrors, uniqueSources, overallRecoveryRate: recoveryRate, averageRecoveryTimeMs: avgRecoveryTime, failuresLast24h: last24h, failuresLast7d: last7d, topErrorTypes: topErrors, topFailingSources: topSources, recentRecoveries }; } catch (error) { logger.warn('⚠️ Statistics query failed:', error); } } // Cache-based fallback return { totalFailures: this.cache.length, uniqueErrorTypes: new Set(this.cache.map(f => f.errorType)).size, uniqueSources: new Set(this.cache.map(f => f.sourceName)).size, overallRecoveryRate: 0, averageRecoveryTimeMs: 0, failuresLast24h: this.cache.filter(f => f.occurredAt > oneDayAgo).length, failuresLast7d: this.cache.filter(f => f.occurredAt > sevenDaysAgo).length, topErrorTypes: [], topFailingSources: [], recentRecoveries: [] }; } /** * Force flush all pending writes */ async flush(): Promise { await this.flushWriteQueue(); } /** * Shutdown gracefully */ async shutdown(): Promise { if (this.flushInterval) { clearInterval(this.flushInterval); this.flushInterval = null; } this.flushWriteQueue(); logger.info('🛡️ FailureMemory shut down gracefully'); } /** * Extract useful context from error */ private extractErrorContext(error: any): any { const context: any = {}; // Network errors if (error.code) context.code = error.code; if (error.errno) context.errno = error.errno; if (error.syscall) context.syscall = error.syscall; if (error.hostname) context.hostname = error.hostname; if (error.port) context.port = error.port; // HTTP errors if (error.statusCode) context.statusCode = error.statusCode; if (error.status) context.status = error.status; if (error.response?.status) context.responseStatus = error.response.status; if (error.response?.statusText) context.responseText = error.response.statusText; // Database errors if (error.sql) context.sql = error.sql.substring(0, 200); if (error.constraint) context.constraint = error.constraint; // Timeout errors if (error.timeout) context.timeout = error.timeout; return context; } }