File size: 3,468 Bytes
5c5b371 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
/* Queues incoming prompts/responses and periodically flushes them to configured
* logging backend. */
import { logger } from "../../logger";
import { LogBackend, PromptLogEntry } from ".";
import { sheets, file } from "./backends";
import { config } from "../../config";
import { assertNever } from "../utils";
const FLUSH_INTERVAL = 1000 * 10;
const MAX_BATCH_SIZE = 25;
const queue: PromptLogEntry[] = [];
const log = logger.child({ module: "log-queue" });
let started = false;
let timeoutId: NodeJS.Timeout | null = null;
let retrying = false;
let consecutiveFailedBatches = 0;
let backend: LogBackend;
export const enqueue = (payload: PromptLogEntry) => {
if (!started) {
log.warn("Log queue not started, discarding incoming log entry.");
return;
}
queue.push(payload);
};
export const flush = async () => {
if (!started) {
return;
}
if (queue.length > 0) {
const batchSize = Math.min(MAX_BATCH_SIZE, queue.length);
const nextBatch = queue.splice(0, batchSize);
log.info({ size: nextBatch.length }, "Submitting new batch.");
try {
await backend.appendBatch(nextBatch);
retrying = false;
consecutiveFailedBatches = 0;
} catch (e: any) {
if (retrying) {
log.error(
{ message: e.message, stack: e.stack },
"Failed twice to flush batch, discarding."
);
retrying = false;
consecutiveFailedBatches++;
} else {
// Put the batch back at the front of the queue and try again
log.warn(
{ message: e.message, stack: e.stack },
"Failed to flush batch. Retrying."
);
queue.unshift(...nextBatch);
retrying = true;
setImmediate(() => flush());
return;
}
}
}
const useHalfInterval = queue.length > MAX_BATCH_SIZE / 2;
scheduleFlush(useHalfInterval);
};
export const start = async () => {
const type = config.promptLoggingBackend!;
try {
switch (type) {
case "google_sheets":
backend = sheets;
await sheets.init(() => stop());
break;
case "file":
backend = file;
await file.init(() => stop());
break;
default:
assertNever(type)
}
log.info("Logging backend initialized.");
started = true;
} catch (e) {
log.error({ error: e.message }, "Could not initialize logging backend.");
return;
}
scheduleFlush();
};
export const stop = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
log.info("Stopping log queue.");
started = false;
};
const scheduleFlush = (halfInterval = false) => {
if (consecutiveFailedBatches > 3) {
// TODO: may cause memory issues on busy servers, though if we crash that
// may actually fix the problem with logs randomly not being flushed.
const oneMinute = 60 * 1000;
const maxBackoff = 10 * oneMinute;
const backoff = Math.min(consecutiveFailedBatches * oneMinute, maxBackoff);
timeoutId = setTimeout(() => {
flush();
}, backoff);
log.warn(
{ consecutiveFailedBatches, backoffMs: backoff },
"Failed to flush 3 batches in a row, pausing for a few minutes."
);
return;
}
if (halfInterval) {
log.warn(
{ queueSize: queue.length },
"Queue is falling behind, switching to faster flush interval."
);
}
timeoutId = setTimeout(
() => {
flush();
},
halfInterval ? FLUSH_INTERVAL / 2 : FLUSH_INTERVAL
);
};
|