n8cn / packages /cli /src /push /__tests__ /websocket.push.test.ts
gallyga's picture
Add n8n Chinese version
aec3094
import { createHeartbeatMessage, type PushMessage } from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import type { User } from '@n8n/db';
import { Container } from '@n8n/di';
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { WebSocketPush } from '@/push/websocket.push';
import { mockInstance } from '@test/mocking';
jest.useFakeTimers();
class MockWebSocket extends EventEmitter {
isAlive = true;
ping = jest.fn();
send = jest.fn();
terminate = jest.fn();
close = jest.fn();
}
const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<WebSocket>;
describe('WebSocketPush', () => {
const pushRef1 = 'test-session1';
const pushRef2 = 'test-session2';
const userId: User['id'] = 'test-user';
const pushMessage: PushMessage = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
});
mockInstance(Logger);
const webSocketPush = Container.get(WebSocketPush);
const mockWebSocket1 = createMockWebSocket();
const mockWebSocket2 = createMockWebSocket();
beforeEach(() => {
jest.resetAllMocks();
mockWebSocket1.removeAllListeners();
mockWebSocket2.removeAllListeners();
});
it('can add a connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
expect(mockWebSocket1.listenerCount('close')).toBe(1);
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
expect(mockWebSocket1.listenerCount('message')).toBe(1);
});
it('closes a connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
mockWebSocket1.emit('close');
expect(mockWebSocket1.listenerCount('message')).toBe(0);
expect(mockWebSocket1.listenerCount('close')).toBe(0);
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
});
it('sends data to one connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToOne(pushMessage, pushRef1);
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
});
it('sends data to all connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToAll(pushMessage);
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('pings all connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
jest.runOnlyPendingTimers();
expect(mockWebSocket1.ping).toHaveBeenCalled();
expect(mockWebSocket2.ping).toHaveBeenCalled();
});
it('sends data to all users connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToUsers(pushMessage, [userId]);
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('emits message event when connection receives data', async () => {
jest.useRealTimers();
const mockOnMessageReceived = jest.fn();
webSocketPush.on('message', mockOnMessageReceived);
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
const data = { test: 'data' };
const buffer = Buffer.from(JSON.stringify(data));
mockWebSocket1.emit('message', buffer);
// Flush the event loop
await new Promise(process.nextTick);
expect(mockOnMessageReceived).toHaveBeenCalledWith({
msg: data,
pushRef: pushRef1,
userId,
});
});
it("emits doesn' emit message for client heartbeat", async () => {
const mockOnMessageReceived = jest.fn();
webSocketPush.on('message', mockOnMessageReceived);
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
const data = createHeartbeatMessage();
const buffer = Buffer.from(JSON.stringify(data));
mockWebSocket1.emit('message', buffer);
// Flush the event loop
await new Promise(process.nextTick);
expect(mockOnMessageReceived).not.toHaveBeenCalled();
});
});