/** * ╔══════════════════════════════════════════════════════════════════════════════╗ * ║ PATTERN MEMORY SERVICE - ENTERPRISE A+ ║ * ║ ║ * ║ Records and analyzes query patterns with persistent SQLite storage. ║ * ║ Enables: ║ * ║ • Learning which sources are best for which queries ║ * ║ • Predictive pre-fetching based on historical patterns ║ * ║ • Performance optimization recommendations ║ * ║ • Cross-session memory persistence ║ * ║ ║ * ║ Schema: mcp_query_patterns table (defined in schema.sql) ║ * ╚══════════════════════════════════════════════════════════════════════════════╝ */ import { v4 as uuidv4 } from 'uuid'; import crypto from 'crypto'; import type { StorageAdapter } from './StorageAdapter.js'; import { logger } from '../../utils/logger.js'; import { eventBus, type EventType } from '../EventBus.js'; // ═══════════════════════════════════════════════════════════════════════════════ // INTERFACES // ═══════════════════════════════════════════════════════════════════════════════ export interface QueryPattern { id: string; widgetId: string; queryType: string; querySignature: string; sourceUsed: string; latencyMs: number; resultSize?: number; success: boolean; userContext?: { userId?: string; timeOfDay: number; dayOfWeek: number; }; timestamp: Date; } export interface SimilarQuery { pattern: QueryPattern; similarity: number; } export interface UsagePattern { widgetId: string; commonSources: string[]; averageLatency: number; successRate: number; queryCount: number; timePatterns: { hour: number; frequency: number; }[]; lastActivity: Date | null; } export interface PatternStatistics { totalPatterns: number; uniqueWidgets: number; uniqueSources: number; avgLatencyMs: number; successRate: number; queriesLast24h: number; queriesLast7d: number; topSources: { source: string; count: number; avgLatency: number }[]; topWidgets: { widget: string; count: number }[]; } // ═══════════════════════════════════════════════════════════════════════════════ // PATTERN MEMORY SERVICE - ENTERPRISE IMPLEMENTATION // ═══════════════════════════════════════════════════════════════════════════════ export class PatternMemory { private storage: StorageAdapter | null = null; private initialized: boolean = false; private writeQueue: QueryPattern[] = []; private flushInterval: ReturnType | null = null; // In-memory cache for hot data (last 1000 patterns) private cache: QueryPattern[] = []; private readonly CACHE_SIZE = 1000; private readonly FLUSH_INTERVAL_MS = 5000; // Batch writes every 5 seconds private readonly MAX_DB_PATTERNS = 100000; constructor(storage?: StorageAdapter) { if (storage) { this.storage = storage; this.initialize(); } } /** * Initialize with storage adapter */ public setStorage(storage: StorageAdapter): void { this.storage = storage; this.initialize(); } /** * Initialize the service */ private async initialize(): Promise { if (this.initialized || !this.storage) return; try { // Postgres uses different syntax for some things, but basic CREATE TABLE is mostly compatible // except for DATETIME DEFAULT CURRENT_TIMESTAMP vs TIMESTAMP WITH TIME ZONE const isPostgres = this.storage.mode === 'postgres'; const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME'; await this.storage.execute(` CREATE TABLE IF NOT EXISTS mcp_query_patterns ( id TEXT PRIMARY KEY, widget_id TEXT NOT NULL, query_type TEXT NOT NULL, query_signature TEXT NOT NULL, source_used TEXT NOT NULL, latency_ms INTEGER NOT NULL, result_size INTEGER, success BOOLEAN NOT NULL, user_context TEXT, timestamp ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP ) `); // Create indexes for fast queries await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_widget ON mcp_query_patterns(widget_id, timestamp DESC)`); await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_signature ON mcp_query_patterns(query_signature)`); await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_source ON mcp_query_patterns(source_used, timestamp DESC)`); // Load recent patterns into cache await this.loadCacheFromDb(); // Start background flush this.flushInterval = setInterval(() => this.flushWriteQueue(), this.FLUSH_INTERVAL_MS); this.initialized = true; logger.info(`🧠 PatternMemory initialized with ${this.storage.mode} storage`); } catch (error) { logger.error('❌ PatternMemory initialization failed:', error); // Fall back to in-memory only this.initialized = true; } } /** * Load recent patterns from database into cache */ private async loadCacheFromDb(): Promise { if (!this.storage) return; try { const rows = await this.storage.queryAll(` SELECT * FROM mcp_query_patterns ORDER BY timestamp DESC LIMIT ? `, [this.CACHE_SIZE]); for (const row of rows) { this.cache.push(this.rowToPattern(row)); } logger.info(`📦 Loaded ${this.cache.length} patterns into memory cache`); } catch (error) { logger.warn('⚠️ Failed to load pattern cache:', error); } } /** * Convert database row to QueryPattern */ private rowToPattern(row: any): QueryPattern { return { id: row.id, widgetId: row.widget_id, queryType: row.query_type, querySignature: row.query_signature, sourceUsed: row.source_used, latencyMs: row.latency_ms, resultSize: row.result_size || undefined, success: Boolean(row.success), userContext: row.user_context ? JSON.parse(row.user_context) : undefined, timestamp: new Date(row.timestamp) }; } /** * Record a query pattern for learning (async, batched) */ async recordQuery(params: { widgetId: string; queryType: string; queryParams: any; sourceUsed: string; latencyMs: number; resultSize?: number; success: boolean; userId?: string; }): Promise { const id = uuidv4(); const signature = this.generateSignature(params.queryType, params.queryParams); const userContext = { userId: params.userId, timeOfDay: new Date().getHours(), dayOfWeek: new Date().getDay() }; const pattern: QueryPattern = { id, widgetId: params.widgetId, queryType: params.queryType, querySignature: signature, sourceUsed: params.sourceUsed, latencyMs: params.latencyMs, resultSize: params.resultSize, success: params.success, userContext, timestamp: new Date() }; // Add to cache (hot data) this.cache.unshift(pattern); if (this.cache.length > this.CACHE_SIZE) { this.cache.pop(); } // Queue for database write this.writeQueue.push(pattern); // Emit event for real-time monitoring eventBus.emit('pattern:recorded' as EventType, { widgetId: params.widgetId, sourceUsed: params.sourceUsed, latencyMs: params.latencyMs, success: params.success }); } /** * Flush write queue to database (batched writes) */ private async flushWriteQueue(): Promise { if (!this.storage || this.writeQueue.length === 0) return; const patterns = [...this.writeQueue]; this.writeQueue = []; try { const columns = [ 'id', 'widget_id', 'query_type', 'query_signature', 'source_used', 'latency_ms', 'result_size', 'success', 'user_context', 'timestamp' ]; const rows = patterns.map(p => [ p.id, p.widgetId, p.queryType, p.querySignature, p.sourceUsed, p.latencyMs, p.resultSize ?? null, p.success, // Use boolean directly for Postgres compatibility p.userContext ? JSON.stringify(p.userContext) : null, p.timestamp.toISOString() ]); await this.storage.batchInsert('mcp_query_patterns', columns, rows); // Cleanup old patterns periodically if (Math.random() < 0.01) { // 1% chance per flush this.cleanupOldPatterns(); // No await to keep it background } } catch (error) { logger.error('❌ Failed to flush pattern write queue:', error); // Put patterns back in queue this.writeQueue.unshift(...patterns); } } /** * Cleanup old patterns to prevent database bloat */ private async cleanupOldPatterns(): Promise { if (!this.storage) return; try { const thirtyDaysAgo = new Date(); thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30); // Need to fix this query for Postgres compatibility if nested subquery limit issues arise // But standard SQL roughly supports DELETE... // Postgres supports DELETE FROM ... WHERE id NOT IN (...) await this.storage.execute(` DELETE FROM mcp_query_patterns WHERE timestamp < ? AND id NOT IN ( SELECT id FROM mcp_query_patterns ORDER BY timestamp DESC LIMIT ? ) `, [thirtyDaysAgo.toISOString(), this.MAX_DB_PATTERNS]); logger.debug('🧹 Cleaned up old query patterns'); } catch (error) { logger.warn('⚠️ Pattern cleanup failed:', error); } } /** * Find similar queries based on signature */ async findSimilarQueries( queryType: string, queryParams: any, limit: number = 10 ): Promise { const signature = this.generateSignature(queryType, queryParams); // Check cache first const cacheMatches = this.cache .filter(p => p.querySignature === signature) .slice(0, limit); let results: SimilarQuery[] = cacheMatches.map(p => ({ pattern: p, similarity: 1.0 })); if (results.length >= limit) { return results; } // Query database for more if (this.storage) { try { const remaining = limit - results.length; const rows = await this.storage.queryAll(` SELECT * FROM mcp_query_patterns WHERE query_signature = ? AND id NOT IN (${results.map(() => '?').join(',') || '\'\''}) ORDER BY timestamp DESC LIMIT ? `, [...results.map(r => r.pattern.id), signature, remaining]); const dbMatches = rows.map(row => ({ pattern: this.rowToPattern(row), similarity: 1.0 })); results = [...results, ...dbMatches]; } catch (error) { logger.warn('⚠️ Database query failed, using cache:', error); } } return results; } /** * Get widget usage patterns (comprehensive analysis) */ async getWidgetPatterns(widgetId: string): Promise { const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); let patterns: QueryPattern[] = []; // Try database first if (this.storage) { try { const rows = await this.storage.queryAll(` SELECT * FROM mcp_query_patterns WHERE widget_id = ? AND timestamp > ? ORDER BY timestamp DESC `, [widgetId, sevenDaysAgo.toISOString()]); patterns = rows.map(row => this.rowToPattern(row)); } catch (error) { logger.warn('⚠️ Database query failed, using cache'); } } // Fall back to cache if (patterns.length === 0) { patterns = this.cache.filter( p => p.widgetId === widgetId && p.timestamp > sevenDaysAgo ); } // Aggregate statistics const sourceCounts = new Map(); const timePatterns = new Map(); let totalLatency = 0; let successCount = 0; let lastActivity: Date | null = null; for (const p of patterns) { sourceCounts.set( p.sourceUsed, (sourceCounts.get(p.sourceUsed) || 0) + 1 ); const hour = p.userContext?.timeOfDay || 0; timePatterns.set(hour, (timePatterns.get(hour) || 0) + 1); totalLatency += p.latencyMs; if (p.success) successCount++; if (!lastActivity || p.timestamp > lastActivity) { lastActivity = p.timestamp; } } const commonSources = Array.from(sourceCounts.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 5) .map(([source]) => source); const timePatternArray = Array.from(timePatterns.entries()) .map(([hour, frequency]) => ({ hour, frequency })) .sort((a, b) => b.frequency - a.frequency); return { widgetId, commonSources, averageLatency: patterns.length > 0 ? totalLatency / patterns.length : 0, successRate: patterns.length > 0 ? successCount / patterns.length : 0, queryCount: patterns.length, timePatterns: timePatternArray, lastActivity }; } /** * Get average latency for a source */ async getAverageLatency(sourceName: string): Promise { if (this.storage) { try { const oneDayAgo = new Date(); oneDayAgo.setDate(oneDayAgo.getDate() - 1); const result = await this.storage.queryOne<{ avg_latency: number }>(` SELECT AVG(latency_ms) as avg_latency FROM mcp_query_patterns WHERE source_used = ? AND success = 1 AND timestamp > ? `, [sourceName, oneDayAgo.toISOString()]); if (result && result.avg_latency !== null) { return result.avg_latency; } // If DB has no data (result is null or avg_latency is null), fall back to cache } catch (error) { logger.warn('⚠️ Latency query failed:', error); } } // Fall back to cache const oneDayAgo = new Date(); oneDayAgo.setDate(oneDayAgo.getDate() - 1); const sourcePatterns = this.cache.filter( p => p.sourceUsed === sourceName && p.success && p.timestamp > oneDayAgo ); if (sourcePatterns.length === 0) return 0; return sourcePatterns.reduce((sum, p) => sum + p.latencyMs, 0) / sourcePatterns.length; } /** * Get success rate for a source and query type */ async getSuccessRate(sourceName: string, queryType: string): Promise { if (this.storage) { try { const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); const result = await this.storage.queryOne<{ total: number; successes: number }>(` SELECT COUNT(*) as total, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successes FROM mcp_query_patterns WHERE source_used = ? AND query_type = ? AND timestamp > ? `, [sourceName, queryType, sevenDaysAgo.toISOString()]); if (result && result.total > 0) { return result.successes / result.total; } // Fall back to cache if DB yields no matching rows } catch (error) { logger.warn('⚠️ Success rate query failed:', error); } } // Fall back to cache const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); const matching = this.cache.filter( p => p.sourceUsed === sourceName && p.queryType === queryType && p.timestamp > sevenDaysAgo ); if (matching.length === 0) return 0; return matching.filter(p => p.success).length / matching.length; } /** * Get comprehensive statistics */ async getStatistics(): Promise { const now = new Date(); const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); if (this.storage) { try { // Total patterns const total = await this.storage.queryScalar('SELECT COUNT(*) FROM mcp_query_patterns') || 0; // Unique widgets const uniqueWidgets = await this.storage.queryScalar('SELECT COUNT(DISTINCT widget_id) FROM mcp_query_patterns') || 0; // Unique sources const uniqueSources = await this.storage.queryScalar('SELECT COUNT(DISTINCT source_used) FROM mcp_query_patterns') || 0; // Average latency const avgLatency = await this.storage.queryScalar('SELECT AVG(latency_ms) FROM mcp_query_patterns WHERE success = 1') || 0; // Success rate const successRate = await this.storage.queryScalar('SELECT AVG(CAST(success AS FLOAT)) FROM mcp_query_patterns') || 0; // Queries last 24h const last24h = await this.storage.queryScalar( 'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?', [oneDayAgo.toISOString()] ) || 0; // Queries last 7d const last7d = await this.storage.queryScalar( 'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?', [sevenDaysAgo.toISOString()] ) || 0; // Top sources const topSourcesRows = await this.storage.queryAll<{ source_used: string; count: number; avg_latency: number }>(` SELECT source_used, COUNT(*) as count, AVG(latency_ms) as avg_latency FROM mcp_query_patterns GROUP BY source_used ORDER BY count DESC LIMIT 10 `); const topSources = topSourcesRows.map(row => ({ source: row.source_used, count: row.count, avgLatency: row.avg_latency || 0 })); // Top widgets const topWidgetsRows = await this.storage.queryAll<{ widget_id: string; count: number }>(` SELECT widget_id, COUNT(*) as count FROM mcp_query_patterns GROUP BY widget_id ORDER BY count DESC LIMIT 10 `); const topWidgets = topWidgetsRows.map(row => ({ widget: row.widget_id, count: row.count })); return { totalPatterns: total, uniqueWidgets, uniqueSources, avgLatencyMs: avgLatency, successRate, queriesLast24h: last24h, queriesLast7d: last7d, topSources, topWidgets }; } catch (error) { logger.warn('⚠️ Statistics query failed:', error); } } // Fallback to cache-based stats const widgets = new Set(this.cache.map(p => p.widgetId)); const sources = new Set(this.cache.map(p => p.sourceUsed)); return { totalPatterns: this.cache.length, uniqueWidgets: widgets.size, uniqueSources: sources.size, avgLatencyMs: this.cache.length > 0 ? this.cache.reduce((sum, p) => sum + p.latencyMs, 0) / this.cache.length : 0, successRate: this.cache.length > 0 ? this.cache.filter(p => p.success).length / this.cache.length : 0, queriesLast24h: this.cache.filter(p => p.timestamp > oneDayAgo).length, queriesLast7d: this.cache.filter(p => p.timestamp > sevenDaysAgo).length, topSources: [], topWidgets: [] }; } /** * Force flush all pending writes */ async flush(): Promise { this.flushWriteQueue(); } /** * Shutdown gracefully */ async shutdown(): Promise { if (this.flushInterval) { clearInterval(this.flushInterval); this.flushInterval = null; } this.flushWriteQueue(); logger.info('🧠 PatternMemory shut down gracefully'); } /** * Generate query signature for pattern matching */ private generateSignature(queryType: string, queryParams: any): string { const normalized = JSON.stringify({ type: queryType, params: this.normalizeParams(queryParams) }); return crypto.createHash('sha256') .update(normalized) .digest('hex') .substring(0, 16); } /** * Normalize query params for signature generation */ private normalizeParams(params: any): any { if (!params) return {}; // Remove volatile params (timestamps, random IDs, etc.) const { timestamp, requestId, _t, _ts, ...stable } = params; // Sort keys for consistent hashing const sorted: any = {}; Object.keys(stable).sort().forEach(key => { sorted[key] = stable[key]; }); return sorted; } }