Spaces:
Running
Running
| import type { SignalEventHandlerDeps, SignalReceivePayload } from "./event-handler.types.js"; | |
| import { resolveHumanDelayConfig } from "../../agents/identity.js"; | |
| import { hasControlCommand } from "../../auto-reply/command-detection.js"; | |
| import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; | |
| import { | |
| formatInboundEnvelope, | |
| formatInboundFromLabel, | |
| resolveEnvelopeFormatOptions, | |
| } from "../../auto-reply/envelope.js"; | |
| import { | |
| createInboundDebouncer, | |
| resolveInboundDebounceMs, | |
| } from "../../auto-reply/inbound-debounce.js"; | |
| import { | |
| buildPendingHistoryContextFromMap, | |
| clearHistoryEntriesIfEnabled, | |
| } from "../../auto-reply/reply/history.js"; | |
| import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; | |
| import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; | |
| import { resolveControlCommandGate } from "../../channels/command-gating.js"; | |
| import { logInboundDrop, logTypingFailure } from "../../channels/logging.js"; | |
| import { createReplyPrefixContext } from "../../channels/reply-prefix.js"; | |
| import { recordInboundSession } from "../../channels/session.js"; | |
| import { createTypingCallbacks } from "../../channels/typing.js"; | |
| import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; | |
| import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; | |
| import { enqueueSystemEvent } from "../../infra/system-events.js"; | |
| import { mediaKindFromMime } from "../../media/constants.js"; | |
| import { buildPairingReply } from "../../pairing/pairing-messages.js"; | |
| import { | |
| readChannelAllowFromStore, | |
| upsertChannelPairingRequest, | |
| } from "../../pairing/pairing-store.js"; | |
| import { resolveAgentRoute } from "../../routing/resolve-route.js"; | |
| import { normalizeE164 } from "../../utils.js"; | |
| import { | |
| formatSignalPairingIdLine, | |
| formatSignalSenderDisplay, | |
| formatSignalSenderId, | |
| isSignalSenderAllowed, | |
| resolveSignalPeerId, | |
| resolveSignalRecipient, | |
| resolveSignalSender, | |
| } from "../identity.js"; | |
| import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js"; | |
| export function createSignalEventHandler(deps: SignalEventHandlerDeps) { | |
| const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" }); | |
| type SignalInboundEntry = { | |
| senderName: string; | |
| senderDisplay: string; | |
| senderRecipient: string; | |
| senderPeerId: string; | |
| groupId?: string; | |
| groupName?: string; | |
| isGroup: boolean; | |
| bodyText: string; | |
| timestamp?: number; | |
| messageId?: string; | |
| mediaPath?: string; | |
| mediaType?: string; | |
| commandAuthorized: boolean; | |
| }; | |
| async function handleSignalInboundMessage(entry: SignalInboundEntry) { | |
| const fromLabel = formatInboundFromLabel({ | |
| isGroup: entry.isGroup, | |
| groupLabel: entry.groupName ?? undefined, | |
| groupId: entry.groupId ?? "unknown", | |
| groupFallback: "Group", | |
| directLabel: entry.senderName, | |
| directId: entry.senderDisplay, | |
| }); | |
| const route = resolveAgentRoute({ | |
| cfg: deps.cfg, | |
| channel: "signal", | |
| accountId: deps.accountId, | |
| peer: { | |
| kind: entry.isGroup ? "group" : "dm", | |
| id: entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId, | |
| }, | |
| }); | |
| const storePath = resolveStorePath(deps.cfg.session?.store, { | |
| agentId: route.agentId, | |
| }); | |
| const envelopeOptions = resolveEnvelopeFormatOptions(deps.cfg); | |
| const previousTimestamp = readSessionUpdatedAt({ | |
| storePath, | |
| sessionKey: route.sessionKey, | |
| }); | |
| const body = formatInboundEnvelope({ | |
| channel: "Signal", | |
| from: fromLabel, | |
| timestamp: entry.timestamp ?? undefined, | |
| body: entry.bodyText, | |
| chatType: entry.isGroup ? "group" : "direct", | |
| sender: { name: entry.senderName, id: entry.senderDisplay }, | |
| previousTimestamp, | |
| envelope: envelopeOptions, | |
| }); | |
| let combinedBody = body; | |
| const historyKey = entry.isGroup ? String(entry.groupId ?? "unknown") : undefined; | |
| if (entry.isGroup && historyKey) { | |
| combinedBody = buildPendingHistoryContextFromMap({ | |
| historyMap: deps.groupHistories, | |
| historyKey, | |
| limit: deps.historyLimit, | |
| currentMessage: combinedBody, | |
| formatEntry: (historyEntry) => | |
| formatInboundEnvelope({ | |
| channel: "Signal", | |
| from: fromLabel, | |
| timestamp: historyEntry.timestamp, | |
| body: `${historyEntry.body}${ | |
| historyEntry.messageId ? ` [id:${historyEntry.messageId}]` : "" | |
| }`, | |
| chatType: "group", | |
| senderLabel: historyEntry.sender, | |
| envelope: envelopeOptions, | |
| }), | |
| }); | |
| } | |
| const signalTo = entry.isGroup ? `group:${entry.groupId}` : `signal:${entry.senderRecipient}`; | |
| const ctxPayload = finalizeInboundContext({ | |
| Body: combinedBody, | |
| RawBody: entry.bodyText, | |
| CommandBody: entry.bodyText, | |
| From: entry.isGroup | |
| ? `group:${entry.groupId ?? "unknown"}` | |
| : `signal:${entry.senderRecipient}`, | |
| To: signalTo, | |
| SessionKey: route.sessionKey, | |
| AccountId: route.accountId, | |
| ChatType: entry.isGroup ? "group" : "direct", | |
| ConversationLabel: fromLabel, | |
| GroupSubject: entry.isGroup ? (entry.groupName ?? undefined) : undefined, | |
| SenderName: entry.senderName, | |
| SenderId: entry.senderDisplay, | |
| Provider: "signal" as const, | |
| Surface: "signal" as const, | |
| MessageSid: entry.messageId, | |
| Timestamp: entry.timestamp ?? undefined, | |
| MediaPath: entry.mediaPath, | |
| MediaType: entry.mediaType, | |
| MediaUrl: entry.mediaPath, | |
| CommandAuthorized: entry.commandAuthorized, | |
| OriginatingChannel: "signal" as const, | |
| OriginatingTo: signalTo, | |
| }); | |
| await recordInboundSession({ | |
| storePath, | |
| sessionKey: ctxPayload.SessionKey ?? route.sessionKey, | |
| ctx: ctxPayload, | |
| updateLastRoute: !entry.isGroup | |
| ? { | |
| sessionKey: route.mainSessionKey, | |
| channel: "signal", | |
| to: entry.senderRecipient, | |
| accountId: route.accountId, | |
| } | |
| : undefined, | |
| onRecordError: (err) => { | |
| logVerbose(`signal: failed updating session meta: ${String(err)}`); | |
| }, | |
| }); | |
| if (shouldLogVerbose()) { | |
| const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n"); | |
| logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`); | |
| } | |
| const prefixContext = createReplyPrefixContext({ cfg: deps.cfg, agentId: route.agentId }); | |
| const typingCallbacks = createTypingCallbacks({ | |
| start: async () => { | |
| if (!ctxPayload.To) { | |
| return; | |
| } | |
| await sendTypingSignal(ctxPayload.To, { | |
| baseUrl: deps.baseUrl, | |
| account: deps.account, | |
| accountId: deps.accountId, | |
| }); | |
| }, | |
| onStartError: (err) => { | |
| logTypingFailure({ | |
| log: logVerbose, | |
| channel: "signal", | |
| target: ctxPayload.To ?? undefined, | |
| error: err, | |
| }); | |
| }, | |
| }); | |
| const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ | |
| responsePrefix: prefixContext.responsePrefix, | |
| responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, | |
| humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId), | |
| deliver: async (payload) => { | |
| await deps.deliverReplies({ | |
| replies: [payload], | |
| target: ctxPayload.To, | |
| baseUrl: deps.baseUrl, | |
| account: deps.account, | |
| accountId: deps.accountId, | |
| runtime: deps.runtime, | |
| maxBytes: deps.mediaMaxBytes, | |
| textLimit: deps.textLimit, | |
| }); | |
| }, | |
| onError: (err, info) => { | |
| deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); | |
| }, | |
| onReplyStart: typingCallbacks.onReplyStart, | |
| }); | |
| const { queuedFinal } = await dispatchInboundMessage({ | |
| ctx: ctxPayload, | |
| cfg: deps.cfg, | |
| dispatcher, | |
| replyOptions: { | |
| ...replyOptions, | |
| disableBlockStreaming: | |
| typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, | |
| onModelSelected: (ctx) => { | |
| prefixContext.onModelSelected(ctx); | |
| }, | |
| }, | |
| }); | |
| markDispatchIdle(); | |
| if (!queuedFinal) { | |
| if (entry.isGroup && historyKey) { | |
| clearHistoryEntriesIfEnabled({ | |
| historyMap: deps.groupHistories, | |
| historyKey, | |
| limit: deps.historyLimit, | |
| }); | |
| } | |
| return; | |
| } | |
| if (entry.isGroup && historyKey) { | |
| clearHistoryEntriesIfEnabled({ | |
| historyMap: deps.groupHistories, | |
| historyKey, | |
| limit: deps.historyLimit, | |
| }); | |
| } | |
| } | |
| const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({ | |
| debounceMs: inboundDebounceMs, | |
| buildKey: (entry) => { | |
| const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId; | |
| if (!conversationId || !entry.senderPeerId) { | |
| return null; | |
| } | |
| return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`; | |
| }, | |
| shouldDebounce: (entry) => { | |
| if (!entry.bodyText.trim()) { | |
| return false; | |
| } | |
| if (entry.mediaPath || entry.mediaType) { | |
| return false; | |
| } | |
| return !hasControlCommand(entry.bodyText, deps.cfg); | |
| }, | |
| onFlush: async (entries) => { | |
| const last = entries.at(-1); | |
| if (!last) { | |
| return; | |
| } | |
| if (entries.length === 1) { | |
| await handleSignalInboundMessage(last); | |
| return; | |
| } | |
| const combinedText = entries | |
| .map((entry) => entry.bodyText) | |
| .filter(Boolean) | |
| .join("\\n"); | |
| if (!combinedText.trim()) { | |
| return; | |
| } | |
| await handleSignalInboundMessage({ | |
| ...last, | |
| bodyText: combinedText, | |
| mediaPath: undefined, | |
| mediaType: undefined, | |
| }); | |
| }, | |
| onError: (err) => { | |
| deps.runtime.error?.(`signal debounce flush failed: ${String(err)}`); | |
| }, | |
| }); | |
| return async (event: { event?: string; data?: string }) => { | |
| if (event.event !== "receive" || !event.data) { | |
| return; | |
| } | |
| let payload: SignalReceivePayload | null = null; | |
| try { | |
| payload = JSON.parse(event.data) as SignalReceivePayload; | |
| } catch (err) { | |
| deps.runtime.error?.(`failed to parse event: ${String(err)}`); | |
| return; | |
| } | |
| if (payload?.exception?.message) { | |
| deps.runtime.error?.(`receive exception: ${payload.exception.message}`); | |
| } | |
| const envelope = payload?.envelope; | |
| if (!envelope) { | |
| return; | |
| } | |
| if (envelope.syncMessage) { | |
| return; | |
| } | |
| const sender = resolveSignalSender(envelope); | |
| if (!sender) { | |
| return; | |
| } | |
| if (deps.account && sender.kind === "phone") { | |
| if (sender.e164 === normalizeE164(deps.account)) { | |
| return; | |
| } | |
| } | |
| const dataMessage = envelope.dataMessage ?? envelope.editMessage?.dataMessage; | |
| const reaction = deps.isSignalReactionMessage(envelope.reactionMessage) | |
| ? envelope.reactionMessage | |
| : deps.isSignalReactionMessage(dataMessage?.reaction) | |
| ? dataMessage?.reaction | |
| : null; | |
| const messageText = (dataMessage?.message ?? "").trim(); | |
| const quoteText = dataMessage?.quote?.text?.trim() ?? ""; | |
| const hasBodyContent = | |
| Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length); | |
| if (reaction && !hasBodyContent) { | |
| if (reaction.isRemove) { | |
| return; | |
| } // Ignore reaction removals | |
| const emojiLabel = reaction.emoji?.trim() || "emoji"; | |
| const senderDisplay = formatSignalSenderDisplay(sender); | |
| const senderName = envelope.sourceName ?? senderDisplay; | |
| logVerbose(`signal reaction: ${emojiLabel} from ${senderName}`); | |
| const targets = deps.resolveSignalReactionTargets(reaction); | |
| const shouldNotify = deps.shouldEmitSignalReactionNotification({ | |
| mode: deps.reactionMode, | |
| account: deps.account, | |
| targets, | |
| sender, | |
| allowlist: deps.reactionAllowlist, | |
| }); | |
| if (!shouldNotify) { | |
| return; | |
| } | |
| const groupId = reaction.groupInfo?.groupId ?? undefined; | |
| const groupName = reaction.groupInfo?.groupName ?? undefined; | |
| const isGroup = Boolean(groupId); | |
| const senderPeerId = resolveSignalPeerId(sender); | |
| const route = resolveAgentRoute({ | |
| cfg: deps.cfg, | |
| channel: "signal", | |
| accountId: deps.accountId, | |
| peer: { | |
| kind: isGroup ? "group" : "dm", | |
| id: isGroup ? (groupId ?? "unknown") : senderPeerId, | |
| }, | |
| }); | |
| const groupLabel = isGroup ? `${groupName ?? "Signal Group"} id:${groupId}` : undefined; | |
| const messageId = reaction.targetSentTimestamp | |
| ? String(reaction.targetSentTimestamp) | |
| : "unknown"; | |
| const text = deps.buildSignalReactionSystemEventText({ | |
| emojiLabel, | |
| actorLabel: senderName, | |
| messageId, | |
| targetLabel: targets[0]?.display, | |
| groupLabel, | |
| }); | |
| const senderId = formatSignalSenderId(sender); | |
| const contextKey = [ | |
| "signal", | |
| "reaction", | |
| "added", | |
| messageId, | |
| senderId, | |
| emojiLabel, | |
| groupId ?? "", | |
| ] | |
| .filter(Boolean) | |
| .join(":"); | |
| enqueueSystemEvent(text, { sessionKey: route.sessionKey, contextKey }); | |
| return; | |
| } | |
| if (!dataMessage) { | |
| return; | |
| } | |
| const senderDisplay = formatSignalSenderDisplay(sender); | |
| const senderRecipient = resolveSignalRecipient(sender); | |
| const senderPeerId = resolveSignalPeerId(sender); | |
| const senderAllowId = formatSignalSenderId(sender); | |
| if (!senderRecipient) { | |
| return; | |
| } | |
| const senderIdLine = formatSignalPairingIdLine(sender); | |
| const groupId = dataMessage.groupInfo?.groupId ?? undefined; | |
| const groupName = dataMessage.groupInfo?.groupName ?? undefined; | |
| const isGroup = Boolean(groupId); | |
| const storeAllowFrom = await readChannelAllowFromStore("signal").catch(() => []); | |
| const effectiveDmAllow = [...deps.allowFrom, ...storeAllowFrom]; | |
| const effectiveGroupAllow = [...deps.groupAllowFrom, ...storeAllowFrom]; | |
| const dmAllowed = | |
| deps.dmPolicy === "open" ? true : isSignalSenderAllowed(sender, effectiveDmAllow); | |
| if (!isGroup) { | |
| if (deps.dmPolicy === "disabled") { | |
| return; | |
| } | |
| if (!dmAllowed) { | |
| if (deps.dmPolicy === "pairing") { | |
| const senderId = senderAllowId; | |
| const { code, created } = await upsertChannelPairingRequest({ | |
| channel: "signal", | |
| id: senderId, | |
| meta: { name: envelope.sourceName ?? undefined }, | |
| }); | |
| if (created) { | |
| logVerbose(`signal pairing request sender=${senderId}`); | |
| try { | |
| await sendMessageSignal( | |
| `signal:${senderRecipient}`, | |
| buildPairingReply({ | |
| channel: "signal", | |
| idLine: senderIdLine, | |
| code, | |
| }), | |
| { | |
| baseUrl: deps.baseUrl, | |
| account: deps.account, | |
| maxBytes: deps.mediaMaxBytes, | |
| accountId: deps.accountId, | |
| }, | |
| ); | |
| } catch (err) { | |
| logVerbose(`signal pairing reply failed for ${senderId}: ${String(err)}`); | |
| } | |
| } | |
| } else { | |
| logVerbose(`Blocked signal sender ${senderDisplay} (dmPolicy=${deps.dmPolicy})`); | |
| } | |
| return; | |
| } | |
| } | |
| if (isGroup && deps.groupPolicy === "disabled") { | |
| logVerbose("Blocked signal group message (groupPolicy: disabled)"); | |
| return; | |
| } | |
| if (isGroup && deps.groupPolicy === "allowlist") { | |
| if (effectiveGroupAllow.length === 0) { | |
| logVerbose("Blocked signal group message (groupPolicy: allowlist, no groupAllowFrom)"); | |
| return; | |
| } | |
| if (!isSignalSenderAllowed(sender, effectiveGroupAllow)) { | |
| logVerbose(`Blocked signal group sender ${senderDisplay} (not in groupAllowFrom)`); | |
| return; | |
| } | |
| } | |
| const useAccessGroups = deps.cfg.commands?.useAccessGroups !== false; | |
| const ownerAllowedForCommands = isSignalSenderAllowed(sender, effectiveDmAllow); | |
| const groupAllowedForCommands = isSignalSenderAllowed(sender, effectiveGroupAllow); | |
| const hasControlCommandInMessage = hasControlCommand(messageText, deps.cfg); | |
| const commandGate = resolveControlCommandGate({ | |
| useAccessGroups, | |
| authorizers: [ | |
| { configured: effectiveDmAllow.length > 0, allowed: ownerAllowedForCommands }, | |
| { configured: effectiveGroupAllow.length > 0, allowed: groupAllowedForCommands }, | |
| ], | |
| allowTextCommands: true, | |
| hasControlCommand: hasControlCommandInMessage, | |
| }); | |
| const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAllowed; | |
| if (isGroup && commandGate.shouldBlock) { | |
| logInboundDrop({ | |
| log: logVerbose, | |
| channel: "signal", | |
| reason: "control command (unauthorized)", | |
| target: senderDisplay, | |
| }); | |
| return; | |
| } | |
| let mediaPath: string | undefined; | |
| let mediaType: string | undefined; | |
| let placeholder = ""; | |
| const firstAttachment = dataMessage.attachments?.[0]; | |
| if (firstAttachment?.id && !deps.ignoreAttachments) { | |
| try { | |
| const fetched = await deps.fetchAttachment({ | |
| baseUrl: deps.baseUrl, | |
| account: deps.account, | |
| attachment: firstAttachment, | |
| sender: senderRecipient, | |
| groupId, | |
| maxBytes: deps.mediaMaxBytes, | |
| }); | |
| if (fetched) { | |
| mediaPath = fetched.path; | |
| mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined; | |
| } | |
| } catch (err) { | |
| deps.runtime.error?.(danger(`attachment fetch failed: ${String(err)}`)); | |
| } | |
| } | |
| const kind = mediaKindFromMime(mediaType ?? undefined); | |
| if (kind) { | |
| placeholder = `<media:${kind}>`; | |
| } else if (dataMessage.attachments?.length) { | |
| placeholder = "<media:attachment>"; | |
| } | |
| const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || ""; | |
| if (!bodyText) { | |
| return; | |
| } | |
| const receiptTimestamp = | |
| typeof envelope.timestamp === "number" | |
| ? envelope.timestamp | |
| : typeof dataMessage.timestamp === "number" | |
| ? dataMessage.timestamp | |
| : undefined; | |
| if (deps.sendReadReceipts && !deps.readReceiptsViaDaemon && !isGroup && receiptTimestamp) { | |
| try { | |
| await sendReadReceiptSignal(`signal:${senderRecipient}`, receiptTimestamp, { | |
| baseUrl: deps.baseUrl, | |
| account: deps.account, | |
| accountId: deps.accountId, | |
| }); | |
| } catch (err) { | |
| logVerbose(`signal read receipt failed for ${senderDisplay}: ${String(err)}`); | |
| } | |
| } else if ( | |
| deps.sendReadReceipts && | |
| !deps.readReceiptsViaDaemon && | |
| !isGroup && | |
| !receiptTimestamp | |
| ) { | |
| logVerbose(`signal read receipt skipped (missing timestamp) for ${senderDisplay}`); | |
| } | |
| const senderName = envelope.sourceName ?? senderDisplay; | |
| const messageId = | |
| typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined; | |
| await inboundDebouncer.enqueue({ | |
| senderName, | |
| senderDisplay, | |
| senderRecipient, | |
| senderPeerId, | |
| groupId, | |
| groupName, | |
| isGroup, | |
| bodyText, | |
| timestamp: envelope.timestamp ?? undefined, | |
| messageId, | |
| mediaPath, | |
| mediaType, | |
| commandAuthorized, | |
| }); | |
| }; | |
| } | |