OpenClawBot / src /auto-reply /reply /agent-runner-payloads.ts
darkfire514's picture
Upload 2526 files
fb4d8fe verified
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { logVerbose } from "../../globals.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import {
applyReplyThreading,
filterMessagingToolDuplicates,
isRenderablePayload,
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
export function buildReplyPayloads(params: {
payloads: ReplyPayload[];
isHeartbeat: boolean;
didLogHeartbeatStrip: boolean;
blockStreamingEnabled: boolean;
blockReplyPipeline: BlockReplyPipeline | null;
/** Payload keys sent directly (not via pipeline) during tool flush. */
directlySentBlockKeys?: Set<string>;
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
currentMessageId?: string;
messageProvider?: string;
messagingToolSentTexts?: string[];
messagingToolSentTargets?: Parameters<
typeof shouldSuppressMessagingToolReplies
>[0]["messagingToolSentTargets"];
originatingTo?: string;
accountId?: string;
}): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } {
let didLogHeartbeatStrip = params.didLogHeartbeatStrip;
const sanitizedPayloads = params.isHeartbeat
? params.payloads
: params.payloads.flatMap((payload) => {
let text = payload.text;
if (payload.isError && text && isBunFetchSocketError(text)) {
text = formatBunFetchSocketError(text);
}
if (!text || !text.includes("HEARTBEAT_OK")) {
return [{ ...payload, text }];
}
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) {
return [];
}
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
replyToMode: params.replyToMode,
replyToChannel: params.replyToChannel,
currentMessageId: params.currentMessageId,
})
.map((payload) => {
const parsed = parseReplyDirectives(payload.text ?? "", {
currentMessageId: params.currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
});
const mediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
const mediaUrl = payload.mediaUrl ?? parsed.mediaUrl ?? mediaUrls?.[0];
return {
...payload,
text: parsed.text ? parsed.text : undefined,
mediaUrls,
mediaUrl,
replyToId: payload.replyToId ?? parsed.replyToId,
replyToTag: payload.replyToTag || parsed.replyToTag,
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),
};
})
.filter(isRenderablePayload);
// Drop final payloads only when block streaming succeeded end-to-end.
// If streaming aborted (e.g., timeout), fall back to final payloads.
const shouldDropFinalPayloads =
params.blockStreamingEnabled &&
Boolean(params.blockReplyPipeline?.didStream()) &&
!params.blockReplyPipeline?.isAborted();
const messagingToolSentTexts = params.messagingToolSentTexts ?? [];
const messagingToolSentTargets = params.messagingToolSentTargets ?? [];
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
messageProvider: params.messageProvider,
messagingToolSentTargets,
originatingTo: params.originatingTo,
accountId: params.accountId,
});
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: messagingToolSentTexts,
});
// Filter out payloads already sent via pipeline or directly during tool flush.
const filteredPayloads = shouldDropFinalPayloads
? []
: params.blockStreamingEnabled
? dedupedPayloads.filter((payload) => !params.blockReplyPipeline?.hasSentPayload(payload))
: params.directlySentBlockKeys?.size
? dedupedPayloads.filter(
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
)
: dedupedPayloads;
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
return {
replyPayloads,
didLogHeartbeatStrip,
};
}