/** * ╔══════════════════════════════════════════════════════════════════════════════╗ * ║ UNIFIED DATA SERVICE - ENTERPRISE A+ ║ * ║ ║ * ║ Frontend abstraction layer for autonomous backend queries. ║ * ║ Features: ║ * ║ • Intelligent query routing via autonomous MCP system ║ * ║ • Real-time source health monitoring ║ * ║ • Automatic retries with exponential backoff ║ * ║ • Request caching with smart invalidation ║ * ║ • WebSocket subscriptions for live updates ║ * ║ • Comprehensive telemetry and observability ║ * ║ ║ * ║ Backend: /api/mcp/autonomous/* ║ * ╚══════════════════════════════════════════════════════════════════════════════╝ */ // ═══════════════════════════════════════════════════════════════════════════════ // TYPES & INTERFACES // ═══════════════════════════════════════════════════════════════════════════════ export interface QueryOptions { /** Widget ID for pattern learning */ widgetId?: string; /** Query priority (affects source selection) */ priority?: 'low' | 'normal' | 'high' | 'critical'; /** Data freshness requirement */ freshness?: 'stale' | 'normal' | 'realtime'; /** Cache TTL in milliseconds (0 = no cache) */ cacheTtl?: number; /** Timeout in milliseconds */ timeout?: number; /** Skip cache and force fresh query */ skipCache?: boolean; /** Abort signal for cancellation */ signal?: AbortSignal; } export interface QueryResult { data: T; source: string; latencyMs: number; cached: boolean; confidence: number; timestamp: Date; } export interface SourceInfo { name: string; type: string; healthy: boolean; latencyMs: number; capabilities: string[]; requestCount?: number; } export interface AutonomousStats { totalDecisions: number; averageConfidence: number; averageLatencyMs: number; successRate: number; topSources: { source: string; count: number; avgLatency: number }[]; topWidgets: { widget: string; count: number }[]; queriesLast24h: number; } export interface HealthStatus { overall: 'healthy' | 'degraded' | 'critical'; sources: SourceInfo[]; lastCheck: Date; } interface CacheEntry { data: T; source: string; expiresAt: number; createdAt: number; } type EventCallback = (data: any) => void; // ═══════════════════════════════════════════════════════════════════════════════ // UNIFIED DATA SERVICE // ═══════════════════════════════════════════════════════════════════════════════ class UnifiedDataService { private baseUrl: string; private wsUrl: string; private ws: WebSocket | null = null; private wsReconnectAttempts = 0; private maxWsReconnectAttempts = 10; private wsReconnectDelay = 1000; private cache = new Map>(); private pendingRequests = new Map>(); private eventListeners = new Map>(); private defaultTimeout = 30000; private defaultCacheTtl = 60000; // 1 minute // Telemetry private queryCount = 0; private cacheHits = 0; private errorCount = 0; constructor() { // Auto-detect backend URL const isProduction = import.meta.env.PROD; const backendPort = import.meta.env.VITE_BACKEND_PORT || '3001'; if (isProduction) { this.baseUrl = '/api/mcp/autonomous'; this.wsUrl = `wss://${window.location.host}/mcp/ws`; } else { this.baseUrl = `http://localhost:${backendPort}/api/mcp/autonomous`; this.wsUrl = `ws://localhost:${backendPort}/mcp/ws`; } // Connect WebSocket on init this.connectWebSocket(); // Cleanup stale cache entries periodically setInterval(() => this.cleanupCache(), 30000); } // ═══════════════════════════════════════════════════════════════════════════ // CORE QUERY METHODS // ═══════════════════════════════════════════════════════════════════════════ /** * Query data through the autonomous routing system */ async query( type: string, params: Record = {}, options: QueryOptions = {} ): Promise> { const { widgetId = 'unknown', priority = 'normal', freshness = 'normal', cacheTtl = this.defaultCacheTtl, timeout = this.defaultTimeout, skipCache = false, signal } = options; this.queryCount++; const startTime = Date.now(); // Generate cache key const cacheKey = this.getCacheKey(type, params, widgetId); // Check cache (unless skipped or realtime required) if (!skipCache && freshness !== 'realtime') { const cached = this.getFromCache(cacheKey); if (cached) { this.cacheHits++; return { data: cached.data, source: cached.source + ' (cached)', latencyMs: Date.now() - startTime, cached: true, confidence: 1.0, timestamp: new Date(cached.createdAt) }; } } // Deduplicate in-flight requests const pendingKey = cacheKey; if (this.pendingRequests.has(pendingKey)) { return this.pendingRequests.get(pendingKey)!; } // Make the request const requestPromise = this.executeQuery( type, params, { widgetId, priority, freshness, timeout, signal } ).then(result => { // Cache successful results if (cacheTtl > 0) { this.setCache(cacheKey, result.data, result.source, cacheTtl); } return result; }).finally(() => { this.pendingRequests.delete(pendingKey); }); this.pendingRequests.set(pendingKey, requestPromise); return requestPromise; } /** * Execute the actual query with retries */ private async executeQuery( type: string, params: Record, options: { widgetId: string; priority: string; freshness: string; timeout: number; signal?: AbortSignal; } ): Promise> { const startTime = Date.now(); const maxRetries = options.priority === 'critical' ? 3 : 1; let lastError: Error | null = null; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), options.timeout); // Combine external signal with timeout if (options.signal) { options.signal.addEventListener('abort', () => controller.abort()); } const response = await fetch(`${this.baseUrl}/query`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ type, params, widgetId: options.widgetId, priority: options.priority, freshness: options.freshness }), signal: controller.signal }); clearTimeout(timeoutId); if (!response.ok) { const errorData = await response.json().catch(() => ({})); throw new Error(errorData.error || `HTTP ${response.status}`); } const result = await response.json(); return { data: result.data as T, source: result.meta?.source || 'unknown', latencyMs: Date.now() - startTime, cached: false, confidence: result.meta?.confidence || 1.0, timestamp: new Date() }; } catch (error: any) { lastError = error; this.errorCount++; // Don't retry on abort if (error.name === 'AbortError') { throw new Error('Query cancelled or timed out'); } // Exponential backoff before retry if (attempt < maxRetries) { await this.sleep(Math.pow(2, attempt) * 500); } } } throw lastError || new Error('Query failed after retries'); } // ═══════════════════════════════════════════════════════════════════════════ // CONVENIENCE METHODS // ═══════════════════════════════════════════════════════════════════════════ /** * Fetch a list of items */ async list( domain: string, options: QueryOptions = {} ): Promise { const result = await this.query(`${domain}.list`, {}, options); return result.data; } /** * Fetch a single item by ID */ async get( domain: string, id: string, options: QueryOptions = {} ): Promise { const result = await this.query(`${domain}.get`, { id }, options); return result.data; } /** * Search with a query string */ async search( domain: string, query: string, options: QueryOptions = {} ): Promise { const result = await this.query( `${domain}.search`, { query }, { ...options, priority: 'high' } ); return result.data; } // ═══════════════════════════════════════════════════════════════════════════ // MONITORING & STATUS // ═══════════════════════════════════════════════════════════════════════════ /** * Get available data sources and their health */ async getSources(): Promise { try { const response = await fetch(`${this.baseUrl}/sources`); if (!response.ok) throw new Error('Failed to fetch sources'); const data = await response.json(); return data.sources || []; } catch (error) { console.error('Failed to fetch sources:', error); return []; } } /** * Get autonomous system statistics */ async getStats(): Promise { try { const response = await fetch(`${this.baseUrl}/stats`); if (!response.ok) throw new Error('Failed to fetch stats'); return await response.json(); } catch (error) { console.error('Failed to fetch stats:', error); return { totalDecisions: 0, averageConfidence: 0, averageLatencyMs: 0, successRate: 0, topSources: [], topWidgets: [], queriesLast24h: 0 }; } } /** * Get overall system health */ async getHealth(): Promise { const sources = await this.getSources(); const healthySources = sources.filter(s => s.healthy).length; const totalSources = sources.length; let overall: HealthStatus['overall'] = 'healthy'; if (totalSources > 0) { const ratio = healthySources / totalSources; if (ratio < 0.5) overall = 'critical'; else if (ratio < 0.8) overall = 'degraded'; } return { overall, sources, lastCheck: new Date() }; } /** * Get decision history */ async getDecisions(limit: number = 50): Promise { try { const response = await fetch(`${this.baseUrl}/decisions?limit=${limit}`); if (!response.ok) throw new Error('Failed to fetch decisions'); const data = await response.json(); return data.decisions || []; } catch (error) { console.error('Failed to fetch decisions:', error); return []; } } /** * Get learned patterns for a widget */ async getPatterns(widgetId: string): Promise { try { const response = await fetch(`${this.baseUrl}/patterns?widgetId=${widgetId}`); if (!response.ok) throw new Error('Failed to fetch patterns'); return await response.json(); } catch (error) { console.error('Failed to fetch patterns:', error); return null; } } // ═══════════════════════════════════════════════════════════════════════════ // WEBSOCKET SUBSCRIPTIONS // ═══════════════════════════════════════════════════════════════════════════ /** * Subscribe to real-time events */ subscribe(eventType: string, callback: EventCallback): () => void { if (!this.eventListeners.has(eventType)) { this.eventListeners.set(eventType, new Set()); } this.eventListeners.get(eventType)!.add(callback); // Return unsubscribe function return () => { const listeners = this.eventListeners.get(eventType); if (listeners) { listeners.delete(callback); } }; } /** * Subscribe to source health changes */ onSourceHealthChange(callback: (sources: SourceInfo[]) => void): () => void { return this.subscribe('source:health', callback); } /** * Subscribe to pattern updates */ onPatternRecorded(callback: (pattern: any) => void): () => void { return this.subscribe('pattern:recorded', callback); } /** * Subscribe to failure events */ onFailure(callback: (failure: any) => void): () => void { return this.subscribe('failure:recorded', callback); } /** * Connect WebSocket */ private connectWebSocket(): void { if (this.ws?.readyState === WebSocket.OPEN) return; try { this.ws = new WebSocket(this.wsUrl); this.ws.onopen = () => { console.log('🔌 UnifiedDataService WebSocket connected'); this.wsReconnectAttempts = 0; this.wsReconnectDelay = 1000; // Subscribe to autonomous events this.ws?.send(JSON.stringify({ type: 'subscribe', topics: ['pattern:*', 'failure:*', 'source:*', 'decision:*'] })); }; this.ws.onmessage = (event) => { try { const message = JSON.parse(event.data); this.handleWebSocketMessage(message); } catch (error) { console.warn('Failed to parse WebSocket message:', error); } }; this.ws.onerror = (error) => { console.warn('WebSocket error:', error); }; this.ws.onclose = () => { console.log('WebSocket closed, attempting reconnect...'); this.scheduleReconnect(); }; } catch (error) { console.warn('Failed to connect WebSocket:', error); this.scheduleReconnect(); } } /** * Handle incoming WebSocket message */ private handleWebSocketMessage(message: any): void { const { type, data } = message; // Emit to listeners const listeners = this.eventListeners.get(type); if (listeners) { listeners.forEach(callback => { try { callback(data); } catch (error) { console.warn('Event listener error:', error); } }); } // Also emit to wildcard listeners const wildcardType = type.split(':')[0] + ':*'; const wildcardListeners = this.eventListeners.get(wildcardType); if (wildcardListeners) { wildcardListeners.forEach(callback => { try { callback({ type, data }); } catch (error) { console.warn('Wildcard listener error:', error); } }); } } /** * Schedule WebSocket reconnection */ private scheduleReconnect(): void { if (this.wsReconnectAttempts >= this.maxWsReconnectAttempts) { console.error('Max WebSocket reconnect attempts reached'); return; } this.wsReconnectAttempts++; this.wsReconnectDelay = Math.min(this.wsReconnectDelay * 1.5, 30000); setTimeout(() => { this.connectWebSocket(); }, this.wsReconnectDelay); } // ═══════════════════════════════════════════════════════════════════════════ // CACHE MANAGEMENT // ═══════════════════════════════════════════════════════════════════════════ /** * Generate cache key */ private getCacheKey(type: string, params: any, widgetId: string): string { return `${widgetId}:${type}:${JSON.stringify(params)}`; } /** * Get item from cache */ private getFromCache(key: string): CacheEntry | null { const entry = this.cache.get(key); if (!entry) return null; if (Date.now() > entry.expiresAt) { this.cache.delete(key); return null; } return entry as CacheEntry; } /** * Set item in cache */ private setCache(key: string, data: any, source: string, ttl: number): void { this.cache.set(key, { data, source, expiresAt: Date.now() + ttl, createdAt: Date.now() }); } /** * Invalidate cache for a pattern */ invalidateCache(pattern?: string): void { if (!pattern) { this.cache.clear(); return; } for (const key of this.cache.keys()) { if (key.includes(pattern)) { this.cache.delete(key); } } } /** * Cleanup expired cache entries */ private cleanupCache(): void { const now = Date.now(); for (const [key, entry] of this.cache.entries()) { if (now > entry.expiresAt) { this.cache.delete(key); } } } // ═══════════════════════════════════════════════════════════════════════════ // TELEMETRY // ═══════════════════════════════════════════════════════════════════════════ /** * Get service telemetry */ getTelemetry() { return { queryCount: this.queryCount, cacheHits: this.cacheHits, cacheSize: this.cache.size, errorCount: this.errorCount, cacheHitRate: this.queryCount > 0 ? (this.cacheHits / this.queryCount * 100).toFixed(1) + '%' : '0%', wsConnected: this.ws?.readyState === WebSocket.OPEN }; } // ═══════════════════════════════════════════════════════════════════════════ // UTILITIES // ═══════════════════════════════════════════════════════════════════════════ private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } /** * Disconnect and cleanup */ disconnect(): void { if (this.ws) { this.ws.close(); this.ws = null; } this.cache.clear(); this.eventListeners.clear(); } } // ═══════════════════════════════════════════════════════════════════════════════ // SINGLETON EXPORT // ═══════════════════════════════════════════════════════════════════════════════ export const dataService = new UnifiedDataService(); export const unifiedDataService = dataService; // Also export the class for testing export { UnifiedDataService };