|
|
import pino from "pino"; |
|
|
import { Duplex, Readable } from "stream"; |
|
|
import { EventStreamMarshaller } from "@smithy/eventstream-serde-node"; |
|
|
import { fromUtf8, toUtf8 } from "@smithy/util-utf8"; |
|
|
import { Message } from "@smithy/eventstream-codec"; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
export function getAwsEventStreamDecoder(params: { |
|
|
input: Readable; |
|
|
logger: pino.Logger; |
|
|
}): Duplex { |
|
|
const { input, logger } = params; |
|
|
const config = { utf8Encoder: toUtf8, utf8Decoder: fromUtf8 }; |
|
|
const eventStream = new EventStreamMarshaller(config).deserialize( |
|
|
input, |
|
|
async (input: Record<string, Message>) => { |
|
|
const eventType = Object.keys(input)[0]; |
|
|
let result; |
|
|
if (eventType === "chunk") { |
|
|
result = input[eventType]; |
|
|
} else { |
|
|
|
|
|
result = { [eventType]: input[eventType] } as any; |
|
|
} |
|
|
return result; |
|
|
} |
|
|
); |
|
|
return new AWSEventStreamDecoder(eventStream, { logger }); |
|
|
} |
|
|
|
|
|
class AWSEventStreamDecoder extends Duplex { |
|
|
private readonly asyncIterable: AsyncIterable<Message>; |
|
|
private iterator: AsyncIterator<Message>; |
|
|
private reading: boolean; |
|
|
private logger: pino.Logger; |
|
|
|
|
|
constructor( |
|
|
asyncIterable: AsyncIterable<Message>, |
|
|
options: { logger: pino.Logger } |
|
|
) { |
|
|
super({ ...options, objectMode: true }); |
|
|
this.asyncIterable = asyncIterable; |
|
|
this.iterator = this.asyncIterable[Symbol.asyncIterator](); |
|
|
this.reading = false; |
|
|
this.logger = options.logger.child({ module: "aws-eventstream-decoder" }); |
|
|
} |
|
|
|
|
|
async _read(_size: number) { |
|
|
if (this.reading) return; |
|
|
this.reading = true; |
|
|
|
|
|
try { |
|
|
while (true) { |
|
|
const { value, done } = await this.iterator.next(); |
|
|
if (done) { |
|
|
this.push(null); |
|
|
break; |
|
|
} |
|
|
if (!this.push(value)) break; |
|
|
} |
|
|
} catch (err) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const isAwsError = !(err instanceof Error); |
|
|
|
|
|
if (isAwsError) { |
|
|
this.logger.warn({ err: err.headers }, "Received AWS error event"); |
|
|
this.push(err); |
|
|
this.push(null); |
|
|
} else { |
|
|
this.logger.error(err, "Error during AWS stream deserialization"); |
|
|
this.destroy(err); |
|
|
} |
|
|
} finally { |
|
|
this.reading = false; |
|
|
} |
|
|
} |
|
|
|
|
|
_write(_chunk: any, _encoding: string, callback: () => void) { |
|
|
callback(); |
|
|
} |
|
|
|
|
|
_final(callback: () => void) { |
|
|
callback(); |
|
|
} |
|
|
} |
|
|
|