Kraft102's picture
fix: sql.js Docker/Alpine compatibility layer for PatternMemory and FailureMemory
5a81b95
/**
* ╔══════════════════════════════════════════════════════════════════════════════╗
* β•‘ UNIFIED DATA SERVICE - ENTERPRISE A+ β•‘
* β•‘ β•‘
* β•‘ Frontend abstraction layer for autonomous backend queries. β•‘
* β•‘ Features: β•‘
* β•‘ β€’ Intelligent query routing via autonomous MCP system β•‘
* β•‘ β€’ Real-time source health monitoring β•‘
* β•‘ β€’ Automatic retries with exponential backoff β•‘
* β•‘ β€’ Request caching with smart invalidation β•‘
* β•‘ β€’ WebSocket subscriptions for live updates β•‘
* β•‘ β€’ Comprehensive telemetry and observability β•‘
* β•‘ β•‘
* β•‘ Backend: /api/mcp/autonomous/* β•‘
* β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
*/
// ═══════════════════════════════════════════════════════════════════════════════
// TYPES & INTERFACES
// ═══════════════════════════════════════════════════════════════════════════════
export interface QueryOptions {
/** Widget ID for pattern learning */
widgetId?: string;
/** Query priority (affects source selection) */
priority?: 'low' | 'normal' | 'high' | 'critical';
/** Data freshness requirement */
freshness?: 'stale' | 'normal' | 'realtime';
/** Cache TTL in milliseconds (0 = no cache) */
cacheTtl?: number;
/** Timeout in milliseconds */
timeout?: number;
/** Skip cache and force fresh query */
skipCache?: boolean;
/** Abort signal for cancellation */
signal?: AbortSignal;
}
export interface QueryResult<T = any> {
data: T;
source: string;
latencyMs: number;
cached: boolean;
confidence: number;
timestamp: Date;
}
export interface SourceInfo {
name: string;
type: string;
healthy: boolean;
latencyMs: number;
capabilities: string[];
requestCount?: number;
}
export interface AutonomousStats {
totalDecisions: number;
averageConfidence: number;
averageLatencyMs: number;
successRate: number;
topSources: { source: string; count: number; avgLatency: number }[];
topWidgets: { widget: string; count: number }[];
queriesLast24h: number;
}
export interface HealthStatus {
overall: 'healthy' | 'degraded' | 'critical';
sources: SourceInfo[];
lastCheck: Date;
}
interface CacheEntry<T> {
data: T;
source: string;
expiresAt: number;
createdAt: number;
}
type EventCallback = (data: any) => void;
// ═══════════════════════════════════════════════════════════════════════════════
// UNIFIED DATA SERVICE
// ═══════════════════════════════════════════════════════════════════════════════
class UnifiedDataService {
private baseUrl: string;
private wsUrl: string;
private ws: WebSocket | null = null;
private wsReconnectAttempts = 0;
private maxWsReconnectAttempts = 10;
private wsReconnectDelay = 1000;
private cache = new Map<string, CacheEntry<any>>();
private pendingRequests = new Map<string, Promise<any>>();
private eventListeners = new Map<string, Set<EventCallback>>();
private defaultTimeout = 30000;
private defaultCacheTtl = 60000; // 1 minute
// Telemetry
private queryCount = 0;
private cacheHits = 0;
private errorCount = 0;
constructor() {
// Auto-detect backend URL
const isProduction = import.meta.env.PROD;
const backendPort = import.meta.env.VITE_BACKEND_PORT || '3001';
if (isProduction) {
this.baseUrl = '/api/mcp/autonomous';
this.wsUrl = `wss://${window.location.host}/mcp/ws`;
} else {
this.baseUrl = `http://localhost:${backendPort}/api/mcp/autonomous`;
this.wsUrl = `ws://localhost:${backendPort}/mcp/ws`;
}
// Connect WebSocket on init
this.connectWebSocket();
// Cleanup stale cache entries periodically
setInterval(() => this.cleanupCache(), 30000);
}
// ═══════════════════════════════════════════════════════════════════════════
// CORE QUERY METHODS
// ═══════════════════════════════════════════════════════════════════════════
/**
* Query data through the autonomous routing system
*/
async query<T = any>(
type: string,
params: Record<string, any> = {},
options: QueryOptions = {}
): Promise<QueryResult<T>> {
const {
widgetId = 'unknown',
priority = 'normal',
freshness = 'normal',
cacheTtl = this.defaultCacheTtl,
timeout = this.defaultTimeout,
skipCache = false,
signal
} = options;
this.queryCount++;
const startTime = Date.now();
// Generate cache key
const cacheKey = this.getCacheKey(type, params, widgetId);
// Check cache (unless skipped or realtime required)
if (!skipCache && freshness !== 'realtime') {
const cached = this.getFromCache<T>(cacheKey);
if (cached) {
this.cacheHits++;
return {
data: cached.data,
source: cached.source + ' (cached)',
latencyMs: Date.now() - startTime,
cached: true,
confidence: 1.0,
timestamp: new Date(cached.createdAt)
};
}
}
// Deduplicate in-flight requests
const pendingKey = cacheKey;
if (this.pendingRequests.has(pendingKey)) {
return this.pendingRequests.get(pendingKey)!;
}
// Make the request
const requestPromise = this.executeQuery<T>(
type,
params,
{ widgetId, priority, freshness, timeout, signal }
).then(result => {
// Cache successful results
if (cacheTtl > 0) {
this.setCache(cacheKey, result.data, result.source, cacheTtl);
}
return result;
}).finally(() => {
this.pendingRequests.delete(pendingKey);
});
this.pendingRequests.set(pendingKey, requestPromise);
return requestPromise;
}
/**
* Execute the actual query with retries
*/
private async executeQuery<T>(
type: string,
params: Record<string, any>,
options: {
widgetId: string;
priority: string;
freshness: string;
timeout: number;
signal?: AbortSignal;
}
): Promise<QueryResult<T>> {
const startTime = Date.now();
const maxRetries = options.priority === 'critical' ? 3 : 1;
let lastError: Error | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), options.timeout);
// Combine external signal with timeout
if (options.signal) {
options.signal.addEventListener('abort', () => controller.abort());
}
const response = await fetch(`${this.baseUrl}/query`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
type,
params,
widgetId: options.widgetId,
priority: options.priority,
freshness: options.freshness
}),
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
throw new Error(errorData.error || `HTTP ${response.status}`);
}
const result = await response.json();
return {
data: result.data as T,
source: result.meta?.source || 'unknown',
latencyMs: Date.now() - startTime,
cached: false,
confidence: result.meta?.confidence || 1.0,
timestamp: new Date()
};
} catch (error: any) {
lastError = error;
this.errorCount++;
// Don't retry on abort
if (error.name === 'AbortError') {
throw new Error('Query cancelled or timed out');
}
// Exponential backoff before retry
if (attempt < maxRetries) {
await this.sleep(Math.pow(2, attempt) * 500);
}
}
}
throw lastError || new Error('Query failed after retries');
}
// ═══════════════════════════════════════════════════════════════════════════
// CONVENIENCE METHODS
// ═══════════════════════════════════════════════════════════════════════════
/**
* Fetch a list of items
*/
async list<T = any>(
domain: string,
options: QueryOptions = {}
): Promise<T[]> {
const result = await this.query<T[]>(`${domain}.list`, {}, options);
return result.data;
}
/**
* Fetch a single item by ID
*/
async get<T = any>(
domain: string,
id: string,
options: QueryOptions = {}
): Promise<T> {
const result = await this.query<T>(`${domain}.get`, { id }, options);
return result.data;
}
/**
* Search with a query string
*/
async search<T = any>(
domain: string,
query: string,
options: QueryOptions = {}
): Promise<T[]> {
const result = await this.query<T[]>(
`${domain}.search`,
{ query },
{ ...options, priority: 'high' }
);
return result.data;
}
// ═══════════════════════════════════════════════════════════════════════════
// MONITORING & STATUS
// ═══════════════════════════════════════════════════════════════════════════
/**
* Get available data sources and their health
*/
async getSources(): Promise<SourceInfo[]> {
try {
const response = await fetch(`${this.baseUrl}/sources`);
if (!response.ok) throw new Error('Failed to fetch sources');
const data = await response.json();
return data.sources || [];
} catch (error) {
console.error('Failed to fetch sources:', error);
return [];
}
}
/**
* Get autonomous system statistics
*/
async getStats(): Promise<AutonomousStats> {
try {
const response = await fetch(`${this.baseUrl}/stats`);
if (!response.ok) throw new Error('Failed to fetch stats');
return await response.json();
} catch (error) {
console.error('Failed to fetch stats:', error);
return {
totalDecisions: 0,
averageConfidence: 0,
averageLatencyMs: 0,
successRate: 0,
topSources: [],
topWidgets: [],
queriesLast24h: 0
};
}
}
/**
* Get overall system health
*/
async getHealth(): Promise<HealthStatus> {
const sources = await this.getSources();
const healthySources = sources.filter(s => s.healthy).length;
const totalSources = sources.length;
let overall: HealthStatus['overall'] = 'healthy';
if (totalSources > 0) {
const ratio = healthySources / totalSources;
if (ratio < 0.5) overall = 'critical';
else if (ratio < 0.8) overall = 'degraded';
}
return {
overall,
sources,
lastCheck: new Date()
};
}
/**
* Get decision history
*/
async getDecisions(limit: number = 50): Promise<any[]> {
try {
const response = await fetch(`${this.baseUrl}/decisions?limit=${limit}`);
if (!response.ok) throw new Error('Failed to fetch decisions');
const data = await response.json();
return data.decisions || [];
} catch (error) {
console.error('Failed to fetch decisions:', error);
return [];
}
}
/**
* Get learned patterns for a widget
*/
async getPatterns(widgetId: string): Promise<any> {
try {
const response = await fetch(`${this.baseUrl}/patterns?widgetId=${widgetId}`);
if (!response.ok) throw new Error('Failed to fetch patterns');
return await response.json();
} catch (error) {
console.error('Failed to fetch patterns:', error);
return null;
}
}
// ═══════════════════════════════════════════════════════════════════════════
// WEBSOCKET SUBSCRIPTIONS
// ═══════════════════════════════════════════════════════════════════════════
/**
* Subscribe to real-time events
*/
subscribe(eventType: string, callback: EventCallback): () => void {
if (!this.eventListeners.has(eventType)) {
this.eventListeners.set(eventType, new Set());
}
this.eventListeners.get(eventType)!.add(callback);
// Return unsubscribe function
return () => {
const listeners = this.eventListeners.get(eventType);
if (listeners) {
listeners.delete(callback);
}
};
}
/**
* Subscribe to source health changes
*/
onSourceHealthChange(callback: (sources: SourceInfo[]) => void): () => void {
return this.subscribe('source:health', callback);
}
/**
* Subscribe to pattern updates
*/
onPatternRecorded(callback: (pattern: any) => void): () => void {
return this.subscribe('pattern:recorded', callback);
}
/**
* Subscribe to failure events
*/
onFailure(callback: (failure: any) => void): () => void {
return this.subscribe('failure:recorded', callback);
}
/**
* Connect WebSocket
*/
private connectWebSocket(): void {
if (this.ws?.readyState === WebSocket.OPEN) return;
try {
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
console.log('πŸ”Œ UnifiedDataService WebSocket connected');
this.wsReconnectAttempts = 0;
this.wsReconnectDelay = 1000;
// Subscribe to autonomous events
this.ws?.send(JSON.stringify({
type: 'subscribe',
topics: ['pattern:*', 'failure:*', 'source:*', 'decision:*']
}));
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleWebSocketMessage(message);
} catch (error) {
console.warn('Failed to parse WebSocket message:', error);
}
};
this.ws.onerror = (error) => {
console.warn('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('WebSocket closed, attempting reconnect...');
this.scheduleReconnect();
};
} catch (error) {
console.warn('Failed to connect WebSocket:', error);
this.scheduleReconnect();
}
}
/**
* Handle incoming WebSocket message
*/
private handleWebSocketMessage(message: any): void {
const { type, data } = message;
// Emit to listeners
const listeners = this.eventListeners.get(type);
if (listeners) {
listeners.forEach(callback => {
try {
callback(data);
} catch (error) {
console.warn('Event listener error:', error);
}
});
}
// Also emit to wildcard listeners
const wildcardType = type.split(':')[0] + ':*';
const wildcardListeners = this.eventListeners.get(wildcardType);
if (wildcardListeners) {
wildcardListeners.forEach(callback => {
try {
callback({ type, data });
} catch (error) {
console.warn('Wildcard listener error:', error);
}
});
}
}
/**
* Schedule WebSocket reconnection
*/
private scheduleReconnect(): void {
if (this.wsReconnectAttempts >= this.maxWsReconnectAttempts) {
console.error('Max WebSocket reconnect attempts reached');
return;
}
this.wsReconnectAttempts++;
this.wsReconnectDelay = Math.min(this.wsReconnectDelay * 1.5, 30000);
setTimeout(() => {
this.connectWebSocket();
}, this.wsReconnectDelay);
}
// ═══════════════════════════════════════════════════════════════════════════
// CACHE MANAGEMENT
// ═══════════════════════════════════════════════════════════════════════════
/**
* Generate cache key
*/
private getCacheKey(type: string, params: any, widgetId: string): string {
return `${widgetId}:${type}:${JSON.stringify(params)}`;
}
/**
* Get item from cache
*/
private getFromCache<T>(key: string): CacheEntry<T> | null {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return null;
}
return entry as CacheEntry<T>;
}
/**
* Set item in cache
*/
private setCache(key: string, data: any, source: string, ttl: number): void {
this.cache.set(key, {
data,
source,
expiresAt: Date.now() + ttl,
createdAt: Date.now()
});
}
/**
* Invalidate cache for a pattern
*/
invalidateCache(pattern?: string): void {
if (!pattern) {
this.cache.clear();
return;
}
for (const key of this.cache.keys()) {
if (key.includes(pattern)) {
this.cache.delete(key);
}
}
}
/**
* Cleanup expired cache entries
*/
private cleanupCache(): void {
const now = Date.now();
for (const [key, entry] of this.cache.entries()) {
if (now > entry.expiresAt) {
this.cache.delete(key);
}
}
}
// ═══════════════════════════════════════════════════════════════════════════
// TELEMETRY
// ═══════════════════════════════════════════════════════════════════════════
/**
* Get service telemetry
*/
getTelemetry() {
return {
queryCount: this.queryCount,
cacheHits: this.cacheHits,
cacheSize: this.cache.size,
errorCount: this.errorCount,
cacheHitRate: this.queryCount > 0
? (this.cacheHits / this.queryCount * 100).toFixed(1) + '%'
: '0%',
wsConnected: this.ws?.readyState === WebSocket.OPEN
};
}
// ═══════════════════════════════════════════════════════════════════════════
// UTILITIES
// ═══════════════════════════════════════════════════════════════════════════
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Disconnect and cleanup
*/
disconnect(): void {
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.cache.clear();
this.eventListeners.clear();
}
}
// ═══════════════════════════════════════════════════════════════════════════════
// SINGLETON EXPORT
// ═══════════════════════════════════════════════════════════════════════════════
export const dataService = new UnifiedDataService();
export const unifiedDataService = dataService;
// Also export the class for testing
export { UnifiedDataService };