Spaces:
Sleeping
Sleeping
File size: 4,554 Bytes
fb4d8fe | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | import type { FollowupRun } from "./types.js";
import { defaultRuntime } from "../../../runtime.js";
import {
buildCollectPrompt,
buildQueueSummaryPrompt,
hasCrossChannelItems,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = FOLLOWUP_QUEUES.get(key);
if (!queue || queue.draining) {
return;
}
queue.draining = true;
void (async () => {
try {
let forceIndividualCollect = false;
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
// Once the batch is mixed, never collect again within this drain.
// Prevents “collect after shift” collapsing different targets.
//
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
if (forceIndividualCollect) {
const next = queue.items.shift();
if (!next) {
break;
}
await runFollowup(next);
continue;
}
// Check if messages span multiple channels.
// If so, process individually to preserve per-message routing.
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
const threadKey = typeof threadId === "number" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
});
if (isCrossChannel) {
forceIndividualCollect = true;
const next = queue.items.shift();
if (!next) {
break;
}
await runFollowup(next);
continue;
}
const items = queue.items.splice(0, queue.items.length);
const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
}
// Preserve originating channel from items when collecting same-channel.
const originatingChannel = items.find((i) => i.originatingChannel)?.originatingChannel;
const originatingTo = items.find((i) => i.originatingTo)?.originatingTo;
const originatingAccountId = items.find(
(i) => i.originatingAccountId,
)?.originatingAccountId;
const originatingThreadId = items.find(
(i) => typeof i.originatingThreadId === "number",
)?.originatingThreadId;
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items,
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
await runFollowup({
prompt,
run,
enqueuedAt: Date.now(),
originatingChannel,
originatingTo,
originatingAccountId,
originatingThreadId,
});
continue;
}
const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" });
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {
break;
}
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const next = queue.items.shift();
if (!next) {
break;
}
await runFollowup(next);
}
} catch (err) {
defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
}
}
})();
}
|