Spaces:
Paused
Paused
| /** | |
| * ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| * β PATTERN MEMORY SERVICE - ENTERPRISE A+ β | |
| * β β | |
| * β Records and analyzes query patterns with persistent SQLite storage. β | |
| * β Enables: β | |
| * β β’ Learning which sources are best for which queries β | |
| * β β’ Predictive pre-fetching based on historical patterns β | |
| * β β’ Performance optimization recommendations β | |
| * β β’ Cross-session memory persistence β | |
| * β β | |
| * β Schema: mcp_query_patterns table (defined in schema.sql) β | |
| * ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| */ | |
| import { v4 as uuidv4 } from 'uuid'; | |
| import crypto from 'crypto'; | |
| import type { StorageAdapter } from './StorageAdapter.js'; | |
| import { logger } from '../../utils/logger.js'; | |
| import { eventBus, type EventType } from '../EventBus.js'; | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // INTERFACES | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export interface QueryPattern { | |
| id: string; | |
| widgetId: string; | |
| queryType: string; | |
| querySignature: string; | |
| sourceUsed: string; | |
| latencyMs: number; | |
| resultSize?: number; | |
| success: boolean; | |
| userContext?: { | |
| userId?: string; | |
| timeOfDay: number; | |
| dayOfWeek: number; | |
| }; | |
| timestamp: Date; | |
| } | |
| export interface SimilarQuery { | |
| pattern: QueryPattern; | |
| similarity: number; | |
| } | |
| export interface UsagePattern { | |
| widgetId: string; | |
| commonSources: string[]; | |
| averageLatency: number; | |
| successRate: number; | |
| queryCount: number; | |
| timePatterns: { | |
| hour: number; | |
| frequency: number; | |
| }[]; | |
| lastActivity: Date | null; | |
| } | |
| export interface PatternStatistics { | |
| totalPatterns: number; | |
| uniqueWidgets: number; | |
| uniqueSources: number; | |
| avgLatencyMs: number; | |
| successRate: number; | |
| queriesLast24h: number; | |
| queriesLast7d: number; | |
| topSources: { source: string; count: number; avgLatency: number }[]; | |
| topWidgets: { widget: string; count: number }[]; | |
| } | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // PATTERN MEMORY SERVICE - ENTERPRISE IMPLEMENTATION | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export class PatternMemory { | |
| private storage: StorageAdapter | null = null; | |
| private initialized: boolean = false; | |
| private writeQueue: QueryPattern[] = []; | |
| private flushInterval: ReturnType<typeof setInterval> | null = null; | |
| // In-memory cache for hot data (last 1000 patterns) | |
| private cache: QueryPattern[] = []; | |
| private readonly CACHE_SIZE = 1000; | |
| private readonly FLUSH_INTERVAL_MS = 5000; // Batch writes every 5 seconds | |
| private readonly MAX_DB_PATTERNS = 100000; | |
| constructor(storage?: StorageAdapter) { | |
| if (storage) { | |
| this.storage = storage; | |
| this.initialize(); | |
| } | |
| } | |
| /** | |
| * Initialize with storage adapter | |
| */ | |
| public setStorage(storage: StorageAdapter): void { | |
| this.storage = storage; | |
| this.initialize(); | |
| } | |
| /** | |
| * Initialize the service | |
| */ | |
| private async initialize(): Promise<void> { | |
| if (this.initialized || !this.storage) return; | |
| try { | |
| // Postgres uses different syntax for some things, but basic CREATE TABLE is mostly compatible | |
| // except for DATETIME DEFAULT CURRENT_TIMESTAMP vs TIMESTAMP WITH TIME ZONE | |
| const isPostgres = this.storage.mode === 'postgres'; | |
| const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME'; | |
| await this.storage.execute(` | |
| CREATE TABLE IF NOT EXISTS mcp_query_patterns ( | |
| id TEXT PRIMARY KEY, | |
| widget_id TEXT NOT NULL, | |
| query_type TEXT NOT NULL, | |
| query_signature TEXT NOT NULL, | |
| source_used TEXT NOT NULL, | |
| latency_ms INTEGER NOT NULL, | |
| result_size INTEGER, | |
| success BOOLEAN NOT NULL, | |
| user_context TEXT, | |
| timestamp ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| `); | |
| // Create indexes for fast queries | |
| await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_widget | |
| ON mcp_query_patterns(widget_id, timestamp DESC)`); | |
| await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_signature | |
| ON mcp_query_patterns(query_signature)`); | |
| await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_source | |
| ON mcp_query_patterns(source_used, timestamp DESC)`); | |
| // Load recent patterns into cache | |
| await this.loadCacheFromDb(); | |
| // Start background flush | |
| this.flushInterval = setInterval(() => this.flushWriteQueue(), this.FLUSH_INTERVAL_MS); | |
| this.initialized = true; | |
| logger.info(`π§ PatternMemory initialized with ${this.storage.mode} storage`); | |
| } catch (error) { | |
| logger.error('β PatternMemory initialization failed:', error); | |
| // Fall back to in-memory only | |
| this.initialized = true; | |
| } | |
| } | |
| /** | |
| * Load recent patterns from database into cache | |
| */ | |
| private async loadCacheFromDb(): Promise<void> { | |
| if (!this.storage) return; | |
| try { | |
| const rows = await this.storage.queryAll(` | |
| SELECT * FROM mcp_query_patterns | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| `, [this.CACHE_SIZE]); | |
| for (const row of rows) { | |
| this.cache.push(this.rowToPattern(row)); | |
| } | |
| logger.info(`π¦ Loaded ${this.cache.length} patterns into memory cache`); | |
| } catch (error) { | |
| logger.warn('β οΈ Failed to load pattern cache:', error); | |
| } | |
| } | |
| /** | |
| * Convert database row to QueryPattern | |
| */ | |
| private rowToPattern(row: any): QueryPattern { | |
| return { | |
| id: row.id, | |
| widgetId: row.widget_id, | |
| queryType: row.query_type, | |
| querySignature: row.query_signature, | |
| sourceUsed: row.source_used, | |
| latencyMs: row.latency_ms, | |
| resultSize: row.result_size || undefined, | |
| success: Boolean(row.success), | |
| userContext: row.user_context ? JSON.parse(row.user_context) : undefined, | |
| timestamp: new Date(row.timestamp) | |
| }; | |
| } | |
| /** | |
| * Record a query pattern for learning (async, batched) | |
| */ | |
| async recordQuery(params: { | |
| widgetId: string; | |
| queryType: string; | |
| queryParams: any; | |
| sourceUsed: string; | |
| latencyMs: number; | |
| resultSize?: number; | |
| success: boolean; | |
| userId?: string; | |
| }): Promise<void> { | |
| const id = uuidv4(); | |
| const signature = this.generateSignature(params.queryType, params.queryParams); | |
| const userContext = { | |
| userId: params.userId, | |
| timeOfDay: new Date().getHours(), | |
| dayOfWeek: new Date().getDay() | |
| }; | |
| const pattern: QueryPattern = { | |
| id, | |
| widgetId: params.widgetId, | |
| queryType: params.queryType, | |
| querySignature: signature, | |
| sourceUsed: params.sourceUsed, | |
| latencyMs: params.latencyMs, | |
| resultSize: params.resultSize, | |
| success: params.success, | |
| userContext, | |
| timestamp: new Date() | |
| }; | |
| // Add to cache (hot data) | |
| this.cache.unshift(pattern); | |
| if (this.cache.length > this.CACHE_SIZE) { | |
| this.cache.pop(); | |
| } | |
| // Queue for database write | |
| this.writeQueue.push(pattern); | |
| // Emit event for real-time monitoring | |
| eventBus.emit('pattern:recorded' as EventType, { | |
| widgetId: params.widgetId, | |
| sourceUsed: params.sourceUsed, | |
| latencyMs: params.latencyMs, | |
| success: params.success | |
| }); | |
| } | |
| /** | |
| * Flush write queue to database (batched writes) | |
| */ | |
| private async flushWriteQueue(): Promise<void> { | |
| if (!this.storage || this.writeQueue.length === 0) return; | |
| const patterns = [...this.writeQueue]; | |
| this.writeQueue = []; | |
| try { | |
| const columns = [ | |
| 'id', 'widget_id', 'query_type', 'query_signature', 'source_used', | |
| 'latency_ms', 'result_size', 'success', 'user_context', 'timestamp' | |
| ]; | |
| const rows = patterns.map(p => [ | |
| p.id, | |
| p.widgetId, | |
| p.queryType, | |
| p.querySignature, | |
| p.sourceUsed, | |
| p.latencyMs, | |
| p.resultSize ?? null, | |
| p.success, // Use boolean directly for Postgres compatibility | |
| p.userContext ? JSON.stringify(p.userContext) : null, | |
| p.timestamp.toISOString() | |
| ]); | |
| await this.storage.batchInsert('mcp_query_patterns', columns, rows); | |
| // Cleanup old patterns periodically | |
| if (Math.random() < 0.01) { // 1% chance per flush | |
| this.cleanupOldPatterns(); // No await to keep it background | |
| } | |
| } catch (error) { | |
| logger.error('β Failed to flush pattern write queue:', error); | |
| // Put patterns back in queue | |
| this.writeQueue.unshift(...patterns); | |
| } | |
| } | |
| /** | |
| * Cleanup old patterns to prevent database bloat | |
| */ | |
| private async cleanupOldPatterns(): Promise<void> { | |
| if (!this.storage) return; | |
| try { | |
| const thirtyDaysAgo = new Date(); | |
| thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30); | |
| // Need to fix this query for Postgres compatibility if nested subquery limit issues arise | |
| // But standard SQL roughly supports DELETE... | |
| // Postgres supports DELETE FROM ... WHERE id NOT IN (...) | |
| await this.storage.execute(` | |
| DELETE FROM mcp_query_patterns | |
| WHERE timestamp < ? | |
| AND id NOT IN ( | |
| SELECT id FROM mcp_query_patterns | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ) | |
| `, [thirtyDaysAgo.toISOString(), this.MAX_DB_PATTERNS]); | |
| logger.debug('π§Ή Cleaned up old query patterns'); | |
| } catch (error) { | |
| logger.warn('β οΈ Pattern cleanup failed:', error); | |
| } | |
| } | |
| /** | |
| * Find similar queries based on signature | |
| */ | |
| async findSimilarQueries( | |
| queryType: string, | |
| queryParams: any, | |
| limit: number = 10 | |
| ): Promise<SimilarQuery[]> { | |
| const signature = this.generateSignature(queryType, queryParams); | |
| // Check cache first | |
| const cacheMatches = this.cache | |
| .filter(p => p.querySignature === signature) | |
| .slice(0, limit); | |
| let results: SimilarQuery[] = cacheMatches.map(p => ({ pattern: p, similarity: 1.0 })); | |
| if (results.length >= limit) { | |
| return results; | |
| } | |
| // Query database for more | |
| if (this.storage) { | |
| try { | |
| const remaining = limit - results.length; | |
| const rows = await this.storage.queryAll(` | |
| SELECT * FROM mcp_query_patterns | |
| WHERE query_signature = ? | |
| AND id NOT IN (${results.map(() => '?').join(',') || '\'\''}) | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| `, [...results.map(r => r.pattern.id), signature, remaining]); | |
| const dbMatches = rows.map(row => ({ | |
| pattern: this.rowToPattern(row), | |
| similarity: 1.0 | |
| })); | |
| results = [...results, ...dbMatches]; | |
| } catch (error) { | |
| logger.warn('β οΈ Database query failed, using cache:', error); | |
| } | |
| } | |
| return results; | |
| } | |
| /** | |
| * Get widget usage patterns (comprehensive analysis) | |
| */ | |
| async getWidgetPatterns(widgetId: string): Promise<UsagePattern> { | |
| const sevenDaysAgo = new Date(); | |
| sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); | |
| let patterns: QueryPattern[] = []; | |
| // Try database first | |
| if (this.storage) { | |
| try { | |
| const rows = await this.storage.queryAll(` | |
| SELECT * FROM mcp_query_patterns | |
| WHERE widget_id = ? AND timestamp > ? | |
| ORDER BY timestamp DESC | |
| `, [widgetId, sevenDaysAgo.toISOString()]); | |
| patterns = rows.map(row => this.rowToPattern(row)); | |
| } catch (error) { | |
| logger.warn('β οΈ Database query failed, using cache'); | |
| } | |
| } | |
| // Fall back to cache | |
| if (patterns.length === 0) { | |
| patterns = this.cache.filter( | |
| p => p.widgetId === widgetId && p.timestamp > sevenDaysAgo | |
| ); | |
| } | |
| // Aggregate statistics | |
| const sourceCounts = new Map<string, number>(); | |
| const timePatterns = new Map<number, number>(); | |
| let totalLatency = 0; | |
| let successCount = 0; | |
| let lastActivity: Date | null = null; | |
| for (const p of patterns) { | |
| sourceCounts.set( | |
| p.sourceUsed, | |
| (sourceCounts.get(p.sourceUsed) || 0) + 1 | |
| ); | |
| const hour = p.userContext?.timeOfDay || 0; | |
| timePatterns.set(hour, (timePatterns.get(hour) || 0) + 1); | |
| totalLatency += p.latencyMs; | |
| if (p.success) successCount++; | |
| if (!lastActivity || p.timestamp > lastActivity) { | |
| lastActivity = p.timestamp; | |
| } | |
| } | |
| const commonSources = Array.from(sourceCounts.entries()) | |
| .sort((a, b) => b[1] - a[1]) | |
| .slice(0, 5) | |
| .map(([source]) => source); | |
| const timePatternArray = Array.from(timePatterns.entries()) | |
| .map(([hour, frequency]) => ({ hour, frequency })) | |
| .sort((a, b) => b.frequency - a.frequency); | |
| return { | |
| widgetId, | |
| commonSources, | |
| averageLatency: patterns.length > 0 ? totalLatency / patterns.length : 0, | |
| successRate: patterns.length > 0 ? successCount / patterns.length : 0, | |
| queryCount: patterns.length, | |
| timePatterns: timePatternArray, | |
| lastActivity | |
| }; | |
| } | |
| /** | |
| * Get average latency for a source | |
| */ | |
| async getAverageLatency(sourceName: string): Promise<number> { | |
| if (this.storage) { | |
| try { | |
| const oneDayAgo = new Date(); | |
| oneDayAgo.setDate(oneDayAgo.getDate() - 1); | |
| const result = await this.storage.queryOne<{ avg_latency: number }>(` | |
| SELECT AVG(latency_ms) as avg_latency | |
| FROM mcp_query_patterns | |
| WHERE source_used = ? AND success = 1 AND timestamp > ? | |
| `, [sourceName, oneDayAgo.toISOString()]); | |
| if (result && result.avg_latency !== null) { | |
| return result.avg_latency; | |
| } | |
| // If DB has no data (result is null or avg_latency is null), fall back to cache | |
| } catch (error) { | |
| logger.warn('β οΈ Latency query failed:', error); | |
| } | |
| } | |
| // Fall back to cache | |
| const oneDayAgo = new Date(); | |
| oneDayAgo.setDate(oneDayAgo.getDate() - 1); | |
| const sourcePatterns = this.cache.filter( | |
| p => p.sourceUsed === sourceName && p.success && p.timestamp > oneDayAgo | |
| ); | |
| if (sourcePatterns.length === 0) return 0; | |
| return sourcePatterns.reduce((sum, p) => sum + p.latencyMs, 0) / sourcePatterns.length; | |
| } | |
| /** | |
| * Get success rate for a source and query type | |
| */ | |
| async getSuccessRate(sourceName: string, queryType: string): Promise<number> { | |
| if (this.storage) { | |
| try { | |
| const sevenDaysAgo = new Date(); | |
| sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); | |
| const result = await this.storage.queryOne<{ total: number; successes: number }>(` | |
| SELECT | |
| COUNT(*) as total, | |
| SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successes | |
| FROM mcp_query_patterns | |
| WHERE source_used = ? AND query_type = ? AND timestamp > ? | |
| `, [sourceName, queryType, sevenDaysAgo.toISOString()]); | |
| if (result && result.total > 0) { | |
| return result.successes / result.total; | |
| } | |
| // Fall back to cache if DB yields no matching rows | |
| } catch (error) { | |
| logger.warn('β οΈ Success rate query failed:', error); | |
| } | |
| } | |
| // Fall back to cache | |
| const sevenDaysAgo = new Date(); | |
| sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); | |
| const matching = this.cache.filter( | |
| p => p.sourceUsed === sourceName && | |
| p.queryType === queryType && | |
| p.timestamp > sevenDaysAgo | |
| ); | |
| if (matching.length === 0) return 0; | |
| return matching.filter(p => p.success).length / matching.length; | |
| } | |
| /** | |
| * Get comprehensive statistics | |
| */ | |
| async getStatistics(): Promise<PatternStatistics> { | |
| const now = new Date(); | |
| const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); | |
| const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); | |
| if (this.storage) { | |
| try { | |
| // Total patterns | |
| const total = await this.storage.queryScalar<number>('SELECT COUNT(*) FROM mcp_query_patterns') || 0; | |
| // Unique widgets | |
| const uniqueWidgets = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT widget_id) FROM mcp_query_patterns') || 0; | |
| // Unique sources | |
| const uniqueSources = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT source_used) FROM mcp_query_patterns') || 0; | |
| // Average latency | |
| const avgLatency = await this.storage.queryScalar<number>('SELECT AVG(latency_ms) FROM mcp_query_patterns WHERE success = 1') || 0; | |
| // Success rate | |
| const successRate = await this.storage.queryScalar<number>('SELECT AVG(CAST(success AS FLOAT)) FROM mcp_query_patterns') || 0; | |
| // Queries last 24h | |
| const last24h = await this.storage.queryScalar<number>( | |
| 'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?', | |
| [oneDayAgo.toISOString()] | |
| ) || 0; | |
| // Queries last 7d | |
| const last7d = await this.storage.queryScalar<number>( | |
| 'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?', | |
| [sevenDaysAgo.toISOString()] | |
| ) || 0; | |
| // Top sources | |
| const topSourcesRows = await this.storage.queryAll<{ source_used: string; count: number; avg_latency: number }>(` | |
| SELECT source_used, COUNT(*) as count, AVG(latency_ms) as avg_latency | |
| FROM mcp_query_patterns | |
| GROUP BY source_used | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| `); | |
| const topSources = topSourcesRows.map(row => ({ | |
| source: row.source_used, | |
| count: row.count, | |
| avgLatency: row.avg_latency || 0 | |
| })); | |
| // Top widgets | |
| const topWidgetsRows = await this.storage.queryAll<{ widget_id: string; count: number }>(` | |
| SELECT widget_id, COUNT(*) as count | |
| FROM mcp_query_patterns | |
| GROUP BY widget_id | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| `); | |
| const topWidgets = topWidgetsRows.map(row => ({ | |
| widget: row.widget_id, | |
| count: row.count | |
| })); | |
| return { | |
| totalPatterns: total, | |
| uniqueWidgets, | |
| uniqueSources, | |
| avgLatencyMs: avgLatency, | |
| successRate, | |
| queriesLast24h: last24h, | |
| queriesLast7d: last7d, | |
| topSources, | |
| topWidgets | |
| }; | |
| } catch (error) { | |
| logger.warn('β οΈ Statistics query failed:', error); | |
| } | |
| } | |
| // Fallback to cache-based stats | |
| const widgets = new Set(this.cache.map(p => p.widgetId)); | |
| const sources = new Set(this.cache.map(p => p.sourceUsed)); | |
| return { | |
| totalPatterns: this.cache.length, | |
| uniqueWidgets: widgets.size, | |
| uniqueSources: sources.size, | |
| avgLatencyMs: this.cache.length > 0 | |
| ? this.cache.reduce((sum, p) => sum + p.latencyMs, 0) / this.cache.length | |
| : 0, | |
| successRate: this.cache.length > 0 | |
| ? this.cache.filter(p => p.success).length / this.cache.length | |
| : 0, | |
| queriesLast24h: this.cache.filter(p => p.timestamp > oneDayAgo).length, | |
| queriesLast7d: this.cache.filter(p => p.timestamp > sevenDaysAgo).length, | |
| topSources: [], | |
| topWidgets: [] | |
| }; | |
| } | |
| /** | |
| * Force flush all pending writes | |
| */ | |
| async flush(): Promise<void> { | |
| this.flushWriteQueue(); | |
| } | |
| /** | |
| * Shutdown gracefully | |
| */ | |
| async shutdown(): Promise<void> { | |
| if (this.flushInterval) { | |
| clearInterval(this.flushInterval); | |
| this.flushInterval = null; | |
| } | |
| this.flushWriteQueue(); | |
| logger.info('π§ PatternMemory shut down gracefully'); | |
| } | |
| /** | |
| * Generate query signature for pattern matching | |
| */ | |
| private generateSignature(queryType: string, queryParams: any): string { | |
| const normalized = JSON.stringify({ | |
| type: queryType, | |
| params: this.normalizeParams(queryParams) | |
| }); | |
| return crypto.createHash('sha256') | |
| .update(normalized) | |
| .digest('hex') | |
| .substring(0, 16); | |
| } | |
| /** | |
| * Normalize query params for signature generation | |
| */ | |
| private normalizeParams(params: any): any { | |
| if (!params) return {}; | |
| // Remove volatile params (timestamps, random IDs, etc.) | |
| const { timestamp, requestId, _t, _ts, ...stable } = params; | |
| // Sort keys for consistent hashing | |
| const sorted: any = {}; | |
| Object.keys(stable).sort().forEach(key => { | |
| sorted[key] = stable[key]; | |
| }); | |
| return sorted; | |
| } | |
| } | |