n8cn / packages /cli /src /push /__tests__ /sse.push.test.ts
gallyga's picture
Add n8n Chinese version
aec3094
import { type PushMessage } from '@n8n/api-types';
import { mock } from 'jest-mock-extended';
import EventEmitter from 'node:events';
import { SSEPush } from '@/push/sse.push';
import type { PushRequest, PushResponse } from '@/push/types';
jest.useFakeTimers();
const createMockConnection = () => {
const req = mock(new EventEmitter() as PushRequest);
req.socket = mock();
const res = mock(new EventEmitter() as PushResponse);
return { req, res };
};
describe('SSEPush', () => {
const userId = 'test-user';
const executionId = 'test-execution-id';
const pushRef = 'push-ref';
const connection = createMockConnection();
const { req, res } = connection;
const pushRef2 = 'push-ref-2';
const connection2 = createMockConnection();
const pushMessage: PushMessage = { type: 'executionRecovered', data: { executionId } };
const expectedMsg = JSON.stringify(pushMessage);
let ssePush: SSEPush;
beforeEach(() => {
jest.resetAllMocks();
ssePush = new SSEPush(mock(), mock());
ssePush.add(pushRef, userId, connection);
ssePush.add(pushRef2, userId, connection2);
});
describe('add', () => {
it('adds a connection', () => {
expect(req.socket.setTimeout).toHaveBeenCalledWith(0);
expect(req.socket.setNoDelay).toHaveBeenCalledWith(true);
expect(req.socket.setKeepAlive).toHaveBeenCalledWith(true);
expect(res.setHeader).toHaveBeenCalledWith(
'Content-Type',
'text/event-stream; charset=UTF-8',
);
expect(res.setHeader).toHaveBeenCalledWith('Cache-Control', 'no-cache');
expect(res.setHeader).toHaveBeenCalledWith('Connection', 'keep-alive');
expect(res.writeHead).toHaveBeenCalledWith(200);
expect(res.write).toHaveBeenCalledWith(':ok\n\n');
expect(res.flush).toHaveBeenCalled();
});
});
describe('closes a connection', () => {
test.each([
['end', req],
['close', req],
['finish', res],
])('on "%s" event', (event, emitter) => {
expect(ssePush.hasPushRef(pushRef)).toBe(true);
emitter.emit(event);
expect(ssePush.hasPushRef(pushRef)).toBe(false);
});
});
describe('sends data', () => {
beforeEach(() => jest.clearAllMocks());
it('to one connection', () => {
ssePush.sendToOne(pushMessage, pushRef);
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
expect(connection.res.flush).toHaveBeenCalled();
expect(connection2.res.write).not.toHaveBeenCalled();
});
it('to all connections', () => {
ssePush.sendToAll(pushMessage);
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
expect(connection.res.flush).toHaveBeenCalled();
expect(connection2.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
expect(connection2.res.flush).toHaveBeenCalled();
});
it('to a specific user', () => {
ssePush.sendToUsers(pushMessage, [userId]);
expect(connection.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
expect(connection.res.flush).toHaveBeenCalled();
expect(connection2.res.write).toHaveBeenCalledWith(`data: ${expectedMsg}\n\n`);
expect(connection2.res.flush).toHaveBeenCalled();
});
});
it('pings all connections', () => {
jest.runOnlyPendingTimers();
expect(connection.res.write).toHaveBeenCalledWith(':ping\n\n');
expect(connection.res.flush).toHaveBeenCalled();
expect(connection2.res.write).toHaveBeenCalledWith(':ping\n\n');
expect(connection2.res.flush).toHaveBeenCalled();
});
it('closes all connections', () => {
ssePush.closeAllConnections();
expect(connection.res.end).toHaveBeenCalled();
expect(connection2.res.end).toHaveBeenCalled();
});
});