| | import { Container } from '@n8n/di'; |
| | import { Flags } from '@oclif/core'; |
| |
|
| | import { ActiveExecutions } from '@/active-executions'; |
| | import config from '@/config'; |
| | import { Publisher } from '@/scaling/pubsub/publisher.service'; |
| | import { PubSubRegistry } from '@/scaling/pubsub/pubsub.registry'; |
| | import { Subscriber } from '@/scaling/pubsub/subscriber.service'; |
| | import { WebhookServer } from '@/webhooks/webhook-server'; |
| |
|
| | import { BaseCommand } from './base-command'; |
| |
|
| | export class Webhook extends BaseCommand { |
| | static description = 'Starts n8n webhook process. Intercepts only production URLs.'; |
| |
|
| | static examples = ['$ n8n webhook']; |
| |
|
| | static flags = { |
| | help: Flags.help({ char: 'h' }), |
| | }; |
| |
|
| | protected server = Container.get(WebhookServer); |
| |
|
| | override needsCommunityPackages = true; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | async stopProcess() { |
| | this.logger.info('\nStopping n8n...'); |
| |
|
| | try { |
| | await this.externalHooks?.run('n8n.stop'); |
| |
|
| | await Container.get(ActiveExecutions).shutdown(); |
| | } catch (error) { |
| | await this.exitWithCrash('There was an error shutting down n8n.', error); |
| | } |
| |
|
| | await this.exitSuccessFully(); |
| | } |
| |
|
| | async init() { |
| | if (config.getEnv('executions.mode') !== 'queue') { |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | this.error('Webhook processes can only run with execution mode as queue.'); |
| | } |
| |
|
| | await this.initCrashJournal(); |
| | this.logger.debug('Crash journal initialized'); |
| |
|
| | this.logger.info('Starting n8n webhook process...'); |
| | this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`); |
| |
|
| | await super.init(); |
| |
|
| | await this.initLicense(); |
| | this.logger.debug('License init complete'); |
| | await this.initOrchestration(); |
| | this.logger.debug('Orchestration init complete'); |
| | await this.initBinaryDataService(); |
| | this.logger.debug('Binary data service init complete'); |
| | await this.initDataDeduplicationService(); |
| | this.logger.debug('Data deduplication service init complete'); |
| | await this.initExternalHooks(); |
| | this.logger.debug('External hooks init complete'); |
| |
|
| | await this.moduleRegistry.initModules(); |
| | } |
| |
|
| | async run() { |
| | const { ScalingService } = await import('@/scaling/scaling.service'); |
| | await Container.get(ScalingService).setupQueue(); |
| | await this.server.start(); |
| | this.logger.info('Webhook listener waiting for requests.'); |
| |
|
| | |
| | await new Promise(() => {}); |
| | } |
| |
|
| | async catch(error: Error) { |
| | await this.exitWithCrash('Exiting due to an error.', error); |
| | } |
| |
|
| | async initOrchestration() { |
| | Container.get(Publisher); |
| |
|
| | Container.get(PubSubRegistry).init(); |
| | await Container.get(Subscriber).subscribe('n8n.commands'); |
| | } |
| | } |
| |
|