| | import { Logger } from '@n8n/backend-common'; |
| | import type { IExecutionResponse } from '@n8n/db'; |
| | import { ExecutionRepository } from '@n8n/db'; |
| | import { Service } from '@n8n/di'; |
| | import type { DateTime } from 'luxon'; |
| | import { InstanceSettings } from 'n8n-core'; |
| | import { sleep } from 'n8n-workflow'; |
| | import type { IRun, ITaskData } from 'n8n-workflow'; |
| |
|
| | import { ARTIFICIAL_TASK_DATA } from '@/constants'; |
| | import { NodeCrashedError } from '@/errors/node-crashed.error'; |
| | import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; |
| | import { getLifecycleHooksForRegularMain } from '@/execution-lifecycle/execution-lifecycle-hooks'; |
| | import { Push } from '@/push'; |
| |
|
| | import type { EventMessageTypes } from '../eventbus/event-message-classes'; |
| |
|
| | |
| | |
| | |
| | @Service() |
| | export class ExecutionRecoveryService { |
| | constructor( |
| | private readonly logger: Logger, |
| | private readonly instanceSettings: InstanceSettings, |
| | private readonly push: Push, |
| | private readonly executionRepository: ExecutionRepository, |
| | ) {} |
| |
|
| | |
| | |
| | |
| | async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { |
| | if (this.instanceSettings.isFollower) return; |
| |
|
| | const amendedExecution = await this.amend(executionId, messages); |
| |
|
| | if (!amendedExecution) return null; |
| |
|
| | this.logger.info('[Recovery] Logs available, amended execution', { |
| | executionId: amendedExecution.id, |
| | }); |
| |
|
| | await this.executionRepository.updateExistingExecution(executionId, amendedExecution); |
| |
|
| | await this.runHooks(amendedExecution); |
| |
|
| | this.push.once('editorUiConnected', async () => { |
| | await sleep(1000); |
| | this.push.broadcast({ type: 'executionRecovered', data: { executionId } }); |
| | }); |
| |
|
| | return amendedExecution; |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | private async amend(executionId: string, messages: EventMessageTypes[]) { |
| | if (messages.length === 0) return await this.amendWithoutLogs(executionId); |
| |
|
| | const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); |
| |
|
| | if (nodeMessages.length === 0) return null; |
| |
|
| | const execution = await this.executionRepository.findSingleExecution(executionId, { |
| | includeData: true, |
| | unflattenData: true, |
| | }); |
| |
|
| | |
| | |
| | |
| | |
| | |
| | if (!execution || (['success', 'error'].includes(execution.status) && execution.data)) { |
| | return null; |
| | } |
| |
|
| | const runExecutionData = execution.data ?? { resultData: { runData: {} } }; |
| |
|
| | let lastNodeRunTimestamp: DateTime | undefined; |
| |
|
| | for (const node of execution.workflowData.nodes) { |
| | const nodeStartedMessage = nodeMessages.find( |
| | (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.started', |
| | ); |
| |
|
| | if (!nodeStartedMessage) continue; |
| |
|
| | const nodeHasRunData = runExecutionData.resultData.runData[node.name] !== undefined; |
| |
|
| | if (nodeHasRunData) continue; |
| |
|
| | const nodeFinishedMessage = nodeMessages.find( |
| | (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished', |
| | ); |
| |
|
| | const taskData: ITaskData = { |
| | startTime: nodeStartedMessage.ts.toUnixInteger(), |
| | executionIndex: 0, |
| | executionTime: -1, |
| | source: [null], |
| | }; |
| |
|
| | if (nodeFinishedMessage) { |
| | taskData.executionStatus = 'success'; |
| | taskData.data ??= ARTIFICIAL_TASK_DATA; |
| | taskData.executionTime = nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis(); |
| | lastNodeRunTimestamp = nodeFinishedMessage.ts; |
| | } else { |
| | taskData.executionStatus = 'crashed'; |
| | taskData.error = new NodeCrashedError(node); |
| | taskData.executionTime = 0; |
| | runExecutionData.resultData.error = new WorkflowCrashedError(); |
| | lastNodeRunTimestamp = nodeStartedMessage.ts; |
| | } |
| |
|
| | runExecutionData.resultData.lastNodeExecuted = node.name; |
| | runExecutionData.resultData.runData[node.name] = [taskData]; |
| | } |
| |
|
| | return { |
| | ...execution, |
| | status: execution.status === 'error' ? 'error' : 'crashed', |
| | stoppedAt: this.toStoppedAt(lastNodeRunTimestamp, workflowMessages), |
| | data: runExecutionData, |
| | } as IExecutionResponse; |
| | } |
| |
|
| | private async amendWithoutLogs(executionId: string) { |
| | const exists = await this.executionRepository.exists({ where: { id: executionId } }); |
| |
|
| | if (!exists) return null; |
| |
|
| | await this.executionRepository.markAsCrashed(executionId); |
| |
|
| | const execution = await this.executionRepository.findSingleExecution(executionId, { |
| | includeData: true, |
| | unflattenData: true, |
| | }); |
| |
|
| | return execution ?? null; |
| | } |
| |
|
| | private toRelevantMessages(messages: EventMessageTypes[]) { |
| | return messages.reduce<{ |
| | nodeMessages: EventMessageTypes[]; |
| | workflowMessages: EventMessageTypes[]; |
| | }>( |
| | (acc, cur) => { |
| | if (cur.eventName.startsWith('n8n.node.')) { |
| | acc.nodeMessages.push(cur); |
| | } else if (cur.eventName.startsWith('n8n.workflow.')) { |
| | acc.workflowMessages.push(cur); |
| | } |
| |
|
| | return acc; |
| | }, |
| | { nodeMessages: [], workflowMessages: [] }, |
| | ); |
| | } |
| |
|
| | private toStoppedAt(timestamp: DateTime | undefined, messages: EventMessageTypes[]) { |
| | if (timestamp) return timestamp.toJSDate(); |
| |
|
| | const WORKFLOW_END_EVENTS = new Set([ |
| | 'n8n.workflow.success', |
| | 'n8n.workflow.crashed', |
| | 'n8n.workflow.failed', |
| | ]); |
| |
|
| | return ( |
| | messages.find((m) => WORKFLOW_END_EVENTS.has(m.eventName)) ?? |
| | messages.find((m) => m.eventName === 'n8n.workflow.started') |
| | )?.ts.toJSDate(); |
| | } |
| |
|
| | private async runHooks(execution: IExecutionResponse) { |
| | execution.data ??= { resultData: { runData: {} } }; |
| |
|
| | const lifecycleHooks = getLifecycleHooksForRegularMain( |
| | { |
| | userId: '', |
| | workflowData: execution.workflowData, |
| | executionMode: execution.mode, |
| | executionData: execution.data, |
| | runData: execution.data.resultData.runData, |
| | retryOf: execution.retryOf ?? undefined, |
| | }, |
| | execution.id, |
| | ); |
| |
|
| | const run: IRun = { |
| | data: execution.data, |
| | finished: false, |
| | mode: execution.mode, |
| | waitTill: execution.waitTill ?? undefined, |
| | startedAt: execution.startedAt, |
| | stoppedAt: execution.stoppedAt, |
| | status: execution.status, |
| | }; |
| |
|
| | await lifecycleHooks.runHook('workflowExecuteAfter', [run]); |
| | } |
| | } |
| |
|