Spaces:
Sleeping
Sleeping
| ; | |
| Object.defineProperty(exports, "__esModule", { value: true }); | |
| exports.WsStream = void 0; | |
| const errors_js_1 = require("../errors.js"); | |
| const queue_js_1 = require("../queue.js"); | |
| const stream_js_1 = require("../stream.js"); | |
| const cursor_js_1 = require("./cursor.js"); | |
| class WsStream extends stream_js_1.Stream { | |
| #client; | |
| #streamId; | |
| #queue; | |
| #cursor; | |
| #closing; | |
| #closed; | |
| /** @private */ | |
| static open(client) { | |
| const streamId = client._streamIdAlloc.alloc(); | |
| const stream = new WsStream(client, streamId); | |
| const responseCallback = () => undefined; | |
| const errorCallback = (e) => stream.#setClosed(e); | |
| const request = { type: "open_stream", streamId }; | |
| client._sendRequest(request, { responseCallback, errorCallback }); | |
| return stream; | |
| } | |
| /** @private */ | |
| constructor(client, streamId) { | |
| super(client.intMode); | |
| this.#client = client; | |
| this.#streamId = streamId; | |
| this.#queue = new queue_js_1.Queue(); | |
| this.#cursor = undefined; | |
| this.#closing = false; | |
| this.#closed = undefined; | |
| } | |
| /** Get the {@link WsClient} object that this stream belongs to. */ | |
| client() { | |
| return this.#client; | |
| } | |
| /** @private */ | |
| _sqlOwner() { | |
| return this.#client; | |
| } | |
| /** @private */ | |
| _execute(stmt) { | |
| return this.#sendStreamRequest({ | |
| type: "execute", | |
| streamId: this.#streamId, | |
| stmt, | |
| }).then((response) => { | |
| return response.result; | |
| }); | |
| } | |
| /** @private */ | |
| _batch(batch) { | |
| return this.#sendStreamRequest({ | |
| type: "batch", | |
| streamId: this.#streamId, | |
| batch, | |
| }).then((response) => { | |
| return response.result; | |
| }); | |
| } | |
| /** @private */ | |
| _describe(protoSql) { | |
| this.#client._ensureVersion(2, "describe()"); | |
| return this.#sendStreamRequest({ | |
| type: "describe", | |
| streamId: this.#streamId, | |
| sql: protoSql.sql, | |
| sqlId: protoSql.sqlId, | |
| }).then((response) => { | |
| return response.result; | |
| }); | |
| } | |
| /** @private */ | |
| _sequence(protoSql) { | |
| this.#client._ensureVersion(2, "sequence()"); | |
| return this.#sendStreamRequest({ | |
| type: "sequence", | |
| streamId: this.#streamId, | |
| sql: protoSql.sql, | |
| sqlId: protoSql.sqlId, | |
| }).then((_response) => { | |
| return undefined; | |
| }); | |
| } | |
| /** Check whether the SQL connection underlying this stream is in autocommit state (i.e., outside of an | |
| * explicit transaction). This requires protocol version 3 or higher. | |
| */ | |
| getAutocommit() { | |
| this.#client._ensureVersion(3, "getAutocommit()"); | |
| return this.#sendStreamRequest({ | |
| type: "get_autocommit", | |
| streamId: this.#streamId, | |
| }).then((response) => { | |
| return response.isAutocommit; | |
| }); | |
| } | |
| #sendStreamRequest(request) { | |
| return new Promise((responseCallback, errorCallback) => { | |
| this.#pushToQueue({ type: "request", request, responseCallback, errorCallback }); | |
| }); | |
| } | |
| /** @private */ | |
| _openCursor(batch) { | |
| this.#client._ensureVersion(3, "cursor"); | |
| return new Promise((cursorCallback, errorCallback) => { | |
| this.#pushToQueue({ type: "cursor", batch, cursorCallback, errorCallback }); | |
| }); | |
| } | |
| /** @private */ | |
| _sendCursorRequest(cursor, request) { | |
| if (cursor !== this.#cursor) { | |
| throw new errors_js_1.InternalError("Cursor not associated with the stream attempted to execute a request"); | |
| } | |
| return new Promise((responseCallback, errorCallback) => { | |
| if (this.#closed !== undefined) { | |
| errorCallback(new errors_js_1.ClosedError("Stream is closed", this.#closed)); | |
| } | |
| else { | |
| this.#client._sendRequest(request, { responseCallback, errorCallback }); | |
| } | |
| }); | |
| } | |
| /** @private */ | |
| _cursorClosed(cursor) { | |
| if (cursor !== this.#cursor) { | |
| throw new errors_js_1.InternalError("Cursor was closed, but it was not associated with the stream"); | |
| } | |
| this.#cursor = undefined; | |
| this.#flushQueue(); | |
| } | |
| #pushToQueue(entry) { | |
| if (this.#closed !== undefined) { | |
| entry.errorCallback(new errors_js_1.ClosedError("Stream is closed", this.#closed)); | |
| } | |
| else if (this.#closing) { | |
| entry.errorCallback(new errors_js_1.ClosedError("Stream is closing", undefined)); | |
| } | |
| else { | |
| this.#queue.push(entry); | |
| this.#flushQueue(); | |
| } | |
| } | |
| #flushQueue() { | |
| for (;;) { | |
| const entry = this.#queue.first(); | |
| if (entry === undefined && this.#cursor === undefined && this.#closing) { | |
| this.#setClosed(new errors_js_1.ClientError("Stream was gracefully closed")); | |
| break; | |
| } | |
| else if (entry?.type === "request" && this.#cursor === undefined) { | |
| const { request, responseCallback, errorCallback } = entry; | |
| this.#queue.shift(); | |
| this.#client._sendRequest(request, { responseCallback, errorCallback }); | |
| } | |
| else if (entry?.type === "cursor" && this.#cursor === undefined) { | |
| const { batch, cursorCallback } = entry; | |
| this.#queue.shift(); | |
| const cursorId = this.#client._cursorIdAlloc.alloc(); | |
| const cursor = new cursor_js_1.WsCursor(this.#client, this, cursorId); | |
| const request = { | |
| type: "open_cursor", | |
| streamId: this.#streamId, | |
| cursorId, | |
| batch, | |
| }; | |
| const responseCallback = () => undefined; | |
| const errorCallback = (e) => cursor._setClosed(e); | |
| this.#client._sendRequest(request, { responseCallback, errorCallback }); | |
| this.#cursor = cursor; | |
| cursorCallback(cursor); | |
| } | |
| else { | |
| break; | |
| } | |
| } | |
| } | |
| #setClosed(error) { | |
| if (this.#closed !== undefined) { | |
| return; | |
| } | |
| this.#closed = error; | |
| if (this.#cursor !== undefined) { | |
| this.#cursor._setClosed(error); | |
| } | |
| for (;;) { | |
| const entry = this.#queue.shift(); | |
| if (entry !== undefined) { | |
| entry.errorCallback(error); | |
| } | |
| else { | |
| break; | |
| } | |
| } | |
| const request = { type: "close_stream", streamId: this.#streamId }; | |
| const responseCallback = () => this.#client._streamIdAlloc.free(this.#streamId); | |
| const errorCallback = () => undefined; | |
| this.#client._sendRequest(request, { responseCallback, errorCallback }); | |
| } | |
| /** Immediately close the stream. */ | |
| close() { | |
| this.#setClosed(new errors_js_1.ClientError("Stream was manually closed")); | |
| } | |
| /** Gracefully close the stream. */ | |
| closeGracefully() { | |
| this.#closing = true; | |
| this.#flushQueue(); | |
| } | |
| /** True if the stream is closed or closing. */ | |
| get closed() { | |
| return this.#closed !== undefined || this.#closing; | |
| } | |
| } | |
| exports.WsStream = WsStream; | |