Kraft102's picture
Deploy from GitHub Actions 2025-12-15_17-15-49
a9f0bb1 verified
import { spawn, ChildProcess } from 'child_process';
import path from 'path';
import { EmbeddingProvider } from './EmbeddingService.js';
import { logger } from '../../utils/logger.js';
import { fileURLToPath } from 'url';
// ESM/CJS compatible directory logic
const currentDir = typeof import.meta !== 'undefined' && import.meta.url
? path.dirname(fileURLToPath(import.meta.url))
: process.cwd(); // Fallback for CJS if specific __dirname is tricky, usually path.dirname(__filename) but __filename is also issue.
// Better: assume ESM for this modern app or just use process.cwd() relative path if needed but finding built script is safer.
export class LocalGPUEmbeddingsProvider implements EmbeddingProvider {
name = 'local-gpu';
dimensions = 384; // Default for all-MiniLM-L6-v2
private process: ChildProcess | null = null;
private isReady: boolean = false;
private pendingRequests: Map<number, { resolve: (val: any) => void; reject: (err: any) => void }> = new Map();
private requestCounter: number = 0;
private cleanup: () => void = () => { }; // Dynamic cleanup callback
async initialize(): Promise<void> {
if (this.isReady) return;
return new Promise((resolve, reject) => {
try {
const scriptPath = path.join(currentDir, 'gpu_bridge.py');
logger.info(`๐Ÿ”Œ Starting GPU Bridge: python3 ${scriptPath}`);
this.process = spawn('python3', [scriptPath]);
// Handle stdout (Responses)
this.process.stdout?.on('data', (data) => {
const lines = data.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
try {
const response = JSON.parse(line);
// Initial Ready Signal
if (response.status === 'ready') {
logger.info(`๐Ÿš€ GPU Bridge Ready on device: ${response.device}`);
this.isReady = true;
resolve();
continue;
}
// We are using a simple FIFO queue for now since we write/read sequentially
// For a more robust solution with concurrency, we'd need Request IDs
// But for this implementation, we assume one request at a time via the Service lock
// or we rely on the strictly ordered stdio.
// NOTE: This simplified implementation assumes sequential processing (awaiting each call)
// which matches the current usage in EmbeddingService.
} catch (e) {
logger.error(`GPU Bridge parse error: ${e} [Line: ${line}]`);
}
}
});
// Handle stderr (Logs)
this.process.stderr?.on('data', (data) => {
logger.info(`[GPU-Bridge] ${data.toString().trim()}`);
});
this.process.on('error', (err) => {
logger.error('โŒ Failed to start GPU Bridge process:', err);
reject(err);
});
this.process.on('exit', (code) => {
logger.warn(`โš ๏ธ GPU Bridge process exited with code ${code}`);
this.isReady = false;
});
// Timeout backup
setTimeout(() => {
if (!this.isReady) {
// If it takes too long, we might just be downloading the model (can take time)
// So we don't reject immediately, but warn.
logger.warn('โณ GPU Bridge taking longer than expected (model download?)...');
}
}, 10000);
} catch (error) {
reject(error);
}
});
}
// Helper to execute a single request-response cycle
// Since we are using stdio, we need to be careful about concurrency.
// Ideally, we wrap this in a mutex, but Node.js is single threaded event loop.
// We just need to ensure we don't interleave writes before reads are done.
private async execute<T>(payload: any): Promise<T> {
if (!this.process || !this.isReady) {
await this.initialize();
}
return new Promise((resolve, reject) => {
// Simple one-off listener for the next line of output
// This assumes strictly sequential execution provided by the await in the caller
const listener = (data: Buffer) => {
const lines = data.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
try {
const response = JSON.parse(line);
// Ignore status messages if they appear late
if (response.status) continue;
if (response.error) {
this.cleanup();
reject(new Error(response.error));
} else {
this.cleanup();
resolve(response);
}
return; // Handled
} catch (e) {
// partial line? wait for next chunk
}
}
};
const cleanup = () => {
this.process?.stdout?.off('data', listener);
this.cleanup = () => { }; // prevent double call
};
// Store cleanup on this scope so the listener callback can call it
(this as any).cleanup = cleanup;
this.process!.stdout!.on('data', listener);
this.process!.stdin!.write(JSON.stringify(payload) + '\n');
});
}
async generateEmbedding(text: string): Promise<number[]> {
const response = await this.execute<{ embedding: number[] }>({ text });
return response.embedding;
}
async generateEmbeddings(texts: string[]): Promise<number[][]> {
const response = await this.execute<{ embeddings: number[][] }>({ texts });
return response.embeddings;
}
}