| | import { Container } from '@n8n/di'; |
| |
|
| | import { TaskBrokerWsServer } from '@/task-runners/task-broker/task-broker-ws-server'; |
| | import { TaskBroker } from '@/task-runners/task-broker/task-broker.service'; |
| | import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; |
| | import { TaskRunnerProcessRestartLoopDetector } from '@/task-runners/task-runner-process-restart-loop-detector'; |
| | import { retryUntil } from '@test-integration/retry-until'; |
| | import { setupBrokerTestServer } from '@test-integration/utils/task-broker-test-server'; |
| |
|
| | describe('TaskRunnerProcess', () => { |
| | const { config, server: taskRunnerServer } = setupBrokerTestServer({ |
| | mode: 'internal', |
| | }); |
| | const runnerProcess = Container.get(TaskRunnerProcess); |
| | const taskBroker = Container.get(TaskBroker); |
| | const taskRunnerService = Container.get(TaskBrokerWsServer); |
| |
|
| | beforeAll(async () => { |
| | await taskRunnerServer.start(); |
| | |
| | config.port = taskRunnerServer.port; |
| | }); |
| |
|
| | afterAll(async () => { |
| | await taskRunnerServer.stop(); |
| | }); |
| |
|
| | afterEach(async () => { |
| | await runnerProcess.stop(); |
| | }); |
| |
|
| | const getNumConnectedRunners = () => taskRunnerService.runnerConnections.size; |
| | const getNumRegisteredRunners = () => taskBroker.getKnownRunners().size; |
| |
|
| | it('should start and connect the task runner', async () => { |
| | |
| | await runnerProcess.start(); |
| |
|
| | |
| | expect(runnerProcess.isRunning).toBeTruthy(); |
| |
|
| | |
| | await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); |
| | expect(getNumRegisteredRunners()).toBe(1); |
| | }); |
| |
|
| | it('should stop an disconnect the task runner', async () => { |
| | |
| | await runnerProcess.start(); |
| |
|
| | |
| | await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); |
| | expect(getNumRegisteredRunners()).toBe(1); |
| |
|
| | |
| | await runnerProcess.stop(); |
| |
|
| | |
| | |
| | await retryUntil(() => expect(getNumConnectedRunners()).toBe(0)); |
| |
|
| | expect(runnerProcess.isRunning).toBeFalsy(); |
| | expect(getNumRegisteredRunners()).toBe(0); |
| | }); |
| |
|
| | it('should restart the task runner if it exits', async () => { |
| | |
| | await runnerProcess.start(); |
| |
|
| | |
| | await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); |
| | const processId = runnerProcess.pid; |
| |
|
| | |
| | |
| | runnerProcess.process?.kill('SIGKILL'); |
| |
|
| | |
| | await runnerProcess.runPromise; |
| |
|
| | |
| | |
| | await retryUntil(() => expect(getNumConnectedRunners()).toBe(1)); |
| | expect(getNumConnectedRunners()).toBe(1); |
| | expect(getNumRegisteredRunners()).toBe(1); |
| | expect(runnerProcess.pid).not.toBe(processId); |
| | }); |
| |
|
| | it('should work together with restart loop detector', async () => { |
| | |
| | const restartLoopDetector = new TaskRunnerProcessRestartLoopDetector(runnerProcess); |
| | let restartLoopDetectedEventEmitted = false; |
| | restartLoopDetector.once('restart-loop-detected', () => { |
| | restartLoopDetectedEventEmitted = true; |
| | }); |
| |
|
| | |
| | await runnerProcess.start(); |
| |
|
| | |
| | for (let i = 0; i < 5; i++) { |
| | await retryUntil(() => { |
| | expect(runnerProcess.pid).toBeDefined(); |
| | }); |
| |
|
| | |
| | runnerProcess.process?.kill(); |
| |
|
| | await new Promise((resolve) => { |
| | runnerProcess.once('exit', resolve); |
| | }); |
| | } |
| |
|
| | |
| | expect(restartLoopDetectedEventEmitted).toBe(true); |
| | }); |
| | }); |
| |
|