n8cn / packages /cli /src /executions /__tests__ /execution-recovery.service.test.ts
gallyga's picture
Add n8n Chinese version
aec3094
import { ExecutionRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import { stringify } from 'flatted';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { randomInt } from 'n8n-workflow';
import assert from 'node:assert';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import type { EventMessageTypes as EventMessage } from '@/eventbus/event-message-classes';
import { EventMessageNode } from '@/eventbus/event-message-classes/event-message-node';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { Push } from '@/push';
import { mockInstance } from '@test/mocking';
import { createExecution } from '@test-integration/db/executions';
import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db';
import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants';
import { setupMessages } from './utils';
describe('ExecutionRecoveryService', () => {
const push = mockInstance(Push);
const instanceSettings = Container.get(InstanceSettings);
let executionRecoveryService: ExecutionRecoveryService;
let executionRepository: ExecutionRepository;
beforeAll(async () => {
await testDb.init();
executionRepository = Container.get(ExecutionRepository);
executionRecoveryService = new ExecutionRecoveryService(
mock(),
instanceSettings,
push,
executionRepository,
);
});
beforeEach(() => {
instanceSettings.markAsLeader();
});
afterEach(async () => {
jest.restoreAllMocks();
await testDb.truncate(['ExecutionEntity', 'ExecutionData', 'WorkflowEntity']);
});
afterAll(async () => {
await testDb.terminate();
});
describe('recoverFromLogs', () => {
describe('if follower', () => {
test('should do nothing', async () => {
/**
* Arrange
*/
instanceSettings.markAsFollower();
// @ts-expect-error Private method
const amendSpy = jest.spyOn(executionRecoveryService, 'amend');
const messages = setupMessages('123', 'Some workflow');
/**
* Act
*/
await executionRecoveryService.recoverFromLogs('123', messages);
/**
* Assert
*/
expect(amendSpy).not.toHaveBeenCalled();
});
});
describe('if leader, with 0 messages', () => {
test('should return `null` if no execution found', async () => {
/**
* Arrange
*/
const inexistentExecutionId = randomInt(100).toString();
const noMessages: EventMessage[] = [];
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
inexistentExecutionId,
noMessages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('should update `status` and `stoppedAt`', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(execution.id, []);
/**
* Assert
*/
if (!amendedExecution) fail('Expected `amendedExecution` to exist');
expect(amendedExecution.status).toBe('crashed');
expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt);
});
});
describe('if leader, with 1+ messages', () => {
test('for successful dataful execution, should return `null`', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow();
const execution = await createExecution(
{ status: 'success', data: stringify({ runData: { foo: 'bar' } }) },
workflow,
);
const messages = setupMessages(execution.id, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('for errored dataful execution, should return `null`', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow();
const execution = await createExecution(
{ status: 'error', data: stringify({ runData: { foo: 'bar' } }) },
workflow,
);
const messages = setupMessages(execution.id, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('should return `null` if no execution found', async () => {
/**
* Arrange
*/
const inexistentExecutionId = randomInt(100).toString();
const messages = setupMessages(inexistentExecutionId, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
inexistentExecutionId,
messages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('for successful dataless execution, should update `status`, `stoppedAt` and `data`', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow();
const execution = await createExecution(
{
status: 'success',
data: stringify(undefined), // saved execution but likely crashed while saving high-volume data
},
workflow,
);
const messages = setupMessages(execution.id, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
assert(amendedExecution);
expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt);
expect(amendedExecution.data).toEqual({ resultData: { runData: {} } });
expect(amendedExecution.status).toBe('crashed');
});
test('for running execution, should update `status`, `stoppedAt` and `data` if last node did not finish', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
const startOfLastNodeRun = messages
.find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: startOfLastNodeRun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeInstanceOf(WorkflowCrashedError);
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Execute workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
if (!manualTriggerTaskData) fail("Expected manual trigger's `taskData` to be defined");
if (!debugHelperTaskData) fail("Expected debug helper's `taskData` to be defined");
const originalManualTriggerTaskData =
IN_PROGRESS_EXECUTION_DATA.resultData.runData['When clicking "Execute workflow"'].at(
0,
)?.data;
expect(manualTriggerTaskData.executionStatus).toBe('success');
expect(manualTriggerTaskData.error).toBeUndefined();
expect(manualTriggerTaskData.data).toStrictEqual(originalManualTriggerTaskData); // unchanged
expect(debugHelperTaskData.executionStatus).toBe('crashed');
expect(debugHelperTaskData.error).toBeInstanceOf(NodeCrashedError);
});
test('should update `status`, `stoppedAt` and `data` if last node finished', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
messages.push(
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId: execution.id,
workflowName: workflow.name,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
nodeId: '123',
},
}),
);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
const endOfLastNoderun = messages
.find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: endOfLastNoderun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeUndefined();
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Execute workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.executionStatus).toBe('success');
expect(debugHelperTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA);
});
});
});
});