| | import { heartbeatMessageSchema } from '@n8n/api-types'; |
| | import type { User } from '@n8n/db'; |
| | import { Service } from '@n8n/di'; |
| | import { UnexpectedError } from 'n8n-workflow'; |
| | import type WebSocket from 'ws'; |
| |
|
| | import { AbstractPush } from './abstract.push'; |
| |
|
| | function heartbeat(this: WebSocket) { |
| | this.isAlive = true; |
| | } |
| |
|
| | @Service() |
| | export class WebSocketPush extends AbstractPush<WebSocket> { |
| | add(pushRef: string, userId: User['id'], connection: WebSocket) { |
| | connection.isAlive = true; |
| | connection.on('pong', heartbeat); |
| |
|
| | super.add(pushRef, userId, connection); |
| |
|
| | const onMessage = async (data: WebSocket.RawData) => { |
| | try { |
| | const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); |
| | const msg: unknown = JSON.parse(buffer.toString('utf8')); |
| |
|
| | |
| | |
| | |
| | if (await this.isClientHeartbeat(msg)) { |
| | return; |
| | } |
| |
|
| | this.onMessageReceived(pushRef, msg); |
| | } catch (error) { |
| | this.errorReporter.error( |
| | new UnexpectedError('Error parsing push message', { |
| | extra: { |
| | userId, |
| | data, |
| | }, |
| | cause: error, |
| | }), |
| | ); |
| | this.logger.error("Couldn't parse message from editor-UI", { |
| | error: error as unknown, |
| | pushRef, |
| | data, |
| | }); |
| | } |
| | }; |
| |
|
| | |
| | connection.once('close', () => { |
| | connection.off('pong', heartbeat); |
| | connection.off('message', onMessage); |
| | this.remove(pushRef); |
| | }); |
| |
|
| | connection.on('message', onMessage); |
| | } |
| |
|
| | protected close(connection: WebSocket): void { |
| | connection.close(); |
| | } |
| |
|
| | protected sendToOneConnection(connection: WebSocket, data: string): void { |
| | connection.send(data); |
| | } |
| |
|
| | protected ping(connection: WebSocket): void { |
| | |
| | if (!connection.isAlive) { |
| | return connection.terminate(); |
| | } |
| | connection.isAlive = false; |
| | connection.ping(); |
| | } |
| |
|
| | private async isClientHeartbeat(msg: unknown) { |
| | const result = await heartbeatMessageSchema.safeParseAsync(msg); |
| |
|
| | return result.success; |
| | } |
| | } |
| |
|