Spaces:
Sleeping
Sleeping
| import { ChannelType } from "@buape/carbon"; | |
| import type { ReplyPayload } from "../../auto-reply/types.js"; | |
| import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; | |
| import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js"; | |
| import { resolveChunkMode } from "../../auto-reply/chunk.js"; | |
| import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; | |
| import { | |
| formatInboundEnvelope, | |
| formatThreadStarterEnvelope, | |
| resolveEnvelopeFormatOptions, | |
| } from "../../auto-reply/envelope.js"; | |
| import { | |
| buildPendingHistoryContextFromMap, | |
| clearHistoryEntriesIfEnabled, | |
| } from "../../auto-reply/reply/history.js"; | |
| import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; | |
| import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; | |
| import { | |
| removeAckReactionAfterReply, | |
| shouldAckReaction as shouldAckReactionGate, | |
| } from "../../channels/ack-reactions.js"; | |
| import { logTypingFailure, logAckFailure } from "../../channels/logging.js"; | |
| import { createReplyPrefixContext } from "../../channels/reply-prefix.js"; | |
| import { recordInboundSession } from "../../channels/session.js"; | |
| import { createTypingCallbacks } from "../../channels/typing.js"; | |
| import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; | |
| import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; | |
| import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; | |
| import { buildAgentSessionKey } from "../../routing/resolve-route.js"; | |
| import { resolveThreadSessionKeys } from "../../routing/session-key.js"; | |
| import { truncateUtf16Safe } from "../../utils.js"; | |
| import { reactMessageDiscord, removeReactionDiscord } from "../send.js"; | |
| import { normalizeDiscordSlug } from "./allow-list.js"; | |
| import { resolveTimestampMs } from "./format.js"; | |
| import { | |
| buildDiscordMediaPayload, | |
| resolveDiscordMessageText, | |
| resolveMediaList, | |
| } from "./message-utils.js"; | |
| import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js"; | |
| import { deliverDiscordReply } from "./reply-delivery.js"; | |
| import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; | |
| import { sendTyping } from "./typing.js"; | |
| export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { | |
| const { | |
| cfg, | |
| discordConfig, | |
| accountId, | |
| token, | |
| runtime, | |
| guildHistories, | |
| historyLimit, | |
| mediaMaxBytes, | |
| textLimit, | |
| replyToMode, | |
| ackReactionScope, | |
| message, | |
| author, | |
| sender, | |
| data, | |
| client, | |
| channelInfo, | |
| channelName, | |
| isGuildMessage, | |
| isDirectMessage, | |
| isGroupDm, | |
| baseText, | |
| messageText, | |
| shouldRequireMention, | |
| canDetectMention, | |
| effectiveWasMentioned, | |
| shouldBypassMention, | |
| threadChannel, | |
| threadParentId, | |
| threadParentName, | |
| threadParentType, | |
| threadName, | |
| displayChannelSlug, | |
| guildInfo, | |
| guildSlug, | |
| channelConfig, | |
| baseSessionKey, | |
| route, | |
| commandAuthorized, | |
| } = ctx; | |
| const mediaList = await resolveMediaList(message, mediaMaxBytes); | |
| const text = messageText; | |
| if (!text) { | |
| logVerbose(`discord: drop message ${message.id} (empty content)`); | |
| return; | |
| } | |
| const ackReaction = resolveAckReaction(cfg, route.agentId); | |
| const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; | |
| const shouldAckReaction = () => | |
| Boolean( | |
| ackReaction && | |
| shouldAckReactionGate({ | |
| scope: ackReactionScope, | |
| isDirect: isDirectMessage, | |
| isGroup: isGuildMessage || isGroupDm, | |
| isMentionableGroup: isGuildMessage, | |
| requireMention: Boolean(shouldRequireMention), | |
| canDetectMention, | |
| effectiveWasMentioned, | |
| shouldBypassMention, | |
| }), | |
| ); | |
| const ackReactionPromise = shouldAckReaction() | |
| ? reactMessageDiscord(message.channelId, message.id, ackReaction, { | |
| rest: client.rest, | |
| }).then( | |
| () => true, | |
| (err) => { | |
| logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`); | |
| return false; | |
| }, | |
| ) | |
| : null; | |
| const fromLabel = isDirectMessage | |
| ? buildDirectLabel(author) | |
| : buildGuildLabel({ | |
| guild: data.guild ?? undefined, | |
| channelName: channelName ?? message.channelId, | |
| channelId: message.channelId, | |
| }); | |
| const senderLabel = sender.label; | |
| const isForumParent = | |
| threadParentType === ChannelType.GuildForum || threadParentType === ChannelType.GuildMedia; | |
| const forumParentSlug = | |
| isForumParent && threadParentName ? normalizeDiscordSlug(threadParentName) : ""; | |
| const threadChannelId = threadChannel?.id; | |
| const isForumStarter = | |
| Boolean(threadChannelId && isForumParent && forumParentSlug) && message.id === threadChannelId; | |
| const forumContextLine = isForumStarter ? `[Forum parent: #${forumParentSlug}]` : null; | |
| const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined; | |
| const groupSubject = isDirectMessage ? undefined : groupChannel; | |
| const channelDescription = channelInfo?.topic?.trim(); | |
| const senderName = sender.isPluralKit | |
| ? (sender.name ?? author.username) | |
| : (data.member?.nickname ?? author.globalName ?? author.username); | |
| const senderUsername = sender.isPluralKit | |
| ? (sender.tag ?? sender.name ?? author.username) | |
| : author.username; | |
| const senderTag = sender.tag; | |
| const systemPromptParts = [ | |
| channelDescription ? `Channel topic: ${channelDescription}` : null, | |
| channelConfig?.systemPrompt?.trim() || null, | |
| ].filter((entry): entry is string => Boolean(entry)); | |
| const groupSystemPrompt = | |
| systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; | |
| const storePath = resolveStorePath(cfg.session?.store, { | |
| agentId: route.agentId, | |
| }); | |
| const envelopeOptions = resolveEnvelopeFormatOptions(cfg); | |
| const previousTimestamp = readSessionUpdatedAt({ | |
| storePath, | |
| sessionKey: route.sessionKey, | |
| }); | |
| let combinedBody = formatInboundEnvelope({ | |
| channel: "Discord", | |
| from: fromLabel, | |
| timestamp: resolveTimestampMs(message.timestamp), | |
| body: text, | |
| chatType: isDirectMessage ? "direct" : "channel", | |
| senderLabel, | |
| previousTimestamp, | |
| envelope: envelopeOptions, | |
| }); | |
| const shouldIncludeChannelHistory = | |
| !isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel); | |
| if (shouldIncludeChannelHistory) { | |
| combinedBody = buildPendingHistoryContextFromMap({ | |
| historyMap: guildHistories, | |
| historyKey: message.channelId, | |
| limit: historyLimit, | |
| currentMessage: combinedBody, | |
| formatEntry: (entry) => | |
| formatInboundEnvelope({ | |
| channel: "Discord", | |
| from: fromLabel, | |
| timestamp: entry.timestamp, | |
| body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`, | |
| chatType: "channel", | |
| senderLabel: entry.sender, | |
| envelope: envelopeOptions, | |
| }), | |
| }); | |
| } | |
| const replyContext = resolveReplyContext(message, resolveDiscordMessageText, { | |
| envelope: envelopeOptions, | |
| }); | |
| if (replyContext) { | |
| combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`; | |
| } | |
| if (forumContextLine) { | |
| combinedBody = `${combinedBody}\n${forumContextLine}`; | |
| } | |
| let threadStarterBody: string | undefined; | |
| let threadLabel: string | undefined; | |
| let parentSessionKey: string | undefined; | |
| if (threadChannel) { | |
| const starter = await resolveDiscordThreadStarter({ | |
| channel: threadChannel, | |
| client, | |
| parentId: threadParentId, | |
| parentType: threadParentType, | |
| resolveTimestampMs, | |
| }); | |
| if (starter?.text) { | |
| const starterEnvelope = formatThreadStarterEnvelope({ | |
| channel: "Discord", | |
| author: starter.author, | |
| timestamp: starter.timestamp, | |
| body: starter.text, | |
| envelope: envelopeOptions, | |
| }); | |
| threadStarterBody = starterEnvelope; | |
| } | |
| const parentName = threadParentName ?? "parent"; | |
| threadLabel = threadName | |
| ? `Discord thread #${normalizeDiscordSlug(parentName)} › ${threadName}` | |
| : `Discord thread #${normalizeDiscordSlug(parentName)}`; | |
| if (threadParentId) { | |
| parentSessionKey = buildAgentSessionKey({ | |
| agentId: route.agentId, | |
| channel: route.channel, | |
| peer: { kind: "channel", id: threadParentId }, | |
| }); | |
| } | |
| } | |
| const mediaPayload = buildDiscordMediaPayload(mediaList); | |
| const threadKeys = resolveThreadSessionKeys({ | |
| baseSessionKey, | |
| threadId: threadChannel ? message.channelId : undefined, | |
| parentSessionKey, | |
| useSuffix: false, | |
| }); | |
| const replyPlan = await resolveDiscordAutoThreadReplyPlan({ | |
| client, | |
| message, | |
| isGuildMessage, | |
| channelConfig, | |
| threadChannel, | |
| baseText: baseText ?? "", | |
| combinedBody, | |
| replyToMode, | |
| agentId: route.agentId, | |
| channel: route.channel, | |
| }); | |
| const deliverTarget = replyPlan.deliverTarget; | |
| const replyTarget = replyPlan.replyTarget; | |
| const replyReference = replyPlan.replyReference; | |
| const autoThreadContext = replyPlan.autoThreadContext; | |
| const effectiveFrom = isDirectMessage | |
| ? `discord:${author.id}` | |
| : (autoThreadContext?.From ?? `discord:channel:${message.channelId}`); | |
| const effectiveTo = autoThreadContext?.To ?? replyTarget; | |
| if (!effectiveTo) { | |
| runtime.error?.(danger("discord: missing reply target")); | |
| return; | |
| } | |
| const ctxPayload = finalizeInboundContext({ | |
| Body: combinedBody, | |
| RawBody: baseText, | |
| CommandBody: baseText, | |
| From: effectiveFrom, | |
| To: effectiveTo, | |
| SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey, | |
| AccountId: route.accountId, | |
| ChatType: isDirectMessage ? "direct" : "channel", | |
| ConversationLabel: fromLabel, | |
| SenderName: senderName, | |
| SenderId: sender.id, | |
| SenderUsername: senderUsername, | |
| SenderTag: senderTag, | |
| GroupSubject: groupSubject, | |
| GroupChannel: groupChannel, | |
| GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined, | |
| GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined, | |
| Provider: "discord" as const, | |
| Surface: "discord" as const, | |
| WasMentioned: effectiveWasMentioned, | |
| MessageSid: message.id, | |
| ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey, | |
| ThreadStarterBody: threadStarterBody, | |
| ThreadLabel: threadLabel, | |
| Timestamp: resolveTimestampMs(message.timestamp), | |
| ...mediaPayload, | |
| CommandAuthorized: commandAuthorized, | |
| CommandSource: "text" as const, | |
| // Originating channel for reply routing. | |
| OriginatingChannel: "discord" as const, | |
| OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget, | |
| }); | |
| await recordInboundSession({ | |
| storePath, | |
| sessionKey: ctxPayload.SessionKey ?? route.sessionKey, | |
| ctx: ctxPayload, | |
| updateLastRoute: isDirectMessage | |
| ? { | |
| sessionKey: route.mainSessionKey, | |
| channel: "discord", | |
| to: `user:${author.id}`, | |
| accountId: route.accountId, | |
| } | |
| : undefined, | |
| onRecordError: (err) => { | |
| logVerbose(`discord: failed updating session meta: ${String(err)}`); | |
| }, | |
| }); | |
| if (shouldLogVerbose()) { | |
| const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n"); | |
| logVerbose( | |
| `discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`, | |
| ); | |
| } | |
| const typingChannelId = deliverTarget.startsWith("channel:") | |
| ? deliverTarget.slice("channel:".length) | |
| : message.channelId; | |
| const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId }); | |
| const tableMode = resolveMarkdownTableMode({ | |
| cfg, | |
| channel: "discord", | |
| accountId, | |
| }); | |
| const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ | |
| responsePrefix: prefixContext.responsePrefix, | |
| responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, | |
| humanDelay: resolveHumanDelayConfig(cfg, route.agentId), | |
| deliver: async (payload: ReplyPayload) => { | |
| const replyToId = replyReference.use(); | |
| await deliverDiscordReply({ | |
| replies: [payload], | |
| target: deliverTarget, | |
| token, | |
| accountId, | |
| rest: client.rest, | |
| runtime, | |
| replyToId, | |
| textLimit, | |
| maxLinesPerMessage: discordConfig?.maxLinesPerMessage, | |
| tableMode, | |
| chunkMode: resolveChunkMode(cfg, "discord", accountId), | |
| }); | |
| replyReference.markSent(); | |
| }, | |
| onError: (err, info) => { | |
| runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); | |
| }, | |
| onReplyStart: createTypingCallbacks({ | |
| start: () => sendTyping({ client, channelId: typingChannelId }), | |
| onStartError: (err) => { | |
| logTypingFailure({ | |
| log: logVerbose, | |
| channel: "discord", | |
| target: typingChannelId, | |
| error: err, | |
| }); | |
| }, | |
| }).onReplyStart, | |
| }); | |
| const { queuedFinal, counts } = await dispatchInboundMessage({ | |
| ctx: ctxPayload, | |
| cfg, | |
| dispatcher, | |
| replyOptions: { | |
| ...replyOptions, | |
| skillFilter: channelConfig?.skills, | |
| disableBlockStreaming: | |
| typeof discordConfig?.blockStreaming === "boolean" | |
| ? !discordConfig.blockStreaming | |
| : undefined, | |
| onModelSelected: (ctx) => { | |
| prefixContext.onModelSelected(ctx); | |
| }, | |
| }, | |
| }); | |
| markDispatchIdle(); | |
| if (!queuedFinal) { | |
| if (isGuildMessage) { | |
| clearHistoryEntriesIfEnabled({ | |
| historyMap: guildHistories, | |
| historyKey: message.channelId, | |
| limit: historyLimit, | |
| }); | |
| } | |
| return; | |
| } | |
| if (shouldLogVerbose()) { | |
| const finalCount = counts.final; | |
| logVerbose( | |
| `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, | |
| ); | |
| } | |
| removeAckReactionAfterReply({ | |
| removeAfterReply: removeAckAfterReply, | |
| ackReactionPromise, | |
| ackReactionValue: ackReaction, | |
| remove: async () => { | |
| await removeReactionDiscord(message.channelId, message.id, ackReaction, { | |
| rest: client.rest, | |
| }); | |
| }, | |
| onError: (err) => { | |
| logAckFailure({ | |
| log: logVerbose, | |
| channel: "discord", | |
| target: `${message.channelId}/${message.id}`, | |
| error: err, | |
| }); | |
| }, | |
| }); | |
| if (isGuildMessage) { | |
| clearHistoryEntriesIfEnabled({ | |
| historyMap: guildHistories, | |
| historyKey: message.channelId, | |
| limit: historyLimit, | |
| }); | |
| } | |
| } | |