OpenClawBot / src /imessage /monitor /monitor-provider.ts
darkfire514's picture
Upload 2526 files
fb4d8fe verified
import fs from "node:fs/promises";
import type { IMessagePayload, MonitorIMessageOpts } from "./types.js";
import { resolveHumanDelayConfig } from "../../agents/identity.js";
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import { hasControlCommand } from "../../auto-reply/command-detection.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatInboundFromLabel,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "../../auto-reply/inbound-debounce.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
DEFAULT_GROUP_HISTORY_LIMIT,
recordPendingHistoryEntryIfEnabled,
type HistoryEntry,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { resolveControlCommandGate } from "../../channels/command-gating.js";
import { logInboundDrop } from "../../channels/logging.js";
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
import { recordInboundSession } from "../../channels/session.js";
import { loadConfig } from "../../config/config.js";
import {
resolveChannelGroupPolicy,
resolveChannelGroupRequireMention,
} from "../../config/group-policy.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { waitForTransportReady } from "../../infra/transport-ready.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import {
readChannelAllowFromStore,
upsertChannelPairingRequest,
} from "../../pairing/pairing-store.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import { truncateUtf16Safe } from "../../utils.js";
import { resolveIMessageAccount } from "../accounts.js";
import { createIMessageRpcClient } from "../client.js";
import { probeIMessage } from "../probe.js";
import { sendMessageIMessage } from "../send.js";
import {
formatIMessageChatTarget,
isAllowedIMessageSender,
normalizeIMessageHandle,
} from "../targets.js";
import { deliverReplies } from "./deliver.js";
import { normalizeAllowList, resolveRuntime } from "./runtime.js";
/**
* Try to detect remote host from an SSH wrapper script like:
* exec ssh -T openclaw@192.168.64.3 /opt/homebrew/bin/imsg "$@"
* exec ssh -T mac-mini imsg "$@"
* Returns the user@host or host portion if found, undefined otherwise.
*/
async function detectRemoteHostFromCliPath(cliPath: string): Promise<string | undefined> {
try {
// Expand ~ to home directory
const expanded = cliPath.startsWith("~")
? cliPath.replace(/^~/, process.env.HOME ?? "")
: cliPath;
const content = await fs.readFile(expanded, "utf8");
// Match user@host pattern first (e.g., openclaw@192.168.64.3)
const userHostMatch = content.match(/\bssh\b[^\n]*?\s+([a-zA-Z0-9._-]+@[a-zA-Z0-9._-]+)/);
if (userHostMatch) {
return userHostMatch[1];
}
// Fallback: match host-only before imsg command (e.g., ssh -T mac-mini imsg)
const hostOnlyMatch = content.match(/\bssh\b[^\n]*?\s+([a-zA-Z][a-zA-Z0-9._-]*)\s+\S*\bimsg\b/);
return hostOnlyMatch?.[1];
} catch {
return undefined;
}
}
type IMessageReplyContext = {
id?: string;
body: string;
sender?: string;
};
function normalizeReplyField(value: unknown): string | undefined {
if (typeof value === "string") {
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
if (typeof value === "number") {
return String(value);
}
return undefined;
}
function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null {
const body = normalizeReplyField(message.reply_to_text);
if (!body) {
return null;
}
const id = normalizeReplyField(message.reply_to_id);
const sender = normalizeReplyField(message.reply_to_sender);
return { body, id, sender };
}
export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): Promise<void> {
const runtime = resolveRuntime(opts);
const cfg = opts.config ?? loadConfig();
const accountInfo = resolveIMessageAccount({
cfg,
accountId: opts.accountId,
});
const imessageCfg = accountInfo.config;
const historyLimit = Math.max(
0,
imessageCfg.historyLimit ??
cfg.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const groupHistories = new Map<string, HistoryEntry[]>();
const textLimit = resolveTextChunkLimit(cfg, "imessage", accountInfo.accountId);
const allowFrom = normalizeAllowList(opts.allowFrom ?? imessageCfg.allowFrom);
const groupAllowFrom = normalizeAllowList(
opts.groupAllowFrom ??
imessageCfg.groupAllowFrom ??
(imessageCfg.allowFrom && imessageCfg.allowFrom.length > 0 ? imessageCfg.allowFrom : []),
);
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
const groupPolicy = imessageCfg.groupPolicy ?? defaultGroupPolicy ?? "open";
const dmPolicy = imessageCfg.dmPolicy ?? "pairing";
const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false;
const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024;
const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg";
const dbPath = opts.dbPath ?? imessageCfg.dbPath;
// Resolve remoteHost: explicit config, or auto-detect from SSH wrapper script
let remoteHost = imessageCfg.remoteHost;
if (!remoteHost && cliPath && cliPath !== "imsg") {
remoteHost = await detectRemoteHostFromCliPath(cliPath);
if (remoteHost) {
logVerbose(`imessage: detected remoteHost=${remoteHost} from cliPath`);
}
}
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" });
const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const sender = entry.message.sender?.trim();
if (!sender) {
return null;
}
const conversationId =
entry.message.chat_id != null
? `chat:${entry.message.chat_id}`
: (entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown");
return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`;
},
shouldDebounce: (entry) => {
const text = entry.message.text?.trim() ?? "";
if (!text) {
return false;
}
if (entry.message.attachments && entry.message.attachments.length > 0) {
return false;
}
return !hasControlCommand(text, cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await handleMessageNow(last.message);
return;
}
const combinedText = entries
.map((entry) => entry.message.text ?? "")
.filter(Boolean)
.join("\n");
const syntheticMessage: IMessagePayload = {
...last.message,
text: combinedText,
attachments: null,
};
await handleMessageNow(syntheticMessage);
},
onError: (err) => {
runtime.error?.(`imessage debounce flush failed: ${String(err)}`);
},
});
async function handleMessageNow(message: IMessagePayload) {
const senderRaw = message.sender ?? "";
const sender = senderRaw.trim();
if (!sender) {
return;
}
const senderNormalized = normalizeIMessageHandle(sender);
if (message.is_from_me) {
return;
}
const chatId = message.chat_id ?? undefined;
const chatGuid = message.chat_guid ?? undefined;
const chatIdentifier = message.chat_identifier ?? undefined;
const groupIdCandidate = chatId !== undefined ? String(chatId) : undefined;
const groupListPolicy = groupIdCandidate
? resolveChannelGroupPolicy({
cfg,
channel: "imessage",
accountId: accountInfo.accountId,
groupId: groupIdCandidate,
})
: {
allowlistEnabled: false,
allowed: true,
groupConfig: undefined,
defaultConfig: undefined,
};
// Some iMessage threads can have multiple participants but still report
// is_group=false depending on how Messages stores the identifier.
// If the owner explicitly configures a chat_id under imessage.groups, treat
// that thread as a "group" for permission gating and session isolation.
const treatAsGroupByConfig = Boolean(
groupIdCandidate && groupListPolicy.allowlistEnabled && groupListPolicy.groupConfig,
);
const isGroup = Boolean(message.is_group) || treatAsGroupByConfig;
if (isGroup && !chatId) {
return;
}
const groupId = isGroup ? groupIdCandidate : undefined;
const storeAllowFrom = await readChannelAllowFromStore("imessage").catch(() => []);
const effectiveDmAllowFrom = Array.from(new Set([...allowFrom, ...storeAllowFrom]))
.map((v) => String(v).trim())
.filter(Boolean);
const effectiveGroupAllowFrom = Array.from(new Set([...groupAllowFrom, ...storeAllowFrom]))
.map((v) => String(v).trim())
.filter(Boolean);
if (isGroup) {
if (groupPolicy === "disabled") {
logVerbose("Blocked iMessage group message (groupPolicy: disabled)");
return;
}
if (groupPolicy === "allowlist") {
if (effectiveGroupAllowFrom.length === 0) {
logVerbose("Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)");
return;
}
const allowed = isAllowedIMessageSender({
allowFrom: effectiveGroupAllowFrom,
sender,
chatId: chatId ?? undefined,
chatGuid,
chatIdentifier,
});
if (!allowed) {
logVerbose(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`);
return;
}
}
if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) {
logVerbose(`imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`);
return;
}
}
const dmHasWildcard = effectiveDmAllowFrom.includes("*");
const dmAuthorized =
dmPolicy === "open"
? true
: dmHasWildcard ||
(effectiveDmAllowFrom.length > 0 &&
isAllowedIMessageSender({
allowFrom: effectiveDmAllowFrom,
sender,
chatId: chatId ?? undefined,
chatGuid,
chatIdentifier,
}));
if (!isGroup) {
if (dmPolicy === "disabled") {
return;
}
if (!dmAuthorized) {
if (dmPolicy === "pairing") {
const senderId = normalizeIMessageHandle(sender);
const { code, created } = await upsertChannelPairingRequest({
channel: "imessage",
id: senderId,
meta: {
sender: senderId,
chatId: chatId ? String(chatId) : undefined,
},
});
if (created) {
logVerbose(`imessage pairing request sender=${senderId}`);
try {
await sendMessageIMessage(
sender,
buildPairingReply({
channel: "imessage",
idLine: `Your iMessage sender id: ${senderId}`,
code,
}),
{
client,
maxBytes: mediaMaxBytes,
accountId: accountInfo.accountId,
...(chatId ? { chatId } : {}),
},
);
} catch (err) {
logVerbose(`imessage pairing reply failed for ${senderId}: ${String(err)}`);
}
}
} else {
logVerbose(`Blocked iMessage sender ${sender} (dmPolicy=${dmPolicy})`);
}
return;
}
}
const route = resolveAgentRoute({
cfg,
channel: "imessage",
accountId: accountInfo.accountId,
peer: {
kind: isGroup ? "group" : "dm",
id: isGroup ? String(chatId ?? "unknown") : normalizeIMessageHandle(sender),
},
});
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
const messageText = (message.text ?? "").trim();
const attachments = includeAttachments ? (message.attachments ?? []) : [];
// Filter to valid attachments with paths
const validAttachments = attachments.filter((entry) => entry?.original_path && !entry?.missing);
const firstAttachment = validAttachments[0];
const mediaPath = firstAttachment?.original_path ?? undefined;
const mediaType = firstAttachment?.mime_type ?? undefined;
// Build arrays for all attachments (for multi-image support)
const mediaPaths = validAttachments.map((a) => a.original_path).filter(Boolean) as string[];
const mediaTypes = validAttachments.map((a) => a.mime_type ?? undefined);
const kind = mediaKindFromMime(mediaType ?? undefined);
const placeholder = kind ? `<media:${kind}>` : attachments?.length ? "<media:attachment>" : "";
const bodyText = messageText || placeholder;
if (!bodyText) {
return;
}
const replyContext = describeReplyContext(message);
const createdAt = message.created_at ? Date.parse(message.created_at) : undefined;
const historyKey = isGroup
? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown")
: undefined;
const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true;
const requireMention = resolveChannelGroupRequireMention({
cfg,
channel: "imessage",
accountId: accountInfo.accountId,
groupId,
requireMentionOverride: opts.requireMention,
overrideOrder: "before-config",
});
const canDetectMention = mentionRegexes.length > 0;
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
const ownerAllowedForCommands =
effectiveDmAllowFrom.length > 0
? isAllowedIMessageSender({
allowFrom: effectiveDmAllowFrom,
sender,
chatId: chatId ?? undefined,
chatGuid,
chatIdentifier,
})
: false;
const groupAllowedForCommands =
effectiveGroupAllowFrom.length > 0
? isAllowedIMessageSender({
allowFrom: effectiveGroupAllowFrom,
sender,
chatId: chatId ?? undefined,
chatGuid,
chatIdentifier,
})
: false;
const hasControlCommandInMessage = hasControlCommand(messageText, cfg);
const commandGate = resolveControlCommandGate({
useAccessGroups,
authorizers: [
{ configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands },
{ configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands },
],
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
});
const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAuthorized;
if (isGroup && commandGate.shouldBlock) {
logInboundDrop({
log: logVerbose,
channel: "imessage",
reason: "control command (unauthorized)",
target: sender,
});
return;
}
const shouldBypassMention =
isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommandInMessage;
const effectiveWasMentioned = mentioned || shouldBypassMention;
if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) {
logVerbose(`imessage: skipping group message (no mention)`);
recordPendingHistoryEntryIfEnabled({
historyMap: groupHistories,
historyKey: historyKey ?? "",
limit: historyLimit,
entry: historyKey
? {
sender: senderNormalized,
body: bodyText,
timestamp: createdAt,
messageId: message.id ? String(message.id) : undefined,
}
: null,
});
return;
}
const chatTarget = formatIMessageChatTarget(chatId);
const fromLabel = formatInboundFromLabel({
isGroup,
groupLabel: message.chat_name ?? undefined,
groupId: chatId !== undefined ? String(chatId) : "unknown",
groupFallback: "Group",
directLabel: senderNormalized,
directId: sender,
});
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const replySuffix = replyContext
? `\n\n[Replying to ${replyContext.sender ?? "unknown sender"}${
replyContext.id ? ` id:${replyContext.id}` : ""
}]\n${replyContext.body}\n[/Replying]`
: "";
const body = formatInboundEnvelope({
channel: "iMessage",
from: fromLabel,
timestamp: createdAt,
body: `${bodyText}${replySuffix}`,
chatType: isGroup ? "group" : "direct",
sender: { name: senderNormalized, id: sender },
previousTimestamp,
envelope: envelopeOptions,
});
let combinedBody = body;
if (isGroup && historyKey) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: groupHistories,
historyKey,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatInboundEnvelope({
channel: "iMessage",
from: fromLabel,
timestamp: entry.timestamp,
body: `${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`,
chatType: "group",
senderLabel: entry.sender,
envelope: envelopeOptions,
}),
});
}
const imessageTo = (isGroup ? chatTarget : undefined) || `imessage:${sender}`;
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: bodyText,
CommandBody: bodyText,
From: isGroup ? `imessage:group:${chatId ?? "unknown"}` : `imessage:${sender}`,
To: imessageTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
GroupSubject: isGroup ? (message.chat_name ?? undefined) : undefined,
GroupMembers: isGroup ? (message.participants ?? []).filter(Boolean).join(", ") : undefined,
SenderName: senderNormalized,
SenderId: sender,
Provider: "imessage",
Surface: "imessage",
MessageSid: message.id ? String(message.id) : undefined,
ReplyToId: replyContext?.id,
ReplyToBody: replyContext?.body,
ReplyToSender: replyContext?.sender,
Timestamp: createdAt,
MediaPath: mediaPath,
MediaType: mediaType,
MediaUrl: mediaPath,
MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined,
MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined,
MediaRemoteHost: remoteHost,
WasMentioned: effectiveWasMentioned,
CommandAuthorized: commandAuthorized,
// Originating channel for reply routing.
OriginatingChannel: "imessage" as const,
OriginatingTo: imessageTo,
});
const updateTarget = (isGroup ? chatTarget : undefined) || sender;
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute:
!isGroup && updateTarget
? {
sessionKey: route.mainSessionKey,
channel: "imessage",
to: updateTarget,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n");
logVerbose(
`imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${body.length} preview="${preview}"`,
);
}
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
const dispatcher = createReplyDispatcher({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
target: ctxPayload.To,
client,
accountId: accountInfo.accountId,
runtime,
maxBytes: mediaMaxBytes,
textLimit,
});
},
onError: (err, info) => {
runtime.error?.(danger(`imessage ${info.kind} reply failed: ${String(err)}`));
},
});
const { queuedFinal } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
disableBlockStreaming:
typeof accountInfo.config.blockStreaming === "boolean"
? !accountInfo.config.blockStreaming
: undefined,
onModelSelected: prefixContext.onModelSelected,
},
});
if (!queuedFinal) {
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: groupHistories,
historyKey,
limit: historyLimit,
});
}
return;
}
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
}
}
const handleMessage = async (raw: unknown) => {
const params = raw as { message?: IMessagePayload | null };
const message = params?.message ?? null;
if (!message) {
return;
}
await inboundDebouncer.enqueue({ message });
};
await waitForTransportReady({
label: "imsg rpc",
timeoutMs: 30_000,
logAfterMs: 10_000,
logIntervalMs: 10_000,
pollIntervalMs: 500,
abortSignal: opts.abortSignal,
runtime,
check: async () => {
const probe = await probeIMessage(2000, { cliPath, dbPath, runtime });
if (probe.ok) {
return { ok: true };
}
if (probe.fatal) {
throw new Error(probe.error ?? "imsg rpc unavailable");
}
return { ok: false, error: probe.error ?? "unreachable" };
},
});
if (opts.abortSignal?.aborted) {
return;
}
const client = await createIMessageRpcClient({
cliPath,
dbPath,
runtime,
onNotification: (msg) => {
if (msg.method === "message") {
void handleMessage(msg.params).catch((err) => {
runtime.error?.(`imessage: handler failed: ${String(err)}`);
});
} else if (msg.method === "error") {
runtime.error?.(`imessage: watch error ${JSON.stringify(msg.params)}`);
}
},
});
let subscriptionId: number | null = null;
const abort = opts.abortSignal;
const onAbort = () => {
if (subscriptionId) {
void client
.request("watch.unsubscribe", {
subscription: subscriptionId,
})
.catch(() => {
// Ignore disconnect errors during shutdown.
});
}
void client.stop().catch(() => {
// Ignore disconnect errors during shutdown.
});
};
abort?.addEventListener("abort", onAbort, { once: true });
try {
const result = await client.request<{ subscription?: number }>("watch.subscribe", {
attachments: includeAttachments,
});
subscriptionId = result?.subscription ?? null;
await client.waitForClose();
} catch (err) {
if (abort?.aborted) {
return;
}
runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`));
throw err;
} finally {
abort?.removeEventListener("abort", onAbort);
await client.stop();
}
}