| | import { createHeartbeatMessage, type PushMessage } from '@n8n/api-types'; |
| | import { Logger } from '@n8n/backend-common'; |
| | import type { User } from '@n8n/db'; |
| | import { Container } from '@n8n/di'; |
| | import { EventEmitter } from 'events'; |
| | import type WebSocket from 'ws'; |
| |
|
| | import { WebSocketPush } from '@/push/websocket.push'; |
| | import { mockInstance } from '@test/mocking'; |
| |
|
| | jest.useFakeTimers(); |
| |
|
| | class MockWebSocket extends EventEmitter { |
| | isAlive = true; |
| |
|
| | ping = jest.fn(); |
| |
|
| | send = jest.fn(); |
| |
|
| | terminate = jest.fn(); |
| |
|
| | close = jest.fn(); |
| | } |
| |
|
| | const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<WebSocket>; |
| |
|
| | describe('WebSocketPush', () => { |
| | const pushRef1 = 'test-session1'; |
| | const pushRef2 = 'test-session2'; |
| | const userId: User['id'] = 'test-user'; |
| | const pushMessage: PushMessage = { |
| | type: 'executionRecovered', |
| | data: { |
| | executionId: 'test-execution-id', |
| | }, |
| | }; |
| | const expectedMsg = JSON.stringify({ |
| | type: 'executionRecovered', |
| | data: { |
| | executionId: 'test-execution-id', |
| | }, |
| | }); |
| |
|
| | mockInstance(Logger); |
| | const webSocketPush = Container.get(WebSocketPush); |
| | const mockWebSocket1 = createMockWebSocket(); |
| | const mockWebSocket2 = createMockWebSocket(); |
| |
|
| | beforeEach(() => { |
| | jest.resetAllMocks(); |
| | mockWebSocket1.removeAllListeners(); |
| | mockWebSocket2.removeAllListeners(); |
| | }); |
| |
|
| | it('can add a connection', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| |
|
| | expect(mockWebSocket1.listenerCount('close')).toBe(1); |
| | expect(mockWebSocket1.listenerCount('pong')).toBe(1); |
| | expect(mockWebSocket1.listenerCount('message')).toBe(1); |
| | }); |
| |
|
| | it('closes a connection', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| |
|
| | mockWebSocket1.emit('close'); |
| |
|
| | expect(mockWebSocket1.listenerCount('message')).toBe(0); |
| | expect(mockWebSocket1.listenerCount('close')).toBe(0); |
| | expect(mockWebSocket1.listenerCount('pong')).toBe(0); |
| | }); |
| |
|
| | it('sends data to one connection', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| | webSocketPush.sendToOne(pushMessage, pushRef1); |
| |
|
| | expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); |
| | expect(mockWebSocket2.send).not.toHaveBeenCalled(); |
| | }); |
| |
|
| | it('sends data to all connections', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| | webSocketPush.sendToAll(pushMessage); |
| |
|
| | expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); |
| | expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); |
| | }); |
| |
|
| | it('pings all connections', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| |
|
| | jest.runOnlyPendingTimers(); |
| |
|
| | expect(mockWebSocket1.ping).toHaveBeenCalled(); |
| | expect(mockWebSocket2.ping).toHaveBeenCalled(); |
| | }); |
| |
|
| | it('sends data to all users connections', () => { |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| | webSocketPush.sendToUsers(pushMessage, [userId]); |
| |
|
| | expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); |
| | expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); |
| | }); |
| |
|
| | it('emits message event when connection receives data', async () => { |
| | jest.useRealTimers(); |
| | const mockOnMessageReceived = jest.fn(); |
| | webSocketPush.on('message', mockOnMessageReceived); |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| |
|
| | const data = { test: 'data' }; |
| | const buffer = Buffer.from(JSON.stringify(data)); |
| |
|
| | mockWebSocket1.emit('message', buffer); |
| |
|
| | |
| | await new Promise(process.nextTick); |
| |
|
| | expect(mockOnMessageReceived).toHaveBeenCalledWith({ |
| | msg: data, |
| | pushRef: pushRef1, |
| | userId, |
| | }); |
| | }); |
| |
|
| | it("emits doesn' emit message for client heartbeat", async () => { |
| | const mockOnMessageReceived = jest.fn(); |
| | webSocketPush.on('message', mockOnMessageReceived); |
| | webSocketPush.add(pushRef1, userId, mockWebSocket1); |
| | webSocketPush.add(pushRef2, userId, mockWebSocket2); |
| |
|
| | const data = createHeartbeatMessage(); |
| | const buffer = Buffer.from(JSON.stringify(data)); |
| |
|
| | mockWebSocket1.emit('message', buffer); |
| |
|
| | |
| | await new Promise(process.nextTick); |
| |
|
| | expect(mockOnMessageReceived).not.toHaveBeenCalled(); |
| | }); |
| | }); |
| |
|