Harry9233's picture
Upload 20790 files
5c05829 verified
import { VSN, CHANNEL_EVENTS, TRANSPORTS, SOCKET_STATES, DEFAULT_TIMEOUT, WS_CLOSE_NORMAL, DEFAULT_HEADERS, CONNECTION_STATE, } from './lib/constants';
import Timer from './lib/timer';
import Serializer from './lib/serializer';
import RealtimeChannel from './RealtimeChannel';
const noop = () => { };
const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined';
export default class RealtimeClient {
/**
* Initializes the Socket.
*
* @param endPoint The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol)
* @param options.transport The Websocket Transport, for example WebSocket.
* @param options.timeout The default timeout in milliseconds to trigger push timeouts.
* @param options.params The optional params to pass when connecting.
* @param options.headers The optional headers to pass when connecting.
* @param options.heartbeatIntervalMs The millisec interval to send a heartbeat message.
* @param options.logger The optional function for specialized logging, ie: logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
* @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload))
* @param options.decode The function to decode incoming messages. Defaults to Serializer's decode.
* @param options.reconnectAfterMs he optional function that returns the millsec reconnect interval. Defaults to stepped backoff off.
*/
constructor(endPoint, options) {
var _a;
this.accessToken = null;
this.channels = [];
this.endPoint = '';
this.headers = DEFAULT_HEADERS;
this.params = {};
this.timeout = DEFAULT_TIMEOUT;
this.heartbeatIntervalMs = 30000;
this.heartbeatTimer = undefined;
this.pendingHeartbeatRef = null;
this.ref = 0;
this.logger = noop;
this.conn = null;
this.sendBuffer = [];
this.serializer = new Serializer();
this.stateChangeCallbacks = {
open: [],
close: [],
error: [],
message: [],
};
/**
* Use either custom fetch, if provided, or default fetch to make HTTP requests
*
* @internal
*/
this._resolveFetch = (customFetch) => {
let _fetch;
if (customFetch) {
_fetch = customFetch;
}
else if (typeof fetch === 'undefined') {
_fetch = (...args) => import('@supabase/node-fetch').then(({ default: fetch }) => fetch(...args));
}
else {
_fetch = fetch;
}
return (...args) => _fetch(...args);
};
this.endPoint = `${endPoint}/${TRANSPORTS.websocket}`;
if (options === null || options === void 0 ? void 0 : options.transport) {
this.transport = options.transport;
}
else {
this.transport = null;
}
if (options === null || options === void 0 ? void 0 : options.params)
this.params = options.params;
if (options === null || options === void 0 ? void 0 : options.headers)
this.headers = Object.assign(Object.assign({}, this.headers), options.headers);
if (options === null || options === void 0 ? void 0 : options.timeout)
this.timeout = options.timeout;
if (options === null || options === void 0 ? void 0 : options.logger)
this.logger = options.logger;
if (options === null || options === void 0 ? void 0 : options.heartbeatIntervalMs)
this.heartbeatIntervalMs = options.heartbeatIntervalMs;
const accessToken = (_a = options === null || options === void 0 ? void 0 : options.params) === null || _a === void 0 ? void 0 : _a.apikey;
if (accessToken)
this.accessToken = accessToken;
this.reconnectAfterMs = (options === null || options === void 0 ? void 0 : options.reconnectAfterMs)
? options.reconnectAfterMs
: (tries) => {
return [1000, 2000, 5000, 10000][tries - 1] || 10000;
};
this.encode = (options === null || options === void 0 ? void 0 : options.encode)
? options.encode
: (payload, callback) => {
return callback(JSON.stringify(payload));
};
this.decode = (options === null || options === void 0 ? void 0 : options.decode)
? options.decode
: this.serializer.decode.bind(this.serializer);
this.reconnectTimer = new Timer(async () => {
this.disconnect();
this.connect();
}, this.reconnectAfterMs);
this.fetch = this._resolveFetch(options === null || options === void 0 ? void 0 : options.fetch);
}
/**
* Connects the socket, unless already connected.
*/
connect() {
if (this.conn) {
return;
}
if (this.transport) {
this.conn = new this.transport(this._endPointURL(), undefined, {
headers: this.headers,
});
return;
}
if (NATIVE_WEBSOCKET_AVAILABLE) {
this.conn = new WebSocket(this._endPointURL());
this.setupConnection();
return;
}
this.conn = new WSWebSocketDummy(this._endPointURL(), undefined, {
close: () => {
this.conn = null;
},
});
import('ws').then(({ default: WS }) => {
this.conn = new WS(this._endPointURL(), undefined, {
headers: this.headers,
});
this.setupConnection();
});
}
/**
* Disconnects the socket.
*
* @param code A numeric status code to send on disconnect.
* @param reason A custom reason for the disconnect.
*/
disconnect(code, reason) {
if (this.conn) {
this.conn.onclose = function () { }; // noop
if (code) {
this.conn.close(code, reason !== null && reason !== void 0 ? reason : '');
}
else {
this.conn.close();
}
this.conn = null;
// remove open handles
this.heartbeatTimer && clearInterval(this.heartbeatTimer);
this.reconnectTimer.reset();
}
}
/**
* Returns all created channels
*/
getChannels() {
return this.channels;
}
/**
* Unsubscribes and removes a single channel
* @param channel A RealtimeChannel instance
*/
async removeChannel(channel) {
const status = await channel.unsubscribe();
if (this.channels.length === 0) {
this.disconnect();
}
return status;
}
/**
* Unsubscribes and removes all channels
*/
async removeAllChannels() {
const values_1 = await Promise.all(this.channels.map((channel) => channel.unsubscribe()));
this.disconnect();
return values_1;
}
/**
* Logs the message.
*
* For customized logging, `this.logger` can be overridden.
*/
log(kind, msg, data) {
this.logger(kind, msg, data);
}
/**
* Returns the current state of the socket.
*/
connectionState() {
switch (this.conn && this.conn.readyState) {
case SOCKET_STATES.connecting:
return CONNECTION_STATE.Connecting;
case SOCKET_STATES.open:
return CONNECTION_STATE.Open;
case SOCKET_STATES.closing:
return CONNECTION_STATE.Closing;
default:
return CONNECTION_STATE.Closed;
}
}
/**
* Returns `true` is the connection is open.
*/
isConnected() {
return this.connectionState() === CONNECTION_STATE.Open;
}
channel(topic, params = { config: {} }) {
const chan = new RealtimeChannel(`realtime:${topic}`, params, this);
this.channels.push(chan);
return chan;
}
/**
* Push out a message if the socket is connected.
*
* If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established.
*/
push(data) {
const { topic, event, payload, ref } = data;
const callback = () => {
this.encode(data, (result) => {
var _a;
(_a = this.conn) === null || _a === void 0 ? void 0 : _a.send(result);
});
};
this.log('push', `${topic} ${event} (${ref})`, payload);
if (this.isConnected()) {
callback();
}
else {
this.sendBuffer.push(callback);
}
}
/**
* Sets the JWT access token used for channel subscription authorization and Realtime RLS.
*
* @param token A JWT string.
*/
setAuth(token) {
this.accessToken = token;
this.channels.forEach((channel) => {
token && channel.updateJoinPayload({ access_token: token });
if (channel.joinedOnce && channel._isJoined()) {
channel._push(CHANNEL_EVENTS.access_token, { access_token: token });
}
});
}
/**
* Return the next message ref, accounting for overflows
*
* @internal
*/
_makeRef() {
let newRef = this.ref + 1;
if (newRef === this.ref) {
this.ref = 0;
}
else {
this.ref = newRef;
}
return this.ref.toString();
}
/**
* Unsubscribe from channels with the specified topic.
*
* @internal
*/
_leaveOpenTopic(topic) {
let dupChannel = this.channels.find((c) => c.topic === topic && (c._isJoined() || c._isJoining()));
if (dupChannel) {
this.log('transport', `leaving duplicate topic "${topic}"`);
dupChannel.unsubscribe();
}
}
/**
* Removes a subscription from the socket.
*
* @param channel An open subscription.
*
* @internal
*/
_remove(channel) {
this.channels = this.channels.filter((c) => c._joinRef() !== channel._joinRef());
}
/**
* Sets up connection handlers.
*
* @internal
*/
setupConnection() {
if (this.conn) {
this.conn.binaryType = 'arraybuffer';
this.conn.onopen = () => this._onConnOpen();
this.conn.onerror = (error) => this._onConnError(error);
this.conn.onmessage = (event) => this._onConnMessage(event);
this.conn.onclose = (event) => this._onConnClose(event);
}
}
/**
* Returns the URL of the websocket.
*
* @internal
*/
_endPointURL() {
return this._appendParams(this.endPoint, Object.assign({}, this.params, { vsn: VSN }));
}
/** @internal */
_onConnMessage(rawMessage) {
this.decode(rawMessage.data, (msg) => {
let { topic, event, payload, ref } = msg;
if ((ref && ref === this.pendingHeartbeatRef) ||
event === (payload === null || payload === void 0 ? void 0 : payload.type)) {
this.pendingHeartbeatRef = null;
}
this.log('receive', `${payload.status || ''} ${topic} ${event} ${(ref && '(' + ref + ')') || ''}`, payload);
this.channels
.filter((channel) => channel._isMember(topic))
.forEach((channel) => channel._trigger(event, payload, ref));
this.stateChangeCallbacks.message.forEach((callback) => callback(msg));
});
}
/** @internal */
_onConnOpen() {
this.log('transport', `connected to ${this._endPointURL()}`);
this._flushSendBuffer();
this.reconnectTimer.reset();
this.heartbeatTimer && clearInterval(this.heartbeatTimer);
this.heartbeatTimer = setInterval(() => this._sendHeartbeat(), this.heartbeatIntervalMs);
this.stateChangeCallbacks.open.forEach((callback) => callback());
}
/** @internal */
_onConnClose(event) {
this.log('transport', 'close', event);
this._triggerChanError();
this.heartbeatTimer && clearInterval(this.heartbeatTimer);
this.reconnectTimer.scheduleTimeout();
this.stateChangeCallbacks.close.forEach((callback) => callback(event));
}
/** @internal */
_onConnError(error) {
this.log('transport', error.message);
this._triggerChanError();
this.stateChangeCallbacks.error.forEach((callback) => callback(error));
}
/** @internal */
_triggerChanError() {
this.channels.forEach((channel) => channel._trigger(CHANNEL_EVENTS.error));
}
/** @internal */
_appendParams(url, params) {
if (Object.keys(params).length === 0) {
return url;
}
const prefix = url.match(/\?/) ? '&' : '?';
const query = new URLSearchParams(params);
return `${url}${prefix}${query}`;
}
/** @internal */
_flushSendBuffer() {
if (this.isConnected() && this.sendBuffer.length > 0) {
this.sendBuffer.forEach((callback) => callback());
this.sendBuffer = [];
}
}
/** @internal */
_sendHeartbeat() {
var _a;
if (!this.isConnected()) {
return;
}
if (this.pendingHeartbeatRef) {
this.pendingHeartbeatRef = null;
this.log('transport', 'heartbeat timeout. Attempting to re-establish connection');
(_a = this.conn) === null || _a === void 0 ? void 0 : _a.close(WS_CLOSE_NORMAL, 'hearbeat timeout');
return;
}
this.pendingHeartbeatRef = this._makeRef();
this.push({
topic: 'phoenix',
event: 'heartbeat',
payload: {},
ref: this.pendingHeartbeatRef,
});
this.setAuth(this.accessToken);
}
}
class WSWebSocketDummy {
constructor(address, _protocols, options) {
this.binaryType = 'arraybuffer';
this.onclose = () => { };
this.onerror = () => { };
this.onmessage = () => { };
this.onopen = () => { };
this.readyState = SOCKET_STATES.connecting;
this.send = () => { };
this.url = null;
this.url = address;
this.close = options.close;
}
}
//# sourceMappingURL=RealtimeClient.js.map