File size: 12,335 Bytes
1dbc34b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/**
 * Subprocess management utilities for CLI providers
 */

import { spawn, type ChildProcess } from 'child_process';
import { StringDecoder } from 'string_decoder';

export interface SubprocessOptions {
  command: string;
  args: string[];
  cwd: string;
  env?: Record<string, string>;
  abortController?: AbortController;
  timeout?: number; // Milliseconds of no output before timeout
  /**
   * Data to write to stdin after process spawns.
   * Use this for passing prompts/content that may contain shell metacharacters.
   * Avoids shell interpretation issues when passing data as CLI arguments.
   */
  stdinData?: string;
}

export interface SubprocessResult {
  stdout: string;
  stderr: string;
  exitCode: number | null;
}

/**
 * Spawns a subprocess and streams JSONL output line-by-line.
 *
 * Uses direct 'data' event handling with manual line buffering instead of
 * readline's async iterator. The readline async iterator (for await...of on
 * readline.Interface) has a known issue where events batch up rather than
 * being delivered immediately, because it layers events.on() Promises on top
 * of the readline 'line' event emitter. This causes visible delays (20-40s
 * between batches) in CLI providers like Gemini that produce frequent small
 * events. Direct data event handling delivers parsed events to the consumer
 * as soon as they arrive from the pipe.
 */
