import { createHeartbeatMessage, type PushMessage } from '@n8n/api-types'; import { Logger } from '@n8n/backend-common'; import type { User } from '@n8n/db'; import { Container } from '@n8n/di'; import { EventEmitter } from 'events'; import type WebSocket from 'ws'; import { WebSocketPush } from '@/push/websocket.push'; import { mockInstance } from '@test/mocking'; jest.useFakeTimers(); class MockWebSocket extends EventEmitter { isAlive = true; ping = jest.fn(); send = jest.fn(); terminate = jest.fn(); close = jest.fn(); } const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked; describe('WebSocketPush', () => { const pushRef1 = 'test-session1'; const pushRef2 = 'test-session2'; const userId: User['id'] = 'test-user'; const pushMessage: PushMessage = { type: 'executionRecovered', data: { executionId: 'test-execution-id', }, }; const expectedMsg = JSON.stringify({ type: 'executionRecovered', data: { executionId: 'test-execution-id', }, }); mockInstance(Logger); const webSocketPush = Container.get(WebSocketPush); const mockWebSocket1 = createMockWebSocket(); const mockWebSocket2 = createMockWebSocket(); beforeEach(() => { jest.resetAllMocks(); mockWebSocket1.removeAllListeners(); mockWebSocket2.removeAllListeners(); }); it('can add a connection', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); expect(mockWebSocket1.listenerCount('close')).toBe(1); expect(mockWebSocket1.listenerCount('pong')).toBe(1); expect(mockWebSocket1.listenerCount('message')).toBe(1); }); it('closes a connection', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); mockWebSocket1.emit('close'); expect(mockWebSocket1.listenerCount('message')).toBe(0); expect(mockWebSocket1.listenerCount('close')).toBe(0); expect(mockWebSocket1.listenerCount('pong')).toBe(0); }); it('sends data to one connection', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); webSocketPush.sendToOne(pushMessage, pushRef1); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).not.toHaveBeenCalled(); }); it('sends data to all connections', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); webSocketPush.sendToAll(pushMessage); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); }); it('pings all connections', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); jest.runOnlyPendingTimers(); expect(mockWebSocket1.ping).toHaveBeenCalled(); expect(mockWebSocket2.ping).toHaveBeenCalled(); }); it('sends data to all users connections', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); webSocketPush.sendToUsers(pushMessage, [userId]); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); }); it('emits message event when connection receives data', async () => { jest.useRealTimers(); const mockOnMessageReceived = jest.fn(); webSocketPush.on('message', mockOnMessageReceived); webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); const data = { test: 'data' }; const buffer = Buffer.from(JSON.stringify(data)); mockWebSocket1.emit('message', buffer); // Flush the event loop await new Promise(process.nextTick); expect(mockOnMessageReceived).toHaveBeenCalledWith({ msg: data, pushRef: pushRef1, userId, }); }); it("emits doesn' emit message for client heartbeat", async () => { const mockOnMessageReceived = jest.fn(); webSocketPush.on('message', mockOnMessageReceived); webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); const data = createHeartbeatMessage(); const buffer = Buffer.from(JSON.stringify(data)); mockWebSocket1.emit('message', buffer); // Flush the event loop await new Promise(process.nextTick); expect(mockOnMessageReceived).not.toHaveBeenCalled(); }); });