| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import { EventEmitter } from 'events'; |
| import { Duplex, Readable, Writable } from 'stream'; |
|
|
| import { StatusObject, MessageContext } from './call-interface'; |
| import { Status } from './constants'; |
| import { EmitterAugmentation1 } from './events'; |
| import { Metadata } from './metadata'; |
| import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream'; |
| import { InterceptingCallInterface } from './client-interceptors'; |
|
|
| |
| |
| |
| export type ServiceError = StatusObject & Error; |
|
|
| |
| |
| |
| export type SurfaceCall = { |
| call?: InterceptingCallInterface; |
| cancel(): void; |
| getPeer(): string; |
| } & EmitterAugmentation1<'metadata', Metadata> & |
| EmitterAugmentation1<'status', StatusObject> & |
| EventEmitter; |
|
|
| |
| |
| |
| export type ClientUnaryCall = SurfaceCall; |
|
|
| |
| |
| |
| export type ClientReadableStream<ResponseType> = { |
| deserialize: (chunk: Buffer) => ResponseType; |
| } & SurfaceCall & |
| ObjectReadable<ResponseType>; |
|
|
| |
| |
| |
| export type ClientWritableStream<RequestType> = { |
| serialize: (value: RequestType) => Buffer; |
| } & SurfaceCall & |
| ObjectWritable<RequestType>; |
|
|
| |
| |
| |
| export type ClientDuplexStream<RequestType, ResponseType> = |
| ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>; |
|
|
| |
| |
| |
| |
| |
| |
| export function callErrorFromStatus( |
| status: StatusObject, |
| callerStack: string |
| ): ServiceError { |
| const message = `${status.code} ${Status[status.code]}: ${status.details}`; |
| const error = new Error(message); |
| const stack = `${error.stack}\nfor call at\n${callerStack}`; |
| return Object.assign(new Error(message), status, { stack }); |
| } |
|
|
| export class ClientUnaryCallImpl |
| extends EventEmitter |
| implements ClientUnaryCall |
| { |
| public call?: InterceptingCallInterface; |
| constructor() { |
| super(); |
| } |
|
|
| cancel(): void { |
| this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); |
| } |
|
|
| getPeer(): string { |
| return this.call?.getPeer() ?? 'unknown'; |
| } |
| } |
|
|
| export class ClientReadableStreamImpl<ResponseType> |
| extends Readable |
| implements ClientReadableStream<ResponseType> |
| { |
| public call?: InterceptingCallInterface; |
| constructor(readonly deserialize: (chunk: Buffer) => ResponseType) { |
| super({ objectMode: true }); |
| } |
|
|
| cancel(): void { |
| this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); |
| } |
|
|
| getPeer(): string { |
| return this.call?.getPeer() ?? 'unknown'; |
| } |
|
|
| _read(_size: number): void { |
| this.call?.startRead(); |
| } |
| } |
|
|
| export class ClientWritableStreamImpl<RequestType> |
| extends Writable |
| implements ClientWritableStream<RequestType> |
| { |
| public call?: InterceptingCallInterface; |
| constructor(readonly serialize: (value: RequestType) => Buffer) { |
| super({ objectMode: true }); |
| } |
|
|
| cancel(): void { |
| this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); |
| } |
|
|
| getPeer(): string { |
| return this.call?.getPeer() ?? 'unknown'; |
| } |
|
|
| _write(chunk: RequestType, encoding: string, cb: WriteCallback) { |
| const context: MessageContext = { |
| callback: cb, |
| }; |
| const flags = Number(encoding); |
| if (!Number.isNaN(flags)) { |
| context.flags = flags; |
| } |
| this.call?.sendMessageWithContext(context, chunk); |
| } |
|
|
| _final(cb: Function) { |
| this.call?.halfClose(); |
| cb(); |
| } |
| } |
|
|
| export class ClientDuplexStreamImpl<RequestType, ResponseType> |
| extends Duplex |
| implements ClientDuplexStream<RequestType, ResponseType> |
| { |
| public call?: InterceptingCallInterface; |
| constructor( |
| readonly serialize: (value: RequestType) => Buffer, |
| readonly deserialize: (chunk: Buffer) => ResponseType |
| ) { |
| super({ objectMode: true }); |
| } |
|
|
| cancel(): void { |
| this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client'); |
| } |
|
|
| getPeer(): string { |
| return this.call?.getPeer() ?? 'unknown'; |
| } |
|
|
| _read(_size: number): void { |
| this.call?.startRead(); |
| } |
|
|
| _write(chunk: RequestType, encoding: string, cb: WriteCallback) { |
| const context: MessageContext = { |
| callback: cb, |
| }; |
| const flags = Number(encoding); |
| if (!Number.isNaN(flags)) { |
| context.flags = flags; |
| } |
| this.call?.sendMessageWithContext(context, chunk); |
| } |
|
|
| _final(cb: Function) { |
| this.call?.halfClose(); |
| cb(); |
| } |
| } |
|
|