| | import { Logger } from '@n8n/backend-common'; |
| | import { TaskRunnersConfig } from '@n8n/config'; |
| | import { OnShutdown } from '@n8n/decorators'; |
| | import { Service } from '@n8n/di'; |
| | import * as a from 'node:assert/strict'; |
| | import { spawn } from 'node:child_process'; |
| | import * as process from 'node:process'; |
| |
|
| | import { forwardToLogger } from './forward-to-logger'; |
| | import { NodeProcessOomDetector } from './node-process-oom-detector'; |
| | import { TaskBrokerAuthService } from './task-broker/auth/task-broker-auth.service'; |
| | import { TaskRunnerLifecycleEvents } from './task-runner-lifecycle-events'; |
| | import { TypedEmitter } from '../typed-emitter'; |
| |
|
| | type ChildProcess = ReturnType<typeof spawn>; |
| |
|
| | export type ExitReason = 'unknown' | 'oom'; |
| |
|
| | export type TaskRunnerProcessEventMap = { |
| | exit: { |
| | reason: ExitReason; |
| | }; |
| | }; |
| |
|
| | |
| | |
| | |
| | @Service() |
| | export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> { |
| | get isRunning() { |
| | return this.process !== null; |
| | } |
| |
|
| | |
| | get pid() { |
| | return this.process?.pid; |
| | } |
| |
|
| | |
| | get runPromise() { |
| | return this._runPromise; |
| | } |
| |
|
| | private process: ChildProcess | null = null; |
| |
|
| | private _runPromise: Promise<void> | null = null; |
| |
|
| | private oomDetector: NodeProcessOomDetector | null = null; |
| |
|
| | private isShuttingDown = false; |
| |
|
| | private logger: Logger; |
| |
|
| | |
| | private readonly passthroughEnvVars = [ |
| | 'PATH', |
| | 'HOME', |
| | 'GENERIC_TIMEZONE', |
| | 'NODE_FUNCTION_ALLOW_BUILTIN', |
| | 'NODE_FUNCTION_ALLOW_EXTERNAL', |
| | 'N8N_SENTRY_DSN', |
| | 'N8N_RUNNERS_ALLOW_PROTOTYPE_MUTATION', |
| | |
| | 'N8N_VERSION', |
| | 'ENVIRONMENT', |
| | 'DEPLOYMENT_NAME', |
| | 'NODE_PATH', |
| | ] as const; |
| |
|
| | constructor( |
| | logger: Logger, |
| | private readonly runnerConfig: TaskRunnersConfig, |
| | private readonly authService: TaskBrokerAuthService, |
| | private readonly runnerLifecycleEvents: TaskRunnerLifecycleEvents, |
| | ) { |
| | super(); |
| |
|
| | a.ok( |
| | this.runnerConfig.mode !== 'external', |
| | 'Task Runner Process cannot be used in external mode', |
| | ); |
| |
|
| | this.logger = logger.scoped('task-runner'); |
| |
|
| | this.runnerLifecycleEvents.on('runner:failed-heartbeat-check', () => { |
| | this.logger.warn('Task runner failed heartbeat check, restarting...'); |
| | void this.forceRestart(); |
| | }); |
| |
|
| | this.runnerLifecycleEvents.on('runner:timed-out-during-task', () => { |
| | this.logger.warn('Task runner timed out during task, restarting...'); |
| | void this.forceRestart(); |
| | }); |
| | } |
| |
|
| | async start() { |
| | a.ok(!this.process, 'Task Runner Process already running'); |
| |
|
| | const grantToken = await this.authService.createGrantToken(); |
| |
|
| | const taskBrokerUri = `http://127.0.0.1:${this.runnerConfig.port}`; |
| | this.process = this.startNode(grantToken, taskBrokerUri); |
| |
|
| | forwardToLogger(this.logger, this.process, '[Task Runner]: '); |
| |
|
| | this.monitorProcess(this.process); |
| | } |
| |
|
| | startNode(grantToken: string, taskBrokerUri: string) { |
| | const startScript = require.resolve('@n8n/task-runner/start'); |
| |
|
| | return spawn( |
| | 'node', |
| | ['--disallow-code-generation-from-strings', '--disable-proto=delete', startScript], |
| | { |
| | env: this.getProcessEnvVars(grantToken, taskBrokerUri), |
| | }, |
| | ); |
| | } |
| |
|
| | @OnShutdown() |
| | async stop() { |
| | if (!this.process) return; |
| |
|
| | this.isShuttingDown = true; |
| |
|
| | |
| | this.killNode(); |
| | await this._runPromise; |
| |
|
| | this.isShuttingDown = false; |
| | } |
| |
|
| | |
| | async forceRestart() { |
| | if (!this.process) return; |
| |
|
| | this.process.kill('SIGKILL'); |
| |
|
| | await this._runPromise; |
| | } |
| |
|
| | killNode() { |
| | if (!this.process) return; |
| |
|
| | this.process.kill(); |
| | } |
| |
|
| | private monitorProcess(taskRunnerProcess: ChildProcess) { |
| | this._runPromise = new Promise((resolve) => { |
| | this.oomDetector = new NodeProcessOomDetector(taskRunnerProcess); |
| |
|
| | taskRunnerProcess.on('exit', (code) => { |
| | this.onProcessExit(code, resolve); |
| | }); |
| | }); |
| | } |
| |
|
| | private onProcessExit(_code: number | null, resolveFn: () => void) { |
| | this.process = null; |
| | this.emit('exit', { reason: this.oomDetector?.didProcessOom ? 'oom' : 'unknown' }); |
| | resolveFn(); |
| |
|
| | if (!this.isShuttingDown) { |
| | setImmediate(async () => await this.start()); |
| | } |
| | } |
| |
|
| | private getProcessEnvVars(grantToken: string, taskBrokerUri: string) { |
| | const envVars: Record<string, string> = { |
| | N8N_RUNNERS_GRANT_TOKEN: grantToken, |
| | N8N_RUNNERS_TASK_BROKER_URI: taskBrokerUri, |
| | N8N_RUNNERS_MAX_PAYLOAD: this.runnerConfig.maxPayload.toString(), |
| | N8N_RUNNERS_MAX_CONCURRENCY: this.runnerConfig.maxConcurrency.toString(), |
| | N8N_RUNNERS_TASK_TIMEOUT: this.runnerConfig.taskTimeout.toString(), |
| | N8N_RUNNERS_HEARTBEAT_INTERVAL: this.runnerConfig.heartbeatInterval.toString(), |
| | ...this.getPassthroughEnvVars(), |
| | }; |
| |
|
| | if (this.runnerConfig.maxOldSpaceSize) { |
| | envVars.NODE_OPTIONS = `--max-old-space-size=${this.runnerConfig.maxOldSpaceSize}`; |
| | } |
| |
|
| | return envVars; |
| | } |
| |
|
| | private getPassthroughEnvVars() { |
| | return this.passthroughEnvVars.reduce<Record<string, string>>((env, key) => { |
| | if (process.env[key]) { |
| | env[key] = process.env[key]; |
| | } |
| |
|
| | return env; |
| | }, {}); |
| | } |
| | } |
| |
|