Spaces:
Runtime error
Runtime error
File size: 2,086 Bytes
4327358 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import * as grpc from '@grpc/grpc-js';
import { messages } from '@waha/core/engines/gows/grpc/gows';
import { EnginePayload } from '@waha/structures/webhooks.dto';
import { sleep } from '@waha/utils/promiseTimeout';
import { Logger } from 'pino';
import { Observable } from 'rxjs';
/**
* Observable that listens to a gRPC stream and emits EnginePayload objects.
* Pass a factory function that returns a client and a stream.
*/
export class GowsEventStreamObservable extends Observable<EnginePayload> {
_client: grpc.Client;
CLIENT_CLOSE_TIMEOUT = 1_000;
constructor(
private logger: Logger,
factory: () => {
client: grpc.Client;
stream: grpc.ClientReadableStream<messages.EventJson>;
},
) {
super((subscriber) => {
this.logger.debug('Creating grpc client and stream...');
const { client, stream } = factory();
this._client = client;
stream.on('data', (raw) => {
const obj = raw.toObject();
obj.data = JSON.parse(obj.data);
subscriber?.next(obj);
});
stream.on('end', (...args) => {
this.logger.debug('Stream ended', args);
subscriber?.complete();
subscriber = null;
});
stream.on('error', async (err: any) => {
const CLIENT_CANCELLED_CODE = grpc.status.CANCELLED;
if (err.code === CLIENT_CANCELLED_CODE) {
this.logger.debug('Stream cancelled by client');
return;
}
this.logger.error(err, 'Stream error');
// Give some time to node event loop to process the error
await sleep(100);
subscriber?.error(err);
subscriber = null;
});
return async () => {
this.logger.debug('Closing stream client...');
client.close();
await sleep(this.CLIENT_CLOSE_TIMEOUT);
this.logger.debug('Stream client closed');
this.logger.debug('Cancelling stream...');
stream.cancel();
this.logger.debug('Stream cancelled');
};
});
}
get client(): Omit<grpc.Client, 'close'> {
return this._client;
}
}
|