Kraft102's picture
Update backend source
34367da verified
/**
* ╔══════════════════════════════════════════════════════════════════════════════╗
* β•‘ FAILURE MEMORY SERVICE - ENTERPRISE A+ β•‘
* β•‘ β•‘
* β•‘ Records and learns from failures with persistent SQLite storage. β•‘
* β•‘ Enables: β•‘
* β•‘ β€’ Self-healing by remembering successful recovery paths β•‘
* β•‘ β€’ Avoiding known failure scenarios β•‘
* β•‘ β€’ Predicting failures before they happen β•‘
* β•‘ β€’ Cross-session failure memory persistence β•‘
* β•‘ β•‘
* β•‘ Schema: mcp_failure_memory table (defined in schema.sql) β•‘
* β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
*/
import { v4 as uuidv4 } from 'uuid';
import type { StorageAdapter } from './StorageAdapter.js';
import { logger } from '../../utils/logger.js';
import { eventBus, type EventType } from '../EventBus.js';
// ═══════════════════════════════════════════════════════════════════════════════
// INTERFACES
// ═══════════════════════════════════════════════════════════════════════════════
export interface Failure {
id: string;
sourceName: string;
errorType: string;
errorMessage: string;
errorContext: any;
queryContext: any;
recoveryAction?: string;
recoverySuccess?: boolean;
recoveryTimeMs?: number;
occurredAt: Date;
}
export interface RecoveryPath {
action: string;
successRate: number;
averageRecoveryTime: number;
occurrences: number;
lastSuccessAt?: Date;
}
export interface FailureStatistics {
totalFailures: number;
uniqueErrorTypes: number;
uniqueSources: number;
overallRecoveryRate: number;
averageRecoveryTimeMs: number;
failuresLast24h: number;
failuresLast7d: number;
topErrorTypes: { type: string; count: number; recoveryRate: number }[];
topFailingSources: { source: string; count: number }[];
recentRecoveries: { action: string; success: boolean; timeMs: number; at: Date }[];
}
export interface SourceHealthSummary {
sourceName: string;
totalFailures: number;
uniqueErrorTypes: number;
recoverySuccessRate: number;
averageRecoveryTime: number;
isRecurring: boolean;
lastFailure?: Date;
recommendedAction?: string;
}
// ═══════════════════════════════════════════════════════════════════════════════
// FAILURE MEMORY SERVICE - ENTERPRISE IMPLEMENTATION
// ═══════════════════════════════════════════════════════════════════════════════
export class FailureMemory {
private storage: StorageAdapter | null = null;
private initialized: boolean = false;
private writeQueue: Failure[] = [];
private flushInterval: ReturnType<typeof setInterval> | null = null;
// In-memory cache for hot failures (recent 500)
private cache: Failure[] = [];
private readonly CACHE_SIZE = 500;
private readonly FLUSH_INTERVAL_MS = 3000; // Faster flush for failures
private readonly MAX_DB_FAILURES = 50000;
constructor(storage?: StorageAdapter) {
if (storage) {
this.storage = storage;
this.initialize();
}
}
/**
* Initialize with storage adapter
*/
public setStorage(storage: StorageAdapter): void {
this.storage = storage;
this.initialize();
}
/**
* Initialize the service
*/
private async initialize(): Promise<void> {
if (this.initialized || !this.storage) return;
try {
const isPostgres = this.storage.mode === 'postgres';
const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME';
// Ensure table exists - use exec for DDL statements (more compatible)
await this.storage.execute(`
CREATE TABLE IF NOT EXISTS mcp_failure_memory (
id TEXT PRIMARY KEY,
source_name TEXT NOT NULL,
error_type TEXT NOT NULL,
error_message TEXT,
error_context TEXT,
query_context TEXT,
recovery_action TEXT,
recovery_success BOOLEAN,
recovery_time_ms INTEGER,
occurred_at ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
// Create indexes
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_source
ON mcp_failure_memory(source_name, occurred_at DESC)`);
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_error
ON mcp_failure_memory(error_type, occurred_at DESC)`);
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_failure_memory_recovery
ON mcp_failure_memory(recovery_action, recovery_success)`);
// Load recent failures into cache
await this.loadCacheFromDb();
// Start background flush
this.flushInterval = setInterval(() => this.flushWriteQueue(), this.FLUSH_INTERVAL_MS);
this.initialized = true;
logger.info(`πŸ›‘οΈ FailureMemory initialized with ${this.storage.mode} persistence`);
} catch (error) {
logger.error('❌ FailureMemory initialization failed:', error);
this.initialized = true;
}
}
/**
* Load recent failures from database into cache
*/
private async loadCacheFromDb(): Promise<void> {
if (!this.storage) return;
try {
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_failure_memory
ORDER BY occurred_at DESC
LIMIT ?
`, [this.CACHE_SIZE]);
for (const row of rows) {
this.cache.push(this.rowToFailure(row));
}
logger.info(`πŸ“¦ Loaded ${this.cache.length} failures into memory cache`);
} catch (error) {
logger.warn('⚠️ Failed to load failure cache:', error);
}
}
/**
* Convert database row to Failure
*/
private rowToFailure(row: any): Failure {
return {
id: row.id,
sourceName: row.source_name,
errorType: row.error_type,
errorMessage: row.error_message || '',
errorContext: row.error_context ? JSON.parse(row.error_context) : {},
queryContext: row.query_context ? JSON.parse(row.query_context) : {},
recoveryAction: row.recovery_action || undefined,
recoverySuccess: row.recovery_success != null ? Boolean(row.recovery_success) : undefined,
recoveryTimeMs: row.recovery_time_ms || undefined,
occurredAt: new Date(row.occurred_at)
};
}
/**
* Record a failure for learning
*/
async recordFailure(params: {
sourceName: string;
error: Error;
queryContext?: any;
recoveryAction?: string;
recoverySuccess?: boolean;
recoveryTimeMs?: number;
}): Promise<string> {
const id = uuidv4();
const failure: Failure = {
id,
sourceName: params.sourceName,
errorType: params.error.name || 'Error',
errorMessage: params.error.message,
errorContext: {
stack: params.error.stack?.substring(0, 1000), // Limit stack size
...this.extractErrorContext(params.error)
},
queryContext: params.queryContext || {},
recoveryAction: params.recoveryAction,
recoverySuccess: params.recoverySuccess,
recoveryTimeMs: params.recoveryTimeMs,
occurredAt: new Date()
};
// Add to cache
this.cache.unshift(failure);
if (this.cache.length > this.CACHE_SIZE) {
this.cache.pop();
}
// Queue for database write
this.writeQueue.push(failure);
// Emit event for monitoring
eventBus.emit('failure:recorded' as EventType, {
id,
sourceName: params.sourceName,
errorType: failure.errorType,
recoveryAction: params.recoveryAction,
recoverySuccess: params.recoverySuccess
});
// Check if this is a recurring failure
const isRecurring = await this.isRecurringFailure(params.sourceName, failure.errorType, 30);
if (isRecurring) {
eventBus.emit('failure:recurring' as EventType, {
sourceName: params.sourceName,
errorType: failure.errorType,
message: `Recurring failure detected for ${params.sourceName}: ${failure.errorType}`
});
}
return id;
}
/**
* Update a failure with recovery result
*/
async updateRecovery(
failureId: string,
recoveryAction: string,
success: boolean,
recoveryTimeMs: number
): Promise<void> {
// Update cache
const cachedFailure = this.cache.find(f => f.id === failureId);
if (cachedFailure) {
cachedFailure.recoveryAction = recoveryAction;
cachedFailure.recoverySuccess = success;
cachedFailure.recoveryTimeMs = recoveryTimeMs;
}
// Update database
// Update database
if (this.storage) {
try {
await this.storage.execute(`
UPDATE mcp_failure_memory
SET recovery_action = ?, recovery_success = ?, recovery_time_ms = ?
WHERE id = ?
`, [recoveryAction, success ? 1 : 0, recoveryTimeMs, failureId]);
} catch (error) {
logger.warn('⚠️ Failed to update recovery:', error);
}
}
eventBus.emit('recovery:completed' as EventType, {
failureId,
recoveryAction,
success,
recoveryTimeMs
});
}
/**
* Flush write queue to database
*/
private async flushWriteQueue(): Promise<void> {
if (!this.storage || this.writeQueue.length === 0) return;
const failures = [...this.writeQueue];
this.writeQueue = [];
try {
const columns = [
'id', 'source_name', 'error_type', 'error_message', 'error_context',
'query_context', 'recovery_action', 'recovery_success', 'recovery_time_ms', 'occurred_at'
];
const rows = failures.map(f => [
f.id,
f.sourceName,
f.errorType,
f.errorMessage,
JSON.stringify(f.errorContext),
JSON.stringify(f.queryContext),
f.recoveryAction || null,
f.recoverySuccess != null ? f.recoverySuccess : null, // Use boolean directly
f.recoveryTimeMs || null,
f.occurredAt.toISOString()
]);
await this.storage.batchInsert('mcp_failure_memory', columns, rows);
// Cleanup old failures periodically
if (Math.random() < 0.02) { // 2% chance per flush
this.cleanupOldFailures(); // Background
}
} catch (error) {
logger.error('❌ Failed to flush failure write queue:', error);
this.writeQueue.unshift(...failures);
}
}
/**
* Cleanup old failures
*/
private async cleanupOldFailures(): Promise<void> {
if (!this.storage) return;
try {
const ninetyDaysAgo = new Date();
ninetyDaysAgo.setDate(ninetyDaysAgo.getDate() - 90);
await this.storage.execute(`
DELETE FROM mcp_failure_memory
WHERE occurred_at < ?
AND id NOT IN (
SELECT id FROM mcp_failure_memory
ORDER BY occurred_at DESC
LIMIT ?
)
`, [ninetyDaysAgo.toISOString(), this.MAX_DB_FAILURES]);
logger.debug('🧹 Cleaned up old failure records');
} catch (error) {
logger.warn('⚠️ Failure cleanup failed:', error);
}
}
/**
* Get failure history for a source
*/
async getFailureHistory(
sourceName: string,
limit: number = 50
): Promise<Failure[]> {
if (this.storage) {
try {
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_failure_memory
WHERE source_name = ?
ORDER BY occurred_at DESC
LIMIT ?
`, [sourceName, limit]);
if (rows.length > 0) {
return rows.map(row => this.rowToFailure(row));
}
// Fall back to cache if DB empty (e.g. write queue not flushed)
} catch (error) {
logger.warn('⚠️ Database query failed:', error);
}
}
return this.cache
.filter(f => f.sourceName === sourceName)
.slice(0, limit);
}
/**
* Get successful recovery paths for an error type
*/
async getRecoveryPaths(
sourceName: string,
errorType: string
): Promise<RecoveryPath[]> {
if (this.storage) {
try {
const rows = await this.storage.queryAll<{
recovery_action: string;
total: number;
successes: number;
avg_time: number;
last_success: string | null;
}>(`
SELECT
recovery_action,
COUNT(*) as total,
SUM(CASE WHEN recovery_success = 1 THEN 1 ELSE 0 END) as successes,
AVG(CASE WHEN recovery_success = 1 THEN recovery_time_ms ELSE NULL END) as avg_time,
MAX(CASE WHEN recovery_success = 1 THEN occurred_at ELSE NULL END) as last_success
FROM mcp_failure_memory
WHERE source_name = ? AND error_type = ?
GROUP BY recovery_action
`, [sourceName, errorType]);
if (rows.length > 0) {
return rows.map(row => ({
action: row.recovery_action,
successRate: row.total > 0 ? row.successes / row.total : 0,
averageRecoveryTime: row.avg_time || 0,
occurrences: row.total,
lastSuccessAt: row.last_success ? new Date(row.last_success) : undefined
}));
}
// Fall back to cache
} catch (error) {
logger.warn('⚠️ Database query failed:', error);
}
}
// Fallback to in-memory calculation from cache
const relevant = this.cache.filter(
f => f.sourceName === sourceName && f.errorType === errorType && f.recoveryAction
);
const actionGroups = new Map<string, Failure[]>();
for (const f of relevant) {
const action = f.recoveryAction!;
if (!actionGroups.has(action)) {
actionGroups.set(action, []);
}
actionGroups.get(action)!.push(f);
}
return Array.from(actionGroups.entries()).map(([action, failures]) => {
const successes = failures.filter(f => f.recoverySuccess).length;
const withTime = failures.filter(f => f.recoveryTimeMs);
const totalTime = withTime.reduce((sum, f) => sum + (f.recoveryTimeMs || 0), 0);
const lastSuccess = failures
.filter(f => f.recoverySuccess)
.sort((a, b) => b.occurredAt.getTime() - a.occurredAt.getTime())[0];
return {
action,
successRate: failures.length > 0 ? successes / failures.length : 0,
averageRecoveryTime: withTime.length > 0 ? totalTime / withTime.length : 0,
occurrences: failures.length,
lastSuccessAt: lastSuccess?.occurredAt
};
}).sort((a, b) => b.successRate - a.successRate);
}
/**
* Get best recovery action for an error
*/
async getBestRecoveryAction(
sourceName: string,
errorType: string
): Promise<RecoveryPath | null> {
const paths = await this.getRecoveryPaths(sourceName, errorType);
// Find path with best success rate and at least 2 occurrences
const reliable = paths.filter(p => p.occurrences >= 2 && p.successRate > 0.5);
if (reliable.length > 0) {
return reliable[0];
}
// Fall back to any successful recovery
const anySuccess = paths.find(p => p.successRate > 0);
return anySuccess || null;
}
/**
* Check if a failure pattern is recurring
*/
async isRecurringFailure(
sourceName: string,
errorType: string,
withinMinutes: number = 60
): Promise<boolean> {
const cutoff = new Date();
cutoff.setMinutes(cutoff.getMinutes() - withinMinutes);
if (this.storage) {
try {
const count = await this.storage.queryScalar<number>(`
SELECT COUNT(*)
FROM mcp_failure_memory
WHERE source_name = ? AND error_type = ? AND occurred_at > ?
`, [sourceName, errorType, cutoff.toISOString()]);
if ((count || 0) >= 3) return true;
} catch (error) {
logger.warn('⚠️ Recurring check failed:', error);
}
}
const recentCount = this.cache.filter(
f => f.sourceName === sourceName &&
f.errorType === errorType &&
f.occurredAt > cutoff
).length;
return recentCount >= 3;
}
/**
* Get source health summary
*/
async getSourceHealthSummary(sourceName: string): Promise<SourceHealthSummary> {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
let failures: Failure[] = [];
if (this.storage) {
try {
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_failure_memory
WHERE source_name = ? AND occurred_at > ?
ORDER BY occurred_at DESC
`, [sourceName, sevenDaysAgo.toISOString()]);
failures = rows.map(row => this.rowToFailure(row));
} catch (error) {
logger.warn('⚠️ Health summary query failed:', error);
}
}
if (failures.length === 0) {
failures = this.cache.filter(
f => f.sourceName === sourceName && f.occurredAt > sevenDaysAgo
);
}
const errorTypes = new Set(failures.map(f => f.errorType));
const withRecovery = failures.filter(f => f.recoveryAction);
const successfulRecoveries = withRecovery.filter(f => f.recoverySuccess);
const recoveryTimes = successfulRecoveries
.filter(f => f.recoveryTimeMs)
.map(f => f.recoveryTimeMs!);
// Check for recurring failures
const recentErrorTypes = [...errorTypes];
let isRecurring = false;
for (const errorType of recentErrorTypes) {
if (await this.isRecurringFailure(sourceName, errorType, 60)) {
isRecurring = true;
break;
}
}
// Get recommended action
let recommendedAction: string | undefined;
if (failures.length > 0) {
const mostCommonError = failures
.map(f => f.errorType)
.reduce((acc, type) => {
acc[type] = (acc[type] || 0) + 1;
return acc;
}, {} as Record<string, number>);
const topError = Object.entries(mostCommonError)
.sort((a, b) => b[1] - a[1])[0];
if (topError) {
const bestRecovery = await this.getBestRecoveryAction(sourceName, topError[0]);
recommendedAction = bestRecovery?.action;
}
}
return {
sourceName,
totalFailures: failures.length,
uniqueErrorTypes: errorTypes.size,
recoverySuccessRate: withRecovery.length > 0
? successfulRecoveries.length / withRecovery.length
: 0,
averageRecoveryTime: recoveryTimes.length > 0
? recoveryTimes.reduce((a, b) => a + b, 0) / recoveryTimes.length
: 0,
isRecurring,
lastFailure: failures[0]?.occurredAt,
recommendedAction
};
}
/**
* Get comprehensive failure statistics
*/
async getStatistics(): Promise<FailureStatistics> {
const now = new Date();
const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000);
const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
if (this.storage) {
try {
const total = await this.storage.queryScalar<number>('SELECT COUNT(*) FROM mcp_failure_memory') || 0;
const uniqueErrors = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT error_type) FROM mcp_failure_memory') || 0;
const uniqueSources = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT source_name) FROM mcp_failure_memory') || 0;
const recoveryRate = await this.storage.queryScalar<number>(`
SELECT AVG(CAST(recovery_success AS FLOAT))
FROM mcp_failure_memory
WHERE recovery_action IS NOT NULL
`) || 0;
const avgRecoveryTime = await this.storage.queryScalar<number>(`
SELECT AVG(recovery_time_ms)
FROM mcp_failure_memory
WHERE recovery_success = 1
`) || 0;
const last24h = await this.storage.queryScalar<number>(
'SELECT COUNT(*) FROM mcp_failure_memory WHERE occurred_at > ?',
[oneDayAgo.toISOString()]
) || 0;
const last7d = await this.storage.queryScalar<number>(
'SELECT COUNT(*) FROM mcp_failure_memory WHERE occurred_at > ?',
[sevenDaysAgo.toISOString()]
) || 0;
// Top error types
const topErrorRows = await this.storage.queryAll<{ error_type: string; count: number; recovery_rate: number }>(`
SELECT
error_type,
COUNT(*) as count,
AVG(CASE WHEN recovery_success = 1 THEN 1.0 ELSE 0.0 END) as recovery_rate
FROM mcp_failure_memory
GROUP BY error_type
ORDER BY count DESC
LIMIT 10
`);
const topErrors = topErrorRows.map(row => ({
type: row.error_type,
count: row.count,
recoveryRate: row.recovery_rate || 0
}));
// Top failing sources
const topSourceRows = await this.storage.queryAll<{ source_name: string; count: number }>(`
SELECT source_name, COUNT(*) as count
FROM mcp_failure_memory
GROUP BY source_name
ORDER BY count DESC
LIMIT 10
`);
const topSources = topSourceRows.map(row => ({
source: row.source_name,
count: row.count
}));
// Recent recoveries
const recentRecoveryRows = await this.storage.queryAll<{
recovery_action: string;
recovery_success: number;
recovery_time_ms: number;
occurred_at: string;
}>(`
SELECT recovery_action, recovery_success, recovery_time_ms, occurred_at
FROM mcp_failure_memory
WHERE recovery_action IS NOT NULL
ORDER BY occurred_at DESC
LIMIT 10
`);
const recentRecoveries = recentRecoveryRows.map(row => ({
action: row.recovery_action,
success: Boolean(row.recovery_success),
timeMs: row.recovery_time_ms || 0,
at: new Date(row.occurred_at)
}));
return {
totalFailures: total,
uniqueErrorTypes: uniqueErrors,
uniqueSources,
overallRecoveryRate: recoveryRate,
averageRecoveryTimeMs: avgRecoveryTime,
failuresLast24h: last24h,
failuresLast7d: last7d,
topErrorTypes: topErrors,
topFailingSources: topSources,
recentRecoveries
};
} catch (error) {
logger.warn('⚠️ Statistics query failed:', error);
}
}
// Cache-based fallback
return {
totalFailures: this.cache.length,
uniqueErrorTypes: new Set(this.cache.map(f => f.errorType)).size,
uniqueSources: new Set(this.cache.map(f => f.sourceName)).size,
overallRecoveryRate: 0,
averageRecoveryTimeMs: 0,
failuresLast24h: this.cache.filter(f => f.occurredAt > oneDayAgo).length,
failuresLast7d: this.cache.filter(f => f.occurredAt > sevenDaysAgo).length,
topErrorTypes: [],
topFailingSources: [],
recentRecoveries: []
};
}
/**
* Force flush all pending writes
*/
async flush(): Promise<void> {
await this.flushWriteQueue();
}
/**
* Shutdown gracefully
*/
async shutdown(): Promise<void> {
if (this.flushInterval) {
clearInterval(this.flushInterval);
this.flushInterval = null;
}
this.flushWriteQueue();
logger.info('πŸ›‘οΈ FailureMemory shut down gracefully');
}
/**
* Extract useful context from error
*/
private extractErrorContext(error: any): any {
const context: any = {};
// Network errors
if (error.code) context.code = error.code;
if (error.errno) context.errno = error.errno;
if (error.syscall) context.syscall = error.syscall;
if (error.hostname) context.hostname = error.hostname;
if (error.port) context.port = error.port;
// HTTP errors
if (error.statusCode) context.statusCode = error.statusCode;
if (error.status) context.status = error.status;
if (error.response?.status) context.responseStatus = error.response.status;
if (error.response?.statusText) context.responseText = error.response.statusText;
// Database errors
if (error.sql) context.sql = error.sql.substring(0, 200);
if (error.constraint) context.constraint = error.constraint;
// Timeout errors
if (error.timeout) context.timeout = error.timeout;
return context;
}
}