/** * PluginWorkerManager — spawns and manages out-of-process plugin worker child * processes, routes JSON-RPC 2.0 calls over stdio, and handles lifecycle * management including crash recovery with exponential backoff. * * Each installed plugin gets one dedicated worker process. The host sends * JSON-RPC requests over the child's stdin and reads responses from stdout. * Worker stderr is captured and forwarded to the host logger. * * Process Model (from PLUGIN_SPEC.md §12): * - One worker process per installed plugin * - Failure isolation: plugin crashes do not affect the host * - Graceful shutdown: 10-second drain, then SIGTERM, then SIGKILL * - Automatic restart with exponential backoff on unexpected exits * * @see PLUGIN_SPEC.md §12 — Process Model * @see PLUGIN_SPEC.md §12.5 — Graceful Shutdown Policy * @see PLUGIN_SPEC.md §13 — Host-Worker Protocol */ import { fork, type ChildProcess } from "node:child_process"; import { EventEmitter } from "node:events"; import { createInterface, type Interface as ReadlineInterface } from "node:readline"; import type { PaperclipPluginManifestV1 } from "@paperclipai/shared"; import { JSONRPC_VERSION, JSONRPC_ERROR_CODES, PLUGIN_RPC_ERROR_CODES, createRequest, createErrorResponse, parseMessage, serializeMessage, isJsonRpcResponse, isJsonRpcRequest, isJsonRpcNotification, isJsonRpcSuccessResponse, JsonRpcParseError, JsonRpcCallError, } from "@paperclipai/plugin-sdk"; import type { JsonRpcId, JsonRpcResponse, JsonRpcRequest, JsonRpcNotification, HostToWorkerMethodName, HostToWorkerMethods, WorkerToHostMethodName, WorkerToHostMethods, InitializeParams, } from "@paperclipai/plugin-sdk"; import { logger } from "../middleware/logger.js"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- /** Default timeout for RPC calls in milliseconds. */ const DEFAULT_RPC_TIMEOUT_MS = 30_000; /** Hard upper bound for any RPC timeout (5 minutes). Prevents unbounded waits. */ const MAX_RPC_TIMEOUT_MS = 5 * 60 * 1_000; /** Timeout for the initialize RPC call. */ const INITIALIZE_TIMEOUT_MS = 15_000; /** Timeout for the shutdown RPC call before escalating to SIGTERM. */ const SHUTDOWN_DRAIN_MS = 10_000; /** Time to wait after SIGTERM before sending SIGKILL. */ const SIGTERM_GRACE_MS = 5_000; /** Minimum backoff delay for crash recovery (1 second). */ const MIN_BACKOFF_MS = 1_000; /** Maximum backoff delay for crash recovery (5 minutes). */ const MAX_BACKOFF_MS = 5 * 60 * 1_000; /** Backoff multiplier on each consecutive crash. */ const BACKOFF_MULTIPLIER = 2; /** Maximum number of consecutive crashes before giving up on auto-restart. */ const MAX_CONSECUTIVE_CRASHES = 10; /** Time window in which crashes are considered consecutive (10 minutes). */ const CRASH_WINDOW_MS = 10 * 60 * 1_000; /** Maximum number of stderr characters retained for worker failure context. */ const MAX_STDERR_EXCERPT_CHARS = 8_000; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- /** * Status of a managed worker process. */ export type WorkerStatus = | "stopped" | "starting" | "running" | "stopping" | "crashed" | "backoff"; /** * Worker-to-host method handler. The host registers these to service calls * that the plugin worker makes back to the host (e.g. state.get, events.emit). */ export type WorkerToHostHandler = ( params: WorkerToHostMethods[M][0], ) => Promise; /** * A map of all worker-to-host method handlers provided by the host. */ export type WorkerToHostHandlers = { [M in WorkerToHostMethodName]?: WorkerToHostHandler; }; /** * Events emitted by a PluginWorkerHandle. */ export interface WorkerHandleEvents { /** Worker process started and is ready (initialize succeeded). */ "ready": { pluginId: string }; /** Worker process exited. */ "exit": { pluginId: string; code: number | null; signal: NodeJS.Signals | null }; /** Worker process crashed unexpectedly. */ "crash": { pluginId: string; code: number | null; signal: NodeJS.Signals | null; willRestart: boolean }; /** Worker process errored (e.g. spawn failure). */ "error": { pluginId: string; error: Error }; /** Worker status changed. */ "status": { pluginId: string; status: WorkerStatus; previousStatus: WorkerStatus }; } type WorkerHandleEventName = keyof WorkerHandleEvents; export function appendStderrExcerpt(current: string, chunk: string): string { const next = current ? `${current}\n${chunk}` : chunk; return next.length <= MAX_STDERR_EXCERPT_CHARS ? next : next.slice(-MAX_STDERR_EXCERPT_CHARS); } export function formatWorkerFailureMessage(message: string, stderrExcerpt: string): string { const excerpt = stderrExcerpt.trim(); if (!excerpt) return message; if (message.includes(excerpt)) return message; return `${message}\n\nWorker stderr:\n${excerpt}`; } /** * Options for starting a worker process. */ export interface WorkerStartOptions { /** Absolute path to the plugin worker entrypoint (CJS bundle). */ entrypointPath: string; /** Plugin manifest. */ manifest: PaperclipPluginManifestV1; /** Resolved plugin configuration. */ config: Record; /** Host instance information for the initialize call. */ instanceInfo: { instanceId: string; hostVersion: string; }; /** Host API version. */ apiVersion: number; /** Handlers for worker→host RPC calls. */ hostHandlers: WorkerToHostHandlers; /** Default timeout for RPC calls (ms). Defaults to 30s. */ rpcTimeoutMs?: number; /** Whether to auto-restart on crash. Defaults to true. */ autoRestart?: boolean; /** Node.js execArgv passed to the child process. */ execArgv?: string[]; /** Environment variables passed to the child process. */ env?: Record; /** * Callback for stream notifications from the worker (streams.open/emit/close). * The host wires this to the PluginStreamBus to fan out events to SSE clients. */ onStreamNotification?: (method: string, params: Record) => void; } /** * A pending RPC call waiting for a response from the worker. */ interface PendingRequest { /** The request ID. */ id: JsonRpcId; /** Method name (for logging). */ method: string; /** Resolve the promise with the response. */ resolve: (response: JsonRpcResponse) => void; /** Timeout timer handle. */ timer: ReturnType; /** Timestamp when the request was sent. */ sentAt: number; } // --------------------------------------------------------------------------- // PluginWorkerHandle — manages a single worker process // --------------------------------------------------------------------------- /** * Handle for a single plugin worker process. * * Callers use `start()` to spawn the worker, `call()` to send RPC requests, * and `stop()` to gracefully shut down. The handle manages crash recovery * with exponential backoff automatically when `autoRestart` is enabled. */ export interface PluginWorkerHandle { /** The plugin ID this worker serves. */ readonly pluginId: string; /** Current worker status. */ readonly status: WorkerStatus; /** Start the worker process. Resolves when initialize completes. */ start(): Promise; /** * Stop the worker process gracefully. * * Sends a `shutdown` RPC call, waits up to 10 seconds for the worker to * exit, then escalates to SIGTERM, and finally SIGKILL if needed. */ stop(): Promise; /** * Restart the worker process (stop + start). */ restart(): Promise; /** * Send a typed host→worker RPC call. * * @param method - The RPC method name * @param params - Method parameters * @param timeoutMs - Optional per-call timeout override * @returns The method result * @throws {JsonRpcCallError} if the worker returns an error response * @throws {Error} if the worker is not running or the call times out */ call( method: M, params: HostToWorkerMethods[M][0], timeoutMs?: number, ): Promise; /** * Send a fire-and-forget notification to the worker (no response expected). */ notify(method: string, params: unknown): void; /** Subscribe to worker events. */ on( event: K, listener: (payload: WorkerHandleEvents[K]) => void, ): void; /** Unsubscribe from worker events. */ off( event: K, listener: (payload: WorkerHandleEvents[K]) => void, ): void; /** Optional methods the worker reported during initialization. */ readonly supportedMethods: string[]; /** Get diagnostic info about the worker. */ diagnostics(): WorkerDiagnostics; } /** * Diagnostic information about a worker process. */ export interface WorkerDiagnostics { pluginId: string; status: WorkerStatus; pid: number | null; uptime: number | null; consecutiveCrashes: number; totalCrashes: number; pendingRequests: number; lastCrashAt: number | null; nextRestartAt: number | null; } // --------------------------------------------------------------------------- // PluginWorkerManager — manages all plugin workers // --------------------------------------------------------------------------- /** * The top-level manager that holds all plugin worker handles. * * Provides a registry of workers keyed by plugin ID, with convenience methods * for starting/stopping all workers and routing RPC calls. */ export interface PluginWorkerManager { /** * Register and start a worker for a plugin. * * @returns The worker handle * @throws if a worker is already registered for this plugin */ startWorker(pluginId: string, options: WorkerStartOptions): Promise; /** * Stop and unregister a specific plugin worker. */ stopWorker(pluginId: string): Promise; /** * Get the worker handle for a plugin. */ getWorker(pluginId: string): PluginWorkerHandle | undefined; /** * Check if a worker is registered and running for a plugin. */ isRunning(pluginId: string): boolean; /** * Stop all managed workers. Called during server shutdown. */ stopAll(): Promise; /** * Get diagnostic info for all workers. */ diagnostics(): WorkerDiagnostics[]; /** * Send an RPC call to a specific plugin worker. * * @throws if the worker is not running */ call( pluginId: string, method: M, params: HostToWorkerMethods[M][0], timeoutMs?: number, ): Promise; } // --------------------------------------------------------------------------- // Implementation: createPluginWorkerHandle // --------------------------------------------------------------------------- /** * Create a handle for a single plugin worker process. * * @internal Exported for testing; consumers should use `createPluginWorkerManager`. */ export function createPluginWorkerHandle( pluginId: string, options: WorkerStartOptions, ): PluginWorkerHandle { const log = logger.child({ service: "plugin-worker", pluginId }); const emitter = new EventEmitter(); /** * Higher than default (10) to accommodate multiple subscribers to * crash/ready/exit events during integration tests and runtime monitoring. */ emitter.setMaxListeners(50); // Worker process state let childProcess: ChildProcess | null = null; let readline: ReadlineInterface | null = null; let stderrReadline: ReadlineInterface | null = null; let status: WorkerStatus = "stopped"; let startedAt: number | null = null; let stderrExcerpt = ""; // Pending RPC requests awaiting a response const pendingRequests = new Map(); let nextRequestId = 1; // Optional methods reported by the worker during initialization let supportedMethods: string[] = []; // Crash tracking for exponential backoff let consecutiveCrashes = 0; let totalCrashes = 0; let lastCrashAt: number | null = null; let backoffTimer: ReturnType | null = null; let nextRestartAt: number | null = null; // Track open stream channels so we can emit synthetic close on crash. // Maps channel → companyId. const openStreamChannels = new Map(); // Shutdown coordination let intentionalStop = false; const rpcTimeoutMs = options.rpcTimeoutMs ?? DEFAULT_RPC_TIMEOUT_MS; const autoRestart = options.autoRestart ?? true; // ----------------------------------------------------------------------- // Status management // ----------------------------------------------------------------------- function setStatus(newStatus: WorkerStatus): void { const prev = status; if (prev === newStatus) return; status = newStatus; log.debug({ from: prev, to: newStatus }, "worker status change"); emitter.emit("status", { pluginId, status: newStatus, previousStatus: prev }); } // ----------------------------------------------------------------------- // JSON-RPC message sending // ----------------------------------------------------------------------- function sendMessage(message: unknown): void { if (!childProcess?.stdin?.writable) { throw new Error(`Worker process for plugin "${pluginId}" is not writable`); } const serialized = serializeMessage(message as any); childProcess.stdin.write(serialized); } // ----------------------------------------------------------------------- // Incoming message handling // ----------------------------------------------------------------------- function handleLine(line: string): void { if (!line.trim()) return; let message: unknown; try { message = parseMessage(line); } catch (err) { if (err instanceof JsonRpcParseError) { log.warn({ rawLine: line.slice(0, 200) }, "unparseable message from worker"); } else { log.warn({ err }, "error parsing worker message"); } return; } if (isJsonRpcResponse(message)) { handleResponse(message); } else if (isJsonRpcRequest(message)) { handleWorkerRequest(message as JsonRpcRequest); } else if (isJsonRpcNotification(message)) { handleWorkerNotification(message as JsonRpcNotification); } else { log.warn("unknown message type from worker"); } } /** * Handle a JSON-RPC response from the worker (matching a pending request). */ function handleResponse(response: JsonRpcResponse): void { const id = response.id; if (id === null || id === undefined) { log.warn("received response with null/undefined id"); return; } const pending = pendingRequests.get(id); if (!pending) { log.warn({ id }, "received response for unknown request id"); return; } clearTimeout(pending.timer); pendingRequests.delete(id); pending.resolve(response); } /** * Handle a JSON-RPC request from the worker (worker→host call). */ async function handleWorkerRequest(request: JsonRpcRequest): Promise { const method = request.method as WorkerToHostMethodName; const handler = options.hostHandlers[method] as | ((params: unknown) => Promise) | undefined; if (!handler) { log.warn({ method }, "worker called unregistered host method"); try { sendMessage( createErrorResponse( request.id, JSONRPC_ERROR_CODES.METHOD_NOT_FOUND, `Host does not handle method "${method}"`, ), ); } catch { // Worker may have exited, ignore send error } return; } try { const result = await handler(request.params); sendMessage({ jsonrpc: JSONRPC_VERSION, id: request.id, result: result ?? null, }); } catch (err) { const errorMessage = err instanceof Error ? err.message : String(err); log.error({ method, err: errorMessage }, "host handler error"); try { sendMessage( createErrorResponse( request.id, JSONRPC_ERROR_CODES.INTERNAL_ERROR, errorMessage, ), ); } catch { // Worker may have exited, ignore send error } } } /** * Handle a JSON-RPC notification from the worker (fire-and-forget). * * The `log` notification is the primary case — worker `ctx.logger` calls * arrive here. We append structured plugin context (pluginId, timestamp, * level) so that every log entry is queryable per the spec (§26.1). */ function handleWorkerNotification(notification: JsonRpcNotification): void { if (notification.method === "log") { const params = notification.params as { level?: string; message?: string; meta?: Record; } | null; const level = params?.level ?? "info"; const msg = params?.message ?? ""; const meta = params?.meta; // Build a structured log object that includes the plugin context fields // required by §26.1: pluginId, timestamp, level, message, and metadata. // The child logger already carries `pluginId` in its bindings, but we // add explicit `pluginLogLevel` and `pluginTimestamp` so downstream // consumers (log storage, UI queries) can filter without parsing. const logFields: Record = { ...meta, pluginLogLevel: level, pluginTimestamp: new Date().toISOString(), }; if (level === "error") { log.error(logFields, `[plugin] ${msg}`); } else if (level === "warn") { log.warn(logFields, `[plugin] ${msg}`); } else if (level === "debug") { log.debug(logFields, `[plugin] ${msg}`); } else { log.info(logFields, `[plugin] ${msg}`); } return; } // Stream notifications: forward to the stream bus via callback if ( notification.method === "streams.open" || notification.method === "streams.emit" || notification.method === "streams.close" ) { const params = (notification.params ?? {}) as Record; // Track open channels so we can emit synthetic close on crash if (notification.method === "streams.open") { const ch = String(params.channel ?? ""); const co = String(params.companyId ?? ""); if (ch) openStreamChannels.set(ch, co); } else if (notification.method === "streams.close") { openStreamChannels.delete(String(params.channel ?? "")); } if (options.onStreamNotification) { try { options.onStreamNotification(notification.method, params); } catch (err) { log.error( { method: notification.method, err: err instanceof Error ? err.message : String(err), }, "stream notification handler failed", ); } } return; } log.debug({ method: notification.method }, "received notification from worker"); } // ----------------------------------------------------------------------- // Process lifecycle // ----------------------------------------------------------------------- function spawnProcess(): ChildProcess { // Security: Do NOT spread process.env into the worker. Plugins should only // receive a minimal, controlled environment to prevent leaking host // secrets (like DATABASE_URL, internal API keys, etc.). const workerEnv: Record = { ...options.env, PATH: process.env.PATH ?? "", NODE_PATH: process.env.NODE_PATH ?? "", PAPERCLIP_PLUGIN_ID: pluginId, NODE_ENV: process.env.NODE_ENV ?? "production", TZ: process.env.TZ ?? "UTC", }; const child = fork(options.entrypointPath, [], { stdio: ["pipe", "pipe", "pipe", "ipc"], execArgv: options.execArgv ?? [], env: workerEnv, // Don't let the child keep the parent alive detached: false, }); return child; } function attachStdioHandlers(child: ChildProcess): void { // Read NDJSON from stdout if (child.stdout) { readline = createInterface({ input: child.stdout }); readline.on("line", handleLine); } // Capture stderr for logging if (child.stderr) { stderrReadline = createInterface({ input: child.stderr }); stderrReadline.on("line", (line: string) => { stderrExcerpt = appendStderrExcerpt(stderrExcerpt, line); log.warn({ stream: "stderr" }, `[plugin stderr] ${line}`); }); } // Handle process exit child.on("exit", (code, signal) => { handleProcessExit(code, signal); }); // Handle process errors (e.g. spawn failure) child.on("error", (err) => { log.error({ err: err.message }, "worker process error"); emitter.emit("error", { pluginId, error: err }); if (status === "starting") { setStatus("crashed"); rejectAllPending( new Error(formatWorkerFailureMessage( `Worker process failed to start: ${err.message}`, stderrExcerpt, )), ); } }); } function handleProcessExit( code: number | null, signal: NodeJS.Signals | null, ): void { const wasIntentional = intentionalStop; // Clean up readline interfaces if (readline) { readline.close(); readline = null; } if (stderrReadline) { stderrReadline.close(); stderrReadline = null; } childProcess = null; startedAt = null; // Reject all pending requests rejectAllPending( new Error(formatWorkerFailureMessage( `Worker process exited (code=${code}, signal=${signal})`, stderrExcerpt, )), ); // Emit synthetic close for any orphaned stream channels so SSE clients // are notified instead of hanging indefinitely. if (openStreamChannels.size > 0 && options.onStreamNotification) { for (const [channel, companyId] of openStreamChannels) { try { options.onStreamNotification("streams.close", { channel, companyId }); } catch { // Best-effort cleanup — don't let it interfere with exit handling } } openStreamChannels.clear(); } emitter.emit("exit", { pluginId, code, signal }); if (wasIntentional) { // Graceful stop — status is already "stopping" or will be set to "stopped" setStatus("stopped"); log.info({ code, signal }, "worker process stopped"); return; } // Unexpected exit — crash recovery totalCrashes++; const now = Date.now(); // Reset consecutive crash counter if enough time passed if (lastCrashAt !== null && now - lastCrashAt > CRASH_WINDOW_MS) { consecutiveCrashes = 0; } consecutiveCrashes++; lastCrashAt = now; log.error( { code, signal, consecutiveCrashes, totalCrashes }, "worker process crashed", ); const willRestart = autoRestart && consecutiveCrashes <= MAX_CONSECUTIVE_CRASHES; setStatus("crashed"); emitter.emit("crash", { pluginId, code, signal, willRestart }); if (willRestart) { scheduleRestart(); } else { log.error( { consecutiveCrashes, maxCrashes: MAX_CONSECUTIVE_CRASHES }, "max consecutive crashes reached, not restarting", ); } } function rejectAllPending(error: Error): void { for (const [id, pending] of pendingRequests) { clearTimeout(pending.timer); pending.resolve( createErrorResponse( pending.id, PLUGIN_RPC_ERROR_CODES.WORKER_UNAVAILABLE, error.message, ) as JsonRpcResponse, ); } pendingRequests.clear(); } // ----------------------------------------------------------------------- // Crash recovery with exponential backoff // ----------------------------------------------------------------------- function computeBackoffMs(): number { // Exponential backoff: MIN_BACKOFF * MULTIPLIER^(consecutiveCrashes - 1) const delay = MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, consecutiveCrashes - 1); // Add jitter: ±25% const jitter = delay * 0.25 * (Math.random() * 2 - 1); return Math.min(Math.round(delay + jitter), MAX_BACKOFF_MS); } function scheduleRestart(): void { const delay = computeBackoffMs(); nextRestartAt = Date.now() + delay; setStatus("backoff"); log.info( { delayMs: delay, consecutiveCrashes }, "scheduling restart with backoff", ); backoffTimer = setTimeout(async () => { backoffTimer = null; nextRestartAt = null; try { await startInternal(); } catch (err) { log.error( { err: err instanceof Error ? err.message : String(err) }, "restart after backoff failed", ); } }, delay); } function cancelPendingRestart(): void { if (backoffTimer !== null) { clearTimeout(backoffTimer); backoffTimer = null; nextRestartAt = null; } } // ----------------------------------------------------------------------- // Start / Stop // ----------------------------------------------------------------------- async function startInternal(): Promise { if (status === "running" || status === "starting") { throw new Error(`Worker for plugin "${pluginId}" is already ${status}`); } intentionalStop = false; setStatus("starting"); stderrExcerpt = ""; const child = spawnProcess(); childProcess = child; attachStdioHandlers(child); startedAt = Date.now(); // Send the initialize RPC call const initParams: InitializeParams = { manifest: options.manifest, config: options.config, instanceInfo: options.instanceInfo, apiVersion: options.apiVersion, }; try { const result = await callInternal( "initialize", initParams, INITIALIZE_TIMEOUT_MS, ) as { ok?: boolean; supportedMethods?: string[] } | undefined; if (!result || !result.ok) { throw new Error("Worker initialize returned ok=false"); } supportedMethods = result.supportedMethods ?? []; } catch (err) { // Initialize failed — kill the process and propagate const msg = err instanceof Error ? err.message : String(err); log.error({ err: msg }, "worker initialize failed"); await killProcess(); setStatus("crashed"); throw new Error(`Worker initialize failed for "${pluginId}": ${msg}`); } // Reset crash counter on successful start consecutiveCrashes = 0; setStatus("running"); emitter.emit("ready", { pluginId }); log.info({ pid: child.pid }, "worker process started and initialized"); } async function stopInternal(): Promise { cancelPendingRestart(); if (status === "stopped" || status === "stopping") { return; } intentionalStop = true; setStatus("stopping"); if (!childProcess) { setStatus("stopped"); return; } // Step 1: Send shutdown RPC and wait for the worker to exit gracefully. // We race the shutdown call against a timeout. The worker should process // the shutdown and exit on its own within the drain period. try { await Promise.race([ callInternal("shutdown", {} as Record, SHUTDOWN_DRAIN_MS), waitForExit(SHUTDOWN_DRAIN_MS), ]); } catch { // Shutdown call failed or timed out — proceed to kill log.warn("shutdown RPC failed or timed out, escalating to SIGTERM"); } // Give the process a brief moment to exit after the shutdown response if (childProcess) { await waitForExit(500); } // Check if process already exited if (!childProcess) { setStatus("stopped"); return; } // Step 2: Send SIGTERM and wait log.info("worker did not exit after shutdown RPC, sending SIGTERM"); await killWithSignal("SIGTERM", SIGTERM_GRACE_MS); if (!childProcess) { setStatus("stopped"); return; } // Step 3: Forcefully kill with SIGKILL log.warn("worker did not exit after SIGTERM, sending SIGKILL"); await killWithSignal("SIGKILL", 2_000); if (childProcess) { log.error("worker process still alive after SIGKILL — this should not happen"); } setStatus("stopped"); } /** * Wait for the child process to exit, up to `timeoutMs`. * Resolves immediately if the process is already gone. */ function waitForExit(timeoutMs: number): Promise { return new Promise((resolve) => { if (!childProcess) { resolve(); return; } let settled = false; const timer = setTimeout(() => { if (settled) return; settled = true; resolve(); }, timeoutMs); childProcess.once("exit", () => { if (settled) return; settled = true; clearTimeout(timer); resolve(); }); }); } function killWithSignal( signal: NodeJS.Signals, waitMs: number, ): Promise { return new Promise((resolve) => { if (!childProcess) { resolve(); return; } const timer = setTimeout(() => { resolve(); }, waitMs); childProcess.once("exit", () => { clearTimeout(timer); resolve(); }); try { childProcess.kill(signal); } catch { clearTimeout(timer); resolve(); } }); } async function killProcess(): Promise { if (!childProcess) return; intentionalStop = true; try { childProcess.kill("SIGKILL"); } catch { // Process may already be dead } // Wait briefly for exit event await new Promise((resolve) => { if (!childProcess) { resolve(); return; } const timer = setTimeout(() => { resolve(); }, 1_000); childProcess.once("exit", () => { clearTimeout(timer); resolve(); }); }); } // ----------------------------------------------------------------------- // RPC call implementation // ----------------------------------------------------------------------- function callInternal( method: M, params: HostToWorkerMethods[M][0], timeoutMs?: number, ): Promise { return new Promise((resolve, reject) => { if (!childProcess?.stdin?.writable) { reject( new Error( `Cannot call "${method}" — worker for "${pluginId}" is not running`, ), ); return; } const id = nextRequestId++; const timeout = Math.min(timeoutMs ?? rpcTimeoutMs, MAX_RPC_TIMEOUT_MS); // Guard against double-settlement. When a process exits all pending // requests are rejected via rejectAllPending(), but the timeout timer // may still be running. Without this guard the timer's reject fires on // an already-settled promise, producing an unhandled rejection. let settled = false; const settle = (fn: (value: T) => void, value: T): void => { if (settled) return; settled = true; clearTimeout(timer); pendingRequests.delete(id); fn(value); }; const timer = setTimeout(() => { settle( reject, new JsonRpcCallError({ code: PLUGIN_RPC_ERROR_CODES.TIMEOUT, message: `RPC call "${method}" timed out after ${timeout}ms`, }), ); }, timeout); const pending: PendingRequest = { id, method, resolve: (response: JsonRpcResponse) => { if (isJsonRpcSuccessResponse(response)) { settle(resolve, response.result as HostToWorkerMethods[M][1]); } else if ("error" in response && response.error) { settle(reject, new JsonRpcCallError(response.error)); } else { settle(reject, new Error(`Unexpected response format for "${method}"`)); } }, timer, sentAt: Date.now(), }; pendingRequests.set(id, pending); try { const request = createRequest(method, params, id); sendMessage(request); } catch (err) { clearTimeout(timer); pendingRequests.delete(id); reject( new Error( `Failed to send "${method}" to worker: ${ err instanceof Error ? err.message : String(err) }`, ), ); } }); } // ----------------------------------------------------------------------- // Public API // ----------------------------------------------------------------------- const handle: PluginWorkerHandle = { get pluginId() { return pluginId; }, get status() { return status; }, get supportedMethods() { return supportedMethods; }, async start() { await startInternal(); }, async stop() { await stopInternal(); }, async restart() { await stopInternal(); await startInternal(); }, call( method: M, params: HostToWorkerMethods[M][0], timeoutMs?: number, ): Promise { if (status !== "running" && status !== "starting") { return Promise.reject( new Error( `Cannot call "${method}" — worker for "${pluginId}" is ${status}`, ), ); } return callInternal(method, params, timeoutMs); }, notify(method: string, params: unknown) { if (status !== "running") return; try { sendMessage({ jsonrpc: JSONRPC_VERSION, method, params, }); } catch { log.warn({ method }, "failed to send notification to worker"); } }, on( event: K, listener: (payload: WorkerHandleEvents[K]) => void, ) { emitter.on(event, listener); }, off( event: K, listener: (payload: WorkerHandleEvents[K]) => void, ) { emitter.off(event, listener); }, diagnostics(): WorkerDiagnostics { return { pluginId, status, pid: childProcess?.pid ?? null, uptime: startedAt !== null && status === "running" ? Date.now() - startedAt : null, consecutiveCrashes, totalCrashes, pendingRequests: pendingRequests.size, lastCrashAt, nextRestartAt, }; }, }; return handle; } // --------------------------------------------------------------------------- // Implementation: createPluginWorkerManager // --------------------------------------------------------------------------- /** * Options for creating a PluginWorkerManager. */ export interface PluginWorkerManagerOptions { /** * Optional callback invoked when a worker emits a lifecycle event * (crash, restart). Used by the server to publish global live events. */ onWorkerEvent?: (event: { type: "plugin.worker.crashed" | "plugin.worker.restarted"; pluginId: string; code?: number | null; signal?: string | null; willRestart?: boolean; }) => void; } /** * Create a new PluginWorkerManager. * * The manager holds all plugin worker handles and provides a unified API for * starting, stopping, and communicating with plugin workers. * * @example * ```ts * const manager = createPluginWorkerManager(); * * const handle = await manager.startWorker("acme.linear", { * entrypointPath: "/path/to/worker.cjs", * manifest, * config: resolvedConfig, * instanceInfo: { instanceId: "inst-1", hostVersion: "1.0.0" }, * apiVersion: 1, * hostHandlers: { "config.get": async () => resolvedConfig, ... }, * }); * * // Send RPC call to the worker * const health = await manager.call("acme.linear", "health", {}); * * // Shutdown all workers on server exit * await manager.stopAll(); * ``` */ export function createPluginWorkerManager( managerOptions?: PluginWorkerManagerOptions, ): PluginWorkerManager { const log = logger.child({ service: "plugin-worker-manager" }); const workers = new Map(); /** Per-plugin startup locks to prevent concurrent spawn races. */ const startupLocks = new Map>(); return { async startWorker( pluginId: string, options: WorkerStartOptions, ): Promise { // Mutex: if a start is already in-flight for this plugin, wait for it const inFlight = startupLocks.get(pluginId); if (inFlight) { log.warn({ pluginId }, "concurrent startWorker call — waiting for in-flight start"); return inFlight; } const existing = workers.get(pluginId); if (existing && existing.status !== "stopped") { throw new Error( `Worker already registered for plugin "${pluginId}" (status: ${existing.status})`, ); } const handle = createPluginWorkerHandle(pluginId, options); workers.set(pluginId, handle); // Subscribe to crash/ready events for live event forwarding if (managerOptions?.onWorkerEvent) { const notify = managerOptions.onWorkerEvent; handle.on("crash", (payload) => { notify({ type: "plugin.worker.crashed", pluginId: payload.pluginId, code: payload.code, signal: payload.signal, willRestart: payload.willRestart, }); }); handle.on("ready", (payload) => { // Only emit restarted if this was a crash recovery (totalCrashes > 0) const diag = handle.diagnostics(); if (diag.totalCrashes > 0) { notify({ type: "plugin.worker.restarted", pluginId: payload.pluginId, }); } }); } log.info({ pluginId }, "starting plugin worker"); // Set the lock before awaiting start() to prevent concurrent spawns const startPromise = handle.start().then(() => handle).finally(() => { startupLocks.delete(pluginId); }); startupLocks.set(pluginId, startPromise); return startPromise; }, async stopWorker(pluginId: string): Promise { const handle = workers.get(pluginId); if (!handle) { log.warn({ pluginId }, "no worker registered for plugin, nothing to stop"); return; } log.info({ pluginId }, "stopping plugin worker"); await handle.stop(); workers.delete(pluginId); }, getWorker(pluginId: string): PluginWorkerHandle | undefined { return workers.get(pluginId); }, isRunning(pluginId: string): boolean { const handle = workers.get(pluginId); return handle?.status === "running"; }, async stopAll(): Promise { log.info({ count: workers.size }, "stopping all plugin workers"); const promises = Array.from(workers.values()).map(async (handle) => { try { await handle.stop(); } catch (err) { log.error( { pluginId: handle.pluginId, err: err instanceof Error ? err.message : String(err), }, "error stopping worker during shutdown", ); } }); await Promise.all(promises); workers.clear(); }, diagnostics(): WorkerDiagnostics[] { return Array.from(workers.values()).map((h) => h.diagnostics()); }, call( pluginId: string, method: M, params: HostToWorkerMethods[M][0], timeoutMs?: number, ): Promise { const handle = workers.get(pluginId); if (!handle) { return Promise.reject( new Error(`No worker registered for plugin "${pluginId}"`), ); } return handle.call(method, params, timeoutMs); }, }; }