File size: 3,468 Bytes
5c5b371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/* Queues incoming prompts/responses and periodically flushes them to configured
 * logging backend. */

import { logger } from "../../logger";
import { LogBackend, PromptLogEntry } from ".";
import { sheets, file } from "./backends";
import { config } from "../../config";
import { assertNever } from "../utils";

const FLUSH_INTERVAL = 1000 * 10;
const MAX_BATCH_SIZE = 25;

const queue: PromptLogEntry[] = [];
const log = logger.child({ module: "log-queue" });

let started = false;
let timeoutId: NodeJS.Timeout | null = null;
let retrying = false;
let consecutiveFailedBatches = 0;
let backend: LogBackend;

export const enqueue = (payload: PromptLogEntry) => {
  if (!started) {
    log.warn("Log queue not started, discarding incoming log entry.");
    return;
  }
  queue.push(payload);
};

export const flush = async () => {
  if (!started) {
    return;
  }

  if (queue.length > 0) {
    const batchSize = Math.min(MAX_BATCH_SIZE, queue.length);
    const nextBatch = queue.splice(0, batchSize);
    log.info({ size: nextBatch.length }, "Submitting new batch.");
    try {
      await backend.appendBatch(nextBatch);
      retrying = false;
      consecutiveFailedBatches = 0;
    } catch (e: any) {
      if (retrying) {
        log.error(
          { message: e.message, stack: e.stack },
          "Failed twice to flush batch, discarding."
        );
        retrying = false;
        consecutiveFailedBatches++;
      } else {
        // Put the batch back at the front of the queue and try again
        log.warn(
          { message: e.message, stack: e.stack },
          "Failed to flush batch. Retrying."
        );
        queue.unshift(...nextBatch);
        retrying = true;
        setImmediate(() => flush());
        return;
      }
    }
  }

  const useHalfInterval = queue.length > MAX_BATCH_SIZE / 2;
  scheduleFlush(useHalfInterval);
};

export const start = async () => {
  const type = config.promptLoggingBackend!;
  try {
    switch (type) {
      case "google_sheets":
        backend = sheets;
        await sheets.init(() => stop());
        break;
      case "file":
        backend = file;
        await file.init(() => stop());
        break;
      default:
        assertNever(type)
    }
    log.info("Logging backend initialized.");
    started = true;
  } catch (e) {
    log.error({ error: e.message }, "Could not initialize logging backend.");
    return;
  }
  scheduleFlush();
};

export const stop = () => {
  if (timeoutId) {
    clearTimeout(timeoutId);
  }
  log.info("Stopping log queue.");
  started = false;
};

const scheduleFlush = (halfInterval = false) => {
  if (consecutiveFailedBatches > 3) {
    // TODO: may cause memory issues on busy servers, though if we crash that
    // may actually fix the problem with logs randomly not being flushed.
    const oneMinute = 60 * 1000;
    const maxBackoff = 10 * oneMinute;
    const backoff = Math.min(consecutiveFailedBatches * oneMinute, maxBackoff);
    timeoutId = setTimeout(() => {
      flush();
    }, backoff);
    log.warn(
      { consecutiveFailedBatches, backoffMs: backoff },
      "Failed to flush 3 batches in a row, pausing for a few minutes."
    );
    return;
  }

  if (halfInterval) {
    log.warn(
      { queueSize: queue.length },
      "Queue is falling behind, switching to faster flush interval."
    );
  }

  timeoutId = setTimeout(
    () => {
      flush();
    },
    halfInterval ? FLUSH_INTERVAL / 2 : FLUSH_INTERVAL
  );
};