Kraft102's picture
Update backend source
34367da verified
/**
* ╔══════════════════════════════════════════════════════════════════════════════╗
* β•‘ COGNITIVE MEMORY - ENTERPRISE A+ EDITION β•‘
* β•‘ β•‘
* β•‘ Central Intelligence System with persistent SQLite storage. β•‘
* β•‘ Combines pattern learning and failure memory to provide β•‘
* β•‘ autonomous decision-making capabilities. β•‘
* β•‘ β•‘
* β•‘ Features: β•‘
* β•‘ β€’ Persistent pattern memory across restarts β•‘
* β•‘ β€’ Failure tracking with recovery path learning β•‘
* β•‘ β€’ Real-time health metrics and trend analysis β•‘
* β•‘ β€’ Context-aware decision support β•‘
* β•‘ β€’ Comprehensive source intelligence β•‘
* β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
*/
import type { StorageAdapter } from './StorageAdapter.js';
import { SqlJsStorageAdapter } from './StorageAdapter.js';
import type { Database } from 'sql.js';
import { PatternMemory, UsagePattern, PatternStatistics } from './PatternMemory.js';
import { FailureMemory, Failure, RecoveryPath, FailureStatistics, SourceHealthSummary } from './FailureMemory.js';
import { logger } from '../../utils/logger.js';
import { eventBus, type EventType } from '../EventBus.js';
// ═══════════════════════════════════════════════════════════════════════════════
// INTERFACES
// ═══════════════════════════════════════════════════════════════════════════════
export interface HealthMetrics {
sourceName: string;
healthScore: number;
latency: {
p50: number;
p95: number;
p99: number;
};
successRate: number;
requestCount: number;
errorCount: number;
timestamp: Date;
}
export interface UserContext {
userId?: string;
timeOfDay: number;
dayOfWeek: number;
recentActivity?: string[];
}
export interface SourceIntelligence {
sourceName: string;
averageLatency: number;
overallSuccessRate: number;
recentFailures: number;
lastFailure?: Failure;
knownRecoveryPaths: Map<string, RecoveryPath[]>;
healthTrend: 'improving' | 'stable' | 'degrading';
recommendedAction?: string;
}
export interface CognitiveStats {
patterns: PatternStatistics;
failures: FailureStatistics;
healthRecords: number;
initialized: boolean;
persistenceMode: 'sqlite' | 'postgres' | 'memory';
}
// ═══════════════════════════════════════════════════════════════════════════════
// COGNITIVE MEMORY - ENTERPRISE IMPLEMENTATION
// ═══════════════════════════════════════════════════════════════════════════════
export class CognitiveMemory {
public readonly patternMemory: PatternMemory;
public readonly failureMemory: FailureMemory;
private storage: StorageAdapter | null = null;
private healthHistory: Map<string, HealthMetrics[]> = new Map();
private readonly MAX_HEALTH_RECORDS = 1000;
private initialized: boolean = false;
private persistenceMode: 'sqlite' | 'postgres' | 'memory' = 'memory';
constructor(storageOrDb?: StorageAdapter | Database) {
if (storageOrDb) {
if ('exec' in storageOrDb || 'run' in storageOrDb) {
// It's a Database
this.storage = new SqlJsStorageAdapter(storageOrDb as Database);
} else {
// It's a StorageAdapter
this.storage = storageOrDb as StorageAdapter;
}
this.persistenceMode = this.storage.mode;
}
this.patternMemory = new PatternMemory(this.storage || undefined);
this.failureMemory = new FailureMemory(this.storage || undefined);
if (this.storage) {
this.initializeHealthTable();
}
this.initialized = true;
logger.info(`🧠 Cognitive Memory initialized (${this.persistenceMode} mode)`);
}
/**
* Initialize health metrics table for persistence
*/
private async initializeHealthTable(): Promise<void> {
if (!this.storage) return;
try {
const isPostgres = this.storage.mode === 'postgres';
const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME';
await this.storage.execute(`
CREATE TABLE IF NOT EXISTS mcp_source_health (
id TEXT PRIMARY KEY,
source_name TEXT NOT NULL,
health_score REAL NOT NULL,
latency_p50 REAL,
latency_p95 REAL,
latency_p99 REAL,
success_rate REAL NOT NULL,
request_count INTEGER NOT NULL,
error_count INTEGER NOT NULL,
timestamp ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_source_health_source
ON mcp_source_health(source_name, timestamp DESC)`);
logger.debug('πŸ“Š Health metrics table initialized');
} catch (error) {
logger.warn('⚠️ Failed to initialize health table:', error);
}
}
// ════════════════════════════════════════════════════════════════════════════
// PATTERN MEMORY INTERFACE
// ════════════════════════════════════════════════════════════════════════════
/**
* Record a successful query for pattern learning
*/
async recordSuccess(params: {
widgetId: string;
queryType: string;
queryParams: any;
sourceUsed: string;
latencyMs: number;
resultSize?: number;
userId?: string;
}): Promise<void> {
await this.patternMemory.recordQuery({
...params,
success: true
});
}
/**
* Record a query (success or failure)
*/
async recordQuery(params: {
widgetId: string;
queryType: string;
queryParams: any;
sourceUsed: string;
latencyMs: number;
resultSize?: number;
success: boolean;
userId?: string;
}): Promise<void> {
return this.patternMemory.recordQuery(params);
}
/**
* Get usage patterns for a widget
*/
async getWidgetPatterns(widgetId: string): Promise<UsagePattern> {
return this.patternMemory.getWidgetPatterns(widgetId);
}
/**
* Get average latency for a source
*/
async getAverageLatency(sourceName: string): Promise<number> {
return this.patternMemory.getAverageLatency(sourceName);
}
/**
* Get success rate for a specific source and query combination
*/
async getSuccessRate(sourceName: string, queryType: string): Promise<number> {
return this.patternMemory.getSuccessRate(sourceName, queryType);
}
/**
* Find similar historical queries
*/
async getSimilarQuerySuccess(queryType: string, queryParams: any): Promise<number> {
const similar = await this.patternMemory.findSimilarQueries(queryType, queryParams, 10);
if (similar.length === 0) return 0.5; // No data, neutral score
const successCount = similar.filter(q => q.pattern.success).length;
return successCount / similar.length;
}
/**
* Get pattern statistics
*/
async getPatternStatistics(): Promise<PatternStatistics> {
return this.patternMemory.getStatistics();
}
// ════════════════════════════════════════════════════════════════════════════
// FAILURE MEMORY INTERFACE
// ════════════════════════════════════════════════════════════════════════════
/**
* Record a failure with optional recovery information
*/
async recordFailure(params: {
sourceName: string;
error: Error;
queryContext?: any;
recoveryAction?: string;
recoverySuccess?: boolean;
recoveryTimeMs?: number;
}): Promise<string> {
return this.failureMemory.recordFailure(params);
}
/**
* Update a failure with recovery result
*/
async updateRecovery(
failureId: string,
recoveryAction: string,
success: boolean,
recoveryTimeMs: number
): Promise<void> {
return this.failureMemory.updateRecovery(failureId, recoveryAction, success, recoveryTimeMs);
}
/**
* Get known recovery paths for a specific error
*/
async getRecoveryPaths(
sourceName: string,
errorType: string
): Promise<RecoveryPath[]> {
return this.failureMemory.getRecoveryPaths(sourceName, errorType);
}
/**
* Get the best recovery action for an error
*/
async getBestRecoveryAction(
sourceName: string,
errorType: string
): Promise<RecoveryPath | null> {
return this.failureMemory.getBestRecoveryAction(sourceName, errorType);
}
/**
* Check if failures are recurring (indicates deeper issue)
*/
async isRecurringFailure(
sourceName: string,
errorType: string,
withinMinutes: number = 60
): Promise<boolean> {
return this.failureMemory.isRecurringFailure(sourceName, errorType, withinMinutes);
}
/**
* Get failure statistics
*/
async getFailureStatistics(): Promise<FailureStatistics> {
return this.failureMemory.getStatistics();
}
/**
* Get source health summary
*/
async getSourceHealthSummary(sourceName: string): Promise<SourceHealthSummary> {
return this.failureMemory.getSourceHealthSummary(sourceName);
}
// ════════════════════════════════════════════════════════════════════════════
// HEALTH TRACKING
// ════════════════════════════════════════════════════════════════════════════
/**
* Record health metrics for a source
*/
async recordHealthMetrics(metrics: HealthMetrics): Promise<void> {
const sourceName = metrics.sourceName;
// In-memory cache
if (!this.healthHistory.has(sourceName)) {
this.healthHistory.set(sourceName, []);
}
const history = this.healthHistory.get(sourceName)!;
history.unshift(metrics);
// Trim old records
if (history.length > this.MAX_HEALTH_RECORDS) {
this.healthHistory.set(sourceName, history.slice(0, this.MAX_HEALTH_RECORDS));
}
// Persist to database
if (this.storage) {
try {
const id = `${sourceName}-${Date.now()}`;
await this.storage.execute(`
INSERT INTO mcp_source_health
(id, source_name, health_score, latency_p50, latency_p95, latency_p99,
success_rate, request_count, error_count, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [
id,
sourceName,
metrics.healthScore,
metrics.latency.p50,
metrics.latency.p95,
metrics.latency.p99,
metrics.successRate,
metrics.requestCount,
metrics.errorCount,
metrics.timestamp.toISOString()
]);
} catch (error) {
logger.warn('⚠️ Failed to persist health metrics:', error);
}
}
// Emit event for monitoring
eventBus.emit('health:recorded' as EventType, metrics);
}
/**
* Get recent health history for trend analysis
*/
async getHealthHistory(
sourceName: string,
limit: number = 100
): Promise<HealthMetrics[]> {
// Try database first
if (this.storage) {
try {
const rows = await this.storage.queryAll<{
source_name: string;
health_score: number;
latency_p50: number;
latency_p95: number;
latency_p99: number;
success_rate: number;
request_count: number;
error_count: number;
timestamp: string;
}>(`
SELECT * FROM mcp_source_health
WHERE source_name = ?
ORDER BY timestamp DESC
LIMIT ?
`, [sourceName, limit]);
return rows.map(row => ({
sourceName: row.source_name,
healthScore: row.health_score,
latency: {
p50: row.latency_p50 || 0,
p95: row.latency_p95 || 0,
p99: row.latency_p99 || 0
},
successRate: row.success_rate,
requestCount: row.request_count,
errorCount: row.error_count,
timestamp: new Date(row.timestamp)
}));
} catch (error) {
logger.warn('⚠️ Failed to query health history:', error);
}
}
// Fall back to in-memory
const history = this.healthHistory.get(sourceName) || [];
return history.slice(0, limit);
}
/**
* Get source health metrics
*/
async getSourceHealth(sourceName: string): Promise<HealthMetrics | null> {
const history = await this.getHealthHistory(sourceName, 1);
return history.length > 0 ? history[0] : null;
}
/**
* Calculate health trend for a source
*/
async getHealthTrend(sourceName: string): Promise<'improving' | 'stable' | 'degrading'> {
const history = await this.getHealthHistory(sourceName, 20);
if (history.length < 5) return 'stable';
const recent = history.slice(0, 5);
const older = history.slice(5, 10);
const recentAvg = recent.reduce((sum, h) => sum + h.healthScore, 0) / recent.length;
const olderAvg = older.reduce((sum, h) => sum + h.healthScore, 0) / older.length;
const diff = recentAvg - olderAvg;
if (diff > 0.1) return 'improving';
if (diff < -0.1) return 'degrading';
return 'stable';
}
// ════════════════════════════════════════════════════════════════════════════
// CONTEXT AWARENESS
// ════════════════════════════════════════════════════════════════════════════
/**
* Get current user context for decision making
*/
getCurrentUserContext(): UserContext {
const now = new Date();
return {
timeOfDay: now.getHours(),
dayOfWeek: now.getDay()
};
}
/**
* Get comprehensive intelligence summary for a source
*/
async getSourceIntelligence(sourceName: string): Promise<SourceIntelligence> {
const [avgLatency, healthSummary, healthTrend] = await Promise.all([
this.getAverageLatency(sourceName),
this.getSourceHealthSummary(sourceName),
this.getHealthTrend(sourceName)
]);
// Get recovery paths for each unique error type
const failureHistory = await this.failureMemory.getFailureHistory(sourceName, 10);
const errorTypes = new Set(failureHistory.map(f => f.errorType));
const recoveryPaths = new Map<string, RecoveryPath[]>();
for (const errorType of errorTypes) {
const paths = await this.getRecoveryPaths(sourceName, errorType);
if (paths.length > 0) {
recoveryPaths.set(errorType, paths);
}
}
return {
sourceName,
averageLatency: avgLatency,
overallSuccessRate: healthSummary.recoverySuccessRate,
recentFailures: healthSummary.totalFailures,
lastFailure: failureHistory[0],
knownRecoveryPaths: recoveryPaths,
healthTrend,
recommendedAction: healthSummary.recommendedAction
};
}
// ════════════════════════════════════════════════════════════════════════════
// STATISTICS & MAINTENANCE
// ════════════════════════════════════════════════════════════════════════════
/**
* Get comprehensive cognitive memory statistics
*/
async getStatistics(): Promise<CognitiveStats> {
const [patterns, failures] = await Promise.all([
this.patternMemory.getStatistics(),
this.failureMemory.getStatistics()
]);
let healthRecords = 0;
for (const history of this.healthHistory.values()) {
healthRecords += history.length;
}
return {
patterns,
failures,
healthRecords,
initialized: this.initialized,
persistenceMode: this.persistenceMode
};
}
/**
* Flush all pending writes
*/
async flush(): Promise<void> {
await Promise.all([
this.patternMemory.flush(),
this.failureMemory.flush()
]);
logger.debug('🧠 Cognitive Memory flushed');
}
/**
* Graceful shutdown
*/
async shutdown(): Promise<void> {
await Promise.all([
this.patternMemory.shutdown(),
this.failureMemory.shutdown()
]);
logger.info('🧠 Cognitive Memory shut down gracefully');
}
/**
* Clean old data (maintenance)
*/
async cleanup(retentionDays: number = 30): Promise<void> {
if (!this.storage) {
logger.debug('🧹 Cognitive memory cleanup (in-memory - automatic via limits)');
return;
}
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
await this.storage.execute(`
DELETE FROM mcp_source_health
WHERE timestamp < ?
`, [cutoffDate.toISOString()]);
logger.info(`🧹 Cleaned health records older than ${retentionDays} days`);
} catch (error) {
logger.warn('⚠️ Health cleanup failed:', error);
}
}
}
// ═══════════════════════════════════════════════════════════════════════════════
// SINGLETON MANAGEMENT
// ═══════════════════════════════════════════════════════════════════════════════
let instance: CognitiveMemory | null = null;
export function initCognitiveMemory(storageOrDb?: StorageAdapter | Database): CognitiveMemory {
if (!instance) {
instance = new CognitiveMemory(storageOrDb);
} else if (storageOrDb && !instance['storage']) {
// Upgrade to persistent storage if provided later
instance = new CognitiveMemory(storageOrDb);
}
return instance;
}
export function getCognitiveMemory(): CognitiveMemory {
if (!instance) {
throw new Error('Cognitive Memory not initialized! Call initCognitiveMemory() first.');
}
return instance;
}
export function resetCognitiveMemory(): void {
if (instance) {
instance.shutdown().catch(console.error);
}
instance = null;
}