import { spawn, ChildProcess } from 'child_process'; import path from 'path'; import { EmbeddingProvider } from './EmbeddingService.js'; import { logger } from '../../utils/logger.js'; import { fileURLToPath } from 'url'; // ESM/CJS compatible directory logic const currentDir = typeof import.meta !== 'undefined' && import.meta.url ? path.dirname(fileURLToPath(import.meta.url)) : process.cwd(); // Fallback for CJS if specific __dirname is tricky, usually path.dirname(__filename) but __filename is also issue. // Better: assume ESM for this modern app or just use process.cwd() relative path if needed but finding built script is safer. export class LocalGPUEmbeddingsProvider implements EmbeddingProvider { name = 'local-gpu'; dimensions = 384; // Default for all-MiniLM-L6-v2 private process: ChildProcess | null = null; private isReady: boolean = false; private pendingRequests: Map void; reject: (err: any) => void }> = new Map(); private requestCounter: number = 0; private cleanup: () => void = () => { }; // Dynamic cleanup callback async initialize(): Promise { if (this.isReady) return; return new Promise((resolve, reject) => { try { const scriptPath = path.join(currentDir, 'gpu_bridge.py'); logger.info(`🔌 Starting GPU Bridge: python3 ${scriptPath}`); this.process = spawn('python3', [scriptPath]); // Handle stdout (Responses) this.process.stdout?.on('data', (data) => { const lines = data.toString().split('\n'); for (const line of lines) { if (!line.trim()) continue; try { const response = JSON.parse(line); // Initial Ready Signal if (response.status === 'ready') { logger.info(`🚀 GPU Bridge Ready on device: ${response.device}`); this.isReady = true; resolve(); continue; } // We are using a simple FIFO queue for now since we write/read sequentially // For a more robust solution with concurrency, we'd need Request IDs // But for this implementation, we assume one request at a time via the Service lock // or we rely on the strictly ordered stdio. // NOTE: This simplified implementation assumes sequential processing (awaiting each call) // which matches the current usage in EmbeddingService. } catch (e) { logger.error(`GPU Bridge parse error: ${e} [Line: ${line}]`); } } }); // Handle stderr (Logs) this.process.stderr?.on('data', (data) => { logger.info(`[GPU-Bridge] ${data.toString().trim()}`); }); this.process.on('error', (err) => { logger.error('❌ Failed to start GPU Bridge process:', err); reject(err); }); this.process.on('exit', (code) => { logger.warn(`⚠️ GPU Bridge process exited with code ${code}`); this.isReady = false; }); // Timeout backup setTimeout(() => { if (!this.isReady) { // If it takes too long, we might just be downloading the model (can take time) // So we don't reject immediately, but warn. logger.warn('⏳ GPU Bridge taking longer than expected (model download?)...'); } }, 10000); } catch (error) { reject(error); } }); } // Helper to execute a single request-response cycle // Since we are using stdio, we need to be careful about concurrency. // Ideally, we wrap this in a mutex, but Node.js is single threaded event loop. // We just need to ensure we don't interleave writes before reads are done. private async execute(payload: any): Promise { if (!this.process || !this.isReady) { await this.initialize(); } return new Promise((resolve, reject) => { // Simple one-off listener for the next line of output // This assumes strictly sequential execution provided by the await in the caller const listener = (data: Buffer) => { const lines = data.toString().split('\n').filter(l => l.trim()); for (const line of lines) { try { const response = JSON.parse(line); // Ignore status messages if they appear late if (response.status) continue; if (response.error) { this.cleanup(); reject(new Error(response.error)); } else { this.cleanup(); resolve(response); } return; // Handled } catch (e) { // partial line? wait for next chunk } } }; const cleanup = () => { this.process?.stdout?.off('data', listener); this.cleanup = () => { }; // prevent double call }; // Store cleanup on this scope so the listener callback can call it (this as any).cleanup = cleanup; this.process!.stdout!.on('data', listener); this.process!.stdin!.write(JSON.stringify(payload) + '\n'); }); } async generateEmbedding(text: string): Promise { const response = await this.execute<{ embedding: number[] }>({ text }); return response.embedding; } async generateEmbeddings(texts: string[]): Promise { const response = await this.execute<{ embeddings: number[][] }>({ texts }); return response.embeddings; } }