| "use strict"; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| Object.defineProperty(exports, "__esModule", { value: true }); |
| exports.Http2CallStream = exports.InterceptingListenerImpl = exports.isInterceptingListener = void 0; |
| const http2 = require("http2"); |
| const os = require("os"); |
| const constants_1 = require("./constants"); |
| const metadata_1 = require("./metadata"); |
| const stream_decoder_1 = require("./stream-decoder"); |
| const logging = require("./logging"); |
| const constants_2 = require("./constants"); |
| const error_1 = require("./error"); |
| const TRACER_NAME = 'call_stream'; |
| const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants; |
| |
| |
| |
| |
| |
| |
| function getSystemErrorName(errno) { |
| for (const [name, num] of Object.entries(os.constants.errno)) { |
| if (num === errno) { |
| return name; |
| } |
| } |
| return 'Unknown system error ' + errno; |
| } |
| function getMinDeadline(deadlineList) { |
| let minValue = Infinity; |
| for (const deadline of deadlineList) { |
| const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline; |
| if (deadlineMsecs < minValue) { |
| minValue = deadlineMsecs; |
| } |
| } |
| return minValue; |
| } |
| function isInterceptingListener(listener) { |
| return (listener.onReceiveMetadata !== undefined && |
| listener.onReceiveMetadata.length === 1); |
| } |
| exports.isInterceptingListener = isInterceptingListener; |
| class InterceptingListenerImpl { |
| constructor(listener, nextListener) { |
| this.listener = listener; |
| this.nextListener = nextListener; |
| this.processingMetadata = false; |
| this.hasPendingMessage = false; |
| this.processingMessage = false; |
| this.pendingStatus = null; |
| } |
| processPendingMessage() { |
| if (this.hasPendingMessage) { |
| this.nextListener.onReceiveMessage(this.pendingMessage); |
| this.pendingMessage = null; |
| this.hasPendingMessage = false; |
| } |
| } |
| processPendingStatus() { |
| if (this.pendingStatus) { |
| this.nextListener.onReceiveStatus(this.pendingStatus); |
| } |
| } |
| onReceiveMetadata(metadata) { |
| this.processingMetadata = true; |
| this.listener.onReceiveMetadata(metadata, (metadata) => { |
| this.processingMetadata = false; |
| this.nextListener.onReceiveMetadata(metadata); |
| this.processPendingMessage(); |
| this.processPendingStatus(); |
| }); |
| } |
| |
| onReceiveMessage(message) { |
| |
| |
| this.processingMessage = true; |
| this.listener.onReceiveMessage(message, (msg) => { |
| this.processingMessage = false; |
| if (this.processingMetadata) { |
| this.pendingMessage = msg; |
| this.hasPendingMessage = true; |
| } |
| else { |
| this.nextListener.onReceiveMessage(msg); |
| this.processPendingStatus(); |
| } |
| }); |
| } |
| onReceiveStatus(status) { |
| this.listener.onReceiveStatus(status, (processedStatus) => { |
| if (this.processingMetadata || this.processingMessage) { |
| this.pendingStatus = processedStatus; |
| } |
| else { |
| this.nextListener.onReceiveStatus(processedStatus); |
| } |
| }); |
| } |
| } |
| exports.InterceptingListenerImpl = InterceptingListenerImpl; |
| class Http2CallStream { |
| constructor(methodName, channel, options, filterStackFactory, channelCallCredentials, callNumber) { |
| this.methodName = methodName; |
| this.channel = channel; |
| this.options = options; |
| this.channelCallCredentials = channelCallCredentials; |
| this.callNumber = callNumber; |
| this.http2Stream = null; |
| this.pendingRead = false; |
| this.isWriteFilterPending = false; |
| this.pendingWrite = null; |
| this.pendingWriteCallback = null; |
| this.writesClosed = false; |
| this.decoder = new stream_decoder_1.StreamDecoder(); |
| this.isReadFilterPending = false; |
| this.canPush = false; |
| |
| |
| |
| |
| this.readsClosed = false; |
| this.statusOutput = false; |
| this.unpushedReadMessages = []; |
| this.unfilteredReadMessages = []; |
| |
| this.mappedStatusCode = constants_1.Status.UNKNOWN; |
| |
| this.finalStatus = null; |
| this.subchannel = null; |
| this.listener = null; |
| this.internalError = null; |
| this.configDeadline = Infinity; |
| this.statusWatchers = []; |
| this.streamEndWatchers = []; |
| this.callStatsTracker = null; |
| this.filterStack = filterStackFactory.createFilter(this); |
| this.credentials = channelCallCredentials; |
| this.disconnectListener = () => { |
| this.endCall({ |
| code: constants_1.Status.UNAVAILABLE, |
| details: 'Connection dropped', |
| metadata: new metadata_1.Metadata(), |
| }); |
| }; |
| if (this.options.parentCall && |
| this.options.flags & constants_1.Propagate.CANCELLATION) { |
| this.options.parentCall.on('cancelled', () => { |
| this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call'); |
| }); |
| } |
| } |
| outputStatus() { |
| |
| if (this.listener && !this.statusOutput) { |
| this.statusOutput = true; |
| const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus); |
| this.trace('ended with status: code=' + |
| filteredStatus.code + |
| ' details="' + |
| filteredStatus.details + |
| '"'); |
| this.statusWatchers.forEach(watcher => watcher(filteredStatus)); |
| |
| |
| |
| |
| |
| |
| process.nextTick(() => { |
| var _a; |
| (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus); |
| }); |
| if (this.subchannel) { |
| this.subchannel.callUnref(); |
| this.subchannel.removeDisconnectListener(this.disconnectListener); |
| } |
| } |
| } |
| trace(text) { |
| logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text); |
| } |
| |
| |
| |
| |
| |
| endCall(status) { |
| |
| |
| if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) { |
| this.finalStatus = status; |
| this.maybeOutputStatus(); |
| } |
| this.destroyHttp2Stream(); |
| } |
| maybeOutputStatus() { |
| if (this.finalStatus !== null) { |
| |
| |
| |
| if (this.finalStatus.code !== constants_1.Status.OK || |
| (this.readsClosed && |
| this.unpushedReadMessages.length === 0 && |
| this.unfilteredReadMessages.length === 0 && |
| !this.isReadFilterPending)) { |
| this.outputStatus(); |
| } |
| } |
| } |
| push(message) { |
| this.trace('pushing to reader message of length ' + |
| (message instanceof Buffer ? message.length : null)); |
| this.canPush = false; |
| process.nextTick(() => { |
| var _a; |
| |
| |
| |
| |
| if (this.statusOutput) { |
| return; |
| } |
| (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMessage(message); |
| this.maybeOutputStatus(); |
| }); |
| } |
| handleFilterError(error) { |
| this.cancelWithStatus(constants_1.Status.INTERNAL, error.message); |
| } |
| handleFilteredRead(message) { |
| |
| |
| |
| if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) { |
| this.maybeOutputStatus(); |
| return; |
| } |
| this.isReadFilterPending = false; |
| if (this.canPush) { |
| this.http2Stream.pause(); |
| this.push(message); |
| } |
| else { |
| this.trace('unpushedReadMessages.push message of length ' + message.length); |
| this.unpushedReadMessages.push(message); |
| } |
| if (this.unfilteredReadMessages.length > 0) { |
| |
| |
| const nextMessage = this.unfilteredReadMessages.shift(); |
| this.filterReceivedMessage(nextMessage); |
| } |
| } |
| filterReceivedMessage(framedMessage) { |
| |
| |
| |
| if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) { |
| this.maybeOutputStatus(); |
| return; |
| } |
| this.trace('filterReceivedMessage of length ' + framedMessage.length); |
| this.isReadFilterPending = true; |
| this.filterStack |
| .receiveMessage(Promise.resolve(framedMessage)) |
| .then(this.handleFilteredRead.bind(this), this.handleFilterError.bind(this)); |
| } |
| tryPush(messageBytes) { |
| if (this.isReadFilterPending) { |
| this.trace('unfilteredReadMessages.push message of length ' + |
| (messageBytes && messageBytes.length)); |
| this.unfilteredReadMessages.push(messageBytes); |
| } |
| else { |
| this.filterReceivedMessage(messageBytes); |
| } |
| } |
| handleTrailers(headers) { |
| this.streamEndWatchers.forEach(watcher => watcher(true)); |
| let headersString = ''; |
| for (const header of Object.keys(headers)) { |
| headersString += '\t\t' + header + ': ' + headers[header] + '\n'; |
| } |
| this.trace('Received server trailers:\n' + headersString); |
| let metadata; |
| try { |
| metadata = metadata_1.Metadata.fromHttp2Headers(headers); |
| } |
| catch (e) { |
| metadata = new metadata_1.Metadata(); |
| } |
| const metadataMap = metadata.getMap(); |
| let code = this.mappedStatusCode; |
| if (code === constants_1.Status.UNKNOWN && |
| typeof metadataMap['grpc-status'] === 'string') { |
| const receivedStatus = Number(metadataMap['grpc-status']); |
| if (receivedStatus in constants_1.Status) { |
| code = receivedStatus; |
| this.trace('received status code ' + receivedStatus + ' from server'); |
| } |
| metadata.remove('grpc-status'); |
| } |
| let details = ''; |
| if (typeof metadataMap['grpc-message'] === 'string') { |
| details = decodeURI(metadataMap['grpc-message']); |
| metadata.remove('grpc-message'); |
| this.trace('received status details string "' + details + '" from server'); |
| } |
| const status = { code, details, metadata }; |
| |
| this.endCall(status); |
| } |
| writeMessageToStream(message, callback) { |
| var _a; |
| (_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent(); |
| this.http2Stream.write(message, callback); |
| } |
| attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) { |
| this.filterStack.push(extraFilters); |
| if (this.finalStatus !== null) { |
| stream.close(NGHTTP2_CANCEL); |
| } |
| else { |
| this.trace('attachHttp2Stream from subchannel ' + subchannel.getAddress()); |
| this.http2Stream = stream; |
| this.subchannel = subchannel; |
| this.callStatsTracker = callStatsTracker; |
| subchannel.addDisconnectListener(this.disconnectListener); |
| subchannel.callRef(); |
| stream.on('response', (headers, flags) => { |
| var _a; |
| let headersString = ''; |
| for (const header of Object.keys(headers)) { |
| headersString += '\t\t' + header + ': ' + headers[header] + '\n'; |
| } |
| this.trace('Received server headers:\n' + headersString); |
| switch (headers[':status']) { |
| |
| case 400: |
| this.mappedStatusCode = constants_1.Status.INTERNAL; |
| break; |
| case 401: |
| this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED; |
| break; |
| case 403: |
| this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED; |
| break; |
| case 404: |
| this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED; |
| break; |
| case 429: |
| case 502: |
| case 503: |
| case 504: |
| this.mappedStatusCode = constants_1.Status.UNAVAILABLE; |
| break; |
| default: |
| this.mappedStatusCode = constants_1.Status.UNKNOWN; |
| } |
| if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) { |
| this.handleTrailers(headers); |
| } |
| else { |
| let metadata; |
| try { |
| metadata = metadata_1.Metadata.fromHttp2Headers(headers); |
| } |
| catch (error) { |
| this.endCall({ |
| code: constants_1.Status.UNKNOWN, |
| details: (0, error_1.getErrorMessage)(error), |
| metadata: new metadata_1.Metadata(), |
| }); |
| return; |
| } |
| try { |
| const finalMetadata = this.filterStack.receiveMetadata(metadata); |
| (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMetadata(finalMetadata); |
| } |
| catch (error) { |
| this.endCall({ |
| code: constants_1.Status.UNKNOWN, |
| details: (0, error_1.getErrorMessage)(error), |
| metadata: new metadata_1.Metadata(), |
| }); |
| } |
| } |
| }); |
| stream.on('trailers', this.handleTrailers.bind(this)); |
| stream.on('data', (data) => { |
| this.trace('receive HTTP/2 data frame of length ' + data.length); |
| const messages = this.decoder.write(data); |
| for (const message of messages) { |
| this.trace('parsed message of length ' + message.length); |
| this.callStatsTracker.addMessageReceived(); |
| this.tryPush(message); |
| } |
| }); |
| stream.on('end', () => { |
| this.readsClosed = true; |
| this.maybeOutputStatus(); |
| }); |
| stream.on('close', () => { |
| |
| |
| |
| process.nextTick(() => { |
| var _a; |
| this.trace('HTTP/2 stream closed with code ' + stream.rstCode); |
| |
| |
| |
| |
| if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) { |
| return; |
| } |
| let code; |
| let details = ''; |
| switch (stream.rstCode) { |
| case http2.constants.NGHTTP2_NO_ERROR: |
| |
| |
| |
| if (this.finalStatus !== null) { |
| return; |
| } |
| code = constants_1.Status.INTERNAL; |
| details = `Received RST_STREAM with code ${stream.rstCode}`; |
| break; |
| case http2.constants.NGHTTP2_REFUSED_STREAM: |
| code = constants_1.Status.UNAVAILABLE; |
| details = 'Stream refused by server'; |
| break; |
| case http2.constants.NGHTTP2_CANCEL: |
| code = constants_1.Status.CANCELLED; |
| details = 'Call cancelled'; |
| break; |
| case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM: |
| code = constants_1.Status.RESOURCE_EXHAUSTED; |
| details = 'Bandwidth exhausted or memory limit exceeded'; |
| break; |
| case http2.constants.NGHTTP2_INADEQUATE_SECURITY: |
| code = constants_1.Status.PERMISSION_DENIED; |
| details = 'Protocol not secure enough'; |
| break; |
| case http2.constants.NGHTTP2_INTERNAL_ERROR: |
| code = constants_1.Status.INTERNAL; |
| if (this.internalError === null) { |
| |
| |
| |
| |
| |
| details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`; |
| } |
| else { |
| if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') { |
| code = constants_1.Status.UNAVAILABLE; |
| details = this.internalError.message; |
| } |
| else { |
| |
| |
| |
| |
| details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`; |
| } |
| } |
| break; |
| default: |
| code = constants_1.Status.INTERNAL; |
| details = `Received RST_STREAM with code ${stream.rstCode}`; |
| } |
| |
| |
| |
| |
| this.endCall({ code, details, metadata: new metadata_1.Metadata() }); |
| }); |
| }); |
| stream.on('error', (err) => { |
| |
| |
| |
| |
| |
| |
| |
| if (err.code !== 'ERR_HTTP2_STREAM_ERROR') { |
| this.trace('Node error event: message=' + |
| err.message + |
| ' code=' + |
| err.code + |
| ' errno=' + |
| getSystemErrorName(err.errno) + |
| ' syscall=' + |
| err.syscall); |
| this.internalError = err; |
| } |
| this.streamEndWatchers.forEach(watcher => watcher(false)); |
| }); |
| if (!this.pendingRead) { |
| stream.pause(); |
| } |
| if (this.pendingWrite) { |
| if (!this.pendingWriteCallback) { |
| throw new Error('Invalid state in write handling code'); |
| } |
| this.trace('sending data chunk of length ' + |
| this.pendingWrite.length + |
| ' (deferred)'); |
| try { |
| this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback); |
| } |
| catch (error) { |
| this.endCall({ |
| code: constants_1.Status.UNAVAILABLE, |
| details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`, |
| metadata: new metadata_1.Metadata() |
| }); |
| } |
| } |
| this.maybeCloseWrites(); |
| } |
| } |
| start(metadata, listener) { |
| this.trace('Sending metadata'); |
| this.listener = listener; |
| this.channel._startCallStream(this, metadata); |
| this.maybeOutputStatus(); |
| } |
| destroyHttp2Stream() { |
| var _a; |
| |
| |
| if (this.http2Stream !== null && !this.http2Stream.destroyed) { |
| |
| |
| |
| let code; |
| if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) { |
| code = http2.constants.NGHTTP2_NO_ERROR; |
| } |
| else { |
| code = http2.constants.NGHTTP2_CANCEL; |
| } |
| this.trace('close http2 stream with code ' + code); |
| this.http2Stream.close(code); |
| } |
| } |
| cancelWithStatus(status, details) { |
| this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); |
| this.endCall({ code: status, details, metadata: new metadata_1.Metadata() }); |
| } |
| getDeadline() { |
| const deadlineList = [this.options.deadline]; |
| if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) { |
| deadlineList.push(this.options.parentCall.getDeadline()); |
| } |
| if (this.configDeadline) { |
| deadlineList.push(this.configDeadline); |
| } |
| return getMinDeadline(deadlineList); |
| } |
| getCredentials() { |
| return this.credentials; |
| } |
| setCredentials(credentials) { |
| this.credentials = this.channelCallCredentials.compose(credentials); |
| } |
| getStatus() { |
| return this.finalStatus; |
| } |
| getPeer() { |
| var _a, _b; |
| return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget(); |
| } |
| getMethod() { |
| return this.methodName; |
| } |
| getHost() { |
| return this.options.host; |
| } |
| setConfigDeadline(configDeadline) { |
| this.configDeadline = configDeadline; |
| } |
| addStatusWatcher(watcher) { |
| this.statusWatchers.push(watcher); |
| } |
| addStreamEndWatcher(watcher) { |
| this.streamEndWatchers.push(watcher); |
| } |
| addFilters(extraFilters) { |
| this.filterStack.push(extraFilters); |
| } |
| getCallNumber() { |
| return this.callNumber; |
| } |
| startRead() { |
| |
| |
| if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) { |
| this.readsClosed = true; |
| this.maybeOutputStatus(); |
| return; |
| } |
| this.canPush = true; |
| if (this.http2Stream === null) { |
| this.pendingRead = true; |
| } |
| else { |
| if (this.unpushedReadMessages.length > 0) { |
| const nextMessage = this.unpushedReadMessages.shift(); |
| this.push(nextMessage); |
| return; |
| } |
| |
| |
| this.http2Stream.resume(); |
| } |
| } |
| maybeCloseWrites() { |
| if (this.writesClosed && |
| !this.isWriteFilterPending && |
| this.http2Stream !== null) { |
| this.trace('calling end() on HTTP/2 stream'); |
| this.http2Stream.end(); |
| } |
| } |
| sendMessageWithContext(context, message) { |
| var _a; |
| this.trace('write() called with message of length ' + message.length); |
| const writeObj = { |
| message, |
| flags: context.flags, |
| }; |
| const cb = (_a = context.callback) !== null && _a !== void 0 ? _a : (() => { }); |
| this.isWriteFilterPending = true; |
| this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => { |
| this.isWriteFilterPending = false; |
| if (this.http2Stream === null) { |
| this.trace('deferring writing data chunk of length ' + message.message.length); |
| this.pendingWrite = message.message; |
| this.pendingWriteCallback = cb; |
| } |
| else { |
| this.trace('sending data chunk of length ' + message.message.length); |
| try { |
| this.writeMessageToStream(message.message, cb); |
| } |
| catch (error) { |
| this.endCall({ |
| code: constants_1.Status.UNAVAILABLE, |
| details: `Write failed with error ${(0, error_1.getErrorMessage)(error)}`, |
| metadata: new metadata_1.Metadata() |
| }); |
| } |
| this.maybeCloseWrites(); |
| } |
| }, this.handleFilterError.bind(this)); |
| } |
| halfClose() { |
| this.trace('end() called'); |
| this.writesClosed = true; |
| this.maybeCloseWrites(); |
| } |
| } |
| exports.Http2CallStream = Http2CallStream; |
| |