OpenClawBot / src /discord /monitor /message-handler.process.ts
darkfire514's picture
Upload 2526 files
fb4d8fe verified
import { ChannelType } from "@buape/carbon";
import type { ReplyPayload } from "../../auto-reply/types.js";
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
import { resolveChunkMode } from "../../auto-reply/chunk.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatThreadStarterEnvelope,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import {
removeAckReactionAfterReply,
shouldAckReaction as shouldAckReactionGate,
} from "../../channels/ack-reactions.js";
import { logTypingFailure, logAckFailure } from "../../channels/logging.js";
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
import { recordInboundSession } from "../../channels/session.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
import { truncateUtf16Safe } from "../../utils.js";
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
import { normalizeDiscordSlug } from "./allow-list.js";
import { resolveTimestampMs } from "./format.js";
import {
buildDiscordMediaPayload,
resolveDiscordMessageText,
resolveMediaList,
} from "./message-utils.js";
import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js";
import { deliverDiscordReply } from "./reply-delivery.js";
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
import { sendTyping } from "./typing.js";
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
const {
cfg,
discordConfig,
accountId,
token,
runtime,
guildHistories,
historyLimit,
mediaMaxBytes,
textLimit,
replyToMode,
ackReactionScope,
message,
author,
sender,
data,
client,
channelInfo,
channelName,
isGuildMessage,
isDirectMessage,
isGroupDm,
baseText,
messageText,
shouldRequireMention,
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
threadChannel,
threadParentId,
threadParentName,
threadParentType,
threadName,
displayChannelSlug,
guildInfo,
guildSlug,
channelConfig,
baseSessionKey,
route,
commandAuthorized,
} = ctx;
const mediaList = await resolveMediaList(message, mediaMaxBytes);
const text = messageText;
if (!text) {
logVerbose(`discord: drop message ${message.id} (empty content)`);
return;
}
const ackReaction = resolveAckReaction(cfg, route.agentId);
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const shouldAckReaction = () =>
Boolean(
ackReaction &&
shouldAckReactionGate({
scope: ackReactionScope,
isDirect: isDirectMessage,
isGroup: isGuildMessage || isGroupDm,
isMentionableGroup: isGuildMessage,
requireMention: Boolean(shouldRequireMention),
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
}),
);
const ackReactionPromise = shouldAckReaction()
? reactMessageDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
}).then(
() => true,
(err) => {
logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`);
return false;
},
)
: null;
const fromLabel = isDirectMessage
? buildDirectLabel(author)
: buildGuildLabel({
guild: data.guild ?? undefined,
channelName: channelName ?? message.channelId,
channelId: message.channelId,
});
const senderLabel = sender.label;
const isForumParent =
threadParentType === ChannelType.GuildForum || threadParentType === ChannelType.GuildMedia;
const forumParentSlug =
isForumParent && threadParentName ? normalizeDiscordSlug(threadParentName) : "";
const threadChannelId = threadChannel?.id;
const isForumStarter =
Boolean(threadChannelId && isForumParent && forumParentSlug) && message.id === threadChannelId;
const forumContextLine = isForumStarter ? `[Forum parent: #${forumParentSlug}]` : null;
const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined;
const groupSubject = isDirectMessage ? undefined : groupChannel;
const channelDescription = channelInfo?.topic?.trim();
const senderName = sender.isPluralKit
? (sender.name ?? author.username)
: (data.member?.nickname ?? author.globalName ?? author.username);
const senderUsername = sender.isPluralKit
? (sender.tag ?? sender.name ?? author.username)
: author.username;
const senderTag = sender.tag;
const systemPromptParts = [
channelDescription ? `Channel topic: ${channelDescription}` : null,
channelConfig?.systemPrompt?.trim() || null,
].filter((entry): entry is string => Boolean(entry));
const groupSystemPrompt =
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
let combinedBody = formatInboundEnvelope({
channel: "Discord",
from: fromLabel,
timestamp: resolveTimestampMs(message.timestamp),
body: text,
chatType: isDirectMessage ? "direct" : "channel",
senderLabel,
previousTimestamp,
envelope: envelopeOptions,
});
const shouldIncludeChannelHistory =
!isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel);
if (shouldIncludeChannelHistory) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatInboundEnvelope({
channel: "Discord",
from: fromLabel,
timestamp: entry.timestamp,
body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`,
chatType: "channel",
senderLabel: entry.sender,
envelope: envelopeOptions,
}),
});
}
const replyContext = resolveReplyContext(message, resolveDiscordMessageText, {
envelope: envelopeOptions,
});
if (replyContext) {
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
}
if (forumContextLine) {
combinedBody = `${combinedBody}\n${forumContextLine}`;
}
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter({
channel: threadChannel,
client,
parentId: threadParentId,
parentType: threadParentType,
resolveTimestampMs,
});
if (starter?.text) {
const starterEnvelope = formatThreadStarterEnvelope({
channel: "Discord",
author: starter.author,
timestamp: starter.timestamp,
body: starter.text,
envelope: envelopeOptions,
});
threadStarterBody = starterEnvelope;
}
const parentName = threadParentName ?? "parent";
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)}${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
if (threadParentId) {
parentSessionKey = buildAgentSessionKey({
agentId: route.agentId,
channel: route.channel,
peer: { kind: "channel", id: threadParentId },
});
}
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
const threadKeys = resolveThreadSessionKeys({
baseSessionKey,
threadId: threadChannel ? message.channelId : undefined,
parentSessionKey,
useSuffix: false,
});
const replyPlan = await resolveDiscordAutoThreadReplyPlan({
client,
message,
isGuildMessage,
channelConfig,
threadChannel,
baseText: baseText ?? "",
combinedBody,
replyToMode,
agentId: route.agentId,
channel: route.channel,
});
const deliverTarget = replyPlan.deliverTarget;
const replyTarget = replyPlan.replyTarget;
const replyReference = replyPlan.replyReference;
const autoThreadContext = replyPlan.autoThreadContext;
const effectiveFrom = isDirectMessage
? `discord:${author.id}`
: (autoThreadContext?.From ?? `discord:channel:${message.channelId}`);
const effectiveTo = autoThreadContext?.To ?? replyTarget;
if (!effectiveTo) {
runtime.error?.(danger("discord: missing reply target"));
return;
}
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: baseText,
CommandBody: baseText,
From: effectiveFrom,
To: effectiveTo,
SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: fromLabel,
SenderName: senderName,
SenderId: sender.id,
SenderUsername: senderUsername,
SenderTag: senderTag,
GroupSubject: groupSubject,
GroupChannel: groupChannel,
GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined,
GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined,
Provider: "discord" as const,
Surface: "discord" as const,
WasMentioned: effectiveWasMentioned,
MessageSid: message.id,
ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: resolveTimestampMs(message.timestamp),
...mediaPayload,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
// Originating channel for reply routing.
OriginatingChannel: "discord" as const,
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
logVerbose(
`discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`,
);
}
const typingChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: message.channelId;
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
const tableMode = resolveMarkdownTableMode({
cfg,
channel: "discord",
accountId,
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: client.rest,
runtime,
replyToId,
textLimit,
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
tableMode,
chunkMode: resolveChunkMode(cfg, "discord", accountId),
});
replyReference.markSent();
},
onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
start: () => sendTyping({ client, channelId: typingChannelId }),
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "discord",
target: typingChannelId,
error: err,
});
},
}).onReplyStart,
});
const { queuedFinal, counts } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
typeof discordConfig?.blockStreaming === "boolean"
? !discordConfig.blockStreaming
: undefined,
onModelSelected: (ctx) => {
prefixContext.onModelSelected(ctx);
},
},
});
markDispatchIdle();
if (!queuedFinal) {
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
});
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
}
removeAckReactionAfterReply({
removeAfterReply: removeAckAfterReply,
ackReactionPromise,
ackReactionValue: ackReaction,
remove: async () => {
await removeReactionDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
});
},
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "discord",
target: `${message.channelId}/${message.id}`,
error: err,
});
},
});
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
});
}
}