| | import { Logger } from '@n8n/backend-common'; |
| | import type { CreateExecutionPayload, IExecutionDb } from '@n8n/db'; |
| | import { ExecutionRepository } from '@n8n/db'; |
| | import { Service } from '@n8n/di'; |
| | import type { |
| | IDeferredPromise, |
| | IExecuteResponsePromiseData, |
| | IRun, |
| | ExecutionStatus, |
| | IWorkflowExecutionDataProcess, |
| | } from 'n8n-workflow'; |
| | import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow'; |
| | import { strict as assert } from 'node:assert'; |
| | import type PCancelable from 'p-cancelable'; |
| |
|
| | import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; |
| | import type { IExecutingWorkflowData, IExecutionsCurrentSummary } from '@/interfaces'; |
| | import { isWorkflowIdValid } from '@/utils'; |
| |
|
| | import { ConcurrencyControlService } from './concurrency/concurrency-control.service'; |
| | import config from './config'; |
| |
|
| | @Service() |
| | export class ActiveExecutions { |
| | |
| | |
| | |
| | private activeExecutions: { |
| | [executionId: string]: IExecutingWorkflowData; |
| | } = {}; |
| |
|
| | constructor( |
| | private readonly logger: Logger, |
| | private readonly executionRepository: ExecutionRepository, |
| | private readonly concurrencyControl: ConcurrencyControlService, |
| | ) {} |
| |
|
| | has(executionId: string) { |
| | return this.activeExecutions[executionId] !== undefined; |
| | } |
| |
|
| | |
| | |
| | |
| | async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> { |
| | let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; |
| | const mode = executionData.executionMode; |
| | if (executionId === undefined) { |
| | |
| |
|
| | const fullExecutionData: CreateExecutionPayload = { |
| | data: executionData.executionData!, |
| | mode, |
| | finished: false, |
| | workflowData: executionData.workflowData, |
| | status: executionStatus, |
| | workflowId: executionData.workflowData.id, |
| | }; |
| |
|
| | fullExecutionData.retryOf = executionData.retryOf ?? undefined; |
| |
|
| | const workflowId = executionData.workflowData.id; |
| | if (workflowId !== undefined && isWorkflowIdValid(workflowId)) { |
| | fullExecutionData.workflowId = workflowId; |
| | } |
| |
|
| | executionId = await this.executionRepository.createNewExecution(fullExecutionData); |
| | assert(executionId); |
| |
|
| | if (config.getEnv('executions.mode') === 'regular') { |
| | await this.concurrencyControl.throttle({ mode, executionId }); |
| | await this.executionRepository.setRunning(executionId); |
| | } |
| | executionStatus = 'running'; |
| | } else { |
| | |
| |
|
| | await this.concurrencyControl.throttle({ mode, executionId }); |
| |
|
| | const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = { |
| | id: executionId, |
| | data: executionData.executionData!, |
| | waitTill: null, |
| | status: executionStatus, |
| | |
| | }; |
| |
|
| | await this.executionRepository.updateExistingExecution(executionId, execution); |
| | } |
| |
|
| | const resumingExecution = this.activeExecutions[executionId]; |
| | const postExecutePromise = createDeferredPromise<IRun | undefined>(); |
| |
|
| | const execution: IExecutingWorkflowData = { |
| | executionData, |
| | startedAt: resumingExecution?.startedAt ?? new Date(), |
| | postExecutePromise, |
| | status: executionStatus, |
| | responsePromise: resumingExecution?.responsePromise, |
| | }; |
| | this.activeExecutions[executionId] = execution; |
| |
|
| | |
| | void postExecutePromise.promise |
| | .catch((error) => { |
| | if (error instanceof ExecutionCancelledError) return; |
| | throw error; |
| | }) |
| | .finally(() => { |
| | this.concurrencyControl.release({ mode: executionData.executionMode }); |
| | if (execution.status === 'waiting') { |
| | |
| | delete execution.workflowExecution; |
| | } else { |
| | delete this.activeExecutions[executionId]; |
| | this.logger.debug('Execution removed', { executionId }); |
| | } |
| | }); |
| |
|
| | this.logger.debug('Execution added', { executionId }); |
| |
|
| | return executionId; |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) { |
| | this.getExecutionOrFail(executionId).workflowExecution = workflowExecution; |
| | } |
| |
|
| | attachResponsePromise( |
| | executionId: string, |
| | responsePromise: IDeferredPromise<IExecuteResponsePromiseData>, |
| | ): void { |
| | this.getExecutionOrFail(executionId).responsePromise = responsePromise; |
| | } |
| |
|
| | resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { |
| | const execution = this.activeExecutions[executionId]; |
| | execution?.responsePromise?.resolve(response); |
| | } |
| |
|
| | |
| | stopExecution(executionId: string): void { |
| | const execution = this.activeExecutions[executionId]; |
| | if (execution === undefined) { |
| | |
| | return; |
| | } |
| | const error = new ExecutionCancelledError(executionId); |
| | execution.responsePromise?.reject(error); |
| | if (execution.status === 'waiting') { |
| | |
| | |
| | delete this.activeExecutions[executionId]; |
| | } else { |
| | execution.workflowExecution?.cancel(); |
| | execution.postExecutePromise.reject(error); |
| | } |
| | this.logger.debug('Execution cancelled', { executionId }); |
| | } |
| |
|
| | |
| | finalizeExecution(executionId: string, fullRunData?: IRun) { |
| | if (!this.has(executionId)) return; |
| | const execution = this.getExecutionOrFail(executionId); |
| | execution.postExecutePromise.resolve(fullRunData); |
| | this.logger.debug('Execution finalized', { executionId }); |
| | } |
| |
|
| | |
| | resolveExecutionResponsePromise(executionId: string) { |
| | |
| | |
| | |
| | |
| |
|
| | if (!this.has(executionId)) return; |
| | const execution = this.getExecutionOrFail(executionId); |
| |
|
| | if (execution.status !== 'waiting' && execution?.responsePromise) { |
| | execution.responsePromise.resolve({}); |
| | this.logger.debug('Execution response promise cleaned', { executionId }); |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | async getPostExecutePromise(executionId: string): Promise<IRun | undefined> { |
| | return await this.getExecutionOrFail(executionId).postExecutePromise.promise; |
| | } |
| |
|
| | |
| | |
| | |
| | getActiveExecutions(): IExecutionsCurrentSummary[] { |
| | const returnData: IExecutionsCurrentSummary[] = []; |
| |
|
| | let data; |
| |
|
| | for (const id of Object.keys(this.activeExecutions)) { |
| | data = this.activeExecutions[id]; |
| | returnData.push({ |
| | id, |
| | retryOf: data.executionData.retryOf ?? undefined, |
| | startedAt: data.startedAt, |
| | mode: data.executionData.executionMode, |
| | workflowId: data.executionData.workflowData.id, |
| | status: data.status, |
| | }); |
| | } |
| |
|
| | return returnData; |
| | } |
| |
|
| | setStatus(executionId: string, status: ExecutionStatus) { |
| | this.getExecutionOrFail(executionId).status = status; |
| | } |
| |
|
| | getStatus(executionId: string): ExecutionStatus { |
| | return this.getExecutionOrFail(executionId).status; |
| | } |
| |
|
| | |
| | async shutdown(cancelAll = false) { |
| | const isRegularMode = config.getEnv('executions.mode') === 'regular'; |
| | if (isRegularMode) { |
| | |
| | |
| | this.concurrencyControl.disable(); |
| | } |
| |
|
| | let executionIds = Object.keys(this.activeExecutions); |
| | const toCancel: string[] = []; |
| | for (const executionId of executionIds) { |
| | const { responsePromise, status } = this.activeExecutions[executionId]; |
| | if (!!responsePromise || (isRegularMode && cancelAll)) { |
| | |
| | this.stopExecution(executionId); |
| | toCancel.push(executionId); |
| | } else if (status === 'waiting' || status === 'new') { |
| | |
| | delete this.activeExecutions[executionId]; |
| | } |
| | } |
| |
|
| | await this.concurrencyControl.removeAll(toCancel); |
| |
|
| | let count = 0; |
| | executionIds = Object.keys(this.activeExecutions); |
| | while (executionIds.length !== 0) { |
| | if (count++ % 4 === 0) { |
| | this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`); |
| | } |
| |
|
| | await sleep(500); |
| | executionIds = Object.keys(this.activeExecutions); |
| | } |
| | } |
| |
|
| | getExecutionOrFail(executionId: string): IExecutingWorkflowData { |
| | const execution = this.activeExecutions[executionId]; |
| | if (!execution) { |
| | throw new ExecutionNotFoundError(executionId); |
| | } |
| | return execution; |
| | } |
| | } |
| |
|