import { VSN, CHANNEL_EVENTS, TRANSPORTS, SOCKET_STATES, DEFAULT_TIMEOUT, WS_CLOSE_NORMAL, DEFAULT_HEADERS, CONNECTION_STATE, } from './lib/constants'; import Timer from './lib/timer'; import Serializer from './lib/serializer'; import RealtimeChannel from './RealtimeChannel'; const noop = () => { }; const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined'; export default class RealtimeClient { /** * Initializes the Socket. * * @param endPoint The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol) * @param options.transport The Websocket Transport, for example WebSocket. * @param options.timeout The default timeout in milliseconds to trigger push timeouts. * @param options.params The optional params to pass when connecting. * @param options.headers The optional headers to pass when connecting. * @param options.heartbeatIntervalMs The millisec interval to send a heartbeat message. * @param options.logger The optional function for specialized logging, ie: logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) } * @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload)) * @param options.decode The function to decode incoming messages. Defaults to Serializer's decode. * @param options.reconnectAfterMs he optional function that returns the millsec reconnect interval. Defaults to stepped backoff off. */ constructor(endPoint, options) { var _a; this.accessToken = null; this.channels = []; this.endPoint = ''; this.headers = DEFAULT_HEADERS; this.params = {}; this.timeout = DEFAULT_TIMEOUT; this.heartbeatIntervalMs = 30000; this.heartbeatTimer = undefined; this.pendingHeartbeatRef = null; this.ref = 0; this.logger = noop; this.conn = null; this.sendBuffer = []; this.serializer = new Serializer(); this.stateChangeCallbacks = { open: [], close: [], error: [], message: [], }; /** * Use either custom fetch, if provided, or default fetch to make HTTP requests * * @internal */ this._resolveFetch = (customFetch) => { let _fetch; if (customFetch) { _fetch = customFetch; } else if (typeof fetch === 'undefined') { _fetch = (...args) => import('@supabase/node-fetch').then(({ default: fetch }) => fetch(...args)); } else { _fetch = fetch; } return (...args) => _fetch(...args); }; this.endPoint = `${endPoint}/${TRANSPORTS.websocket}`; if (options === null || options === void 0 ? void 0 : options.transport) { this.transport = options.transport; } else { this.transport = null; } if (options === null || options === void 0 ? void 0 : options.params) this.params = options.params; if (options === null || options === void 0 ? void 0 : options.headers) this.headers = Object.assign(Object.assign({}, this.headers), options.headers); if (options === null || options === void 0 ? void 0 : options.timeout) this.timeout = options.timeout; if (options === null || options === void 0 ? void 0 : options.logger) this.logger = options.logger; if (options === null || options === void 0 ? void 0 : options.heartbeatIntervalMs) this.heartbeatIntervalMs = options.heartbeatIntervalMs; const accessToken = (_a = options === null || options === void 0 ? void 0 : options.params) === null || _a === void 0 ? void 0 : _a.apikey; if (accessToken) this.accessToken = accessToken; this.reconnectAfterMs = (options === null || options === void 0 ? void 0 : options.reconnectAfterMs) ? options.reconnectAfterMs : (tries) => { return [1000, 2000, 5000, 10000][tries - 1] || 10000; }; this.encode = (options === null || options === void 0 ? void 0 : options.encode) ? options.encode : (payload, callback) => { return callback(JSON.stringify(payload)); }; this.decode = (options === null || options === void 0 ? void 0 : options.decode) ? options.decode : this.serializer.decode.bind(this.serializer); this.reconnectTimer = new Timer(async () => { this.disconnect(); this.connect(); }, this.reconnectAfterMs); this.fetch = this._resolveFetch(options === null || options === void 0 ? void 0 : options.fetch); } /** * Connects the socket, unless already connected. */ connect() { if (this.conn) { return; } if (this.transport) { this.conn = new this.transport(this._endPointURL(), undefined, { headers: this.headers, }); return; } if (NATIVE_WEBSOCKET_AVAILABLE) { this.conn = new WebSocket(this._endPointURL()); this.setupConnection(); return; } this.conn = new WSWebSocketDummy(this._endPointURL(), undefined, { close: () => { this.conn = null; }, }); import('ws').then(({ default: WS }) => { this.conn = new WS(this._endPointURL(), undefined, { headers: this.headers, }); this.setupConnection(); }); } /** * Disconnects the socket. * * @param code A numeric status code to send on disconnect. * @param reason A custom reason for the disconnect. */ disconnect(code, reason) { if (this.conn) { this.conn.onclose = function () { }; // noop if (code) { this.conn.close(code, reason !== null && reason !== void 0 ? reason : ''); } else { this.conn.close(); } this.conn = null; // remove open handles this.heartbeatTimer && clearInterval(this.heartbeatTimer); this.reconnectTimer.reset(); } } /** * Returns all created channels */ getChannels() { return this.channels; } /** * Unsubscribes and removes a single channel * @param channel A RealtimeChannel instance */ async removeChannel(channel) { const status = await channel.unsubscribe(); if (this.channels.length === 0) { this.disconnect(); } return status; } /** * Unsubscribes and removes all channels */ async removeAllChannels() { const values_1 = await Promise.all(this.channels.map((channel) => channel.unsubscribe())); this.disconnect(); return values_1; } /** * Logs the message. * * For customized logging, `this.logger` can be overridden. */ log(kind, msg, data) { this.logger(kind, msg, data); } /** * Returns the current state of the socket. */ connectionState() { switch (this.conn && this.conn.readyState) { case SOCKET_STATES.connecting: return CONNECTION_STATE.Connecting; case SOCKET_STATES.open: return CONNECTION_STATE.Open; case SOCKET_STATES.closing: return CONNECTION_STATE.Closing; default: return CONNECTION_STATE.Closed; } } /** * Returns `true` is the connection is open. */ isConnected() { return this.connectionState() === CONNECTION_STATE.Open; } channel(topic, params = { config: {} }) { const chan = new RealtimeChannel(`realtime:${topic}`, params, this); this.channels.push(chan); return chan; } /** * Push out a message if the socket is connected. * * If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. */ push(data) { const { topic, event, payload, ref } = data; const callback = () => { this.encode(data, (result) => { var _a; (_a = this.conn) === null || _a === void 0 ? void 0 : _a.send(result); }); }; this.log('push', `${topic} ${event} (${ref})`, payload); if (this.isConnected()) { callback(); } else { this.sendBuffer.push(callback); } } /** * Sets the JWT access token used for channel subscription authorization and Realtime RLS. * * @param token A JWT string. */ setAuth(token) { this.accessToken = token; this.channels.forEach((channel) => { token && channel.updateJoinPayload({ access_token: token }); if (channel.joinedOnce && channel._isJoined()) { channel._push(CHANNEL_EVENTS.access_token, { access_token: token }); } }); } /** * Return the next message ref, accounting for overflows * * @internal */ _makeRef() { let newRef = this.ref + 1; if (newRef === this.ref) { this.ref = 0; } else { this.ref = newRef; } return this.ref.toString(); } /** * Unsubscribe from channels with the specified topic. * * @internal */ _leaveOpenTopic(topic) { let dupChannel = this.channels.find((c) => c.topic === topic && (c._isJoined() || c._isJoining())); if (dupChannel) { this.log('transport', `leaving duplicate topic "${topic}"`); dupChannel.unsubscribe(); } } /** * Removes a subscription from the socket. * * @param channel An open subscription. * * @internal */ _remove(channel) { this.channels = this.channels.filter((c) => c._joinRef() !== channel._joinRef()); } /** * Sets up connection handlers. * * @internal */ setupConnection() { if (this.conn) { this.conn.binaryType = 'arraybuffer'; this.conn.onopen = () => this._onConnOpen(); this.conn.onerror = (error) => this._onConnError(error); this.conn.onmessage = (event) => this._onConnMessage(event); this.conn.onclose = (event) => this._onConnClose(event); } } /** * Returns the URL of the websocket. * * @internal */ _endPointURL() { return this._appendParams(this.endPoint, Object.assign({}, this.params, { vsn: VSN })); } /** @internal */ _onConnMessage(rawMessage) { this.decode(rawMessage.data, (msg) => { let { topic, event, payload, ref } = msg; if ((ref && ref === this.pendingHeartbeatRef) || event === (payload === null || payload === void 0 ? void 0 : payload.type)) { this.pendingHeartbeatRef = null; } this.log('receive', `${payload.status || ''} ${topic} ${event} ${(ref && '(' + ref + ')') || ''}`, payload); this.channels .filter((channel) => channel._isMember(topic)) .forEach((channel) => channel._trigger(event, payload, ref)); this.stateChangeCallbacks.message.forEach((callback) => callback(msg)); }); } /** @internal */ _onConnOpen() { this.log('transport', `connected to ${this._endPointURL()}`); this._flushSendBuffer(); this.reconnectTimer.reset(); this.heartbeatTimer && clearInterval(this.heartbeatTimer); this.heartbeatTimer = setInterval(() => this._sendHeartbeat(), this.heartbeatIntervalMs); this.stateChangeCallbacks.open.forEach((callback) => callback()); } /** @internal */ _onConnClose(event) { this.log('transport', 'close', event); this._triggerChanError(); this.heartbeatTimer && clearInterval(this.heartbeatTimer); this.reconnectTimer.scheduleTimeout(); this.stateChangeCallbacks.close.forEach((callback) => callback(event)); } /** @internal */ _onConnError(error) { this.log('transport', error.message); this._triggerChanError(); this.stateChangeCallbacks.error.forEach((callback) => callback(error)); } /** @internal */ _triggerChanError() { this.channels.forEach((channel) => channel._trigger(CHANNEL_EVENTS.error)); } /** @internal */ _appendParams(url, params) { if (Object.keys(params).length === 0) { return url; } const prefix = url.match(/\?/) ? '&' : '?'; const query = new URLSearchParams(params); return `${url}${prefix}${query}`; } /** @internal */ _flushSendBuffer() { if (this.isConnected() && this.sendBuffer.length > 0) { this.sendBuffer.forEach((callback) => callback()); this.sendBuffer = []; } } /** @internal */ _sendHeartbeat() { var _a; if (!this.isConnected()) { return; } if (this.pendingHeartbeatRef) { this.pendingHeartbeatRef = null; this.log('transport', 'heartbeat timeout. Attempting to re-establish connection'); (_a = this.conn) === null || _a === void 0 ? void 0 : _a.close(WS_CLOSE_NORMAL, 'hearbeat timeout'); return; } this.pendingHeartbeatRef = this._makeRef(); this.push({ topic: 'phoenix', event: 'heartbeat', payload: {}, ref: this.pendingHeartbeatRef, }); this.setAuth(this.accessToken); } } class WSWebSocketDummy { constructor(address, _protocols, options) { this.binaryType = 'arraybuffer'; this.onclose = () => { }; this.onerror = () => { }; this.onmessage = () => { }; this.onopen = () => { }; this.readyState = SOCKET_STATES.connecting; this.send = () => { }; this.url = null; this.url = address; this.close = options.close; } } //# sourceMappingURL=RealtimeClient.js.map