import { VSN, CHANNEL_EVENTS, TRANSPORTS, SOCKET_STATES, DEFAULT_TIMEOUT, WS_CLOSE_NORMAL, DEFAULT_HEADERS, CONNECTION_STATE, } from './lib/constants' import Timer from './lib/timer' import Serializer from './lib/serializer' import RealtimeChannel from './RealtimeChannel' import type { RealtimeChannelOptions } from './RealtimeChannel' import type { WebSocket as WSWebSocket } from 'ws' type Fetch = typeof fetch export type RealtimeClientOptions = { transport?: WebSocketLikeConstructor timeout?: number heartbeatIntervalMs?: number logger?: Function encode?: Function decode?: Function reconnectAfterMs?: Function headers?: { [key: string]: string } params?: { [key: string]: any } log_level?: 'info' | 'debug' | 'warn' | 'error' fetch?: Fetch } export type RealtimeMessage = { topic: string event: string payload: any ref: string join_ref?: string } export type RealtimeRemoveChannelResponse = 'ok' | 'timed out' | 'error' const noop = () => {} interface WebSocketLikeConstructor { new ( address: string | URL, _ignored?: any, options?: { headers: Object | undefined } ): WebSocketLike } type WebSocketLike = WebSocket | WSWebSocket | WSWebSocketDummy interface WebSocketLikeError { error: any message: string type: string } const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined' export default class RealtimeClient { accessToken: string | null = null channels: RealtimeChannel[] = [] endPoint: string = '' headers?: { [key: string]: string } = DEFAULT_HEADERS params?: { [key: string]: string } = {} timeout: number = DEFAULT_TIMEOUT transport: WebSocketLikeConstructor | null heartbeatIntervalMs: number = 30000 heartbeatTimer: ReturnType | undefined = undefined pendingHeartbeatRef: string | null = null ref: number = 0 reconnectTimer: Timer logger: Function = noop encode: Function decode: Function reconnectAfterMs: Function conn: WebSocketLike | null = null sendBuffer: Function[] = [] serializer: Serializer = new Serializer() stateChangeCallbacks: { open: Function[] close: Function[] error: Function[] message: Function[] } = { open: [], close: [], error: [], message: [], } fetch: Fetch /** * Initializes the Socket. * * @param endPoint The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol) * @param options.transport The Websocket Transport, for example WebSocket. * @param options.timeout The default timeout in milliseconds to trigger push timeouts. * @param options.params The optional params to pass when connecting. * @param options.headers The optional headers to pass when connecting. * @param options.heartbeatIntervalMs The millisec interval to send a heartbeat message. * @param options.logger The optional function for specialized logging, ie: logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) } * @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload)) * @param options.decode The function to decode incoming messages. Defaults to Serializer's decode. * @param options.reconnectAfterMs he optional function that returns the millsec reconnect interval. Defaults to stepped backoff off. */ constructor(endPoint: string, options?: RealtimeClientOptions) { this.endPoint = `${endPoint}/${TRANSPORTS.websocket}` if (options?.transport) { this.transport = options.transport } else { this.transport = null } if (options?.params) this.params = options.params if (options?.headers) this.headers = { ...this.headers, ...options.headers } if (options?.timeout) this.timeout = options.timeout if (options?.logger) this.logger = options.logger if (options?.heartbeatIntervalMs) this.heartbeatIntervalMs = options.heartbeatIntervalMs const accessToken = options?.params?.apikey if (accessToken) this.accessToken = accessToken this.reconnectAfterMs = options?.reconnectAfterMs ? options.reconnectAfterMs : (tries: number) => { return [1000, 2000, 5000, 10000][tries - 1] || 10000 } this.encode = options?.encode ? options.encode : (payload: JSON, callback: Function) => { return callback(JSON.stringify(payload)) } this.decode = options?.decode ? options.decode : this.serializer.decode.bind(this.serializer) this.reconnectTimer = new Timer(async () => { this.disconnect() this.connect() }, this.reconnectAfterMs) this.fetch = this._resolveFetch(options?.fetch) } /** * Connects the socket, unless already connected. */ connect(): void { if (this.conn) { return } if (this.transport) { this.conn = new this.transport(this._endPointURL(), undefined, { headers: this.headers, }) return } if (NATIVE_WEBSOCKET_AVAILABLE) { this.conn = new WebSocket(this._endPointURL()) this.setupConnection() return } this.conn = new WSWebSocketDummy(this._endPointURL(), undefined, { close: () => { this.conn = null }, }) import('ws').then(({ default: WS }) => { this.conn = new WS(this._endPointURL(), undefined, { headers: this.headers, }) this.setupConnection() }) } /** * Disconnects the socket. * * @param code A numeric status code to send on disconnect. * @param reason A custom reason for the disconnect. */ disconnect(code?: number, reason?: string): void { if (this.conn) { this.conn.onclose = function () {} // noop if (code) { this.conn.close(code, reason ?? '') } else { this.conn.close() } this.conn = null // remove open handles this.heartbeatTimer && clearInterval(this.heartbeatTimer) this.reconnectTimer.reset() } } /** * Returns all created channels */ getChannels(): RealtimeChannel[] { return this.channels } /** * Unsubscribes and removes a single channel * @param channel A RealtimeChannel instance */ async removeChannel( channel: RealtimeChannel ): Promise { const status = await channel.unsubscribe() if (this.channels.length === 0) { this.disconnect() } return status } /** * Unsubscribes and removes all channels */ async removeAllChannels(): Promise { const values_1 = await Promise.all( this.channels.map((channel) => channel.unsubscribe()) ) this.disconnect() return values_1 } /** * Logs the message. * * For customized logging, `this.logger` can be overridden. */ log(kind: string, msg: string, data?: any) { this.logger(kind, msg, data) } /** * Returns the current state of the socket. */ connectionState(): CONNECTION_STATE { switch (this.conn && this.conn.readyState) { case SOCKET_STATES.connecting: return CONNECTION_STATE.Connecting case SOCKET_STATES.open: return CONNECTION_STATE.Open case SOCKET_STATES.closing: return CONNECTION_STATE.Closing default: return CONNECTION_STATE.Closed } } /** * Returns `true` is the connection is open. */ isConnected(): boolean { return this.connectionState() === CONNECTION_STATE.Open } channel( topic: string, params: RealtimeChannelOptions = { config: {} } ): RealtimeChannel { const chan = new RealtimeChannel(`realtime:${topic}`, params, this) this.channels.push(chan) return chan } /** * Push out a message if the socket is connected. * * If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. */ push(data: RealtimeMessage): void { const { topic, event, payload, ref } = data const callback = () => { this.encode(data, (result: any) => { this.conn?.send(result) }) } this.log('push', `${topic} ${event} (${ref})`, payload) if (this.isConnected()) { callback() } else { this.sendBuffer.push(callback) } } /** * Sets the JWT access token used for channel subscription authorization and Realtime RLS. * * @param token A JWT string. */ setAuth(token: string | null): void { this.accessToken = token this.channels.forEach((channel) => { token && channel.updateJoinPayload({ access_token: token }) if (channel.joinedOnce && channel._isJoined()) { channel._push(CHANNEL_EVENTS.access_token, { access_token: token }) } }) } /** * Use either custom fetch, if provided, or default fetch to make HTTP requests * * @internal */ _resolveFetch = (customFetch?: Fetch): Fetch => { let _fetch: Fetch if (customFetch) { _fetch = customFetch } else if (typeof fetch === 'undefined') { _fetch = (...args) => import('@supabase/node-fetch' as any).then(({ default: fetch }) => fetch(...args) ) } else { _fetch = fetch } return (...args) => _fetch(...args) } /** * Return the next message ref, accounting for overflows * * @internal */ _makeRef(): string { let newRef = this.ref + 1 if (newRef === this.ref) { this.ref = 0 } else { this.ref = newRef } return this.ref.toString() } /** * Unsubscribe from channels with the specified topic. * * @internal */ _leaveOpenTopic(topic: string): void { let dupChannel = this.channels.find( (c) => c.topic === topic && (c._isJoined() || c._isJoining()) ) if (dupChannel) { this.log('transport', `leaving duplicate topic "${topic}"`) dupChannel.unsubscribe() } } /** * Removes a subscription from the socket. * * @param channel An open subscription. * * @internal */ _remove(channel: RealtimeChannel) { this.channels = this.channels.filter( (c: RealtimeChannel) => c._joinRef() !== channel._joinRef() ) } /** * Sets up connection handlers. * * @internal */ private setupConnection(): void { if (this.conn) { this.conn.binaryType = 'arraybuffer' this.conn.onopen = () => this._onConnOpen() this.conn.onerror = (error: WebSocketLikeError) => this._onConnError(error as WebSocketLikeError) this.conn.onmessage = (event: any) => this._onConnMessage(event) this.conn.onclose = (event: any) => this._onConnClose(event) } } /** * Returns the URL of the websocket. * * @internal */ private _endPointURL(): string { return this._appendParams( this.endPoint, Object.assign({}, this.params, { vsn: VSN }) ) } /** @internal */ private _onConnMessage(rawMessage: { data: any }) { this.decode(rawMessage.data, (msg: RealtimeMessage) => { let { topic, event, payload, ref } = msg if ( (ref && ref === this.pendingHeartbeatRef) || event === payload?.type ) { this.pendingHeartbeatRef = null } this.log( 'receive', `${payload.status || ''} ${topic} ${event} ${ (ref && '(' + ref + ')') || '' }`, payload ) this.channels .filter((channel: RealtimeChannel) => channel._isMember(topic)) .forEach((channel: RealtimeChannel) => channel._trigger(event, payload, ref) ) this.stateChangeCallbacks.message.forEach((callback) => callback(msg)) }) } /** @internal */ private _onConnOpen() { this.log('transport', `connected to ${this._endPointURL()}`) this._flushSendBuffer() this.reconnectTimer.reset() this.heartbeatTimer && clearInterval(this.heartbeatTimer) this.heartbeatTimer = setInterval( () => this._sendHeartbeat(), this.heartbeatIntervalMs ) this.stateChangeCallbacks.open.forEach((callback) => callback())! } /** @internal */ private _onConnClose(event: any) { this.log('transport', 'close', event) this._triggerChanError() this.heartbeatTimer && clearInterval(this.heartbeatTimer) this.reconnectTimer.scheduleTimeout() this.stateChangeCallbacks.close.forEach((callback) => callback(event)) } /** @internal */ private _onConnError(error: WebSocketLikeError) { this.log('transport', error.message) this._triggerChanError() this.stateChangeCallbacks.error.forEach((callback) => callback(error)) } /** @internal */ private _triggerChanError() { this.channels.forEach((channel: RealtimeChannel) => channel._trigger(CHANNEL_EVENTS.error) ) } /** @internal */ private _appendParams( url: string, params: { [key: string]: string } ): string { if (Object.keys(params).length === 0) { return url } const prefix = url.match(/\?/) ? '&' : '?' const query = new URLSearchParams(params) return `${url}${prefix}${query}` } /** @internal */ private _flushSendBuffer() { if (this.isConnected() && this.sendBuffer.length > 0) { this.sendBuffer.forEach((callback) => callback()) this.sendBuffer = [] } } /** @internal */ private _sendHeartbeat() { if (!this.isConnected()) { return } if (this.pendingHeartbeatRef) { this.pendingHeartbeatRef = null this.log( 'transport', 'heartbeat timeout. Attempting to re-establish connection' ) this.conn?.close(WS_CLOSE_NORMAL, 'hearbeat timeout') return } this.pendingHeartbeatRef = this._makeRef() this.push({ topic: 'phoenix', event: 'heartbeat', payload: {}, ref: this.pendingHeartbeatRef, }) this.setAuth(this.accessToken) } } class WSWebSocketDummy { binaryType: string = 'arraybuffer' close: Function onclose: Function = () => {} onerror: Function = () => {} onmessage: Function = () => {} onopen: Function = () => {} readyState: number = SOCKET_STATES.connecting send: Function = () => {} url: string | URL | null = null constructor( address: string, _protocols: undefined, options: { close: Function } ) { this.url = address this.close = options.close } }