| import type { RunningJobSummary } from '@n8n/api-types'; |
| import { Logger } from '@n8n/backend-common'; |
| import { ExecutionRepository, WorkflowRepository } from '@n8n/db'; |
| import { Service } from '@n8n/di'; |
| import { WorkflowHasIssuesError, InstanceSettings, WorkflowExecute } from 'n8n-core'; |
| import type { |
| ExecutionStatus, |
| IExecuteResponsePromiseData, |
| IRun, |
| IWorkflowExecutionDataProcess, |
| } from 'n8n-workflow'; |
| import { BINARY_ENCODING, Workflow, UnexpectedError } from 'n8n-workflow'; |
| import type PCancelable from 'p-cancelable'; |
|
|
| import config from '@/config'; |
| import { getLifecycleHooksForScalingWorker } from '@/execution-lifecycle/execution-lifecycle-hooks'; |
| import { ManualExecutionService } from '@/manual-execution.service'; |
| import { NodeTypes } from '@/node-types'; |
| import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; |
|
|
| import type { |
| Job, |
| JobFinishedMessage, |
| JobId, |
| JobResult, |
| RespondToWebhookMessage, |
| RunningJob, |
| } from './scaling.types'; |
|
|
| |
| |
| |
| @Service() |
| export class JobProcessor { |
| private readonly runningJobs: Record<JobId, RunningJob> = {}; |
|
|
| constructor( |
| private readonly logger: Logger, |
| private readonly executionRepository: ExecutionRepository, |
| private readonly workflowRepository: WorkflowRepository, |
| private readonly nodeTypes: NodeTypes, |
| private readonly instanceSettings: InstanceSettings, |
| private readonly manualExecutionService: ManualExecutionService, |
| ) { |
| this.logger = this.logger.scoped('scaling'); |
| } |
|
|
| async processJob(job: Job): Promise<JobResult> { |
| const { executionId, loadStaticData } = job.data; |
|
|
| const execution = await this.executionRepository.findSingleExecution(executionId, { |
| includeData: true, |
| unflattenData: true, |
| }); |
|
|
| if (!execution) { |
| throw new UnexpectedError( |
| `Worker failed to find data for execution ${executionId} (job ${job.id})`, |
| ); |
| } |
|
|
| |
| |
| |
| |
| |
| if (execution.status === 'crashed') return { success: false }; |
|
|
| const workflowId = execution.workflowData.id; |
|
|
| this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, { |
| executionId, |
| jobId: job.id, |
| }); |
|
|
| const startedAt = await this.executionRepository.setRunning(executionId); |
|
|
| let { staticData } = execution.workflowData; |
|
|
| if (loadStaticData) { |
| const workflowData = await this.workflowRepository.findOne({ |
| select: ['id', 'staticData'], |
| where: { id: workflowId }, |
| }); |
|
|
| if (workflowData === null) { |
| throw new UnexpectedError( |
| `Worker failed to find workflow ${workflowId} to run execution ${executionId} (job ${job.id})`, |
| ); |
| } |
|
|
| staticData = workflowData.staticData; |
| } |
|
|
| const workflowSettings = execution.workflowData.settings ?? {}; |
|
|
| let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); |
|
|
| let executionTimeoutTimestamp: number | undefined; |
|
|
| if (workflowTimeout > 0) { |
| workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); |
| executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000; |
| } |
|
|
| const workflow = new Workflow({ |
| id: workflowId, |
| name: execution.workflowData.name, |
| nodes: execution.workflowData.nodes, |
| connections: execution.workflowData.connections, |
| active: execution.workflowData.active, |
| nodeTypes: this.nodeTypes, |
| staticData, |
| settings: execution.workflowData.settings, |
| }); |
|
|
| const additionalData = await WorkflowExecuteAdditionalData.getBase( |
| undefined, |
| undefined, |
| executionTimeoutTimestamp, |
| ); |
|
|
| const { pushRef } = job.data; |
|
|
| const lifecycleHooks = getLifecycleHooksForScalingWorker( |
| { |
| executionMode: execution.mode, |
| workflowData: execution.workflowData, |
| retryOf: execution.retryOf, |
| pushRef, |
| }, |
| executionId, |
| ); |
| additionalData.hooks = lifecycleHooks; |
|
|
| if (pushRef) { |
| |
| additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef }); |
| } |
|
|
| lifecycleHooks.addHandler('sendResponse', async (response): Promise<void> => { |
| const msg: RespondToWebhookMessage = { |
| kind: 'respond-to-webhook', |
| executionId, |
| response: this.encodeWebhookResponse(response), |
| workerId: this.instanceSettings.hostId, |
| }; |
|
|
| await job.progress(msg); |
| }); |
|
|
| additionalData.executionId = executionId; |
|
|
| additionalData.setExecutionStatus = (status: ExecutionStatus) => { |
| |
| this.logger.debug( |
| `Queued worker execution status for execution ${executionId} (job ${job.id}) is "${status}"`, |
| ); |
| }; |
|
|
| let workflowExecute: WorkflowExecute; |
| let workflowRun: PCancelable<IRun>; |
|
|
| const { startData, resultData, manualData } = execution.data; |
|
|
| if (execution.data?.executionData) { |
| workflowExecute = new WorkflowExecute(additionalData, execution.mode, execution.data); |
| workflowRun = workflowExecute.processRunExecutionData(workflow); |
| } else { |
| const data: IWorkflowExecutionDataProcess = { |
| executionMode: execution.mode, |
| workflowData: execution.workflowData, |
| destinationNode: startData?.destinationNode, |
| startNodes: startData?.startNodes, |
| runData: resultData.runData, |
| pinData: resultData.pinData, |
| partialExecutionVersion: manualData?.partialExecutionVersion, |
| dirtyNodeNames: manualData?.dirtyNodeNames, |
| triggerToStartFrom: manualData?.triggerToStartFrom, |
| userId: manualData?.userId, |
| }; |
|
|
| try { |
| workflowRun = this.manualExecutionService.runManually( |
| data, |
| workflow, |
| additionalData, |
| executionId, |
| resultData.pinData, |
| ); |
| } catch (error) { |
| if (error instanceof WorkflowHasIssuesError) { |
| |
|
|
| const now = new Date(); |
| const runData: IRun = { |
| mode: 'manual', |
| status: 'error', |
| finished: false, |
| startedAt: now, |
| stoppedAt: now, |
| data: { resultData: { error, runData: {} } }, |
| }; |
|
|
| await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); |
| return { success: false }; |
| } |
| throw error; |
| } |
| } |
|
|
| const runningJob: RunningJob = { |
| run: workflowRun, |
| executionId, |
| workflowId: execution.workflowId, |
| workflowName: execution.workflowData.name, |
| mode: execution.mode, |
| startedAt, |
| retryOf: execution.retryOf ?? undefined, |
| status: execution.status, |
| }; |
|
|
| this.runningJobs[job.id] = runningJob; |
|
|
| await workflowRun; |
|
|
| delete this.runningJobs[job.id]; |
|
|
| this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, { |
| executionId, |
| jobId: job.id, |
| }); |
|
|
| const msg: JobFinishedMessage = { |
| kind: 'job-finished', |
| executionId, |
| workerId: this.instanceSettings.hostId, |
| }; |
|
|
| await job.progress(msg); |
|
|
| |
| |
| |
| |
|
|
| return { success: true }; |
| } |
|
|
| stopJob(jobId: JobId) { |
| this.runningJobs[jobId]?.run.cancel(); |
| delete this.runningJobs[jobId]; |
| } |
|
|
| getRunningJobIds(): JobId[] { |
| return Object.keys(this.runningJobs); |
| } |
|
|
| getRunningJobsSummary(): RunningJobSummary[] { |
| return Object.values(this.runningJobs).map(({ run, ...summary }) => summary); |
| } |
|
|
| private encodeWebhookResponse( |
| response: IExecuteResponsePromiseData, |
| ): IExecuteResponsePromiseData { |
| if (typeof response === 'object' && Buffer.isBuffer(response.body)) { |
| response.body = { |
| '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), |
| }; |
| } |
|
|
| return response; |
| } |
| } |
|
|