Spaces:
Paused
Paused
| import { spawn, ChildProcess } from 'child_process'; | |
| import path from 'path'; | |
| import { EmbeddingProvider } from './EmbeddingService.js'; | |
| import { logger } from '../../utils/logger.js'; | |
| import { fileURLToPath } from 'url'; | |
| // ESM/CJS compatible directory logic | |
| const currentDir = typeof import.meta !== 'undefined' && import.meta.url | |
| ? path.dirname(fileURLToPath(import.meta.url)) | |
| : process.cwd(); // Fallback for CJS if specific __dirname is tricky, usually path.dirname(__filename) but __filename is also issue. | |
| // Better: assume ESM for this modern app or just use process.cwd() relative path if needed but finding built script is safer. | |
| export class LocalGPUEmbeddingsProvider implements EmbeddingProvider { | |
| name = 'local-gpu'; | |
| dimensions = 384; // Default for all-MiniLM-L6-v2 | |
| private process: ChildProcess | null = null; | |
| private isReady: boolean = false; | |
| private pendingRequests: Map<number, { resolve: (val: any) => void; reject: (err: any) => void }> = new Map(); | |
| private requestCounter: number = 0; | |
| private cleanup: () => void = () => { }; // Dynamic cleanup callback | |
| async initialize(): Promise<void> { | |
| if (this.isReady) return; | |
| return new Promise((resolve, reject) => { | |
| try { | |
| const scriptPath = path.join(currentDir, 'gpu_bridge.py'); | |
| logger.info(`๐ Starting GPU Bridge: python3 ${scriptPath}`); | |
| this.process = spawn('python3', [scriptPath]); | |
| // Handle stdout (Responses) | |
| this.process.stdout?.on('data', (data) => { | |
| const lines = data.toString().split('\n'); | |
| for (const line of lines) { | |
| if (!line.trim()) continue; | |
| try { | |
| const response = JSON.parse(line); | |
| // Initial Ready Signal | |
| if (response.status === 'ready') { | |
| logger.info(`๐ GPU Bridge Ready on device: ${response.device}`); | |
| this.isReady = true; | |
| resolve(); | |
| continue; | |
| } | |
| // We are using a simple FIFO queue for now since we write/read sequentially | |
| // For a more robust solution with concurrency, we'd need Request IDs | |
| // But for this implementation, we assume one request at a time via the Service lock | |
| // or we rely on the strictly ordered stdio. | |
| // NOTE: This simplified implementation assumes sequential processing (awaiting each call) | |
| // which matches the current usage in EmbeddingService. | |
| } catch (e) { | |
| logger.error(`GPU Bridge parse error: ${e} [Line: ${line}]`); | |
| } | |
| } | |
| }); | |
| // Handle stderr (Logs) | |
| this.process.stderr?.on('data', (data) => { | |
| logger.info(`[GPU-Bridge] ${data.toString().trim()}`); | |
| }); | |
| this.process.on('error', (err) => { | |
| logger.error('โ Failed to start GPU Bridge process:', err); | |
| reject(err); | |
| }); | |
| this.process.on('exit', (code) => { | |
| logger.warn(`โ ๏ธ GPU Bridge process exited with code ${code}`); | |
| this.isReady = false; | |
| }); | |
| // Timeout backup | |
| setTimeout(() => { | |
| if (!this.isReady) { | |
| // If it takes too long, we might just be downloading the model (can take time) | |
| // So we don't reject immediately, but warn. | |
| logger.warn('โณ GPU Bridge taking longer than expected (model download?)...'); | |
| } | |
| }, 10000); | |
| } catch (error) { | |
| reject(error); | |
| } | |
| }); | |
| } | |
| // Helper to execute a single request-response cycle | |
| // Since we are using stdio, we need to be careful about concurrency. | |
| // Ideally, we wrap this in a mutex, but Node.js is single threaded event loop. | |
| // We just need to ensure we don't interleave writes before reads are done. | |
| private async execute<T>(payload: any): Promise<T> { | |
| if (!this.process || !this.isReady) { | |
| await this.initialize(); | |
| } | |
| return new Promise((resolve, reject) => { | |
| // Simple one-off listener for the next line of output | |
| // This assumes strictly sequential execution provided by the await in the caller | |
| const listener = (data: Buffer) => { | |
| const lines = data.toString().split('\n').filter(l => l.trim()); | |
| for (const line of lines) { | |
| try { | |
| const response = JSON.parse(line); | |
| // Ignore status messages if they appear late | |
| if (response.status) continue; | |
| if (response.error) { | |
| this.cleanup(); | |
| reject(new Error(response.error)); | |
| } else { | |
| this.cleanup(); | |
| resolve(response); | |
| } | |
| return; // Handled | |
| } catch (e) { | |
| // partial line? wait for next chunk | |
| } | |
| } | |
| }; | |
| const cleanup = () => { | |
| this.process?.stdout?.off('data', listener); | |
| this.cleanup = () => { }; // prevent double call | |
| }; | |
| // Store cleanup on this scope so the listener callback can call it | |
| (this as any).cleanup = cleanup; | |
| this.process!.stdout!.on('data', listener); | |
| this.process!.stdin!.write(JSON.stringify(payload) + '\n'); | |
| }); | |
| } | |
| async generateEmbedding(text: string): Promise<number[]> { | |
| const response = await this.execute<{ embedding: number[] }>({ text }); | |
| return response.embedding; | |
| } | |
| async generateEmbeddings(texts: string[]): Promise<number[][]> { | |
| const response = await this.execute<{ embeddings: number[][] }>({ texts }); | |
| return response.embeddings; | |
| } | |
| } | |