Spaces:
Runtime error
Runtime error
| const { ServerResponse } = require('node:http') | |
| const { PassThrough } = require('node:stream') | |
| const { randomBytes } = require('node:crypto') | |
| const fp = require('fastify-plugin') | |
| const WebSocket = require('ws') | |
| const Duplexify = require('duplexify') | |
| const kWs = Symbol('ws-socket') | |
| const kWsHead = Symbol('ws-head') | |
| const statusCodeReg = /HTTP\/1.1 (\d+)/u | |
| function fastifyWebsocket (fastify, opts, next) { | |
| fastify.decorateRequest('ws', null) | |
| let errorHandler = defaultErrorHandler | |
| if (opts.errorHandler) { | |
| if (typeof opts.errorHandler !== 'function') { | |
| return next(new Error('invalid errorHandler function')) | |
| } | |
| errorHandler = opts.errorHandler | |
| } | |
| let preClose = defaultPreClose | |
| if (opts?.preClose) { | |
| if (typeof opts.preClose !== 'function') { | |
| return next(new Error('invalid preClose function')) | |
| } | |
| preClose = opts.preClose | |
| } | |
| if (opts.options?.noServer) { | |
| return next(new Error("fastify-websocket doesn't support the ws noServer option. If you want to create a websocket server detatched from fastify, use the ws library directly.")) | |
| } | |
| const wssOptions = Object.assign({ noServer: true }, opts.options) | |
| if (wssOptions.path) { | |
| fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead') | |
| } | |
| // We always handle upgrading ourselves in this library so that we can dispatch through the fastify stack before actually upgrading | |
| // For this reason, we run the WebSocket.Server in noServer mode, and prevent the user from passing in a http.Server instance for it to attach to. | |
| // Usually, we listen to the upgrade event of the `fastify.server`, but we do still support this server option by just listening to upgrades on it if passed. | |
| const websocketListenServer = wssOptions.server || fastify.server | |
| delete wssOptions.server | |
| const wss = new WebSocket.Server(wssOptions) | |
| fastify.decorate('websocketServer', wss) | |
| async function injectWS (path = '/', upgradeContext = {}) { | |
| const server2Client = new PassThrough() | |
| const client2Server = new PassThrough() | |
| const serverStream = new Duplexify(server2Client, client2Server) | |
| const clientStream = new Duplexify(client2Server, server2Client) | |
| const ws = new WebSocket(null, undefined, { isServer: false }) | |
| const head = Buffer.from([]) | |
| let resolve, reject | |
| const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject }) | |
| ws.on('open', () => { | |
| clientStream.removeListener('data', onData) | |
| resolve(ws) | |
| }) | |
| const onData = (chunk) => { | |
| if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) { | |
| ws._isServer = false | |
| ws.setSocket(clientStream, head, { maxPayload: 0 }) | |
| } else { | |
| clientStream.removeListener('data', onData) | |
| const statusCode = Number(statusCodeReg.exec(chunk.toString())[1]) | |
| reject(new Error('Unexpected server response: ' + statusCode)) | |
| } | |
| } | |
| clientStream.on('data', onData) | |
| const req = { | |
| ...upgradeContext, | |
| method: 'GET', | |
| headers: { | |
| ...upgradeContext.headers, | |
| connection: 'upgrade', | |
| upgrade: 'websocket', | |
| 'sec-websocket-version': 13, | |
| 'sec-websocket-key': randomBytes(16).toString('base64') | |
| }, | |
| httpVersion: '1.1', | |
| url: path, | |
| [kWs]: serverStream, | |
| [kWsHead]: head | |
| } | |
| websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead]) | |
| return promise | |
| } | |
| fastify.decorate('injectWS', injectWS) | |
| function onUpgrade (rawRequest, socket, head) { | |
| // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. | |
| rawRequest[kWs] = socket | |
| rawRequest[kWsHead] = head | |
| const rawResponse = new ServerResponse(rawRequest) | |
| try { | |
| rawResponse.assignSocket(socket) | |
| fastify.routing(rawRequest, rawResponse) | |
| } catch (err) { | |
| fastify.log.warn({ err }, 'websocket upgrade failed') | |
| } | |
| } | |
| websocketListenServer.on('upgrade', onUpgrade) | |
| const handleUpgrade = (rawRequest, callback) => { | |
| wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => { | |
| wss.emit('connection', socket, rawRequest) | |
| socket.on('error', (error) => { | |
| fastify.log.error(error) | |
| }) | |
| callback(socket) | |
| }) | |
| } | |
| fastify.addHook('onRequest', (request, _reply, done) => { // this adds req.ws to the Request object | |
| if (request.raw[kWs]) { | |
| request.ws = true | |
| } else { | |
| request.ws = false | |
| } | |
| done() | |
| }) | |
| fastify.addHook('onResponse', (request, _reply, done) => { | |
| if (request.ws) { | |
| request.raw[kWs].destroy() | |
| } | |
| done() | |
| }) | |
| fastify.addHook('onRoute', routeOptions => { | |
| let isWebsocketRoute = false | |
| let wsHandler = routeOptions.wsHandler | |
| let handler = routeOptions.handler | |
| if (routeOptions.websocket || routeOptions.wsHandler) { | |
| if (routeOptions.method === 'HEAD') { | |
| return | |
| } else if (routeOptions.method !== 'GET') { | |
| throw new Error('websocket handler can only be declared in GET method') | |
| } | |
| isWebsocketRoute = true | |
| if (routeOptions.websocket) { | |
| wsHandler = routeOptions.handler | |
| handler = function (_, reply) { | |
| reply.code(404).send() | |
| } | |
| } | |
| if (typeof wsHandler !== 'function') { | |
| throw new TypeError('invalid wsHandler function') | |
| } | |
| } | |
| // we always override the route handler so we can close websocket connections to routes to handlers that don't support websocket connections | |
| // This is not an arrow function to fetch the encapsulated this | |
| routeOptions.handler = function (request, reply) { | |
| // within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so | |
| if (request.raw[kWs]) { | |
| reply.hijack() | |
| handleUpgrade(request.raw, socket => { | |
| let result | |
| try { | |
| if (isWebsocketRoute) { | |
| result = wsHandler.call(this, socket, request) | |
| } else { | |
| result = noHandle.call(this, socket, request) | |
| } | |
| } catch (err) { | |
| return errorHandler.call(this, err, socket, request, reply) | |
| } | |
| if (result && typeof result.catch === 'function') { | |
| result.catch(err => errorHandler.call(this, err, socket, request, reply)) | |
| } | |
| }) | |
| } else { | |
| return handler.call(this, request, reply) | |
| } | |
| } | |
| }) | |
| // Fastify is missing a pre-close event, or the ability to | |
| // add a hook before the server.close call. We need to resort | |
| // to monkeypatching for now. | |
| fastify.addHook('preClose', preClose) | |
| function defaultPreClose (done) { | |
| const server = this.websocketServer | |
| if (server.clients) { | |
| for (const client of server.clients) { | |
| client.close() | |
| } | |
| } | |
| fastify.server.removeListener('upgrade', onUpgrade) | |
| server.close(done) | |
| done() | |
| } | |
| function noHandle (socket, rawRequest) { | |
| this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler') | |
| socket.close() | |
| } | |
| function defaultErrorHandler (error, socket, request) { | |
| request.log.error(error) | |
| socket.terminate() | |
| } | |
| next() | |
| } | |
| module.exports = fp(fastifyWebsocket, { | |
| fastify: '5.x', | |
| name: '@fastify/websocket' | |
| }) | |
| module.exports.default = fastifyWebsocket | |
| module.exports.fastifyWebsocket = fastifyWebsocket | |