File size: 4,033 Bytes
aec3094
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import { WorkflowRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import type { IWorkflowBase, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ExecutionBaseError, UnexpectedError, UserError } from 'n8n-workflow';

import { ActiveExecutions } from '@/active-executions';
import { OwnershipService } from '@/services/ownership.service';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { WorkflowRunner } from '@/workflow-runner';

import { BaseCommand } from './base-command';
import config from '../config';

export class Execute extends BaseCommand {
	static description = '\nExecutes a given workflow';

	static examples = ['$ n8n execute --id=5'];

	static flags = {
		help: Flags.help({ char: 'h' }),
		id: Flags.string({
			description: 'id of the workflow to execute',
		}),
		rawOutput: Flags.boolean({
			description: 'Outputs only JSON data, with no other text',
		}),
	};

	override needsCommunityPackages = true;

	override needsTaskRunner = true;

	async init() {
		await super.init();
		await this.initBinaryDataService();
		await this.initDataDeduplicationService();
		await this.initExternalHooks();
	}

	async run() {
		const { flags } = await this.parse(Execute);

		if (!flags.id) {
			this.logger.info('"--id" has to be set!');
			return;
		}

		if (flags.file) {
			throw new UserError(
				'The --file flag is no longer supported. Please first import the workflow and then execute it using the --id flag.',
				{ level: 'warning' },
			);
		}

		let workflowId: string | undefined;
		let workflowData: IWorkflowBase | null = null;

		if (flags.id) {
			// Id of workflow is given
			workflowId = flags.id;
			workflowData = await Container.get(WorkflowRepository).findOneBy({ id: workflowId });
			if (workflowData === null) {
				this.logger.info(`The workflow with the id "${workflowId}" does not exist.`);
				process.exit(1);
			}
		}

		if (!workflowData) {
			throw new UnexpectedError('Failed to retrieve workflow data for requested workflow');
		}

		if (!isWorkflowIdValid(workflowId)) {
			workflowId = undefined;
		}

		const startingNode = findCliWorkflowStart(workflowData.nodes);

		const user = await Container.get(OwnershipService).getInstanceOwner();
		const runData: IWorkflowExecutionDataProcess = {
			executionMode: 'cli',
			startNodes: [{ name: startingNode.name, sourceData: null }],
			workflowData,
			userId: user.id,
		};

		const workflowRunner = Container.get(WorkflowRunner);

		if (config.getEnv('executions.mode') === 'queue') {
			this.logger.warn(
				'CLI command `execute` does not support queue mode. Falling back to regular mode.',
			);
			workflowRunner.setExecutionMode('regular');
		}

		const executionId = await workflowRunner.run(runData);

		const activeExecutions = Container.get(ActiveExecutions);
		const data = await activeExecutions.getPostExecutePromise(executionId);

		if (data === undefined) {
			throw new UnexpectedError('Workflow did not return any data');
		}

		if (data.data.resultData.error) {
			this.logger.info('Execution was NOT successful. See log message for details.');
			this.logger.info('Execution error:');
			this.logger.info('====================================');
			this.logger.info(JSON.stringify(data, null, 2));

			const { error } = data.data.resultData;
			// eslint-disable-next-line @typescript-eslint/no-throw-literal
			throw {
				...error,
				stack: error.stack,
			};
		}
		if (flags.rawOutput === undefined) {
			this.log('Execution was successful:');
			this.log('====================================');
		}
		this.log(JSON.stringify(data, null, 2));
	}

	async catch(error: Error) {
		this.logger.error('Error executing workflow. See log messages for details.');
		this.logger.error('\nExecution error:');
		this.logger.info('====================================');
		this.logger.error(error.message);
		if (error instanceof ExecutionBaseError) this.logger.error(error.description!);
		this.logger.error(error.stack!);
	}
}