| | import { ExecutionRepository } from '@n8n/db'; |
| | import { Container } from '@n8n/di'; |
| | import { stringify } from 'flatted'; |
| | import { mock } from 'jest-mock-extended'; |
| | import { InstanceSettings } from 'n8n-core'; |
| | import { randomInt } from 'n8n-workflow'; |
| | import assert from 'node:assert'; |
| |
|
| | import { ARTIFICIAL_TASK_DATA } from '@/constants'; |
| | import { NodeCrashedError } from '@/errors/node-crashed.error'; |
| | import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; |
| | import type { EventMessageTypes as EventMessage } from '@/eventbus/event-message-classes'; |
| | import { EventMessageNode } from '@/eventbus/event-message-classes/event-message-node'; |
| | import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; |
| | import { Push } from '@/push'; |
| | import { mockInstance } from '@test/mocking'; |
| | import { createExecution } from '@test-integration/db/executions'; |
| | import { createWorkflow } from '@test-integration/db/workflows'; |
| | import * as testDb from '@test-integration/test-db'; |
| |
|
| | import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants'; |
| | import { setupMessages } from './utils'; |
| |
|
| | describe('ExecutionRecoveryService', () => { |
| | const push = mockInstance(Push); |
| | const instanceSettings = Container.get(InstanceSettings); |
| |
|
| | let executionRecoveryService: ExecutionRecoveryService; |
| | let executionRepository: ExecutionRepository; |
| |
|
| | beforeAll(async () => { |
| | await testDb.init(); |
| | executionRepository = Container.get(ExecutionRepository); |
| |
|
| | executionRecoveryService = new ExecutionRecoveryService( |
| | mock(), |
| | instanceSettings, |
| | push, |
| | executionRepository, |
| | ); |
| | }); |
| |
|
| | beforeEach(() => { |
| | instanceSettings.markAsLeader(); |
| | }); |
| |
|
| | afterEach(async () => { |
| | jest.restoreAllMocks(); |
| | await testDb.truncate(['ExecutionEntity', 'ExecutionData', 'WorkflowEntity']); |
| | }); |
| |
|
| | afterAll(async () => { |
| | await testDb.terminate(); |
| | }); |
| |
|
| | describe('recoverFromLogs', () => { |
| | describe('if follower', () => { |
| | test('should do nothing', async () => { |
| | |
| | |
| | |
| | instanceSettings.markAsFollower(); |
| | |
| | const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); |
| | const messages = setupMessages('123', 'Some workflow'); |
| |
|
| | |
| | |
| | |
| | await executionRecoveryService.recoverFromLogs('123', messages); |
| |
|
| | |
| | |
| | |
| | expect(amendSpy).not.toHaveBeenCalled(); |
| | }); |
| | }); |
| |
|
| | describe('if leader, with 0 messages', () => { |
| | test('should return `null` if no execution found', async () => { |
| | |
| | |
| | |
| | const inexistentExecutionId = randomInt(100).toString(); |
| | const noMessages: EventMessage[] = []; |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | inexistentExecutionId, |
| | noMessages, |
| | ); |
| |
|
| | |
| | |
| | |
| | expect(amendedExecution).toBeNull(); |
| | }); |
| |
|
| | test('should update `status` and `stoppedAt`', async () => { |
| | |
| | |
| | |
| | const workflow = await createWorkflow(OOM_WORKFLOW); |
| | const execution = await createExecution( |
| | { |
| | status: 'running', |
| | data: stringify(IN_PROGRESS_EXECUTION_DATA), |
| | }, |
| | workflow, |
| | ); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs(execution.id, []); |
| |
|
| | |
| | |
| | |
| | if (!amendedExecution) fail('Expected `amendedExecution` to exist'); |
| |
|
| | expect(amendedExecution.status).toBe('crashed'); |
| | expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt); |
| | }); |
| | }); |
| |
|
| | describe('if leader, with 1+ messages', () => { |
| | test('for successful dataful execution, should return `null`', async () => { |
| | |
| | |
| | |
| | const workflow = await createWorkflow(); |
| | const execution = await createExecution( |
| | { status: 'success', data: stringify({ runData: { foo: 'bar' } }) }, |
| | workflow, |
| | ); |
| | const messages = setupMessages(execution.id, 'Some workflow'); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | execution.id, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| | expect(amendedExecution).toBeNull(); |
| | }); |
| |
|
| | test('for errored dataful execution, should return `null`', async () => { |
| | |
| | |
| | |
| | const workflow = await createWorkflow(); |
| | const execution = await createExecution( |
| | { status: 'error', data: stringify({ runData: { foo: 'bar' } }) }, |
| | workflow, |
| | ); |
| | const messages = setupMessages(execution.id, 'Some workflow'); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | execution.id, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| | expect(amendedExecution).toBeNull(); |
| | }); |
| |
|
| | test('should return `null` if no execution found', async () => { |
| | |
| | |
| | |
| | const inexistentExecutionId = randomInt(100).toString(); |
| | const messages = setupMessages(inexistentExecutionId, 'Some workflow'); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | inexistentExecutionId, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| | expect(amendedExecution).toBeNull(); |
| | }); |
| |
|
| | test('for successful dataless execution, should update `status`, `stoppedAt` and `data`', async () => { |
| | |
| | |
| | |
| | const workflow = await createWorkflow(); |
| | const execution = await createExecution( |
| | { |
| | status: 'success', |
| | data: stringify(undefined), |
| | }, |
| | workflow, |
| | ); |
| | const messages = setupMessages(execution.id, 'Some workflow'); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | execution.id, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| | assert(amendedExecution); |
| | expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt); |
| | expect(amendedExecution.data).toEqual({ resultData: { runData: {} } }); |
| | expect(amendedExecution.status).toBe('crashed'); |
| | }); |
| |
|
| | test('for running execution, should update `status`, `stoppedAt` and `data` if last node did not finish', async () => { |
| | |
| | |
| | |
| |
|
| | const workflow = await createWorkflow(OOM_WORKFLOW); |
| |
|
| | const execution = await createExecution( |
| | { |
| | status: 'running', |
| | data: stringify(IN_PROGRESS_EXECUTION_DATA), |
| | }, |
| | workflow, |
| | ); |
| |
|
| | const messages = setupMessages(execution.id, workflow.name); |
| |
|
| | |
| | |
| | |
| |
|
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | execution.id, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| |
|
| | const startOfLastNodeRun = messages |
| | .find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper') |
| | ?.ts.toJSDate(); |
| |
|
| | expect(amendedExecution).toEqual( |
| | expect.objectContaining({ |
| | status: 'crashed', |
| | stoppedAt: startOfLastNodeRun, |
| | }), |
| | ); |
| |
|
| | const resultData = amendedExecution?.data.resultData; |
| |
|
| | if (!resultData) fail('Expected `resultData` to be defined'); |
| |
|
| | expect(resultData.error).toBeInstanceOf(WorkflowCrashedError); |
| | expect(resultData.lastNodeExecuted).toBe('DebugHelper'); |
| |
|
| | const runData = resultData.runData; |
| |
|
| | if (!runData) fail('Expected `runData` to be defined'); |
| |
|
| | const manualTriggerTaskData = runData['When clicking "Execute workflow"'].at(0); |
| | const debugHelperTaskData = runData.DebugHelper.at(0); |
| |
|
| | if (!manualTriggerTaskData) fail("Expected manual trigger's `taskData` to be defined"); |
| | if (!debugHelperTaskData) fail("Expected debug helper's `taskData` to be defined"); |
| |
|
| | const originalManualTriggerTaskData = |
| | IN_PROGRESS_EXECUTION_DATA.resultData.runData['When clicking "Execute workflow"'].at( |
| | 0, |
| | )?.data; |
| |
|
| | expect(manualTriggerTaskData.executionStatus).toBe('success'); |
| | expect(manualTriggerTaskData.error).toBeUndefined(); |
| | expect(manualTriggerTaskData.data).toStrictEqual(originalManualTriggerTaskData); |
| |
|
| | expect(debugHelperTaskData.executionStatus).toBe('crashed'); |
| | expect(debugHelperTaskData.error).toBeInstanceOf(NodeCrashedError); |
| | }); |
| |
|
| | test('should update `status`, `stoppedAt` and `data` if last node finished', async () => { |
| | |
| | |
| | |
| | const workflow = await createWorkflow(OOM_WORKFLOW); |
| |
|
| | const execution = await createExecution( |
| | { |
| | status: 'running', |
| | data: stringify(IN_PROGRESS_EXECUTION_DATA), |
| | }, |
| | workflow, |
| | ); |
| |
|
| | const messages = setupMessages(execution.id, workflow.name); |
| | messages.push( |
| | new EventMessageNode({ |
| | eventName: 'n8n.node.finished', |
| | payload: { |
| | executionId: execution.id, |
| | workflowName: workflow.name, |
| | nodeName: 'DebugHelper', |
| | nodeType: 'n8n-nodes-base.debugHelper', |
| | nodeId: '123', |
| | }, |
| | }), |
| | ); |
| |
|
| | |
| | |
| | |
| | const amendedExecution = await executionRecoveryService.recoverFromLogs( |
| | execution.id, |
| | messages, |
| | ); |
| |
|
| | |
| | |
| | |
| | const endOfLastNoderun = messages |
| | .find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper') |
| | ?.ts.toJSDate(); |
| |
|
| | expect(amendedExecution).toEqual( |
| | expect.objectContaining({ |
| | status: 'crashed', |
| | stoppedAt: endOfLastNoderun, |
| | }), |
| | ); |
| |
|
| | const resultData = amendedExecution?.data.resultData; |
| |
|
| | if (!resultData) fail('Expected `resultData` to be defined'); |
| |
|
| | expect(resultData.error).toBeUndefined(); |
| | expect(resultData.lastNodeExecuted).toBe('DebugHelper'); |
| |
|
| | const runData = resultData.runData; |
| |
|
| | if (!runData) fail('Expected `runData` to be defined'); |
| |
|
| | const manualTriggerTaskData = runData['When clicking "Execute workflow"'].at(0); |
| | const debugHelperTaskData = runData.DebugHelper.at(0); |
| |
|
| | expect(manualTriggerTaskData?.executionStatus).toBe('success'); |
| | expect(manualTriggerTaskData?.error).toBeUndefined(); |
| |
|
| | expect(debugHelperTaskData?.executionStatus).toBe('success'); |
| | expect(debugHelperTaskData?.error).toBeUndefined(); |
| | expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA); |
| | }); |
| | }); |
| | }); |
| | }); |
| |
|