| import { resolveHumanDelayConfig } from "../../../agents/identity.js"; |
| import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; |
| import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js"; |
| import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js"; |
| import { logAckFailure, logTypingFailure } from "../../../channels/logging.js"; |
| import { createReplyPrefixContext } from "../../../channels/reply-prefix.js"; |
| import { createTypingCallbacks } from "../../../channels/typing.js"; |
| import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js"; |
| import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; |
| import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; |
| import { removeSlackReaction } from "../../actions.js"; |
| import { resolveSlackThreadTargets } from "../../threading.js"; |
|
|
| import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js"; |
|
|
| import type { PreparedSlackMessage } from "./types.js"; |
|
|
| export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessage) { |
| const { ctx, account, message, route } = prepared; |
| const cfg = ctx.cfg; |
| const runtime = ctx.runtime; |
|
|
| if (prepared.isDirectMessage) { |
| const sessionCfg = cfg.session; |
| const storePath = resolveStorePath(sessionCfg?.store, { |
| agentId: route.agentId, |
| }); |
| await updateLastRoute({ |
| storePath, |
| sessionKey: route.mainSessionKey, |
| deliveryContext: { |
| channel: "slack", |
| to: `user:${message.user}`, |
| accountId: route.accountId, |
| }, |
| ctx: prepared.ctxPayload, |
| }); |
| } |
|
|
| const { statusThreadTs } = resolveSlackThreadTargets({ |
| message, |
| replyToMode: ctx.replyToMode, |
| }); |
|
|
| const messageTs = message.ts ?? message.event_ts; |
| const incomingThreadTs = message.thread_ts; |
| let didSetStatus = false; |
|
|
| |
| |
| const hasRepliedRef = { value: false }; |
| const replyPlan = createSlackReplyDeliveryPlan({ |
| replyToMode: ctx.replyToMode, |
| incomingThreadTs, |
| messageTs, |
| hasRepliedRef, |
| }); |
|
|
| const typingTarget = statusThreadTs ? `${message.channel}/${statusThreadTs}` : message.channel; |
| const typingCallbacks = createTypingCallbacks({ |
| start: async () => { |
| didSetStatus = true; |
| await ctx.setSlackThreadStatus({ |
| channelId: message.channel, |
| threadTs: statusThreadTs, |
| status: "is typing...", |
| }); |
| }, |
| stop: async () => { |
| if (!didSetStatus) { |
| return; |
| } |
| didSetStatus = false; |
| await ctx.setSlackThreadStatus({ |
| channelId: message.channel, |
| threadTs: statusThreadTs, |
| status: "", |
| }); |
| }, |
| onStartError: (err) => { |
| logTypingFailure({ |
| log: (message) => runtime.error?.(danger(message)), |
| channel: "slack", |
| action: "start", |
| target: typingTarget, |
| error: err, |
| }); |
| }, |
| onStopError: (err) => { |
| logTypingFailure({ |
| log: (message) => runtime.error?.(danger(message)), |
| channel: "slack", |
| action: "stop", |
| target: typingTarget, |
| error: err, |
| }); |
| }, |
| }); |
|
|
| const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId }); |
|
|
| const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ |
| responsePrefix: prefixContext.responsePrefix, |
| responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, |
| humanDelay: resolveHumanDelayConfig(cfg, route.agentId), |
| deliver: async (payload) => { |
| const replyThreadTs = replyPlan.nextThreadTs(); |
| await deliverReplies({ |
| replies: [payload], |
| target: prepared.replyTarget, |
| token: ctx.botToken, |
| accountId: account.accountId, |
| runtime, |
| textLimit: ctx.textLimit, |
| replyThreadTs, |
| }); |
| replyPlan.markSent(); |
| }, |
| onError: (err, info) => { |
| runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); |
| typingCallbacks.onIdle?.(); |
| }, |
| onReplyStart: typingCallbacks.onReplyStart, |
| onIdle: typingCallbacks.onIdle, |
| }); |
|
|
| const { queuedFinal, counts } = await dispatchInboundMessage({ |
| ctx: prepared.ctxPayload, |
| cfg, |
| dispatcher, |
| replyOptions: { |
| ...replyOptions, |
| skillFilter: prepared.channelConfig?.skills, |
| hasRepliedRef, |
| disableBlockStreaming: |
| typeof account.config.blockStreaming === "boolean" |
| ? !account.config.blockStreaming |
| : undefined, |
| onModelSelected: (ctx) => { |
| prefixContext.onModelSelected(ctx); |
| }, |
| }, |
| }); |
| markDispatchIdle(); |
|
|
| const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0; |
|
|
| if (!anyReplyDelivered) { |
| if (prepared.isRoomish) { |
| clearHistoryEntriesIfEnabled({ |
| historyMap: ctx.channelHistories, |
| historyKey: prepared.historyKey, |
| limit: ctx.historyLimit, |
| }); |
| } |
| return; |
| } |
|
|
| if (shouldLogVerbose()) { |
| const finalCount = counts.final; |
| logVerbose( |
| `slack: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${prepared.replyTarget}`, |
| ); |
| } |
|
|
| removeAckReactionAfterReply({ |
| removeAfterReply: ctx.removeAckAfterReply, |
| ackReactionPromise: prepared.ackReactionPromise, |
| ackReactionValue: prepared.ackReactionValue, |
| remove: () => |
| removeSlackReaction( |
| message.channel, |
| prepared.ackReactionMessageTs ?? "", |
| prepared.ackReactionValue, |
| { |
| token: ctx.botToken, |
| client: ctx.app.client, |
| }, |
| ), |
| onError: (err) => { |
| logAckFailure({ |
| log: logVerbose, |
| channel: "slack", |
| target: `${message.channel}/${message.ts}`, |
| error: err, |
| }); |
| }, |
| }); |
|
|
| if (prepared.isRoomish) { |
| clearHistoryEntriesIfEnabled({ |
| historyMap: ctx.channelHistories, |
| historyKey: prepared.historyKey, |
| limit: ctx.historyLimit, |
| }); |
| } |
| } |
|
|