File size: 2,988 Bytes
5c5b371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import pino from "pino";
import { Duplex, Readable } from "stream";
import { EventStreamMarshaller } from "@smithy/eventstream-serde-node";
import { fromUtf8, toUtf8 } from "@smithy/util-utf8";
import { Message } from "@smithy/eventstream-codec";

/**
 * Decodes a Readable stream, such as a proxied HTTP response, into a stream of
 * Message objects using the AWS SDK's EventStreamMarshaller. Error events in
 * the amazon eventstream protocol are decoded as Message objects and will not
 * emit an error event on the decoder stream.
 */
export function getAwsEventStreamDecoder(params: {
  input: Readable;
  logger: pino.Logger;
}): Duplex {
  const { input, logger } = params;
  const config = { utf8Encoder: toUtf8, utf8Decoder: fromUtf8 };
  const eventStream = new EventStreamMarshaller(config).deserialize(
    input,
    async (input: Record<string, Message>) => {
      const eventType = Object.keys(input)[0];
      let result;
      if (eventType === "chunk") {
        result = input[eventType];
      } else {
        // AWS unmarshaller treats non-chunk events (errors and exceptions) oddly.
        result = { [eventType]: input[eventType] } as any;
      }
      return result;
    }
  );
  return new AWSEventStreamDecoder(eventStream, { logger });
}

class AWSEventStreamDecoder extends Duplex {
  private readonly asyncIterable: AsyncIterable<Message>;
  private iterator: AsyncIterator<Message>;
  private reading: boolean;
  private logger: pino.Logger;

  constructor(
    asyncIterable: AsyncIterable<Message>,
    options: { logger: pino.Logger }
  ) {
    super({ ...options, objectMode: true });
    this.asyncIterable = asyncIterable;
    this.iterator = this.asyncIterable[Symbol.asyncIterator]();
    this.reading = false;
    this.logger = options.logger.child({ module: "aws-eventstream-decoder" });
  }

  async _read(_size: number) {
    if (this.reading) return;
    this.reading = true;

    try {
      while (true) {
        const { value, done } = await this.iterator.next();
        if (done) {
          this.push(null);
          break;
        }
        if (!this.push(value)) break;
      }
    } catch (err) {
      // AWS SDK's EventStreamMarshaller emits errors in the stream itself as
      // whatever our deserializer returns, which will not be Error objects
      // because we want to pass the Message to the next stream for processing.
      // Any actual Error thrown here is some failure during deserialization.
      const isAwsError = !(err instanceof Error);

      if (isAwsError) {
        this.logger.warn({ err: err.headers }, "Received AWS error event");
        this.push(err);
        this.push(null);
      } else {
        this.logger.error(err, "Error during AWS stream deserialization");
        this.destroy(err);
      }
    } finally {
      this.reading = false;
    }
  }

  _write(_chunk: any, _encoding: string, callback: () => void) {
    callback();
  }

  _final(callback: () => void) {
    callback();
  }
}