Spaces:
Paused
Paused
File size: 5,729 Bytes
34367da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import { spawn, ChildProcess } from 'child_process';
import path from 'path';
import { EmbeddingProvider } from './EmbeddingService.js';
import { logger } from '../../utils/logger.js';
import { fileURLToPath } from 'url';
// ESM/CJS compatible directory logic
const currentDir = typeof import.meta !== 'undefined' && import.meta.url
? path.dirname(fileURLToPath(import.meta.url))
: process.cwd(); // Fallback for CJS if specific __dirname is tricky, usually path.dirname(__filename) but __filename is also issue.
// Better: assume ESM for this modern app or just use process.cwd() relative path if needed but finding built script is safer.
export class LocalGPUEmbeddingsProvider implements EmbeddingProvider {
name = 'local-gpu';
dimensions = 384; // Default for all-MiniLM-L6-v2
private process: ChildProcess | null = null;
private isReady: boolean = false;
private pendingRequests: Map<number, { resolve: (val: any) => void; reject: (err: any) => void }> = new Map();
private requestCounter: number = 0;
private cleanup: () => void = () => { }; // Dynamic cleanup callback
async initialize(): Promise<void> {
if (this.isReady) return;
return new Promise((resolve, reject) => {
try {
const scriptPath = path.join(currentDir, 'gpu_bridge.py');
logger.info(`🔌 Starting GPU Bridge: python3 ${scriptPath}`);
this.process = spawn('python3', [scriptPath]);
// Handle stdout (Responses)
this.process.stdout?.on('data', (data) => {
const lines = data.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
try {
const response = JSON.parse(line);
// Initial Ready Signal
if (response.status === 'ready') {
logger.info(`🚀 GPU Bridge Ready on device: ${response.device}`);
this.isReady = true;
resolve();
continue;
}
// We are using a simple FIFO queue for now since we write/read sequentially
// For a more robust solution with concurrency, we'd need Request IDs
// But for this implementation, we assume one request at a time via the Service lock
// or we rely on the strictly ordered stdio.
// NOTE: This simplified implementation assumes sequential processing (awaiting each call)
// which matches the current usage in EmbeddingService.
} catch (e) {
logger.error(`GPU Bridge parse error: ${e} [Line: ${line}]`);
}
}
});
// Handle stderr (Logs)
this.process.stderr?.on('data', (data) => {
logger.info(`[GPU-Bridge] ${data.toString().trim()}`);
});
this.process.on('error', (err) => {
logger.error('❌ Failed to start GPU Bridge process:', err);
reject(err);
});
this.process.on('exit', (code) => {
logger.warn(`⚠️ GPU Bridge process exited with code ${code}`);
this.isReady = false;
});
// Timeout backup
setTimeout(() => {
if (!this.isReady) {
// If it takes too long, we might just be downloading the model (can take time)
// So we don't reject immediately, but warn.
logger.warn('⏳ GPU Bridge taking longer than expected (model download?)...');
}
}, 10000);
} catch (error) {
reject(error);
}
});
}
// Helper to execute a single request-response cycle
// Since we are using stdio, we need to be careful about concurrency.
// Ideally, we wrap this in a mutex, but Node.js is single threaded event loop.
// We just need to ensure we don't interleave writes before reads are done.
private async execute<T>(payload: any): Promise<T> {
if (!this.process || !this.isReady) {
await this.initialize();
}
return new Promise((resolve, reject) => {
// Simple one-off listener for the next line of output
// This assumes strictly sequential execution provided by the await in the caller
const listener = (data: Buffer) => {
const lines = data.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
try {
const response = JSON.parse(line);
// Ignore status messages if they appear late
if (response.status) continue;
if (response.error) {
this.cleanup();
reject(new Error(response.error));
} else {
this.cleanup();
resolve(response);
}
return; // Handled
} catch (e) {
// partial line? wait for next chunk
}
}
};
const cleanup = () => {
this.process?.stdout?.off('data', listener);
this.cleanup = () => { }; // prevent double call
};
// Store cleanup on this scope so the listener callback can call it
(this as any).cleanup = cleanup;
this.process!.stdout!.on('data', listener);
this.process!.stdin!.write(JSON.stringify(payload) + '\n');
});
}
async generateEmbedding(text: string): Promise<number[]> {
const response = await this.execute<{ embedding: number[] }>({ text });
return response.embedding;
}
async generateEmbeddings(texts: string[]): Promise<number[][]> {
const response = await this.execute<{ embeddings: number[][] }>({ texts });
return response.embeddings;
}
}
|