Kraft102's picture
Update backend source
34367da verified
/**
* ╔══════════════════════════════════════════════════════════════════════════════╗
* β•‘ PATTERN MEMORY SERVICE - ENTERPRISE A+ β•‘
* β•‘ β•‘
* β•‘ Records and analyzes query patterns with persistent SQLite storage. β•‘
* β•‘ Enables: β•‘
* β•‘ β€’ Learning which sources are best for which queries β•‘
* β•‘ β€’ Predictive pre-fetching based on historical patterns β•‘
* β•‘ β€’ Performance optimization recommendations β•‘
* β•‘ β€’ Cross-session memory persistence β•‘
* β•‘ β•‘
* β•‘ Schema: mcp_query_patterns table (defined in schema.sql) β•‘
* β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
*/
import { v4 as uuidv4 } from 'uuid';
import crypto from 'crypto';
import type { StorageAdapter } from './StorageAdapter.js';
import { logger } from '../../utils/logger.js';
import { eventBus, type EventType } from '../EventBus.js';
// ═══════════════════════════════════════════════════════════════════════════════
// INTERFACES
// ═══════════════════════════════════════════════════════════════════════════════
export interface QueryPattern {
id: string;
widgetId: string;
queryType: string;
querySignature: string;
sourceUsed: string;
latencyMs: number;
resultSize?: number;
success: boolean;
userContext?: {
userId?: string;
timeOfDay: number;
dayOfWeek: number;
};
timestamp: Date;
}
export interface SimilarQuery {
pattern: QueryPattern;
similarity: number;
}
export interface UsagePattern {
widgetId: string;
commonSources: string[];
averageLatency: number;
successRate: number;
queryCount: number;
timePatterns: {
hour: number;
frequency: number;
}[];
lastActivity: Date | null;
}
export interface PatternStatistics {
totalPatterns: number;
uniqueWidgets: number;
uniqueSources: number;
avgLatencyMs: number;
successRate: number;
queriesLast24h: number;
queriesLast7d: number;
topSources: { source: string; count: number; avgLatency: number }[];
topWidgets: { widget: string; count: number }[];
}
// ═══════════════════════════════════════════════════════════════════════════════
// PATTERN MEMORY SERVICE - ENTERPRISE IMPLEMENTATION
// ═══════════════════════════════════════════════════════════════════════════════
export class PatternMemory {
private storage: StorageAdapter | null = null;
private initialized: boolean = false;
private writeQueue: QueryPattern[] = [];
private flushInterval: ReturnType<typeof setInterval> | null = null;
// In-memory cache for hot data (last 1000 patterns)
private cache: QueryPattern[] = [];
private readonly CACHE_SIZE = 1000;
private readonly FLUSH_INTERVAL_MS = 5000; // Batch writes every 5 seconds
private readonly MAX_DB_PATTERNS = 100000;
constructor(storage?: StorageAdapter) {
if (storage) {
this.storage = storage;
this.initialize();
}
}
/**
* Initialize with storage adapter
*/
public setStorage(storage: StorageAdapter): void {
this.storage = storage;
this.initialize();
}
/**
* Initialize the service
*/
private async initialize(): Promise<void> {
if (this.initialized || !this.storage) return;
try {
// Postgres uses different syntax for some things, but basic CREATE TABLE is mostly compatible
// except for DATETIME DEFAULT CURRENT_TIMESTAMP vs TIMESTAMP WITH TIME ZONE
const isPostgres = this.storage.mode === 'postgres';
const timestampType = isPostgres ? 'TIMESTAMP WITH TIME ZONE' : 'DATETIME';
await this.storage.execute(`
CREATE TABLE IF NOT EXISTS mcp_query_patterns (
id TEXT PRIMARY KEY,
widget_id TEXT NOT NULL,
query_type TEXT NOT NULL,
query_signature TEXT NOT NULL,
source_used TEXT NOT NULL,
latency_ms INTEGER NOT NULL,
result_size INTEGER,
success BOOLEAN NOT NULL,
user_context TEXT,
timestamp ${timestampType} NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
// Create indexes for fast queries
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_widget
ON mcp_query_patterns(widget_id, timestamp DESC)`);
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_signature
ON mcp_query_patterns(query_signature)`);
await this.storage.execute(`CREATE INDEX IF NOT EXISTS idx_query_patterns_source
ON mcp_query_patterns(source_used, timestamp DESC)`);
// Load recent patterns into cache
await this.loadCacheFromDb();
// Start background flush
this.flushInterval = setInterval(() => this.flushWriteQueue(), this.FLUSH_INTERVAL_MS);
this.initialized = true;
logger.info(`🧠 PatternMemory initialized with ${this.storage.mode} storage`);
} catch (error) {
logger.error('❌ PatternMemory initialization failed:', error);
// Fall back to in-memory only
this.initialized = true;
}
}
/**
* Load recent patterns from database into cache
*/
private async loadCacheFromDb(): Promise<void> {
if (!this.storage) return;
try {
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_query_patterns
ORDER BY timestamp DESC
LIMIT ?
`, [this.CACHE_SIZE]);
for (const row of rows) {
this.cache.push(this.rowToPattern(row));
}
logger.info(`πŸ“¦ Loaded ${this.cache.length} patterns into memory cache`);
} catch (error) {
logger.warn('⚠️ Failed to load pattern cache:', error);
}
}
/**
* Convert database row to QueryPattern
*/
private rowToPattern(row: any): QueryPattern {
return {
id: row.id,
widgetId: row.widget_id,
queryType: row.query_type,
querySignature: row.query_signature,
sourceUsed: row.source_used,
latencyMs: row.latency_ms,
resultSize: row.result_size || undefined,
success: Boolean(row.success),
userContext: row.user_context ? JSON.parse(row.user_context) : undefined,
timestamp: new Date(row.timestamp)
};
}
/**
* Record a query pattern for learning (async, batched)
*/
async recordQuery(params: {
widgetId: string;
queryType: string;
queryParams: any;
sourceUsed: string;
latencyMs: number;
resultSize?: number;
success: boolean;
userId?: string;
}): Promise<void> {
const id = uuidv4();
const signature = this.generateSignature(params.queryType, params.queryParams);
const userContext = {
userId: params.userId,
timeOfDay: new Date().getHours(),
dayOfWeek: new Date().getDay()
};
const pattern: QueryPattern = {
id,
widgetId: params.widgetId,
queryType: params.queryType,
querySignature: signature,
sourceUsed: params.sourceUsed,
latencyMs: params.latencyMs,
resultSize: params.resultSize,
success: params.success,
userContext,
timestamp: new Date()
};
// Add to cache (hot data)
this.cache.unshift(pattern);
if (this.cache.length > this.CACHE_SIZE) {
this.cache.pop();
}
// Queue for database write
this.writeQueue.push(pattern);
// Emit event for real-time monitoring
eventBus.emit('pattern:recorded' as EventType, {
widgetId: params.widgetId,
sourceUsed: params.sourceUsed,
latencyMs: params.latencyMs,
success: params.success
});
}
/**
* Flush write queue to database (batched writes)
*/
private async flushWriteQueue(): Promise<void> {
if (!this.storage || this.writeQueue.length === 0) return;
const patterns = [...this.writeQueue];
this.writeQueue = [];
try {
const columns = [
'id', 'widget_id', 'query_type', 'query_signature', 'source_used',
'latency_ms', 'result_size', 'success', 'user_context', 'timestamp'
];
const rows = patterns.map(p => [
p.id,
p.widgetId,
p.queryType,
p.querySignature,
p.sourceUsed,
p.latencyMs,
p.resultSize ?? null,
p.success, // Use boolean directly for Postgres compatibility
p.userContext ? JSON.stringify(p.userContext) : null,
p.timestamp.toISOString()
]);
await this.storage.batchInsert('mcp_query_patterns', columns, rows);
// Cleanup old patterns periodically
if (Math.random() < 0.01) { // 1% chance per flush
this.cleanupOldPatterns(); // No await to keep it background
}
} catch (error) {
logger.error('❌ Failed to flush pattern write queue:', error);
// Put patterns back in queue
this.writeQueue.unshift(...patterns);
}
}
/**
* Cleanup old patterns to prevent database bloat
*/
private async cleanupOldPatterns(): Promise<void> {
if (!this.storage) return;
try {
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
// Need to fix this query for Postgres compatibility if nested subquery limit issues arise
// But standard SQL roughly supports DELETE...
// Postgres supports DELETE FROM ... WHERE id NOT IN (...)
await this.storage.execute(`
DELETE FROM mcp_query_patterns
WHERE timestamp < ?
AND id NOT IN (
SELECT id FROM mcp_query_patterns
ORDER BY timestamp DESC
LIMIT ?
)
`, [thirtyDaysAgo.toISOString(), this.MAX_DB_PATTERNS]);
logger.debug('🧹 Cleaned up old query patterns');
} catch (error) {
logger.warn('⚠️ Pattern cleanup failed:', error);
}
}
/**
* Find similar queries based on signature
*/
async findSimilarQueries(
queryType: string,
queryParams: any,
limit: number = 10
): Promise<SimilarQuery[]> {
const signature = this.generateSignature(queryType, queryParams);
// Check cache first
const cacheMatches = this.cache
.filter(p => p.querySignature === signature)
.slice(0, limit);
let results: SimilarQuery[] = cacheMatches.map(p => ({ pattern: p, similarity: 1.0 }));
if (results.length >= limit) {
return results;
}
// Query database for more
if (this.storage) {
try {
const remaining = limit - results.length;
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_query_patterns
WHERE query_signature = ?
AND id NOT IN (${results.map(() => '?').join(',') || '\'\''})
ORDER BY timestamp DESC
LIMIT ?
`, [...results.map(r => r.pattern.id), signature, remaining]);
const dbMatches = rows.map(row => ({
pattern: this.rowToPattern(row),
similarity: 1.0
}));
results = [...results, ...dbMatches];
} catch (error) {
logger.warn('⚠️ Database query failed, using cache:', error);
}
}
return results;
}
/**
* Get widget usage patterns (comprehensive analysis)
*/
async getWidgetPatterns(widgetId: string): Promise<UsagePattern> {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
let patterns: QueryPattern[] = [];
// Try database first
if (this.storage) {
try {
const rows = await this.storage.queryAll(`
SELECT * FROM mcp_query_patterns
WHERE widget_id = ? AND timestamp > ?
ORDER BY timestamp DESC
`, [widgetId, sevenDaysAgo.toISOString()]);
patterns = rows.map(row => this.rowToPattern(row));
} catch (error) {
logger.warn('⚠️ Database query failed, using cache');
}
}
// Fall back to cache
if (patterns.length === 0) {
patterns = this.cache.filter(
p => p.widgetId === widgetId && p.timestamp > sevenDaysAgo
);
}
// Aggregate statistics
const sourceCounts = new Map<string, number>();
const timePatterns = new Map<number, number>();
let totalLatency = 0;
let successCount = 0;
let lastActivity: Date | null = null;
for (const p of patterns) {
sourceCounts.set(
p.sourceUsed,
(sourceCounts.get(p.sourceUsed) || 0) + 1
);
const hour = p.userContext?.timeOfDay || 0;
timePatterns.set(hour, (timePatterns.get(hour) || 0) + 1);
totalLatency += p.latencyMs;
if (p.success) successCount++;
if (!lastActivity || p.timestamp > lastActivity) {
lastActivity = p.timestamp;
}
}
const commonSources = Array.from(sourceCounts.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 5)
.map(([source]) => source);
const timePatternArray = Array.from(timePatterns.entries())
.map(([hour, frequency]) => ({ hour, frequency }))
.sort((a, b) => b.frequency - a.frequency);
return {
widgetId,
commonSources,
averageLatency: patterns.length > 0 ? totalLatency / patterns.length : 0,
successRate: patterns.length > 0 ? successCount / patterns.length : 0,
queryCount: patterns.length,
timePatterns: timePatternArray,
lastActivity
};
}
/**
* Get average latency for a source
*/
async getAverageLatency(sourceName: string): Promise<number> {
if (this.storage) {
try {
const oneDayAgo = new Date();
oneDayAgo.setDate(oneDayAgo.getDate() - 1);
const result = await this.storage.queryOne<{ avg_latency: number }>(`
SELECT AVG(latency_ms) as avg_latency
FROM mcp_query_patterns
WHERE source_used = ? AND success = 1 AND timestamp > ?
`, [sourceName, oneDayAgo.toISOString()]);
if (result && result.avg_latency !== null) {
return result.avg_latency;
}
// If DB has no data (result is null or avg_latency is null), fall back to cache
} catch (error) {
logger.warn('⚠️ Latency query failed:', error);
}
}
// Fall back to cache
const oneDayAgo = new Date();
oneDayAgo.setDate(oneDayAgo.getDate() - 1);
const sourcePatterns = this.cache.filter(
p => p.sourceUsed === sourceName && p.success && p.timestamp > oneDayAgo
);
if (sourcePatterns.length === 0) return 0;
return sourcePatterns.reduce((sum, p) => sum + p.latencyMs, 0) / sourcePatterns.length;
}
/**
* Get success rate for a source and query type
*/
async getSuccessRate(sourceName: string, queryType: string): Promise<number> {
if (this.storage) {
try {
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const result = await this.storage.queryOne<{ total: number; successes: number }>(`
SELECT
COUNT(*) as total,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successes
FROM mcp_query_patterns
WHERE source_used = ? AND query_type = ? AND timestamp > ?
`, [sourceName, queryType, sevenDaysAgo.toISOString()]);
if (result && result.total > 0) {
return result.successes / result.total;
}
// Fall back to cache if DB yields no matching rows
} catch (error) {
logger.warn('⚠️ Success rate query failed:', error);
}
}
// Fall back to cache
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const matching = this.cache.filter(
p => p.sourceUsed === sourceName &&
p.queryType === queryType &&
p.timestamp > sevenDaysAgo
);
if (matching.length === 0) return 0;
return matching.filter(p => p.success).length / matching.length;
}
/**
* Get comprehensive statistics
*/
async getStatistics(): Promise<PatternStatistics> {
const now = new Date();
const oneDayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000);
const sevenDaysAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
if (this.storage) {
try {
// Total patterns
const total = await this.storage.queryScalar<number>('SELECT COUNT(*) FROM mcp_query_patterns') || 0;
// Unique widgets
const uniqueWidgets = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT widget_id) FROM mcp_query_patterns') || 0;
// Unique sources
const uniqueSources = await this.storage.queryScalar<number>('SELECT COUNT(DISTINCT source_used) FROM mcp_query_patterns') || 0;
// Average latency
const avgLatency = await this.storage.queryScalar<number>('SELECT AVG(latency_ms) FROM mcp_query_patterns WHERE success = 1') || 0;
// Success rate
const successRate = await this.storage.queryScalar<number>('SELECT AVG(CAST(success AS FLOAT)) FROM mcp_query_patterns') || 0;
// Queries last 24h
const last24h = await this.storage.queryScalar<number>(
'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?',
[oneDayAgo.toISOString()]
) || 0;
// Queries last 7d
const last7d = await this.storage.queryScalar<number>(
'SELECT COUNT(*) FROM mcp_query_patterns WHERE timestamp > ?',
[sevenDaysAgo.toISOString()]
) || 0;
// Top sources
const topSourcesRows = await this.storage.queryAll<{ source_used: string; count: number; avg_latency: number }>(`
SELECT source_used, COUNT(*) as count, AVG(latency_ms) as avg_latency
FROM mcp_query_patterns
GROUP BY source_used
ORDER BY count DESC
LIMIT 10
`);
const topSources = topSourcesRows.map(row => ({
source: row.source_used,
count: row.count,
avgLatency: row.avg_latency || 0
}));
// Top widgets
const topWidgetsRows = await this.storage.queryAll<{ widget_id: string; count: number }>(`
SELECT widget_id, COUNT(*) as count
FROM mcp_query_patterns
GROUP BY widget_id
ORDER BY count DESC
LIMIT 10
`);
const topWidgets = topWidgetsRows.map(row => ({
widget: row.widget_id,
count: row.count
}));
return {
totalPatterns: total,
uniqueWidgets,
uniqueSources,
avgLatencyMs: avgLatency,
successRate,
queriesLast24h: last24h,
queriesLast7d: last7d,
topSources,
topWidgets
};
} catch (error) {
logger.warn('⚠️ Statistics query failed:', error);
}
}
// Fallback to cache-based stats
const widgets = new Set(this.cache.map(p => p.widgetId));
const sources = new Set(this.cache.map(p => p.sourceUsed));
return {
totalPatterns: this.cache.length,
uniqueWidgets: widgets.size,
uniqueSources: sources.size,
avgLatencyMs: this.cache.length > 0
? this.cache.reduce((sum, p) => sum + p.latencyMs, 0) / this.cache.length
: 0,
successRate: this.cache.length > 0
? this.cache.filter(p => p.success).length / this.cache.length
: 0,
queriesLast24h: this.cache.filter(p => p.timestamp > oneDayAgo).length,
queriesLast7d: this.cache.filter(p => p.timestamp > sevenDaysAgo).length,
topSources: [],
topWidgets: []
};
}
/**
* Force flush all pending writes
*/
async flush(): Promise<void> {
this.flushWriteQueue();
}
/**
* Shutdown gracefully
*/
async shutdown(): Promise<void> {
if (this.flushInterval) {
clearInterval(this.flushInterval);
this.flushInterval = null;
}
this.flushWriteQueue();
logger.info('🧠 PatternMemory shut down gracefully');
}
/**
* Generate query signature for pattern matching
*/
private generateSignature(queryType: string, queryParams: any): string {
const normalized = JSON.stringify({
type: queryType,
params: this.normalizeParams(queryParams)
});
return crypto.createHash('sha256')
.update(normalized)
.digest('hex')
.substring(0, 16);
}
/**
* Normalize query params for signature generation
*/
private normalizeParams(params: any): any {
if (!params) return {};
// Remove volatile params (timestamps, random IDs, etc.)
const { timestamp, requestId, _t, _ts, ...stable } = params;
// Sort keys for consistent hashing
const sorted: any = {};
Object.keys(stable).sort().forEach(key => {
sorted[key] = stable[key];
});
return sorted;
}
}