| | import { OnPubSubEvent } from '@n8n/decorators'; |
| | import { Service } from '@n8n/di'; |
| | import type express from 'express'; |
| | import { InstanceSettings } from 'n8n-core'; |
| | import { WebhookPathTakenError, Workflow } from 'n8n-workflow'; |
| | import type { |
| | IWebhookData, |
| | IWorkflowExecuteAdditionalData, |
| | IHttpRequestMethods, |
| | IRunData, |
| | IWorkflowBase, |
| | } from 'n8n-workflow'; |
| |
|
| | import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; |
| | import { NotFoundError } from '@/errors/response-errors/not-found.error'; |
| | import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; |
| | import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error'; |
| | import { NodeTypes } from '@/node-types'; |
| | import { Push } from '@/push'; |
| | import { Publisher } from '@/scaling/pubsub/publisher.service'; |
| | import { removeTrailingSlash } from '@/utils'; |
| | import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service'; |
| | import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service'; |
| | import * as WebhookHelpers from '@/webhooks/webhook-helpers'; |
| | import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; |
| | import type { WorkflowRequest } from '@/workflows/workflow.request'; |
| |
|
| | import { WebhookService } from './webhook.service'; |
| | import type { |
| | IWebhookResponseCallbackData, |
| | IWebhookManager, |
| | WebhookAccessControlOptions, |
| | WebhookRequest, |
| | } from './webhook.types'; |
| |
|
| | |
| | |
| | |
| | |
| | @Service() |
| | export class TestWebhooks implements IWebhookManager { |
| | constructor( |
| | private readonly push: Push, |
| | private readonly nodeTypes: NodeTypes, |
| | private readonly registrations: TestWebhookRegistrationsService, |
| | private readonly instanceSettings: InstanceSettings, |
| | private readonly publisher: Publisher, |
| | private readonly webhookService: WebhookService, |
| | ) {} |
| |
|
| | private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; |
| |
|
| | |
| | |
| | |
| | |
| | async executeWebhook( |
| | request: WebhookRequest, |
| | response: express.Response, |
| | ): Promise<IWebhookResponseCallbackData> { |
| | const httpMethod = request.method; |
| |
|
| | let path = removeTrailingSlash(request.params.path); |
| |
|
| | request.params = {} as WebhookRequest['params']; |
| |
|
| | let webhook = await this.getActiveWebhook(httpMethod, path); |
| |
|
| | if (!webhook) { |
| | |
| | |
| |
|
| | const [webhookId, ...segments] = path.split('/'); |
| |
|
| | webhook = await this.getActiveWebhook(httpMethod, segments.join('/'), webhookId); |
| |
|
| | if (!webhook) |
| | throw new WebhookNotFoundError({ |
| | path, |
| | httpMethod, |
| | webhookMethods: await this.getWebhookMethods(path), |
| | }); |
| |
|
| | path = webhook.path; |
| |
|
| | path.split('/').forEach((segment, index) => { |
| | if (segment.startsWith(':')) { |
| | request.params[segment.slice(1)] = segments[index]; |
| | } |
| | }); |
| | } |
| |
|
| | const key = this.registrations.toKey(webhook); |
| |
|
| | const registration = await this.registrations.get(key); |
| |
|
| | if (!registration) { |
| | throw new WebhookNotFoundError({ |
| | path, |
| | httpMethod, |
| | webhookMethods: await this.getWebhookMethods(path), |
| | }); |
| | } |
| |
|
| | const { destinationNode, pushRef, workflowEntity, webhook: testWebhook } = registration; |
| |
|
| | const workflow = this.toWorkflow(workflowEntity); |
| |
|
| | if (testWebhook.staticData) workflow.setTestStaticData(testWebhook.staticData); |
| |
|
| | const workflowStartNode = workflow.getNode(webhook.node); |
| |
|
| | if (workflowStartNode === null) { |
| | throw new NotFoundError('Could not find node to process webhook.'); |
| | } |
| |
|
| | return await new Promise(async (resolve, reject) => { |
| | try { |
| | const executionMode = 'manual'; |
| | const executionId = await WebhookHelpers.executeWebhook( |
| | workflow, |
| | webhook, |
| | workflowEntity, |
| | workflowStartNode, |
| | executionMode, |
| | pushRef, |
| | undefined, |
| | undefined, |
| | request, |
| | response, |
| | (error: Error | null, data: IWebhookResponseCallbackData) => { |
| | if (error !== null) reject(error); |
| | else resolve(data); |
| | }, |
| | destinationNode, |
| | ); |
| |
|
| | |
| | |
| | |
| | if (executionId === undefined) return; |
| |
|
| | |
| | if (pushRef !== undefined) { |
| | this.push.send( |
| | { type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } }, |
| | pushRef, |
| | ); |
| | } |
| | } catch {} |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) { |
| | void this.publisher.publishCommand({ |
| | command: 'clear-test-webhooks', |
| | payload: { webhookKey: key, workflowEntity, pushRef }, |
| | }); |
| | return; |
| | } |
| |
|
| | this.clearTimeout(key); |
| |
|
| | await this.deactivateWebhooks(workflow); |
| | }); |
| | } |
| |
|
| | @OnPubSubEvent('clear-test-webhooks', { instanceType: 'main' }) |
| | async handleClearTestWebhooks({ |
| | webhookKey, |
| | workflowEntity, |
| | pushRef, |
| | }: { |
| | webhookKey: string; |
| | workflowEntity: IWorkflowBase; |
| | pushRef: string; |
| | }) { |
| | if (!this.push.hasPushRef(pushRef)) return; |
| |
|
| | this.clearTimeout(webhookKey); |
| |
|
| | const workflow = this.toWorkflow(workflowEntity); |
| |
|
| | await this.deactivateWebhooks(workflow); |
| | } |
| |
|
| | clearTimeout(key: string) { |
| | const timeout = this.timeouts[key]; |
| |
|
| | if (timeout) clearTimeout(timeout); |
| | } |
| |
|
| | async getWebhookMethods(rawPath: string) { |
| | const path = removeTrailingSlash(rawPath); |
| | const allKeys = await this.registrations.getAllKeys(); |
| |
|
| | const webhookMethods = allKeys |
| | .filter((key) => key.includes(path)) |
| | .map((key) => key.split('|')[0] as IHttpRequestMethods); |
| |
|
| | if (!webhookMethods.length) throw new WebhookNotFoundError({ path }); |
| |
|
| | return webhookMethods; |
| | } |
| |
|
| | async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) { |
| | const allKeys = await this.registrations.getAllKeys(); |
| |
|
| | const webhookKey = allKeys.find((key) => key.includes(path) && key.startsWith(httpMethod)); |
| |
|
| | if (!webhookKey) return; |
| |
|
| | const registration = await this.registrations.get(webhookKey); |
| |
|
| | if (!registration) return; |
| |
|
| | const { workflowEntity } = registration; |
| |
|
| | const workflow = this.toWorkflow(workflowEntity); |
| |
|
| | const webhookNode = Object.values(workflow.nodes).find( |
| | ({ type, parameters, typeVersion }) => |
| | parameters?.path === path && |
| | (parameters?.httpMethod ?? 'GET') === httpMethod && |
| | 'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion), |
| | ); |
| |
|
| | return webhookNode?.parameters?.options as WebhookAccessControlOptions; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | async needsWebhook(options: { |
| | userId: string; |
| | workflowEntity: IWorkflowBase; |
| | additionalData: IWorkflowExecuteAdditionalData; |
| | runData?: IRunData; |
| | pushRef?: string; |
| | destinationNode?: string; |
| | triggerToStartFrom?: WorkflowRequest.ManualRunPayload['triggerToStartFrom']; |
| | }) { |
| | const { |
| | userId, |
| | workflowEntity, |
| | additionalData, |
| | runData, |
| | pushRef, |
| | destinationNode, |
| | triggerToStartFrom, |
| | } = options; |
| |
|
| | if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity); |
| |
|
| | const workflow = this.toWorkflow(workflowEntity); |
| |
|
| | let webhooks = WebhookHelpers.getWorkflowWebhooks( |
| | workflow, |
| | additionalData, |
| | destinationNode, |
| | true, |
| | ); |
| |
|
| | |
| | |
| | if (triggerToStartFrom?.data) { |
| | return false; |
| | } |
| |
|
| | |
| | |
| | if (triggerToStartFrom) { |
| | webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name); |
| | } |
| |
|
| | if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) { |
| | return false; |
| | } |
| |
|
| | const timeout = setTimeout( |
| | async () => await this.cancelWebhook(workflow.id), |
| | TEST_WEBHOOK_TIMEOUT, |
| | ); |
| |
|
| | for (const webhook of webhooks) { |
| | const key = this.registrations.toKey(webhook); |
| | const registrationByKey = await this.registrations.get(key); |
| |
|
| | if (runData && webhook.node in runData) { |
| | return false; |
| | } |
| |
|
| | |
| | if ( |
| | registrationByKey && |
| | !webhook.webhookId && |
| | !registrationByKey.webhook.isTest && |
| | registrationByKey.webhook.userId !== userId && |
| | registrationByKey.webhook.workflowId !== workflow.id |
| | ) { |
| | throw new WebhookPathTakenError(webhook.node); |
| | } |
| |
|
| | webhook.path = removeTrailingSlash(webhook.path); |
| | webhook.isTest = true; |
| |
|
| | |
| | |
| | |
| | |
| | const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook; |
| |
|
| | cacheableWebhook.userId = userId; |
| |
|
| | const registration: TestWebhookRegistration = { |
| | pushRef, |
| | workflowEntity, |
| | destinationNode, |
| | webhook: cacheableWebhook as IWebhookData, |
| | }; |
| |
|
| | try { |
| | |
| | |
| | |
| | |
| | await this.registrations.register(registration); |
| |
|
| | await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual'); |
| |
|
| | cacheableWebhook.staticData = workflow.staticData; |
| |
|
| | await this.registrations.register(registration); |
| |
|
| | this.timeouts[key] = timeout; |
| | } catch (error) { |
| | await this.deactivateWebhooks(workflow); |
| |
|
| | delete this.timeouts[key]; |
| |
|
| | throw error; |
| | } |
| | } |
| |
|
| | return true; |
| | } |
| |
|
| | async cancelWebhook(workflowId: string) { |
| | let foundWebhook = false; |
| |
|
| | const allWebhookKeys = await this.registrations.getAllKeys(); |
| |
|
| | for (const key of allWebhookKeys) { |
| | const registration = await this.registrations.get(key); |
| |
|
| | if (!registration) continue; |
| |
|
| | const { pushRef, workflowEntity } = registration; |
| |
|
| | const workflow = this.toWorkflow(workflowEntity); |
| |
|
| | if (workflowEntity.id !== workflowId) continue; |
| |
|
| | this.clearTimeout(key); |
| |
|
| | if (pushRef !== undefined) { |
| | try { |
| | this.push.send({ type: 'testWebhookDeleted', data: { workflowId } }, pushRef); |
| | } catch { |
| | |
| | } |
| | } |
| |
|
| | if (!foundWebhook) { |
| | |
| | void this.deactivateWebhooks(workflow); |
| | } |
| |
|
| | foundWebhook = true; |
| | } |
| |
|
| | return foundWebhook; |
| | } |
| |
|
| | async getActiveWebhook(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) { |
| | const key = this.registrations.toKey({ httpMethod, path, webhookId }); |
| |
|
| | let webhook: IWebhookData | undefined; |
| | let maxMatches = 0; |
| | const pathElementsSet = new Set(path.split('/')); |
| | |
| | |
| | const registration = await this.registrations.get(key); |
| |
|
| | if (!registration) return; |
| |
|
| | const { webhook: dynamicWebhook } = registration; |
| |
|
| | const staticElements = dynamicWebhook.path.split('/').filter((ele) => !ele.startsWith(':')); |
| | const allStaticExist = staticElements.every((staticEle) => pathElementsSet.has(staticEle)); |
| |
|
| | if (allStaticExist && staticElements.length > maxMatches) { |
| | maxMatches = staticElements.length; |
| | webhook = dynamicWebhook; |
| | } |
| | |
| | else if (staticElements.length === 0 && !webhook) { |
| | webhook = dynamicWebhook; |
| | } |
| |
|
| | return webhook; |
| | } |
| |
|
| | |
| | |
| | |
| | async deactivateWebhooks(workflow: Workflow) { |
| | const allRegistrations = await this.registrations.getAllRegistrations(); |
| |
|
| | if (!allRegistrations.length) return; |
| |
|
| | type WebhooksByWorkflow = { [workflowId: string]: IWebhookData[] }; |
| |
|
| | const webhooksByWorkflow = allRegistrations.reduce<WebhooksByWorkflow>((acc, cur) => { |
| | const { workflowId } = cur.webhook; |
| |
|
| | acc[workflowId] ||= []; |
| | acc[workflowId].push(cur.webhook); |
| |
|
| | return acc; |
| | }, {}); |
| |
|
| | const webhooks = webhooksByWorkflow[workflow.id]; |
| |
|
| | if (!webhooks) return; |
| |
|
| | for (const webhook of webhooks) { |
| | const { userId, staticData } = webhook; |
| |
|
| | if (userId) { |
| | webhook.workflowExecuteAdditionalData = await WorkflowExecuteAdditionalData.getBase(userId); |
| | } |
| |
|
| | if (staticData) workflow.staticData = staticData; |
| |
|
| | await this.webhookService.deleteWebhook(workflow, webhook, 'internal', 'update'); |
| | } |
| |
|
| | await this.registrations.deregisterAll(); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | toWorkflow(workflowEntity: IWorkflowBase) { |
| | return new Workflow({ |
| | id: workflowEntity.id, |
| | name: workflowEntity.name, |
| | nodes: workflowEntity.nodes, |
| | connections: workflowEntity.connections, |
| | active: false, |
| | nodeTypes: this.nodeTypes, |
| | staticData: {}, |
| | settings: workflowEntity.settings, |
| | }); |
| | } |
| | } |
| |
|