File size: 5,578 Bytes
529090e
 
 
 
 
a9f0bb1
 
 
 
 
 
 
 
529090e
 
 
 
 
 
 
 
 
a9f0bb1
529090e
 
 
 
 
 
a9f0bb1
529090e
 
 
 
 
 
 
 
 
a9f0bb1
529090e
 
a9f0bb1
529090e
 
 
 
 
 
 
 
 
 
 
 
a9f0bb1
529090e
 
a9f0bb1
529090e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9f0bb1
 
 
 
 
529090e
 
 
 
 
 
 
 
 
 
 
 
 
 
a9f0bb1
529090e
 
 
a9f0bb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
529090e
a9f0bb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
529090e
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import { spawn, ChildProcess } from 'child_process';
import path from 'path';
import { EmbeddingProvider } from './EmbeddingService.js';
import { logger } from '../../utils/logger.js';

import { fileURLToPath } from 'url';

// ESM/CJS compatible directory logic
const currentDir = typeof import.meta !== 'undefined' && import.meta.url
  ? path.dirname(fileURLToPath(import.meta.url))
  : process.cwd(); // Fallback for CJS if specific __dirname is tricky, usually path.dirname(__filename) but __filename is also issue. 
// Better: assume ESM for this modern app or just use process.cwd() relative path if needed but finding built script is safer.


export class LocalGPUEmbeddingsProvider implements EmbeddingProvider {
  name = 'local-gpu';
  dimensions = 384; // Default for all-MiniLM-L6-v2

  private process: ChildProcess | null = null;
  private isReady: boolean = false;
  private pendingRequests: Map<number, { resolve: (val: any) => void; reject: (err: any) => void }> = new Map();
  private requestCounter: number = 0;
  private cleanup: () => void = () => { }; // Dynamic cleanup callback

  async initialize(): Promise<void> {
    if (this.isReady) return;

    return new Promise((resolve, reject) => {
      try {
        const scriptPath = path.join(currentDir, 'gpu_bridge.py');
        logger.info(`🔌 Starting GPU Bridge: python3 ${scriptPath}`);

        this.process = spawn('python3', [scriptPath]);

        // Handle stdout (Responses)
        this.process.stdout?.on('data', (data) => {
          const lines = data.toString().split('\n');
          for (const line of lines) {
            if (!line.trim()) continue;

            try {
              const response = JSON.parse(line);

              // Initial Ready Signal
              if (response.status === 'ready') {
                logger.info(`🚀 GPU Bridge Ready on device: ${response.device}`);
                this.isReady = true;
                resolve();
                continue;
              }

              // We are using a simple FIFO queue for now since we write/read sequentially
              // For a more robust solution with concurrency, we'd need Request IDs
              // But for this implementation, we assume one request at a time via the Service lock
              // or we rely on the strictly ordered stdio.

              // NOTE: This simplified implementation assumes sequential processing (awaiting each call)
              // which matches the current usage in EmbeddingService.

            } catch (e) {
              logger.error(`GPU Bridge parse error: ${e} [Line: ${line}]`);
            }
          }
        });

        // Handle stderr (Logs)
        this.process.stderr?.on('data', (data) => {
          logger.info(`[GPU-Bridge] ${data.toString().trim()}`);
        });

        this.process.on('error', (err) => {
          logger.error('❌ Failed to start GPU Bridge process:', err);
          reject(err);
        });

        this.process.on('exit', (code) => {
          logger.warn(`⚠️ GPU Bridge process exited with code ${code}`);
          this.isReady = false;
        });

        // Timeout backup
        setTimeout(() => {
          if (!this.isReady) {
            // If it takes too long, we might just be downloading the model (can take time)
            // So we don't reject immediately, but warn.
            logger.warn('⏳ GPU Bridge taking longer than expected (model download?)...');
          }
        }, 10000);

      } catch (error) {
        reject(error);
      }
    });
  }

  // Helper to execute a single request-response cycle
  // Since we are using stdio, we need to be careful about concurrency.
  // Ideally, we wrap this in a mutex, but Node.js is single threaded event loop.
  // We just need to ensure we don't interleave writes before reads are done.
  private async execute<T>(payload: any): Promise<T> {
    if (!this.process || !this.isReady) {
      await this.initialize();
    }

    return new Promise((resolve, reject) => {
      // Simple one-off listener for the next line of output
      // This assumes strictly sequential execution provided by the await in the caller
      const listener = (data: Buffer) => {
        const lines = data.toString().split('\n').filter(l => l.trim());
        for (const line of lines) {
          try {
            const response = JSON.parse(line);
            // Ignore status messages if they appear late
            if (response.status) continue;

            if (response.error) {
              this.cleanup();
              reject(new Error(response.error));
            } else {
              this.cleanup();
              resolve(response);
            }
            return; // Handled
          } catch (e) {
            // partial line? wait for next chunk
          }
        }
      };

      const cleanup = () => {
        this.process?.stdout?.off('data', listener);
        this.cleanup = () => { }; // prevent double call
      };
      // Store cleanup on this scope so the listener callback can call it
      (this as any).cleanup = cleanup;

      this.process!.stdout!.on('data', listener);
      this.process!.stdin!.write(JSON.stringify(payload) + '\n');
    });
  }

  async generateEmbedding(text: string): Promise<number[]> {
    const response = await this.execute<{ embedding: number[] }>({ text });
    return response.embedding;
  }

  async generateEmbeddings(texts: string[]): Promise<number[][]> {
    const response = await this.execute<{ embeddings: number[][] }>({ texts });
    return response.embeddings;
  }
}