'use strict' const http = require('node:http') const split = require('split2') const test = require('tap').test const Fastify = require('fastify') const fastifyWebsocket = require('..') const WebSocket = require('ws') const { once, on } = require('node:events') let timersPromises try { timersPromises = require('node:timers/promises') } catch {} test('Should expose a websocket', async (t) => { t.plan(2) const fastify = Fastify() t.teardown(() => fastify.close()) await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, (socket) => { t.teardown(() => socket.terminate()) socket.once('message', (chunk) => { t.equal(chunk.toString(), 'hello server') socket.send('hello client') }) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) t.teardown(() => { if (ws.readyState) { ws.close() } }) const chunkPromise = once(ws, 'message') await once(ws, 'open') ws.send('hello server') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello client') ws.close() }) test('Should fail if custom errorHandler is not a function', async (t) => { t.plan(2) const fastify = Fastify() t.teardown(() => fastify.close()) try { await fastify.register(fastifyWebsocket, { errorHandler: {} }) } catch (err) { t.equal(err.message, 'invalid errorHandler function') } fastify.get('/', { websocket: true }, (socket) => { t.teardown(() => socket.terminate()) }) try { await fastify.listen({ port: 0 }) } catch (err) { t.equal(err.message, 'invalid errorHandler function') } }) test('Should run custom errorHandler on wildcard route handler error', async (t) => { t.plan(1) const fastify = Fastify() t.teardown(() => fastify.close()) let _resolve const p = new Promise((resolve) => { _resolve = resolve }) await fastify.register(fastifyWebsocket, { errorHandler: function (error) { t.equal(error.message, 'Fail') _resolve() } }) fastify.get('/*', { websocket: true }, (socket) => { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) return Promise.reject(new Error('Fail')) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) t.teardown(() => { if (ws.readyState) { ws.close() } }) await p }) test('Should run custom errorHandler on error inside websocket handler', async (t) => { t.plan(1) const fastify = Fastify() t.teardown(() => fastify.close()) let _resolve const p = new Promise((resolve) => { _resolve = resolve }) const options = { errorHandler: function (error) { t.equal(error.message, 'Fail') _resolve() } } await fastify.register(fastifyWebsocket, options) fastify.get('/', { websocket: true }, function wsHandler (socket) { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) throw new Error('Fail') }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) t.teardown(() => { if (ws.readyState) { ws.close() } }) await p }) test('Should run custom errorHandler on error inside async websocket handler', async (t) => { t.plan(1) const fastify = Fastify() t.teardown(() => fastify.close()) let _resolve const p = new Promise((resolve) => { _resolve = resolve }) const options = { errorHandler: function (error) { t.equal(error.message, 'Fail') _resolve() } } await fastify.register(fastifyWebsocket, options) fastify.get('/', { websocket: true }, async function wsHandler (socket) { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) throw new Error('Fail') }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) t.teardown(() => { if (ws.readyState) { ws.close() } }) await p }) test('Should be able to pass custom options to ws', async (t) => { t.plan(2) const fastify = Fastify() t.teardown(() => fastify.close()) const options = { verifyClient: function (info) { t.equal(info.req.headers['x-custom-header'], 'fastify is awesome !') return true } } await fastify.register(fastifyWebsocket, { options }) fastify.get('/*', { websocket: true }, (socket) => { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) }) await fastify.listen({ port: 0 }) const clientOptions = { headers: { 'x-custom-header': 'fastify is awesome !' } } const ws = new WebSocket('ws://localhost:' + fastify.server.address().port, clientOptions) const chunkPromise = once(ws, 'message') await once(ws, 'open') t.teardown(() => { if (ws.readyState) { ws.close() } }) ws.send('hello') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello') ws.close() }) test('Should warn if path option is provided to ws', async (t) => { t.plan(3) const logStream = split(JSON.parse) const fastify = Fastify({ logger: { stream: logStream, level: 'warn' } }) logStream.once('data', line => { t.equal(line.msg, 'ws server path option shouldn\'t be provided, use a route instead') t.equal(line.level, 40) }) t.teardown(() => fastify.close()) const options = { path: '/' } await fastify.register(fastifyWebsocket, { options }) fastify.get('/*', { websocket: true }, (socket) => { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) }) await fastify.listen({ port: 0 }) const clientOptions = { headers: { 'x-custom-header': 'fastify is awesome !' } } const ws = new WebSocket('ws://localhost:' + fastify.server.address().port, clientOptions) const chunkPromise = once(ws, 'message') await once(ws, 'open') t.teardown(() => { if (ws.readyState) { ws.close() } }) ws.send('hello') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello') ws.close() }) test('Should be able to pass a custom server option to ws', async (t) => { // We create an external server const externalServerPort = 3000 const externalServer = http .createServer() .on('connection', (socket) => { socket.unref() }) .listen(externalServerPort, 'localhost') const fastify = Fastify() t.teardown(() => { externalServer.close() fastify.close() }) const options = { server: externalServer } await fastify.register(fastifyWebsocket, { options }) fastify.get('/', { websocket: true }, (socket) => { socket.on('message', (data) => socket.send(data)) t.teardown(() => socket.terminate()) }) await fastify.ready() const ws = new WebSocket('ws://localhost:' + externalServerPort) const chunkPromise = once(ws, 'message') await once(ws, 'open') t.teardown(() => { if (ws.readyState) { ws.close() } }) ws.send('hello') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello') ws.close() }) test('Should be able to pass clientTracking option in false to ws', (t) => { t.plan(2) const fastify = Fastify() const options = { clientTracking: false } fastify.register(fastifyWebsocket, { options }) fastify.get('/*', { websocket: true }, (socket) => { socket.close() }) fastify.listen({ port: 0 }, (err) => { t.error(err) fastify.close(err => { t.error(err) }) }) }) test('Should be able to pass preClose option to override default', async (t) => { t.plan(3) const fastify = Fastify() const preClose = (done) => { t.pass('Custom preclose successfully called') for (const connection of fastify.websocketServer.clients) { connection.close() } done() } await fastify.register(fastifyWebsocket, { preClose }) fastify.get('/', { websocket: true }, (socket) => { t.teardown(() => socket.terminate()) socket.once('message', (chunk) => { t.equal(chunk.toString(), 'hello server') socket.send('hello client') }) }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) t.teardown(() => { if (ws.readyState) { ws.close() } }) const chunkPromise = once(ws, 'message') await once(ws, 'open') ws.send('hello server') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello client') ws.close() await fastify.close() }) test('Should fail if custom preClose is not a function', async (t) => { t.plan(2) const fastify = Fastify() t.teardown(() => fastify.close()) const preClose = 'Not a function' try { await fastify.register(fastifyWebsocket, { preClose }) } catch (err) { t.equal(err.message, 'invalid preClose function') } fastify.get('/', { websocket: true }, (socket) => { t.teardown(() => socket.terminate()) }) try { await fastify.listen({ port: 0 }) } catch (err) { t.equal(err.message, 'invalid preClose function') } }) test('Should gracefully close with a connected client', async (t) => { t.plan(2) const fastify = Fastify() await fastify.register(fastifyWebsocket) let serverConnEnded fastify.get('/', { websocket: true }, (socket) => { socket.send('hello client') socket.once('message', (chunk) => { t.equal(chunk.toString(), 'hello server') }) serverConnEnded = once(socket, 'close') // this connection stays alive untile we close the server }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) const chunkPromise = once(ws, 'message') await once(ws, 'open') ws.send('hello server') const ended = once(ws, 'close') const [chunk] = await chunkPromise t.equal(chunk.toString(), 'hello client') await fastify.close() await ended await serverConnEnded }) test('Should gracefully close when clients attempt to connect after calling close', async (t) => { t.plan(3) const fastify = Fastify() const oldClose = fastify.server.close let p fastify.server.close = function (cb) { const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) p = once(ws, 'close').catch((err) => { t.equal(err.message, 'Unexpected server response: 503') oldClose.call(this, cb) }) } await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, (socket) => { t.pass('received client connection') socket.close() // this connection stays alive until we close the server }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) ws.on('close', () => { t.pass('client 1 closed') }) await once(ws, 'open') await fastify.close() await p }) /* This test sends one message every 10 ms. After 50 messages have been sent, we check how many unhandled messages the server has. After 100 messages we check this number has not increased but rather decreased the number of unhandled messages below a threshold, which means it is still able to process message. */ test('Should keep accepting connection', { skip: !timersPromises }, async t => { t.plan(1) const fastify = Fastify() let sent = 0 let unhandled = 0 let threshold = 0 await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, (socket) => { socket.on('message', () => { unhandled-- }) socket.on('error', err => { t.error(err) }) /* This is a safety check - If the socket is stuck, fastify.close will not run. Therefore after 100 messages we forcibly close the socket. */ const safetyInterval = setInterval(() => { if (sent < 100) { return } clearInterval(safetyInterval) socket.terminate() }, 100) }) await fastify.listen({ port: 0 }) // Setup a client that sends a lot of messages to the server const client = new WebSocket('ws://localhost:' + fastify.server.address().port) client.on('error', console.error) await once(client, 'open') const message = Buffer.alloc(1024, Date.now()) /* eslint-disable no-unused-vars */ for await (const _ of timersPromises.setInterval(10)) { client.send(message.toString(), 10) sent++ unhandled++ if (sent === 50) { threshold = unhandled } else if (sent === 100) { await fastify.close() t.ok(unhandled <= threshold) break } } }) test('Should keep processing message when many medium sized messages are sent', async t => { t.plan(1) const fastify = Fastify() const total = 200 let handled = 0 await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, (socket) => { socket.on('message', () => { socket.send('handled') }) socket.on('error', err => { t.error(err) }) }) await fastify.listen({ port: 0 }) // Setup a client that sends a lot of messages to the server const client = new WebSocket('ws://localhost:' + fastify.server.address().port) client.on('error', console.error) await once(client, 'open') for (let i = 0; i < total; i++) { client.send(Buffer.alloc(160, `${i}`).toString('utf-8')) } /* eslint-disable no-unused-vars */ for await (const _ of on(client, 'message')) { handled++ if (handled === total) { break } } await fastify.close() t.equal(handled, total) }) test('Should error server if the noServer option is set', (t) => { t.plan(1) const fastify = Fastify() fastify.register(fastifyWebsocket, { options: { noServer: true } }) t.rejects(fastify.ready()) }) test('Should preserve the prefix in non-websocket routes', (t) => { t.plan(3) const fastify = Fastify() t.teardown(() => fastify.close()) fastify.register(fastifyWebsocket) fastify.register(async function (fastify) { t.equal(fastify.prefix, '/hello') fastify.get('/', function (_, reply) { t.equal(this.prefix, '/hello') reply.send('hello') }) }, { prefix: '/hello' }) fastify.inject('/hello', function (err) { t.error(err) }) }) test('Should Handle WebSocket errors to avoid Node.js crashes', async t => { t.plan(1) const fastify = Fastify() await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, (socket) => { socket.on('error', err => { t.equal(err.code, 'WS_ERR_UNEXPECTED_RSV_2_3') }) }) await fastify.listen({ port: 0 }) const client = new WebSocket('ws://localhost:' + fastify.server.address().port) await once(client, 'open') client._socket.write(Buffer.from([0xa2, 0x00])) await fastify.close() }) test('remove all others websocket handlers on close', async (t) => { const fastify = Fastify() await fastify.register(fastifyWebsocket) await fastify.listen({ port: 0 }) await fastify.close() t.equal(fastify.server.listeners('upgrade').length, 0) }) test('clashing upgrade handler', async (t) => { const fastify = Fastify() t.teardown(() => fastify.close()) fastify.server.on('upgrade', (req, socket) => { const res = new http.ServerResponse(req) res.assignSocket(socket) res.end() socket.destroy() }) await fastify.register(fastifyWebsocket) fastify.get('/', { websocket: true }, () => { t.fail('this should never be invoked') }) await fastify.listen({ port: 0 }) const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) await once(ws, 'error') })