| | import { Logger } from '@n8n/backend-common'; |
| | import { TaskRunnersConfig } from '@n8n/config'; |
| | import { OnShutdown } from '@n8n/decorators'; |
| | import { Container, Service } from '@n8n/di'; |
| | import { ErrorReporter } from 'n8n-core'; |
| | import { sleep } from 'n8n-workflow'; |
| | import * as a from 'node:assert/strict'; |
| |
|
| | import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error'; |
| | import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; |
| | import type { TaskRunnerProcess } from '@/task-runners/task-runner-process'; |
| | import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; |
| |
|
| | import { MissingAuthTokenError } from './errors/missing-auth-token.error'; |
| | import type { TaskBrokerServer } from './task-broker/task-broker-server'; |
| | import type { LocalTaskRequester } from './task-managers/local-task-requester'; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | @Service() |
| | export class TaskRunnerModule { |
| | private taskBrokerHttpServer: TaskBrokerServer | undefined; |
| |
|
| | private taskBrokerWsServer: TaskBrokerWsServer | undefined; |
| |
|
| | private taskRequester: LocalTaskRequester | undefined; |
| |
|
| | private taskRunnerProcess: TaskRunnerProcess | undefined; |
| |
|
| | private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined; |
| |
|
| | constructor( |
| | private readonly logger: Logger, |
| | private readonly errorReporter: ErrorReporter, |
| | private readonly runnerConfig: TaskRunnersConfig, |
| | ) { |
| | this.logger = this.logger.scoped('task-runner'); |
| | } |
| |
|
| | async start() { |
| | a.ok(this.runnerConfig.enabled, 'Task runner is disabled'); |
| |
|
| | const { mode, authToken } = this.runnerConfig; |
| |
|
| | if (mode === 'external' && !authToken) throw new MissingAuthTokenError(); |
| |
|
| | await this.loadTaskRequester(); |
| | await this.loadTaskBroker(); |
| |
|
| | if (mode === 'internal') { |
| | await this.startInternalTaskRunner(); |
| | } |
| | } |
| |
|
| | @OnShutdown() |
| | async stop() { |
| | const stopRunnerProcessTask = (async () => { |
| | if (this.taskRunnerProcess) { |
| | await this.taskRunnerProcess.stop(); |
| | this.taskRunnerProcess = undefined; |
| | } |
| | })(); |
| |
|
| | const stopRunnerServerTask = (async () => { |
| | if (this.taskBrokerHttpServer) { |
| | await this.taskBrokerHttpServer.stop(); |
| | this.taskBrokerHttpServer = undefined; |
| | } |
| | })(); |
| |
|
| | await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]); |
| | } |
| |
|
| | private async loadTaskRequester() { |
| | const { TaskRequester } = await import('@/task-runners/task-managers/task-requester'); |
| | const { LocalTaskRequester } = await import( |
| | '@/task-runners/task-managers/local-task-requester' |
| | ); |
| | this.taskRequester = Container.get(LocalTaskRequester); |
| | Container.set(TaskRequester, this.taskRequester); |
| | } |
| |
|
| | private async loadTaskBroker() { |
| | |
| | |
| | const { TaskBrokerServer } = await import('@/task-runners/task-broker/task-broker-server'); |
| | this.taskBrokerHttpServer = Container.get(TaskBrokerServer); |
| | this.taskBrokerWsServer = Container.get(TaskBrokerWsServer); |
| |
|
| | await this.taskBrokerHttpServer.start(); |
| | } |
| |
|
| | private async startInternalTaskRunner() { |
| | a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded'); |
| |
|
| | const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process'); |
| | this.taskRunnerProcess = Container.get(TaskRunnerProcess); |
| | this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector( |
| | this.taskRunnerProcess, |
| | ); |
| | this.taskRunnerProcessRestartLoopDetector.on( |
| | 'restart-loop-detected', |
| | this.onRunnerRestartLoopDetected, |
| | ); |
| |
|
| | await this.taskRunnerProcess.start(); |
| |
|
| | const { InternalTaskRunnerDisconnectAnalyzer } = await import( |
| | '@/task-runners/internal-task-runner-disconnect-analyzer' |
| | ); |
| | this.taskBrokerWsServer.setDisconnectAnalyzer( |
| | Container.get(InternalTaskRunnerDisconnectAnalyzer), |
| | ); |
| | } |
| |
|
| | private onRunnerRestartLoopDetected = async (error: TaskRunnerRestartLoopError) => { |
| | this.logger.error(error.message); |
| | this.errorReporter.error(error); |
| |
|
| | |
| | await sleep(1000); |
| | process.exit(1); |
| | }; |
| | } |
| |
|