export async function* spawnJSONLProcess(options: SubprocessOptions): AsyncGenerator<unknown> {
  const { command, args, cwd, env, abortController, timeout = 30000, stdinData } = options;

  const processEnv = {
    ...process.env,
    ...env,
  };

  // Log command without stdin data (which may be large/sensitive)
  console.log(`[SubprocessManager] Spawning: ${command} ${args.join(' ')}`);
  console.log(`[SubprocessManager] Working directory: ${cwd}`);
  if (stdinData) {
    console.log(`[SubprocessManager] Passing ${stdinData.length} bytes via stdin`);
  }

  // On Windows, .cmd files must be run through shell (cmd.exe)
  const needsShell =
    process.platform === 'win32' &&
    (command.toLowerCase().endsWith('.cmd') || command === 'npx' || command === 'npm');

  const childProcess: ChildProcess = spawn(command, args, {
    cwd,
    env: processEnv,
    // Use 'pipe' for stdin when we need to write data, otherwise 'ignore'
    stdio: [stdinData ? 'pipe' : 'ignore', 'pipe', 'pipe'],
    shell: needsShell,
  });

  // Write stdin data if provided
  if (stdinData && childProcess.stdin) {
    childProcess.stdin.write(stdinData);
    childProcess.stdin.end();
  }

  let stderrOutput = '';
  let lastOutputTime = Date.now();
  let timeoutHandle: NodeJS.Timeout | null = null;
  let processExited = false;

  // Stream consumer state - declared in outer scope so the abort handler can
  // force the consumer to exit immediately without waiting for stdout to close.
  // CLI tools (especially Gemini CLI) may take a long time to respond to SIGTERM,
  // leaving the feature stuck in 'in_progress' state on the UI.
  let streamEnded = false;
  let notifyConsumer: (() => void) | null = null;

  // Track process exit early so we don't block on an already-exited process
  childProcess.on('exit', () => {
    processExited = true;
  });

  // Collect stderr for error reporting
  if (childProcess.stderr) {
    childProcess.stderr.on('data', (data: Buffer) => {
      const text = data.toString();
      stderrOutput += text;
      console.warn(`[SubprocessManager] stderr: ${text}`);
    });
  }

  // Setup timeout detection
  const resetTimeout = () => {
    lastOutputTime = Date.now();
    if (timeoutHandle) {
      clearTimeout(timeoutHandle);
    }
    timeoutHandle = setTimeout(() => {
      const elapsed = Date.now() - lastOutputTime;
      if (elapsed >= timeout) {
        console.error(`[SubprocessManager] Process timeout: no output for ${timeout}ms`);
        childProcess.kill('SIGTERM');
      }
    }, timeout);
  };

  resetTimeout();

  // Setup abort handling with cleanup
  let abortHandler: (() => void) | null = null;
  if (abortController) {
    abortHandler = () => {
      console.log('[SubprocessManager] Abort signal received, killing process');
      if (timeoutHandle) {
        clearTimeout(timeoutHandle);
      }
      childProcess.kill('SIGTERM');

      // Force stream consumer to exit immediately instead of waiting for
      // the process to close stdout. CLI tools (especially Gemini CLI) may
      // take a long time to respond to SIGTERM while mid-API call.
      streamEnded = true;
      if (notifyConsumer) {
        notifyConsumer();
        notifyConsumer = null;
      }

      // Escalate to SIGKILL after 3 seconds if process hasn't exited.
      // SIGKILL cannot be caught or ignored, guaranteeing termination.
      const killTimer = setTimeout(() => {
        if (!processExited) {
          console.log('[SubprocessManager] Escalated to SIGKILL after SIGTERM timeout');
          try {
            childProcess.kill('SIGKILL');
          } catch {
            // Process may have already exited between the check and kill
          }
        }
      }, 3000);

      // Clean up the kill timer when process exits (don't leak timers)
      childProcess.once('exit', () => {
        clearTimeout(killTimer);
      });
    };
    // Check if already aborted, if so call handler immediately
    if (abortController.signal.aborted) {
      abortHandler();
    } else {
      abortController.signal.addEventListener('abort', abortHandler);
    }
  }

  // Helper to clean up abort listener
  const cleanupAbortListener = () => {
    if (abortController && abortHandler) {
      abortController.signal.removeEventListener('abort', abortHandler);
      abortHandler = null;
    }
  };

  // Parse stdout as JSONL using direct 'data' events with manual line buffering.
  // This avoids the readline async iterator which batches events due to its
  // internal events.on() Promise layering, causing significant delivery delays.
  if (childProcess.stdout) {
    // Queue of parsed events ready to be yielded
    const eventQueue: unknown[] = [];
    // Partial line buffer for incomplete lines across data chunks
    let lineBuffer = '';
    // StringDecoder handles multibyte UTF-8 sequences that may be split across chunks
    const decoder = new StringDecoder('utf8');

    childProcess.stdout.on('data', (chunk: Buffer) => {
      resetTimeout();

      lineBuffer += decoder.write(chunk);
      const lines = lineBuffer.split('\n');
      // Last element is either empty (line ended with \n) or a partial line
      lineBuffer = lines.pop() || '';

      for (const line of lines) {
        const trimmed = line.trim();
        if (!trimmed) continue;

        try {
          eventQueue.push(JSON.parse(trimmed));
        } catch (parseError) {
          console.error(`[SubprocessManager] Failed to parse JSONL line: ${trimmed}`, parseError);
          eventQueue.push({
            type: 'error',
            error: `Failed to parse output: ${trimmed}`,
          });
        }
      }

      // Wake up the consumer if it's waiting for events
      if (notifyConsumer && eventQueue.length > 0) {
        notifyConsumer();
        notifyConsumer = null;
      }
    });

    childProcess.stdout.on('end', () => {
      // Flush any remaining bytes from the decoder
      lineBuffer += decoder.end();

      // Process any remaining partial line
      if (lineBuffer.trim()) {
        try {
          eventQueue.push(JSON.parse(lineBuffer.trim()));
        } catch (parseError) {
          console.error(
            `[SubprocessManager] Failed to parse final JSONL line: ${lineBuffer}`,
            parseError
          );
          eventQueue.push({
            type: 'error',
            error: `Failed to parse output: ${lineBuffer}`,
          });
        }
        lineBuffer = '';
      }

      streamEnded = true;
      // Wake up consumer so it can exit the loop
      if (notifyConsumer) {
        notifyConsumer();
        notifyConsumer = null;
      }
    });

    childProcess.stdout.on('error', (error) => {
      console.error('[SubprocessManager] stdout error:', error);
      streamEnded = true;
      if (notifyConsumer) {
        notifyConsumer();
        notifyConsumer = null;
      }
    });

    try {
      // Yield events as they arrive, waiting only when the queue is empty
      while (!streamEnded || eventQueue.length > 0) {
        if (eventQueue.length > 0) {
          yield eventQueue.shift()!;
        } else {
          // Wait for the next data event to push events into the queue
          await new Promise<void>((resolve) => {
            notifyConsumer = resolve;
          });
        }
      }
    } finally {
      if (timeoutHandle) {
        clearTimeout(timeoutHandle);
      }
      cleanupAbortListener();
    }
  } else {
    // No stdout - still need to cleanup abort listener when process exits
    cleanupAbortListener();
  }

  // Wait for process to exit.
  // If the process already exited (e.g., abort handler killed it while we were
  // draining the stream), resolve immediately to avoid blocking forever.
  const exitCode = await new Promise<number | null>((resolve) => {
    if (processExited) {
      resolve(childProcess.exitCode ?? null);
      return;
    }

    childProcess.on('exit', (code) => {
      console.log(`[SubprocessManager] Process exited with code: ${code}`);
      resolve(code);
    });

    childProcess.on('error', (error) => {
      console.error('[SubprocessManager] Process error:', error);
      resolve(null);
    });
  });

  // Handle non-zero exit codes
  if (exitCode !== 0 && exitCode !== null) {
    const errorMessage = stderrOutput || `Process exited with code ${exitCode}`;
    console.error(`[SubprocessManager] Process failed: ${errorMessage}`);
    yield {
      type: 'error',
      error: errorMessage,
    };
  }

  // Process completed successfully
  if (exitCode === 0 && !stderrOutput) {
    console.log('[SubprocessManager] Process completed successfully');
  }
}

