Spaces:
Build error
Build error
| /** | |
| * PluginWorkerManager — spawns and manages out-of-process plugin worker child | |
| * processes, routes JSON-RPC 2.0 calls over stdio, and handles lifecycle | |
| * management including crash recovery with exponential backoff. | |
| * | |
| * Each installed plugin gets one dedicated worker process. The host sends | |
| * JSON-RPC requests over the child's stdin and reads responses from stdout. | |
| * Worker stderr is captured and forwarded to the host logger. | |
| * | |
| * Process Model (from PLUGIN_SPEC.md §12): | |
| * - One worker process per installed plugin | |
| * - Failure isolation: plugin crashes do not affect the host | |
| * - Graceful shutdown: 10-second drain, then SIGTERM, then SIGKILL | |
| * - Automatic restart with exponential backoff on unexpected exits | |
| * | |
| * @see PLUGIN_SPEC.md §12 — Process Model | |
| * @see PLUGIN_SPEC.md §12.5 — Graceful Shutdown Policy | |
| * @see PLUGIN_SPEC.md §13 — Host-Worker Protocol | |
| */ | |
| import { fork, type ChildProcess } from "node:child_process"; | |
| import { EventEmitter } from "node:events"; | |
| import { createInterface, type Interface as ReadlineInterface } from "node:readline"; | |
| import type { PaperclipPluginManifestV1 } from "@paperclipai/shared"; | |
| import { | |
| JSONRPC_VERSION, | |
| JSONRPC_ERROR_CODES, | |
| PLUGIN_RPC_ERROR_CODES, | |
| createRequest, | |
| createErrorResponse, | |
| parseMessage, | |
| serializeMessage, | |
| isJsonRpcResponse, | |
| isJsonRpcRequest, | |
| isJsonRpcNotification, | |
| isJsonRpcSuccessResponse, | |
| JsonRpcParseError, | |
| JsonRpcCallError, | |
| } from "@paperclipai/plugin-sdk"; | |
| import type { | |
| JsonRpcId, | |
| JsonRpcResponse, | |
| JsonRpcRequest, | |
| JsonRpcNotification, | |
| HostToWorkerMethodName, | |
| HostToWorkerMethods, | |
| WorkerToHostMethodName, | |
| WorkerToHostMethods, | |
| InitializeParams, | |
| } from "@paperclipai/plugin-sdk"; | |
| import { logger } from "../middleware/logger.js"; | |
| // --------------------------------------------------------------------------- | |
| // Constants | |
| // --------------------------------------------------------------------------- | |
| /** Default timeout for RPC calls in milliseconds. */ | |
| const DEFAULT_RPC_TIMEOUT_MS = 30_000; | |
| /** Hard upper bound for any RPC timeout (5 minutes). Prevents unbounded waits. */ | |
| const MAX_RPC_TIMEOUT_MS = 5 * 60 * 1_000; | |
| /** Timeout for the initialize RPC call. */ | |
| const INITIALIZE_TIMEOUT_MS = 15_000; | |
| /** Timeout for the shutdown RPC call before escalating to SIGTERM. */ | |
| const SHUTDOWN_DRAIN_MS = 10_000; | |
| /** Time to wait after SIGTERM before sending SIGKILL. */ | |
| const SIGTERM_GRACE_MS = 5_000; | |
| /** Minimum backoff delay for crash recovery (1 second). */ | |
| const MIN_BACKOFF_MS = 1_000; | |
| /** Maximum backoff delay for crash recovery (5 minutes). */ | |
| const MAX_BACKOFF_MS = 5 * 60 * 1_000; | |
| /** Backoff multiplier on each consecutive crash. */ | |
| const BACKOFF_MULTIPLIER = 2; | |
| /** Maximum number of consecutive crashes before giving up on auto-restart. */ | |
| const MAX_CONSECUTIVE_CRASHES = 10; | |
| /** Time window in which crashes are considered consecutive (10 minutes). */ | |
| const CRASH_WINDOW_MS = 10 * 60 * 1_000; | |
| /** Maximum number of stderr characters retained for worker failure context. */ | |
| const MAX_STDERR_EXCERPT_CHARS = 8_000; | |
| // --------------------------------------------------------------------------- | |
| // Types | |
| // --------------------------------------------------------------------------- | |
| /** | |
| * Status of a managed worker process. | |
| */ | |
| export type WorkerStatus = | |
| | "stopped" | |
| | "starting" | |
| | "running" | |
| | "stopping" | |
| | "crashed" | |
| | "backoff"; | |
| /** | |
| * Worker-to-host method handler. The host registers these to service calls | |
| * that the plugin worker makes back to the host (e.g. state.get, events.emit). | |
| */ | |
| export type WorkerToHostHandler<M extends WorkerToHostMethodName> = ( | |
| params: WorkerToHostMethods[M][0], | |
| ) => Promise<WorkerToHostMethods[M][1]>; | |
| /** | |
| * A map of all worker-to-host method handlers provided by the host. | |
| */ | |
| export type WorkerToHostHandlers = { | |
| [M in WorkerToHostMethodName]?: WorkerToHostHandler<M>; | |
| }; | |
| /** | |
| * Events emitted by a PluginWorkerHandle. | |
| */ | |
| export interface WorkerHandleEvents { | |
| /** Worker process started and is ready (initialize succeeded). */ | |
| "ready": { pluginId: string }; | |
| /** Worker process exited. */ | |
| "exit": { pluginId: string; code: number | null; signal: NodeJS.Signals | null }; | |
| /** Worker process crashed unexpectedly. */ | |
| "crash": { pluginId: string; code: number | null; signal: NodeJS.Signals | null; willRestart: boolean }; | |
| /** Worker process errored (e.g. spawn failure). */ | |
| "error": { pluginId: string; error: Error }; | |
| /** Worker status changed. */ | |
| "status": { pluginId: string; status: WorkerStatus; previousStatus: WorkerStatus }; | |
| } | |
| type WorkerHandleEventName = keyof WorkerHandleEvents; | |
| export function appendStderrExcerpt(current: string, chunk: string): string { | |
| const next = current ? `${current}\n${chunk}` : chunk; | |
| return next.length <= MAX_STDERR_EXCERPT_CHARS | |
| ? next | |
| : next.slice(-MAX_STDERR_EXCERPT_CHARS); | |
| } | |
| export function formatWorkerFailureMessage(message: string, stderrExcerpt: string): string { | |
| const excerpt = stderrExcerpt.trim(); | |
| if (!excerpt) return message; | |
| if (message.includes(excerpt)) return message; | |
| return `${message}\n\nWorker stderr:\n${excerpt}`; | |
| } | |
| /** | |
| * Options for starting a worker process. | |
| */ | |
| export interface WorkerStartOptions { | |
| /** Absolute path to the plugin worker entrypoint (CJS bundle). */ | |
| entrypointPath: string; | |
| /** Plugin manifest. */ | |
| manifest: PaperclipPluginManifestV1; | |
| /** Resolved plugin configuration. */ | |
| config: Record<string, unknown>; | |
| /** Host instance information for the initialize call. */ | |
| instanceInfo: { | |
| instanceId: string; | |
| hostVersion: string; | |
| }; | |
| /** Host API version. */ | |
| apiVersion: number; | |
| /** Handlers for worker→host RPC calls. */ | |
| hostHandlers: WorkerToHostHandlers; | |
| /** Default timeout for RPC calls (ms). Defaults to 30s. */ | |
| rpcTimeoutMs?: number; | |
| /** Whether to auto-restart on crash. Defaults to true. */ | |
| autoRestart?: boolean; | |
| /** Node.js execArgv passed to the child process. */ | |
| execArgv?: string[]; | |
| /** Environment variables passed to the child process. */ | |
| env?: Record<string, string>; | |
| /** | |
| * Callback for stream notifications from the worker (streams.open/emit/close). | |
| * The host wires this to the PluginStreamBus to fan out events to SSE clients. | |
| */ | |
| onStreamNotification?: (method: string, params: Record<string, unknown>) => void; | |
| } | |
| /** | |
| * A pending RPC call waiting for a response from the worker. | |
| */ | |
| interface PendingRequest { | |
| /** The request ID. */ | |
| id: JsonRpcId; | |
| /** Method name (for logging). */ | |
| method: string; | |
| /** Resolve the promise with the response. */ | |
| resolve: (response: JsonRpcResponse) => void; | |
| /** Timeout timer handle. */ | |
| timer: ReturnType<typeof setTimeout>; | |
| /** Timestamp when the request was sent. */ | |
| sentAt: number; | |
| } | |
| // --------------------------------------------------------------------------- | |
| // PluginWorkerHandle — manages a single worker process | |
| // --------------------------------------------------------------------------- | |
| /** | |
| * Handle for a single plugin worker process. | |
| * | |
| * Callers use `start()` to spawn the worker, `call()` to send RPC requests, | |
| * and `stop()` to gracefully shut down. The handle manages crash recovery | |
| * with exponential backoff automatically when `autoRestart` is enabled. | |
| */ | |
| export interface PluginWorkerHandle { | |
| /** The plugin ID this worker serves. */ | |
| readonly pluginId: string; | |
| /** Current worker status. */ | |
| readonly status: WorkerStatus; | |
| /** Start the worker process. Resolves when initialize completes. */ | |
| start(): Promise<void>; | |
| /** | |
| * Stop the worker process gracefully. | |
| * | |
| * Sends a `shutdown` RPC call, waits up to 10 seconds for the worker to | |
| * exit, then escalates to SIGTERM, and finally SIGKILL if needed. | |
| */ | |
| stop(): Promise<void>; | |
| /** | |
| * Restart the worker process (stop + start). | |
| */ | |
| restart(): Promise<void>; | |
| /** | |
| * Send a typed host→worker RPC call. | |
| * | |
| * @param method - The RPC method name | |
| * @param params - Method parameters | |
| * @param timeoutMs - Optional per-call timeout override | |
| * @returns The method result | |
| * @throws {JsonRpcCallError} if the worker returns an error response | |
| * @throws {Error} if the worker is not running or the call times out | |
| */ | |
| call<M extends HostToWorkerMethodName>( | |
| method: M, | |
| params: HostToWorkerMethods[M][0], | |
| timeoutMs?: number, | |
| ): Promise<HostToWorkerMethods[M][1]>; | |
| /** | |
| * Send a fire-and-forget notification to the worker (no response expected). | |
| */ | |
| notify(method: string, params: unknown): void; | |
| /** Subscribe to worker events. */ | |
| on<K extends WorkerHandleEventName>( | |
| event: K, | |
| listener: (payload: WorkerHandleEvents[K]) => void, | |
| ): void; | |
| /** Unsubscribe from worker events. */ | |
| off<K extends WorkerHandleEventName>( | |
| event: K, | |
| listener: (payload: WorkerHandleEvents[K]) => void, | |
| ): void; | |
| /** Optional methods the worker reported during initialization. */ | |
| readonly supportedMethods: string[]; | |
| /** Get diagnostic info about the worker. */ | |
| diagnostics(): WorkerDiagnostics; | |
| } | |
| /** | |
| * Diagnostic information about a worker process. | |
| */ | |
| export interface WorkerDiagnostics { | |
| pluginId: string; | |
| status: WorkerStatus; | |
| pid: number | null; | |
| uptime: number | null; | |
| consecutiveCrashes: number; | |
| totalCrashes: number; | |
| pendingRequests: number; | |
| lastCrashAt: number | null; | |
| nextRestartAt: number | null; | |
| } | |
| // --------------------------------------------------------------------------- | |
| // PluginWorkerManager — manages all plugin workers | |
| // --------------------------------------------------------------------------- | |
| /** | |
| * The top-level manager that holds all plugin worker handles. | |
| * | |
| * Provides a registry of workers keyed by plugin ID, with convenience methods | |
| * for starting/stopping all workers and routing RPC calls. | |
| */ | |
| export interface PluginWorkerManager { | |
| /** | |
| * Register and start a worker for a plugin. | |
| * | |
| * @returns The worker handle | |
| * @throws if a worker is already registered for this plugin | |
| */ | |
| startWorker(pluginId: string, options: WorkerStartOptions): Promise<PluginWorkerHandle>; | |
| /** | |
| * Stop and unregister a specific plugin worker. | |
| */ | |
| stopWorker(pluginId: string): Promise<void>; | |
| /** | |
| * Get the worker handle for a plugin. | |
| */ | |
| getWorker(pluginId: string): PluginWorkerHandle | undefined; | |
| /** | |
| * Check if a worker is registered and running for a plugin. | |
| */ | |
| isRunning(pluginId: string): boolean; | |
| /** | |
| * Stop all managed workers. Called during server shutdown. | |
| */ | |
| stopAll(): Promise<void>; | |
| /** | |
| * Get diagnostic info for all workers. | |
| */ | |
| diagnostics(): WorkerDiagnostics[]; | |
| /** | |
| * Send an RPC call to a specific plugin worker. | |
| * | |
| * @throws if the worker is not running | |
| */ | |
| call<M extends HostToWorkerMethodName>( | |
| pluginId: string, | |
| method: M, | |
| params: HostToWorkerMethods[M][0], | |
| timeoutMs?: number, | |
| ): Promise<HostToWorkerMethods[M][1]>; | |
| } | |
| // --------------------------------------------------------------------------- | |
| // Implementation: createPluginWorkerHandle | |
| // --------------------------------------------------------------------------- | |
| /** | |
| * Create a handle for a single plugin worker process. | |
| * | |
| * @internal Exported for testing; consumers should use `createPluginWorkerManager`. | |
| */ | |
| export function createPluginWorkerHandle( | |
| pluginId: string, | |
| options: WorkerStartOptions, | |
| ): PluginWorkerHandle { | |
| const log = logger.child({ service: "plugin-worker", pluginId }); | |
| const emitter = new EventEmitter(); | |
| /** | |
| * Higher than default (10) to accommodate multiple subscribers to | |
| * crash/ready/exit events during integration tests and runtime monitoring. | |
| */ | |
| emitter.setMaxListeners(50); | |
| // Worker process state | |
| let childProcess: ChildProcess | null = null; | |
| let readline: ReadlineInterface | null = null; | |
| let stderrReadline: ReadlineInterface | null = null; | |
| let status: WorkerStatus = "stopped"; | |
| let startedAt: number | null = null; | |
| let stderrExcerpt = ""; | |
| // Pending RPC requests awaiting a response | |
| const pendingRequests = new Map<string | number, PendingRequest>(); | |
| let nextRequestId = 1; | |
| // Optional methods reported by the worker during initialization | |
| let supportedMethods: string[] = []; | |
| // Crash tracking for exponential backoff | |
| let consecutiveCrashes = 0; | |
| let totalCrashes = 0; | |
| let lastCrashAt: number | null = null; | |
| let backoffTimer: ReturnType<typeof setTimeout> | null = null; | |
| let nextRestartAt: number | null = null; | |
| // Track open stream channels so we can emit synthetic close on crash. | |
| // Maps channel → companyId. | |
| const openStreamChannels = new Map<string, string>(); | |
| // Shutdown coordination | |
| let intentionalStop = false; | |
| const rpcTimeoutMs = options.rpcTimeoutMs ?? DEFAULT_RPC_TIMEOUT_MS; | |
| const autoRestart = options.autoRestart ?? true; | |
| // ----------------------------------------------------------------------- | |
| // Status management | |
| // ----------------------------------------------------------------------- | |
| function setStatus(newStatus: WorkerStatus): void { | |
| const prev = status; | |
| if (prev === newStatus) return; | |
| status = newStatus; | |
| log.debug({ from: prev, to: newStatus }, "worker status change"); | |
| emitter.emit("status", { pluginId, status: newStatus, previousStatus: prev }); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // JSON-RPC message sending | |
| // ----------------------------------------------------------------------- | |
| function sendMessage(message: unknown): void { | |
| if (!childProcess?.stdin?.writable) { | |
| throw new Error(`Worker process for plugin "${pluginId}" is not writable`); | |
| } | |
| const serialized = serializeMessage(message as any); | |
| childProcess.stdin.write(serialized); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // Incoming message handling | |
| // ----------------------------------------------------------------------- | |
| function handleLine(line: string): void { | |
| if (!line.trim()) return; | |
| let message: unknown; | |
| try { | |
| message = parseMessage(line); | |
| } catch (err) { | |
| if (err instanceof JsonRpcParseError) { | |
| log.warn({ rawLine: line.slice(0, 200) }, "unparseable message from worker"); | |
| } else { | |
| log.warn({ err }, "error parsing worker message"); | |
| } | |
| return; | |
| } | |
| if (isJsonRpcResponse(message)) { | |
| handleResponse(message); | |
| } else if (isJsonRpcRequest(message)) { | |
| handleWorkerRequest(message as JsonRpcRequest); | |
| } else if (isJsonRpcNotification(message)) { | |
| handleWorkerNotification(message as JsonRpcNotification); | |
| } else { | |
| log.warn("unknown message type from worker"); | |
| } | |
| } | |
| /** | |
| * Handle a JSON-RPC response from the worker (matching a pending request). | |
| */ | |
| function handleResponse(response: JsonRpcResponse): void { | |
| const id = response.id; | |
| if (id === null || id === undefined) { | |
| log.warn("received response with null/undefined id"); | |
| return; | |
| } | |
| const pending = pendingRequests.get(id); | |
| if (!pending) { | |
| log.warn({ id }, "received response for unknown request id"); | |
| return; | |
| } | |
| clearTimeout(pending.timer); | |
| pendingRequests.delete(id); | |
| pending.resolve(response); | |
| } | |
| /** | |
| * Handle a JSON-RPC request from the worker (worker→host call). | |
| */ | |
| async function handleWorkerRequest(request: JsonRpcRequest): Promise<void> { | |
| const method = request.method as WorkerToHostMethodName; | |
| const handler = options.hostHandlers[method] as | |
| | ((params: unknown) => Promise<unknown>) | |
| | undefined; | |
| if (!handler) { | |
| log.warn({ method }, "worker called unregistered host method"); | |
| try { | |
| sendMessage( | |
| createErrorResponse( | |
| request.id, | |
| JSONRPC_ERROR_CODES.METHOD_NOT_FOUND, | |
| `Host does not handle method "${method}"`, | |
| ), | |
| ); | |
| } catch { | |
| // Worker may have exited, ignore send error | |
| } | |
| return; | |
| } | |
| try { | |
| const result = await handler(request.params); | |
| sendMessage({ | |
| jsonrpc: JSONRPC_VERSION, | |
| id: request.id, | |
| result: result ?? null, | |
| }); | |
| } catch (err) { | |
| const errorMessage = err instanceof Error ? err.message : String(err); | |
| log.error({ method, err: errorMessage }, "host handler error"); | |
| try { | |
| sendMessage( | |
| createErrorResponse( | |
| request.id, | |
| JSONRPC_ERROR_CODES.INTERNAL_ERROR, | |
| errorMessage, | |
| ), | |
| ); | |
| } catch { | |
| // Worker may have exited, ignore send error | |
| } | |
| } | |
| } | |
| /** | |
| * Handle a JSON-RPC notification from the worker (fire-and-forget). | |
| * | |
| * The `log` notification is the primary case — worker `ctx.logger` calls | |
| * arrive here. We append structured plugin context (pluginId, timestamp, | |
| * level) so that every log entry is queryable per the spec (§26.1). | |
| */ | |
| function handleWorkerNotification(notification: JsonRpcNotification): void { | |
| if (notification.method === "log") { | |
| const params = notification.params as { | |
| level?: string; | |
| message?: string; | |
| meta?: Record<string, unknown>; | |
| } | null; | |
| const level = params?.level ?? "info"; | |
| const msg = params?.message ?? ""; | |
| const meta = params?.meta; | |
| // Build a structured log object that includes the plugin context fields | |
| // required by §26.1: pluginId, timestamp, level, message, and metadata. | |
| // The child logger already carries `pluginId` in its bindings, but we | |
| // add explicit `pluginLogLevel` and `pluginTimestamp` so downstream | |
| // consumers (log storage, UI queries) can filter without parsing. | |
| const logFields: Record<string, unknown> = { | |
| ...meta, | |
| pluginLogLevel: level, | |
| pluginTimestamp: new Date().toISOString(), | |
| }; | |
| if (level === "error") { | |
| log.error(logFields, `[plugin] ${msg}`); | |
| } else if (level === "warn") { | |
| log.warn(logFields, `[plugin] ${msg}`); | |
| } else if (level === "debug") { | |
| log.debug(logFields, `[plugin] ${msg}`); | |
| } else { | |
| log.info(logFields, `[plugin] ${msg}`); | |
| } | |
| return; | |
| } | |
| // Stream notifications: forward to the stream bus via callback | |
| if ( | |
| notification.method === "streams.open" || | |
| notification.method === "streams.emit" || | |
| notification.method === "streams.close" | |
| ) { | |
| const params = (notification.params ?? {}) as Record<string, unknown>; | |
| // Track open channels so we can emit synthetic close on crash | |
| if (notification.method === "streams.open") { | |
| const ch = String(params.channel ?? ""); | |
| const co = String(params.companyId ?? ""); | |
| if (ch) openStreamChannels.set(ch, co); | |
| } else if (notification.method === "streams.close") { | |
| openStreamChannels.delete(String(params.channel ?? "")); | |
| } | |
| if (options.onStreamNotification) { | |
| try { | |
| options.onStreamNotification(notification.method, params); | |
| } catch (err) { | |
| log.error( | |
| { | |
| method: notification.method, | |
| err: err instanceof Error ? err.message : String(err), | |
| }, | |
| "stream notification handler failed", | |
| ); | |
| } | |
| } | |
| return; | |
| } | |
| log.debug({ method: notification.method }, "received notification from worker"); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // Process lifecycle | |
| // ----------------------------------------------------------------------- | |
| function spawnProcess(): ChildProcess { | |
| // Security: Do NOT spread process.env into the worker. Plugins should only | |
| // receive a minimal, controlled environment to prevent leaking host | |
| // secrets (like DATABASE_URL, internal API keys, etc.). | |
| const workerEnv: Record<string, string> = { | |
| ...options.env, | |
| PATH: process.env.PATH ?? "", | |
| NODE_PATH: process.env.NODE_PATH ?? "", | |
| PAPERCLIP_PLUGIN_ID: pluginId, | |
| NODE_ENV: process.env.NODE_ENV ?? "production", | |
| TZ: process.env.TZ ?? "UTC", | |
| }; | |
| const child = fork(options.entrypointPath, [], { | |
| stdio: ["pipe", "pipe", "pipe", "ipc"], | |
| execArgv: options.execArgv ?? [], | |
| env: workerEnv, | |
| // Don't let the child keep the parent alive | |
| detached: false, | |
| }); | |
| return child; | |
| } | |
| function attachStdioHandlers(child: ChildProcess): void { | |
| // Read NDJSON from stdout | |
| if (child.stdout) { | |
| readline = createInterface({ input: child.stdout }); | |
| readline.on("line", handleLine); | |
| } | |
| // Capture stderr for logging | |
| if (child.stderr) { | |
| stderrReadline = createInterface({ input: child.stderr }); | |
| stderrReadline.on("line", (line: string) => { | |
| stderrExcerpt = appendStderrExcerpt(stderrExcerpt, line); | |
| log.warn({ stream: "stderr" }, `[plugin stderr] ${line}`); | |
| }); | |
| } | |
| // Handle process exit | |
| child.on("exit", (code, signal) => { | |
| handleProcessExit(code, signal); | |
| }); | |
| // Handle process errors (e.g. spawn failure) | |
| child.on("error", (err) => { | |
| log.error({ err: err.message }, "worker process error"); | |
| emitter.emit("error", { pluginId, error: err }); | |
| if (status === "starting") { | |
| setStatus("crashed"); | |
| rejectAllPending( | |
| new Error(formatWorkerFailureMessage( | |
| `Worker process failed to start: ${err.message}`, | |
| stderrExcerpt, | |
| )), | |
| ); | |
| } | |
| }); | |
| } | |
| function handleProcessExit( | |
| code: number | null, | |
| signal: NodeJS.Signals | null, | |
| ): void { | |
| const wasIntentional = intentionalStop; | |
| // Clean up readline interfaces | |
| if (readline) { | |
| readline.close(); | |
| readline = null; | |
| } | |
| if (stderrReadline) { | |
| stderrReadline.close(); | |
| stderrReadline = null; | |
| } | |
| childProcess = null; | |
| startedAt = null; | |
| // Reject all pending requests | |
| rejectAllPending( | |
| new Error(formatWorkerFailureMessage( | |
| `Worker process exited (code=${code}, signal=${signal})`, | |
| stderrExcerpt, | |
| )), | |
| ); | |
| // Emit synthetic close for any orphaned stream channels so SSE clients | |
| // are notified instead of hanging indefinitely. | |
| if (openStreamChannels.size > 0 && options.onStreamNotification) { | |
| for (const [channel, companyId] of openStreamChannels) { | |
| try { | |
| options.onStreamNotification("streams.close", { channel, companyId }); | |
| } catch { | |
| // Best-effort cleanup — don't let it interfere with exit handling | |
| } | |
| } | |
| openStreamChannels.clear(); | |
| } | |
| emitter.emit("exit", { pluginId, code, signal }); | |
| if (wasIntentional) { | |
| // Graceful stop — status is already "stopping" or will be set to "stopped" | |
| setStatus("stopped"); | |
| log.info({ code, signal }, "worker process stopped"); | |
| return; | |
| } | |
| // Unexpected exit — crash recovery | |
| totalCrashes++; | |
| const now = Date.now(); | |
| // Reset consecutive crash counter if enough time passed | |
| if (lastCrashAt !== null && now - lastCrashAt > CRASH_WINDOW_MS) { | |
| consecutiveCrashes = 0; | |
| } | |
| consecutiveCrashes++; | |
| lastCrashAt = now; | |
| log.error( | |
| { code, signal, consecutiveCrashes, totalCrashes }, | |
| "worker process crashed", | |
| ); | |
| const willRestart = | |
| autoRestart && consecutiveCrashes <= MAX_CONSECUTIVE_CRASHES; | |
| setStatus("crashed"); | |
| emitter.emit("crash", { pluginId, code, signal, willRestart }); | |
| if (willRestart) { | |
| scheduleRestart(); | |
| } else { | |
| log.error( | |
| { consecutiveCrashes, maxCrashes: MAX_CONSECUTIVE_CRASHES }, | |
| "max consecutive crashes reached, not restarting", | |
| ); | |
| } | |
| } | |
| function rejectAllPending(error: Error): void { | |
| for (const [id, pending] of pendingRequests) { | |
| clearTimeout(pending.timer); | |
| pending.resolve( | |
| createErrorResponse( | |
| pending.id, | |
| PLUGIN_RPC_ERROR_CODES.WORKER_UNAVAILABLE, | |
| error.message, | |
| ) as JsonRpcResponse, | |
| ); | |
| } | |
| pendingRequests.clear(); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // Crash recovery with exponential backoff | |
| // ----------------------------------------------------------------------- | |
| function computeBackoffMs(): number { | |
| // Exponential backoff: MIN_BACKOFF * MULTIPLIER^(consecutiveCrashes - 1) | |
| const delay = | |
| MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, consecutiveCrashes - 1); | |
| // Add jitter: ±25% | |
| const jitter = delay * 0.25 * (Math.random() * 2 - 1); | |
| return Math.min(Math.round(delay + jitter), MAX_BACKOFF_MS); | |
| } | |
| function scheduleRestart(): void { | |
| const delay = computeBackoffMs(); | |
| nextRestartAt = Date.now() + delay; | |
| setStatus("backoff"); | |
| log.info( | |
| { delayMs: delay, consecutiveCrashes }, | |
| "scheduling restart with backoff", | |
| ); | |
| backoffTimer = setTimeout(async () => { | |
| backoffTimer = null; | |
| nextRestartAt = null; | |
| try { | |
| await startInternal(); | |
| } catch (err) { | |
| log.error( | |
| { err: err instanceof Error ? err.message : String(err) }, | |
| "restart after backoff failed", | |
| ); | |
| } | |
| }, delay); | |
| } | |
| function cancelPendingRestart(): void { | |
| if (backoffTimer !== null) { | |
| clearTimeout(backoffTimer); | |
| backoffTimer = null; | |
| nextRestartAt = null; | |
| } | |
| } | |
| // ----------------------------------------------------------------------- | |
| // Start / Stop | |
| // ----------------------------------------------------------------------- | |
| async function startInternal(): Promise<void> { | |
| if (status === "running" || status === "starting") { | |
| throw new Error(`Worker for plugin "${pluginId}" is already ${status}`); | |
| } | |
| intentionalStop = false; | |
| setStatus("starting"); | |
| stderrExcerpt = ""; | |
| const child = spawnProcess(); | |
| childProcess = child; | |
| attachStdioHandlers(child); | |
| startedAt = Date.now(); | |
| // Send the initialize RPC call | |
| const initParams: InitializeParams = { | |
| manifest: options.manifest, | |
| config: options.config, | |
| instanceInfo: options.instanceInfo, | |
| apiVersion: options.apiVersion, | |
| }; | |
| try { | |
| const result = await callInternal( | |
| "initialize", | |
| initParams, | |
| INITIALIZE_TIMEOUT_MS, | |
| ) as { ok?: boolean; supportedMethods?: string[] } | undefined; | |
| if (!result || !result.ok) { | |
| throw new Error("Worker initialize returned ok=false"); | |
| } | |
| supportedMethods = result.supportedMethods ?? []; | |
| } catch (err) { | |
| // Initialize failed — kill the process and propagate | |
| const msg = err instanceof Error ? err.message : String(err); | |
| log.error({ err: msg }, "worker initialize failed"); | |
| await killProcess(); | |
| setStatus("crashed"); | |
| throw new Error(`Worker initialize failed for "${pluginId}": ${msg}`); | |
| } | |
| // Reset crash counter on successful start | |
| consecutiveCrashes = 0; | |
| setStatus("running"); | |
| emitter.emit("ready", { pluginId }); | |
| log.info({ pid: child.pid }, "worker process started and initialized"); | |
| } | |
| async function stopInternal(): Promise<void> { | |
| cancelPendingRestart(); | |
| if (status === "stopped" || status === "stopping") { | |
| return; | |
| } | |
| intentionalStop = true; | |
| setStatus("stopping"); | |
| if (!childProcess) { | |
| setStatus("stopped"); | |
| return; | |
| } | |
| // Step 1: Send shutdown RPC and wait for the worker to exit gracefully. | |
| // We race the shutdown call against a timeout. The worker should process | |
| // the shutdown and exit on its own within the drain period. | |
| try { | |
| await Promise.race([ | |
| callInternal("shutdown", {} as Record<string, never>, SHUTDOWN_DRAIN_MS), | |
| waitForExit(SHUTDOWN_DRAIN_MS), | |
| ]); | |
| } catch { | |
| // Shutdown call failed or timed out — proceed to kill | |
| log.warn("shutdown RPC failed or timed out, escalating to SIGTERM"); | |
| } | |
| // Give the process a brief moment to exit after the shutdown response | |
| if (childProcess) { | |
| await waitForExit(500); | |
| } | |
| // Check if process already exited | |
| if (!childProcess) { | |
| setStatus("stopped"); | |
| return; | |
| } | |
| // Step 2: Send SIGTERM and wait | |
| log.info("worker did not exit after shutdown RPC, sending SIGTERM"); | |
| await killWithSignal("SIGTERM", SIGTERM_GRACE_MS); | |
| if (!childProcess) { | |
| setStatus("stopped"); | |
| return; | |
| } | |
| // Step 3: Forcefully kill with SIGKILL | |
| log.warn("worker did not exit after SIGTERM, sending SIGKILL"); | |
| await killWithSignal("SIGKILL", 2_000); | |
| if (childProcess) { | |
| log.error("worker process still alive after SIGKILL — this should not happen"); | |
| } | |
| setStatus("stopped"); | |
| } | |
| /** | |
| * Wait for the child process to exit, up to `timeoutMs`. | |
| * Resolves immediately if the process is already gone. | |
| */ | |
| function waitForExit(timeoutMs: number): Promise<void> { | |
| return new Promise<void>((resolve) => { | |
| if (!childProcess) { | |
| resolve(); | |
| return; | |
| } | |
| let settled = false; | |
| const timer = setTimeout(() => { | |
| if (settled) return; | |
| settled = true; | |
| resolve(); | |
| }, timeoutMs); | |
| childProcess.once("exit", () => { | |
| if (settled) return; | |
| settled = true; | |
| clearTimeout(timer); | |
| resolve(); | |
| }); | |
| }); | |
| } | |
| function killWithSignal( | |
| signal: NodeJS.Signals, | |
| waitMs: number, | |
| ): Promise<void> { | |
| return new Promise<void>((resolve) => { | |
| if (!childProcess) { | |
| resolve(); | |
| return; | |
| } | |
| const timer = setTimeout(() => { | |
| resolve(); | |
| }, waitMs); | |
| childProcess.once("exit", () => { | |
| clearTimeout(timer); | |
| resolve(); | |
| }); | |
| try { | |
| childProcess.kill(signal); | |
| } catch { | |
| clearTimeout(timer); | |
| resolve(); | |
| } | |
| }); | |
| } | |
| async function killProcess(): Promise<void> { | |
| if (!childProcess) return; | |
| intentionalStop = true; | |
| try { | |
| childProcess.kill("SIGKILL"); | |
| } catch { | |
| // Process may already be dead | |
| } | |
| // Wait briefly for exit event | |
| await new Promise<void>((resolve) => { | |
| if (!childProcess) { | |
| resolve(); | |
| return; | |
| } | |
| const timer = setTimeout(() => { | |
| resolve(); | |
| }, 1_000); | |
| childProcess.once("exit", () => { | |
| clearTimeout(timer); | |
| resolve(); | |
| }); | |
| }); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // RPC call implementation | |
| // ----------------------------------------------------------------------- | |
| function callInternal<M extends HostToWorkerMethodName>( | |
| method: M, | |
| params: HostToWorkerMethods[M][0], | |
| timeoutMs?: number, | |
| ): Promise<HostToWorkerMethods[M][1]> { | |
| return new Promise<HostToWorkerMethods[M][1]>((resolve, reject) => { | |
| if (!childProcess?.stdin?.writable) { | |
| reject( | |
| new Error( | |
| `Cannot call "${method}" — worker for "${pluginId}" is not running`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const id = nextRequestId++; | |
| const timeout = Math.min(timeoutMs ?? rpcTimeoutMs, MAX_RPC_TIMEOUT_MS); | |
| // Guard against double-settlement. When a process exits all pending | |
| // requests are rejected via rejectAllPending(), but the timeout timer | |
| // may still be running. Without this guard the timer's reject fires on | |
| // an already-settled promise, producing an unhandled rejection. | |
| let settled = false; | |
| const settle = <T>(fn: (value: T) => void, value: T): void => { | |
| if (settled) return; | |
| settled = true; | |
| clearTimeout(timer); | |
| pendingRequests.delete(id); | |
| fn(value); | |
| }; | |
| const timer = setTimeout(() => { | |
| settle( | |
| reject, | |
| new JsonRpcCallError({ | |
| code: PLUGIN_RPC_ERROR_CODES.TIMEOUT, | |
| message: `RPC call "${method}" timed out after ${timeout}ms`, | |
| }), | |
| ); | |
| }, timeout); | |
| const pending: PendingRequest = { | |
| id, | |
| method, | |
| resolve: (response: JsonRpcResponse) => { | |
| if (isJsonRpcSuccessResponse(response)) { | |
| settle(resolve, response.result as HostToWorkerMethods[M][1]); | |
| } else if ("error" in response && response.error) { | |
| settle(reject, new JsonRpcCallError(response.error)); | |
| } else { | |
| settle(reject, new Error(`Unexpected response format for "${method}"`)); | |
| } | |
| }, | |
| timer, | |
| sentAt: Date.now(), | |
| }; | |
| pendingRequests.set(id, pending); | |
| try { | |
| const request = createRequest(method, params, id); | |
| sendMessage(request); | |
| } catch (err) { | |
| clearTimeout(timer); | |
| pendingRequests.delete(id); | |
| reject( | |
| new Error( | |
| `Failed to send "${method}" to worker: ${ | |
| err instanceof Error ? err.message : String(err) | |
| }`, | |
| ), | |
| ); | |
| } | |
| }); | |
| } | |
| // ----------------------------------------------------------------------- | |
| // Public API | |
| // ----------------------------------------------------------------------- | |
| const handle: PluginWorkerHandle = { | |
| get pluginId() { | |
| return pluginId; | |
| }, | |
| get status() { | |
| return status; | |
| }, | |
| get supportedMethods() { | |
| return supportedMethods; | |
| }, | |
| async start() { | |
| await startInternal(); | |
| }, | |
| async stop() { | |
| await stopInternal(); | |
| }, | |
| async restart() { | |
| await stopInternal(); | |
| await startInternal(); | |
| }, | |
| call<M extends HostToWorkerMethodName>( | |
| method: M, | |
| params: HostToWorkerMethods[M][0], | |
| timeoutMs?: number, | |
| ): Promise<HostToWorkerMethods[M][1]> { | |
| if (status !== "running" && status !== "starting") { | |
| return Promise.reject( | |
| new Error( | |
| `Cannot call "${method}" — worker for "${pluginId}" is ${status}`, | |
| ), | |
| ); | |
| } | |
| return callInternal(method, params, timeoutMs); | |
| }, | |
| notify(method: string, params: unknown) { | |
| if (status !== "running") return; | |
| try { | |
| sendMessage({ | |
| jsonrpc: JSONRPC_VERSION, | |
| method, | |
| params, | |
| }); | |
| } catch { | |
| log.warn({ method }, "failed to send notification to worker"); | |
| } | |
| }, | |
| on<K extends WorkerHandleEventName>( | |
| event: K, | |
| listener: (payload: WorkerHandleEvents[K]) => void, | |
| ) { | |
| emitter.on(event, listener); | |
| }, | |
| off<K extends WorkerHandleEventName>( | |
| event: K, | |
| listener: (payload: WorkerHandleEvents[K]) => void, | |
| ) { | |
| emitter.off(event, listener); | |
| }, | |
| diagnostics(): WorkerDiagnostics { | |
| return { | |
| pluginId, | |
| status, | |
| pid: childProcess?.pid ?? null, | |
| uptime: | |
| startedAt !== null && status === "running" | |
| ? Date.now() - startedAt | |
| : null, | |
| consecutiveCrashes, | |
| totalCrashes, | |
| pendingRequests: pendingRequests.size, | |
| lastCrashAt, | |
| nextRestartAt, | |
| }; | |
| }, | |
| }; | |
| return handle; | |
| } | |
| // --------------------------------------------------------------------------- | |
| // Implementation: createPluginWorkerManager | |
| // --------------------------------------------------------------------------- | |
| /** | |
| * Options for creating a PluginWorkerManager. | |
| */ | |
| export interface PluginWorkerManagerOptions { | |
| /** | |
| * Optional callback invoked when a worker emits a lifecycle event | |
| * (crash, restart). Used by the server to publish global live events. | |
| */ | |
| onWorkerEvent?: (event: { | |
| type: "plugin.worker.crashed" | "plugin.worker.restarted"; | |
| pluginId: string; | |
| code?: number | null; | |
| signal?: string | null; | |
| willRestart?: boolean; | |
| }) => void; | |
| } | |
| /** | |
| * Create a new PluginWorkerManager. | |
| * | |
| * The manager holds all plugin worker handles and provides a unified API for | |
| * starting, stopping, and communicating with plugin workers. | |
| * | |
| * @example | |
| * ```ts | |
| * const manager = createPluginWorkerManager(); | |
| * | |
| * const handle = await manager.startWorker("acme.linear", { | |
| * entrypointPath: "/path/to/worker.cjs", | |
| * manifest, | |
| * config: resolvedConfig, | |
| * instanceInfo: { instanceId: "inst-1", hostVersion: "1.0.0" }, | |
| * apiVersion: 1, | |
| * hostHandlers: { "config.get": async () => resolvedConfig, ... }, | |
| * }); | |
| * | |
| * // Send RPC call to the worker | |
| * const health = await manager.call("acme.linear", "health", {}); | |
| * | |
| * // Shutdown all workers on server exit | |
| * await manager.stopAll(); | |
| * ``` | |
| */ | |
| export function createPluginWorkerManager( | |
| managerOptions?: PluginWorkerManagerOptions, | |
| ): PluginWorkerManager { | |
| const log = logger.child({ service: "plugin-worker-manager" }); | |
| const workers = new Map<string, PluginWorkerHandle>(); | |
| /** Per-plugin startup locks to prevent concurrent spawn races. */ | |
| const startupLocks = new Map<string, Promise<PluginWorkerHandle>>(); | |
| return { | |
| async startWorker( | |
| pluginId: string, | |
| options: WorkerStartOptions, | |
| ): Promise<PluginWorkerHandle> { | |
| // Mutex: if a start is already in-flight for this plugin, wait for it | |
| const inFlight = startupLocks.get(pluginId); | |
| if (inFlight) { | |
| log.warn({ pluginId }, "concurrent startWorker call — waiting for in-flight start"); | |
| return inFlight; | |
| } | |
| const existing = workers.get(pluginId); | |
| if (existing && existing.status !== "stopped") { | |
| throw new Error( | |
| `Worker already registered for plugin "${pluginId}" (status: ${existing.status})`, | |
| ); | |
| } | |
| const handle = createPluginWorkerHandle(pluginId, options); | |
| workers.set(pluginId, handle); | |
| // Subscribe to crash/ready events for live event forwarding | |
| if (managerOptions?.onWorkerEvent) { | |
| const notify = managerOptions.onWorkerEvent; | |
| handle.on("crash", (payload) => { | |
| notify({ | |
| type: "plugin.worker.crashed", | |
| pluginId: payload.pluginId, | |
| code: payload.code, | |
| signal: payload.signal, | |
| willRestart: payload.willRestart, | |
| }); | |
| }); | |
| handle.on("ready", (payload) => { | |
| // Only emit restarted if this was a crash recovery (totalCrashes > 0) | |
| const diag = handle.diagnostics(); | |
| if (diag.totalCrashes > 0) { | |
| notify({ | |
| type: "plugin.worker.restarted", | |
| pluginId: payload.pluginId, | |
| }); | |
| } | |
| }); | |
| } | |
| log.info({ pluginId }, "starting plugin worker"); | |
| // Set the lock before awaiting start() to prevent concurrent spawns | |
| const startPromise = handle.start().then(() => handle).finally(() => { | |
| startupLocks.delete(pluginId); | |
| }); | |
| startupLocks.set(pluginId, startPromise); | |
| return startPromise; | |
| }, | |
| async stopWorker(pluginId: string): Promise<void> { | |
| const handle = workers.get(pluginId); | |
| if (!handle) { | |
| log.warn({ pluginId }, "no worker registered for plugin, nothing to stop"); | |
| return; | |
| } | |
| log.info({ pluginId }, "stopping plugin worker"); | |
| await handle.stop(); | |
| workers.delete(pluginId); | |
| }, | |
| getWorker(pluginId: string): PluginWorkerHandle | undefined { | |
| return workers.get(pluginId); | |
| }, | |
| isRunning(pluginId: string): boolean { | |
| const handle = workers.get(pluginId); | |
| return handle?.status === "running"; | |
| }, | |
| async stopAll(): Promise<void> { | |
| log.info({ count: workers.size }, "stopping all plugin workers"); | |
| const promises = Array.from(workers.values()).map(async (handle) => { | |
| try { | |
| await handle.stop(); | |
| } catch (err) { | |
| log.error( | |
| { | |
| pluginId: handle.pluginId, | |
| err: err instanceof Error ? err.message : String(err), | |
| }, | |
| "error stopping worker during shutdown", | |
| ); | |
| } | |
| }); | |
| await Promise.all(promises); | |
| workers.clear(); | |
| }, | |
| diagnostics(): WorkerDiagnostics[] { | |
| return Array.from(workers.values()).map((h) => h.diagnostics()); | |
| }, | |
| call<M extends HostToWorkerMethodName>( | |
| pluginId: string, | |
| method: M, | |
| params: HostToWorkerMethods[M][0], | |
| timeoutMs?: number, | |
| ): Promise<HostToWorkerMethods[M][1]> { | |
| const handle = workers.get(pluginId); | |
| if (!handle) { | |
| return Promise.reject( | |
| new Error(`No worker registered for plugin "${pluginId}"`), | |
| ); | |
| } | |
| return handle.call(method, params, timeoutMs); | |
| }, | |
| }; | |
| } | |