File size: 4,328 Bytes
aec3094 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | import { Logger } from '@n8n/backend-common';
import { TaskRunnersConfig } from '@n8n/config';
import { OnShutdown } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { ErrorReporter } from 'n8n-core';
import { sleep } from 'n8n-workflow';
import * as a from 'node:assert/strict';
import type { TaskRunnerRestartLoopError } from '@/task-runners/errors/task-runner-restart-loop-error';
import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server';
import type { TaskRunnerProcess } from '@/task-runners/task-runner-process';
import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector';
import { MissingAuthTokenError } from './errors/missing-auth-token.error';
import type { TaskBrokerServer } from './task-broker/task-broker-server';
import type { LocalTaskRequester } from './task-managers/local-task-requester';
/**
* Module responsible for loading and starting task runner. Task runner can be
* run either internally (=launched by n8n as a child process) or externally
* (=launched by some other orchestrator)
*/
@Service()
export class TaskRunnerModule {
private taskBrokerHttpServer: TaskBrokerServer | undefined;
private taskBrokerWsServer: TaskBrokerWsServer | undefined;
private taskRequester: LocalTaskRequester | undefined;
private taskRunnerProcess: TaskRunnerProcess | undefined;
private taskRunnerProcessRestartLoopDetector: TaskRunnerProcessRestartLoopDetector | undefined;
constructor(
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
private readonly runnerConfig: TaskRunnersConfig,
) {
this.logger = this.logger.scoped('task-runner');
}
async start() {
a.ok(this.runnerConfig.enabled, 'Task runner is disabled');
const { mode, authToken } = this.runnerConfig;
if (mode === 'external' && !authToken) throw new MissingAuthTokenError();
await this.loadTaskRequester();
await this.loadTaskBroker();
if (mode === 'internal') {
await this.startInternalTaskRunner();
}
}
@OnShutdown()
async stop() {
const stopRunnerProcessTask = (async () => {
if (this.taskRunnerProcess) {
await this.taskRunnerProcess.stop();
this.taskRunnerProcess = undefined;
}
})();
const stopRunnerServerTask = (async () => {
if (this.taskBrokerHttpServer) {
await this.taskBrokerHttpServer.stop();
this.taskBrokerHttpServer = undefined;
}
})();
await Promise.all([stopRunnerProcessTask, stopRunnerServerTask]);
}
private async loadTaskRequester() {
const { TaskRequester } = await import('@/task-runners/task-managers/task-requester');
const { LocalTaskRequester } = await import(
'@/task-runners/task-managers/local-task-requester'
);
this.taskRequester = Container.get(LocalTaskRequester);
Container.set(TaskRequester, this.taskRequester);
}
private async loadTaskBroker() {
// These are imported dynamically because we need to set the task manager
// instance before importing them
const { TaskBrokerServer } = await import('@/task-runners/task-broker/task-broker-server');
this.taskBrokerHttpServer = Container.get(TaskBrokerServer);
this.taskBrokerWsServer = Container.get(TaskBrokerWsServer);
await this.taskBrokerHttpServer.start();
}
private async startInternalTaskRunner() {
a.ok(this.taskBrokerWsServer, 'Task Runner WS Server not loaded');
const { TaskRunnerProcess } = await import('@/task-runners/task-runner-process');
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
this.taskRunnerProcessRestartLoopDetector = new TaskRunnerProcessRestartLoopDetector(
this.taskRunnerProcess,
);
this.taskRunnerProcessRestartLoopDetector.on(
'restart-loop-detected',
this.onRunnerRestartLoopDetected,
);
await this.taskRunnerProcess.start();
const { InternalTaskRunnerDisconnectAnalyzer } = await import(
'@/task-runners/internal-task-runner-disconnect-analyzer'
);
this.taskBrokerWsServer.setDisconnectAnalyzer(
Container.get(InternalTaskRunnerDisconnectAnalyzer),
);
}
private onRunnerRestartLoopDetected = async (error: TaskRunnerRestartLoopError) => {
this.logger.error(error.message);
this.errorReporter.error(error);
// Allow some time for the error to be flushed
await sleep(1000);
process.exit(1);
};
}
|