Spaces:
Paused
Paused
| import type { ReplyToMode } from "../../config/types.js"; | |
| import type { OriginatingChannelType } from "../templating.js"; | |
| import type { ReplyPayload } from "../types.js"; | |
| import { logVerbose } from "../../globals.js"; | |
| import { stripHeartbeatToken } from "../heartbeat.js"; | |
| import { SILENT_REPLY_TOKEN } from "../tokens.js"; | |
| import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js"; | |
| import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; | |
| import { parseReplyDirectives } from "./reply-directives.js"; | |
| import { | |
| applyReplyThreading, | |
| filterMessagingToolDuplicates, | |
| isRenderablePayload, | |
| shouldSuppressMessagingToolReplies, | |
| } from "./reply-payloads.js"; | |
| export function buildReplyPayloads(params: { | |
| payloads: ReplyPayload[]; | |
| isHeartbeat: boolean; | |
| didLogHeartbeatStrip: boolean; | |
| blockStreamingEnabled: boolean; | |
| blockReplyPipeline: BlockReplyPipeline | null; | |
| /** Payload keys sent directly (not via pipeline) during tool flush. */ | |
| directlySentBlockKeys?: Set<string>; | |
| replyToMode: ReplyToMode; | |
| replyToChannel?: OriginatingChannelType; | |
| currentMessageId?: string; | |
| messageProvider?: string; | |
| messagingToolSentTexts?: string[]; | |
| messagingToolSentTargets?: Parameters< | |
| typeof shouldSuppressMessagingToolReplies | |
| >[0]["messagingToolSentTargets"]; | |
| originatingTo?: string; | |
| accountId?: string; | |
| }): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } { | |
| let didLogHeartbeatStrip = params.didLogHeartbeatStrip; | |
| const sanitizedPayloads = params.isHeartbeat | |
| ? params.payloads | |
| : params.payloads.flatMap((payload) => { | |
| let text = payload.text; | |
| if (payload.isError && text && isBunFetchSocketError(text)) { | |
| text = formatBunFetchSocketError(text); | |
| } | |
| if (!text || !text.includes("HEARTBEAT_OK")) { | |
| return [{ ...payload, text }]; | |
| } | |
| const stripped = stripHeartbeatToken(text, { mode: "message" }); | |
| if (stripped.didStrip && !didLogHeartbeatStrip) { | |
| didLogHeartbeatStrip = true; | |
| logVerbose("Stripped stray HEARTBEAT_OK token from reply"); | |
| } | |
| const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; | |
| if (stripped.shouldSkip && !hasMedia) { | |
| return []; | |
| } | |
| return [{ ...payload, text: stripped.text }]; | |
| }); | |
| const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ | |
| payloads: sanitizedPayloads, | |
| replyToMode: params.replyToMode, | |
| replyToChannel: params.replyToChannel, | |
| currentMessageId: params.currentMessageId, | |
| }) | |
| .map((payload) => { | |
| const parsed = parseReplyDirectives(payload.text ?? "", { | |
| currentMessageId: params.currentMessageId, | |
| silentToken: SILENT_REPLY_TOKEN, | |
| }); | |
| const mediaUrls = payload.mediaUrls ?? parsed.mediaUrls; | |
| const mediaUrl = payload.mediaUrl ?? parsed.mediaUrl ?? mediaUrls?.[0]; | |
| return { | |
| ...payload, | |
| text: parsed.text ? parsed.text : undefined, | |
| mediaUrls, | |
| mediaUrl, | |
| replyToId: payload.replyToId ?? parsed.replyToId, | |
| replyToTag: payload.replyToTag || parsed.replyToTag, | |
| replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent, | |
| audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice), | |
| }; | |
| }) | |
| .filter(isRenderablePayload); | |
| // Drop final payloads only when block streaming succeeded end-to-end. | |
| // If streaming aborted (e.g., timeout), fall back to final payloads. | |
| const shouldDropFinalPayloads = | |
| params.blockStreamingEnabled && | |
| Boolean(params.blockReplyPipeline?.didStream()) && | |
| !params.blockReplyPipeline?.isAborted(); | |
| const messagingToolSentTexts = params.messagingToolSentTexts ?? []; | |
| const messagingToolSentTargets = params.messagingToolSentTargets ?? []; | |
| const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ | |
| messageProvider: params.messageProvider, | |
| messagingToolSentTargets, | |
| originatingTo: params.originatingTo, | |
| accountId: params.accountId, | |
| }); | |
| const dedupedPayloads = filterMessagingToolDuplicates({ | |
| payloads: replyTaggedPayloads, | |
| sentTexts: messagingToolSentTexts, | |
| }); | |
| // Filter out payloads already sent via pipeline or directly during tool flush. | |
| const filteredPayloads = shouldDropFinalPayloads | |
| ? [] | |
| : params.blockStreamingEnabled | |
| ? dedupedPayloads.filter((payload) => !params.blockReplyPipeline?.hasSentPayload(payload)) | |
| : params.directlySentBlockKeys?.size | |
| ? dedupedPayloads.filter( | |
| (payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)), | |
| ) | |
| : dedupedPayloads; | |
| const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; | |
| return { | |
| replyPayloads, | |
| didLogHeartbeatStrip, | |
| }; | |
| } | |