/**
 * Spawns a subprocess and collects all output
 */
export async function spawnProcess(options: SubprocessOptions): Promise<SubprocessResult> {
  const { command, args, cwd, env, abortController, stdinData } = options;

  const processEnv = {
    ...process.env,
    ...env,
  };

  return new Promise((resolve, reject) => {
    // On Windows, .cmd files must be run through shell (cmd.exe)
    const needsShell =
      process.platform === 'win32' &&
      (command.toLowerCase().endsWith('.cmd') || command === 'npx' || command === 'npm');

    const childProcess = spawn(command, args, {
      cwd,
      env: processEnv,
      stdio: [stdinData ? 'pipe' : 'ignore', 'pipe', 'pipe'],
      shell: needsShell,
    });

    if (stdinData && childProcess.stdin) {
      childProcess.stdin.write(stdinData);
      childProcess.stdin.end();
    }

    let stdout = '';
    let stderr = '';

    if (childProcess.stdout) {
      childProcess.stdout.on('data', (data: Buffer) => {
        stdout += data.toString();
      });
    }

    if (childProcess.stderr) {
      childProcess.stderr.on('data', (data: Buffer) => {
        stderr += data.toString();
      });
    }

    // Setup abort handling with cleanup
    let abortHandler: (() => void) | null = null;
    const cleanupAbortListener = () => {
      if (abortController && abortHandler) {
        abortController.signal.removeEventListener('abort', abortHandler);
        abortHandler = null;
      }
    };

    if (abortController) {
      abortHandler = () => {
        cleanupAbortListener();
        childProcess.kill('SIGTERM');

        // Escalate to SIGKILL after 3 seconds if process hasn't exited
        const killTimer = setTimeout(() => {
          try {
            childProcess.kill('SIGKILL');
          } catch {
            // Process may have already exited
          }
        }, 3000);
        childProcess.once('exit', () => clearTimeout(killTimer));

        reject(new Error('Process aborted'));
      };
      abortController.signal.addEventListener('abort', abortHandler);
    }

    childProcess.on('exit', (code) => {
      cleanupAbortListener();
      resolve({ stdout, stderr, exitCode: code });
    });

    childProcess.on('error', (error) => {
      cleanupAbortListener();
      reject(error);
    });
  });
}