File size: 6,282 Bytes
fc93158 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | import { defaultRuntime } from "../../../runtime.js";
import { resolveGlobalMap } from "../../../shared/global-singleton.js";
import {
buildCollectPrompt,
beginQueueDrain,
clearQueueSummaryState,
drainCollectQueueStep,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js";
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import type { FollowupRun } from "./types.js";
// Persists the most recent runFollowup callback per queue key so that
// enqueueFollowupRun can restart a drain that finished and deleted the queue.
const FOLLOWUP_DRAIN_CALLBACKS_KEY = Symbol.for("openclaw.followupDrainCallbacks");
const FOLLOWUP_RUN_CALLBACKS = resolveGlobalMap<string, (run: FollowupRun) => Promise<void>>(
FOLLOWUP_DRAIN_CALLBACKS_KEY,
);
export function clearFollowupDrainCallback(key: string): void {
FOLLOWUP_RUN_CALLBACKS.delete(key);
}
/** Restart the drain for `key` if it is currently idle, using the stored callback. */
export function kickFollowupDrainIfIdle(key: string): void {
const cb = FOLLOWUP_RUN_CALLBACKS.get(key);
if (!cb) {
return;
}
scheduleFollowupDrain(key, cb);
}
type OriginRoutingMetadata = Pick<
FollowupRun,
"originatingChannel" | "originatingTo" | "originatingAccountId" | "originatingThreadId"
>;
function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetadata {
return {
originatingChannel: items.find((item) => item.originatingChannel)?.originatingChannel,
originatingTo: items.find((item) => item.originatingTo)?.originatingTo,
originatingAccountId: items.find((item) => item.originatingAccountId)?.originatingAccountId,
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
originatingThreadId: items.find(
(item) => item.originatingThreadId != null && item.originatingThreadId !== "",
)?.originatingThreadId,
};
}
function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string } {
const { originatingChannel: channel, originatingTo: to, originatingAccountId: accountId } = item;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && (threadId == null || threadId === "")) {
return {};
}
if (!isRoutableChannel(channel) || !to) {
return { cross: true };
}
// Support both number (Telegram topic IDs) and string (Slack thread_ts) thread IDs.
const threadKey = threadId != null && threadId !== "" ? String(threadId) : "";
return {
key: [channel, to, accountId || "", threadKey].join("|"),
};
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = beginQueueDrain(FOLLOWUP_QUEUES, key);
if (!queue) {
return;
}
// Cache callback only when a drain actually starts. Avoid keeping stale
// callbacks around from finalize calls where no queue work is pending.
FOLLOWUP_RUN_CALLBACKS.set(key, runFollowup);
void (async () => {
try {
const collectState = { forceIndividualCollect: false };
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
// Once the batch is mixed, never collect again within this drain.
// Prevents “collect after shift” collapsing different targets.
//
// Debug: `pnpm test src/auto-reply/reply/reply-flow.test.ts`
// Check if messages span multiple channels.
// If so, process individually to preserve per-message routing.
const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
const collectDrainResult = await drainCollectQueueStep({
collectState,
isCrossChannel,
items: queue.items,
run: runFollowup,
});
if (collectDrainResult === "empty") {
break;
}
if (collectDrainResult === "drained") {
continue;
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
}
const routing = resolveOriginRoutingMetadata(items);
const prompt = buildCollectPrompt({
title: "[Queued messages while agent was busy]",
items,
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
await runFollowup({
prompt,
run,
enqueuedAt: Date.now(),
...routing,
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
}
continue;
}
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {
break;
}
if (
!(await drainNextQueueItem(queue.items, async (item) => {
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
originatingChannel: item.originatingChannel,
originatingTo: item.originatingTo,
originatingAccountId: item.originatingAccountId,
originatingThreadId: item.originatingThreadId,
});
}))
) {
break;
}
clearQueueSummaryState(queue);
continue;
}
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
}
} catch (err) {
queue.lastEnqueuedAt = Date.now();
defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
}
}
})();
}
|