| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import { CallCredentials } from './call-credentials'; |
| import { |
| Call, |
| DeadlineInfoProvider, |
| InterceptingListener, |
| MessageContext, |
| StatusObject, |
| } from './call-interface'; |
| import { SubchannelCall } from './subchannel-call'; |
| import { ConnectivityState } from './connectivity-state'; |
| import { LogVerbosity, Status } from './constants'; |
| import { Deadline, formatDateDifference, getDeadlineTimeoutString } from './deadline'; |
| import { InternalChannel } from './internal-channel'; |
| import { Metadata } from './metadata'; |
| import { PickResultType } from './picker'; |
| import { CallConfig } from './resolver'; |
| import { splitHostPort } from './uri-parser'; |
| import * as logging from './logging'; |
| import { restrictControlPlaneStatusCode } from './control-plane-status'; |
| import * as http2 from 'http2'; |
|
|
| const TRACER_NAME = 'load_balancing_call'; |
|
|
| export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED'; |
|
|
| export interface StatusObjectWithProgress extends StatusObject { |
| progress: RpcProgress; |
| } |
|
|
| export interface LoadBalancingCallInterceptingListener |
| extends InterceptingListener { |
| onReceiveStatus(status: StatusObjectWithProgress): void; |
| } |
|
|
| export class LoadBalancingCall implements Call, DeadlineInfoProvider { |
| private child: SubchannelCall | null = null; |
| private readPending = false; |
| private pendingMessage: { context: MessageContext; message: Buffer } | null = |
| null; |
| private pendingHalfClose = false; |
| private ended = false; |
| private serviceUrl: string; |
| private metadata: Metadata | null = null; |
| private listener: InterceptingListener | null = null; |
| private onCallEnded: ((statusCode: Status) => void) | null = null; |
| private startTime: Date; |
| private childStartTime: Date | null = null; |
| constructor( |
| private readonly channel: InternalChannel, |
| private readonly callConfig: CallConfig, |
| private readonly methodName: string, |
| private readonly host: string, |
| private readonly credentials: CallCredentials, |
| private readonly deadline: Deadline, |
| private readonly callNumber: number |
| ) { |
| const splitPath: string[] = this.methodName.split('/'); |
| let serviceName = ''; |
| |
| |
| |
| if (splitPath.length >= 2) { |
| serviceName = splitPath[1]; |
| } |
| const hostname = splitHostPort(this.host)?.host ?? 'localhost'; |
| |
| |
| this.serviceUrl = `https://${hostname}/${serviceName}`; |
| this.startTime = new Date(); |
| } |
| getDeadlineInfo(): string[] { |
| const deadlineInfo: string[] = []; |
| if (this.childStartTime) { |
| if (this.childStartTime > this.startTime) { |
| if (this.metadata?.getOptions().waitForReady) { |
| deadlineInfo.push('wait_for_ready'); |
| } |
| deadlineInfo.push(`LB pick: ${formatDateDifference(this.startTime, this.childStartTime)}`); |
| } |
| deadlineInfo.push(...this.child!.getDeadlineInfo()); |
| return deadlineInfo; |
| } else { |
| if (this.metadata?.getOptions().waitForReady) { |
| deadlineInfo.push('wait_for_ready'); |
| } |
| deadlineInfo.push('Waiting for LB pick'); |
| } |
| return deadlineInfo; |
| } |
|
|
| private trace(text: string): void { |
| logging.trace( |
| LogVerbosity.DEBUG, |
| TRACER_NAME, |
| '[' + this.callNumber + '] ' + text |
| ); |
| } |
|
|
| private outputStatus(status: StatusObject, progress: RpcProgress) { |
| if (!this.ended) { |
| this.ended = true; |
| this.trace( |
| 'ended with status: code=' + |
| status.code + |
| ' details="' + |
| status.details + |
| '" start time=' + |
| this.startTime.toISOString() |
| ); |
| const finalStatus = { ...status, progress }; |
| this.listener?.onReceiveStatus(finalStatus); |
| this.onCallEnded?.(finalStatus.code); |
| } |
| } |
|
|
| doPick() { |
| if (this.ended) { |
| return; |
| } |
| if (!this.metadata) { |
| throw new Error('doPick called before start'); |
| } |
| this.trace('Pick called'); |
| const finalMetadata = this.metadata.clone(); |
| const pickResult = this.channel.doPick( |
| finalMetadata, |
| this.callConfig.pickInformation |
| ); |
| const subchannelString = pickResult.subchannel |
| ? '(' + |
| pickResult.subchannel.getChannelzRef().id + |
| ') ' + |
| pickResult.subchannel.getAddress() |
| : '' + pickResult.subchannel; |
| this.trace( |
| 'Pick result: ' + |
| PickResultType[pickResult.pickResultType] + |
| ' subchannel: ' + |
| subchannelString + |
| ' status: ' + |
| pickResult.status?.code + |
| ' ' + |
| pickResult.status?.details |
| ); |
| switch (pickResult.pickResultType) { |
| case PickResultType.COMPLETE: |
| this.credentials |
| .generateMetadata({ method_name: this.methodName, service_url: this.serviceUrl }) |
| .then( |
| credsMetadata => { |
| |
| |
| |
| if (this.ended) { |
| this.trace( |
| 'Credentials metadata generation finished after call ended' |
| ); |
| return; |
| } |
| finalMetadata.merge(credsMetadata); |
| if (finalMetadata.get('authorization').length > 1) { |
| this.outputStatus( |
| { |
| code: Status.INTERNAL, |
| details: |
| '"authorization" metadata cannot have multiple values', |
| metadata: new Metadata(), |
| }, |
| 'PROCESSED' |
| ); |
| } |
| if ( |
| pickResult.subchannel!.getConnectivityState() !== |
| ConnectivityState.READY |
| ) { |
| this.trace( |
| 'Picked subchannel ' + |
| subchannelString + |
| ' has state ' + |
| ConnectivityState[ |
| pickResult.subchannel!.getConnectivityState() |
| ] + |
| ' after getting credentials metadata. Retrying pick' |
| ); |
| this.doPick(); |
| return; |
| } |
|
|
| if (this.deadline !== Infinity) { |
| finalMetadata.set( |
| 'grpc-timeout', |
| getDeadlineTimeoutString(this.deadline) |
| ); |
| } |
| try { |
| this.child = pickResult |
| .subchannel!.getRealSubchannel() |
| .createCall(finalMetadata, this.host, this.methodName, { |
| onReceiveMetadata: metadata => { |
| this.trace('Received metadata'); |
| this.listener!.onReceiveMetadata(metadata); |
| }, |
| onReceiveMessage: message => { |
| this.trace('Received message'); |
| this.listener!.onReceiveMessage(message); |
| }, |
| onReceiveStatus: status => { |
| this.trace('Received status'); |
| if ( |
| status.rstCode === |
| http2.constants.NGHTTP2_REFUSED_STREAM |
| ) { |
| this.outputStatus(status, 'REFUSED'); |
| } else { |
| this.outputStatus(status, 'PROCESSED'); |
| } |
| }, |
| }); |
| this.childStartTime = new Date(); |
| } catch (error) { |
| this.trace( |
| 'Failed to start call on picked subchannel ' + |
| subchannelString + |
| ' with error ' + |
| (error as Error).message |
| ); |
| this.outputStatus( |
| { |
| code: Status.INTERNAL, |
| details: |
| 'Failed to start HTTP/2 stream with error ' + |
| (error as Error).message, |
| metadata: new Metadata(), |
| }, |
| 'NOT_STARTED' |
| ); |
| return; |
| } |
| this.callConfig.onCommitted?.(); |
| pickResult.onCallStarted?.(); |
| this.onCallEnded = pickResult.onCallEnded; |
| this.trace( |
| 'Created child call [' + this.child.getCallNumber() + ']' |
| ); |
| if (this.readPending) { |
| this.child.startRead(); |
| } |
| if (this.pendingMessage) { |
| this.child.sendMessageWithContext( |
| this.pendingMessage.context, |
| this.pendingMessage.message |
| ); |
| } |
| if (this.pendingHalfClose) { |
| this.child.halfClose(); |
| } |
| }, |
| (error: Error & { code: number }) => { |
| |
| const { code, details } = restrictControlPlaneStatusCode( |
| typeof error.code === 'number' ? error.code : Status.UNKNOWN, |
| `Getting metadata from plugin failed with error: ${error.message}` |
| ); |
| this.outputStatus( |
| { |
| code: code, |
| details: details, |
| metadata: new Metadata(), |
| }, |
| 'PROCESSED' |
| ); |
| } |
| ); |
| break; |
| case PickResultType.DROP: |
| const { code, details } = restrictControlPlaneStatusCode( |
| pickResult.status!.code, |
| pickResult.status!.details |
| ); |
| setImmediate(() => { |
| this.outputStatus( |
| { code, details, metadata: pickResult.status!.metadata }, |
| 'DROP' |
| ); |
| }); |
| break; |
| case PickResultType.TRANSIENT_FAILURE: |
| if (this.metadata.getOptions().waitForReady) { |
| this.channel.queueCallForPick(this); |
| } else { |
| const { code, details } = restrictControlPlaneStatusCode( |
| pickResult.status!.code, |
| pickResult.status!.details |
| ); |
| setImmediate(() => { |
| this.outputStatus( |
| { code, details, metadata: pickResult.status!.metadata }, |
| 'PROCESSED' |
| ); |
| }); |
| } |
| break; |
| case PickResultType.QUEUE: |
| this.channel.queueCallForPick(this); |
| } |
| } |
|
|
| cancelWithStatus(status: Status, details: string): void { |
| this.trace( |
| 'cancelWithStatus code: ' + status + ' details: "' + details + '"' |
| ); |
| this.child?.cancelWithStatus(status, details); |
| this.outputStatus( |
| { code: status, details: details, metadata: new Metadata() }, |
| 'PROCESSED' |
| ); |
| } |
| getPeer(): string { |
| return this.child?.getPeer() ?? this.channel.getTarget(); |
| } |
| start( |
| metadata: Metadata, |
| listener: LoadBalancingCallInterceptingListener |
| ): void { |
| this.trace('start called'); |
| this.listener = listener; |
| this.metadata = metadata; |
| this.doPick(); |
| } |
| sendMessageWithContext(context: MessageContext, message: Buffer): void { |
| this.trace('write() called with message of length ' + message.length); |
| if (this.child) { |
| this.child.sendMessageWithContext(context, message); |
| } else { |
| this.pendingMessage = { context, message }; |
| } |
| } |
| startRead(): void { |
| this.trace('startRead called'); |
| if (this.child) { |
| this.child.startRead(); |
| } else { |
| this.readPending = true; |
| } |
| } |
| halfClose(): void { |
| this.trace('halfClose called'); |
| if (this.child) { |
| this.child.halfClose(); |
| } else { |
| this.pendingHalfClose = true; |
| } |
| } |
| setCredentials(credentials: CallCredentials): void { |
| throw new Error('Method not implemented.'); |
| } |
|
|
| getCallNumber(): number { |
| return this.callNumber; |
| } |
| } |
|
|