Spaces:
Sleeping
Sleeping
| import type { WebChannelStatus, WebInboundMsg, WebMonitorTuning } from "./types.js"; | |
| import { hasControlCommand } from "../../auto-reply/command-detection.js"; | |
| import { resolveInboundDebounceMs } from "../../auto-reply/inbound-debounce.js"; | |
| import { getReplyFromConfig } from "../../auto-reply/reply.js"; | |
| import { DEFAULT_GROUP_HISTORY_LIMIT } from "../../auto-reply/reply/history.js"; | |
| import { formatCliCommand } from "../../cli/command-format.js"; | |
| import { waitForever } from "../../cli/wait.js"; | |
| import { loadConfig } from "../../config/config.js"; | |
| import { logVerbose } from "../../globals.js"; | |
| import { formatDurationMs } from "../../infra/format-duration.js"; | |
| import { enqueueSystemEvent } from "../../infra/system-events.js"; | |
| import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js"; | |
| import { getChildLogger } from "../../logging.js"; | |
| import { resolveAgentRoute } from "../../routing/resolve-route.js"; | |
| import { defaultRuntime, type RuntimeEnv } from "../../runtime.js"; | |
| import { resolveWhatsAppAccount } from "../accounts.js"; | |
| import { setActiveWebListener } from "../active-listener.js"; | |
| import { monitorWebInbox } from "../inbound.js"; | |
| import { | |
| computeBackoff, | |
| newConnectionId, | |
| resolveHeartbeatSeconds, | |
| resolveReconnectPolicy, | |
| sleepWithAbort, | |
| } from "../reconnect.js"; | |
| import { formatError, getWebAuthAgeMs, readWebSelfId } from "../session.js"; | |
| import { DEFAULT_WEB_MEDIA_BYTES } from "./constants.js"; | |
| import { whatsappHeartbeatLog, whatsappLog } from "./loggers.js"; | |
| import { buildMentionConfig } from "./mentions.js"; | |
| import { createEchoTracker } from "./monitor/echo.js"; | |
| import { createWebOnMessageHandler } from "./monitor/on-message.js"; | |
| import { isLikelyWhatsAppCryptoError } from "./util.js"; | |
| export async function monitorWebChannel( | |
| verbose: boolean, | |
| listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox, | |
| keepAlive = true, | |
| replyResolver: typeof getReplyFromConfig | undefined = getReplyFromConfig, | |
| runtime: RuntimeEnv = defaultRuntime, | |
| abortSignal?: AbortSignal, | |
| tuning: WebMonitorTuning = {}, | |
| ) { | |
| const runId = newConnectionId(); | |
| const replyLogger = getChildLogger({ module: "web-auto-reply", runId }); | |
| const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId }); | |
| const reconnectLogger = getChildLogger({ module: "web-reconnect", runId }); | |
| const status: WebChannelStatus = { | |
| running: true, | |
| connected: false, | |
| reconnectAttempts: 0, | |
| lastConnectedAt: null, | |
| lastDisconnect: null, | |
| lastMessageAt: null, | |
| lastEventAt: null, | |
| lastError: null, | |
| }; | |
| const emitStatus = () => { | |
| tuning.statusSink?.({ | |
| ...status, | |
| lastDisconnect: status.lastDisconnect ? { ...status.lastDisconnect } : null, | |
| }); | |
| }; | |
| emitStatus(); | |
| const baseCfg = loadConfig(); | |
| const account = resolveWhatsAppAccount({ | |
| cfg: baseCfg, | |
| accountId: tuning.accountId, | |
| }); | |
| const cfg = { | |
| ...baseCfg, | |
| channels: { | |
| ...baseCfg.channels, | |
| whatsapp: { | |
| ...baseCfg.channels?.whatsapp, | |
| ackReaction: account.ackReaction, | |
| messagePrefix: account.messagePrefix, | |
| allowFrom: account.allowFrom, | |
| groupAllowFrom: account.groupAllowFrom, | |
| groupPolicy: account.groupPolicy, | |
| textChunkLimit: account.textChunkLimit, | |
| chunkMode: account.chunkMode, | |
| mediaMaxMb: account.mediaMaxMb, | |
| blockStreaming: account.blockStreaming, | |
| groups: account.groups, | |
| }, | |
| }, | |
| } satisfies ReturnType<typeof loadConfig>; | |
| const configuredMaxMb = cfg.agents?.defaults?.mediaMaxMb; | |
| const maxMediaBytes = | |
| typeof configuredMaxMb === "number" && configuredMaxMb > 0 | |
| ? configuredMaxMb * 1024 * 1024 | |
| : DEFAULT_WEB_MEDIA_BYTES; | |
| const heartbeatSeconds = resolveHeartbeatSeconds(cfg, tuning.heartbeatSeconds); | |
| const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect); | |
| const baseMentionConfig = buildMentionConfig(cfg); | |
| const groupHistoryLimit = | |
| cfg.channels?.whatsapp?.accounts?.[tuning.accountId ?? ""]?.historyLimit ?? | |
| cfg.channels?.whatsapp?.historyLimit ?? | |
| cfg.messages?.groupChat?.historyLimit ?? | |
| DEFAULT_GROUP_HISTORY_LIMIT; | |
| const groupHistories = new Map< | |
| string, | |
| Array<{ | |
| sender: string; | |
| body: string; | |
| timestamp?: number; | |
| id?: string; | |
| senderJid?: string; | |
| }> | |
| >(); | |
| const groupMemberNames = new Map<string, Map<string, string>>(); | |
| const echoTracker = createEchoTracker({ maxItems: 100, logVerbose }); | |
| const sleep = | |
| tuning.sleep ?? | |
| ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal)); | |
| const stopRequested = () => abortSignal?.aborted === true; | |
| const abortPromise = | |
| abortSignal && | |
| new Promise<"aborted">((resolve) => | |
| abortSignal.addEventListener("abort", () => resolve("aborted"), { | |
| once: true, | |
| }), | |
| ); | |
| // Avoid noisy MaxListenersExceeded warnings in test environments where | |
| // multiple gateway instances may be constructed. | |
| const currentMaxListeners = process.getMaxListeners?.() ?? 10; | |
| if (process.setMaxListeners && currentMaxListeners < 50) { | |
| process.setMaxListeners(50); | |
| } | |
| let sigintStop = false; | |
| const handleSigint = () => { | |
| sigintStop = true; | |
| }; | |
| process.once("SIGINT", handleSigint); | |
| let reconnectAttempts = 0; | |
| while (true) { | |
| if (stopRequested()) { | |
| break; | |
| } | |
| const connectionId = newConnectionId(); | |
| const startedAt = Date.now(); | |
| let heartbeat: NodeJS.Timeout | null = null; | |
| let watchdogTimer: NodeJS.Timeout | null = null; | |
| let lastMessageAt: number | null = null; | |
| let handledMessages = 0; | |
| let _lastInboundMsg: WebInboundMsg | null = null; | |
| let unregisterUnhandled: (() => void) | null = null; | |
| // Watchdog to detect stuck message processing (e.g., event emitter died) | |
| const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages | |
| const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute | |
| const backgroundTasks = new Set<Promise<unknown>>(); | |
| const onMessage = createWebOnMessageHandler({ | |
| cfg, | |
| verbose, | |
| connectionId, | |
| maxMediaBytes, | |
| groupHistoryLimit, | |
| groupHistories, | |
| groupMemberNames, | |
| echoTracker, | |
| backgroundTasks, | |
| replyResolver: replyResolver ?? getReplyFromConfig, | |
| replyLogger, | |
| baseMentionConfig, | |
| account, | |
| }); | |
| const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" }); | |
| const shouldDebounce = (msg: WebInboundMsg) => { | |
| if (msg.mediaPath || msg.mediaType) { | |
| return false; | |
| } | |
| if (msg.location) { | |
| return false; | |
| } | |
| if (msg.replyToId || msg.replyToBody) { | |
| return false; | |
| } | |
| return !hasControlCommand(msg.body, cfg); | |
| }; | |
| const listener = await (listenerFactory ?? monitorWebInbox)({ | |
| verbose, | |
| accountId: account.accountId, | |
| authDir: account.authDir, | |
| mediaMaxMb: account.mediaMaxMb, | |
| sendReadReceipts: account.sendReadReceipts, | |
| debounceMs: inboundDebounceMs, | |
| shouldDebounce, | |
| onMessage: async (msg: WebInboundMsg) => { | |
| handledMessages += 1; | |
| lastMessageAt = Date.now(); | |
| status.lastMessageAt = lastMessageAt; | |
| status.lastEventAt = lastMessageAt; | |
| emitStatus(); | |
| _lastInboundMsg = msg; | |
| await onMessage(msg); | |
| }, | |
| }); | |
| status.connected = true; | |
| status.lastConnectedAt = Date.now(); | |
| status.lastEventAt = status.lastConnectedAt; | |
| status.lastError = null; | |
| emitStatus(); | |
| // Surface a concise connection event for the next main-session turn/heartbeat. | |
| const { e164: selfE164 } = readWebSelfId(account.authDir); | |
| const connectRoute = resolveAgentRoute({ | |
| cfg, | |
| channel: "whatsapp", | |
| accountId: account.accountId, | |
| }); | |
| enqueueSystemEvent(`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, { | |
| sessionKey: connectRoute.sessionKey, | |
| }); | |
| setActiveWebListener(account.accountId, listener); | |
| unregisterUnhandled = registerUnhandledRejectionHandler((reason) => { | |
| if (!isLikelyWhatsAppCryptoError(reason)) { | |
| return false; | |
| } | |
| const errorStr = formatError(reason); | |
| reconnectLogger.warn( | |
| { connectionId, error: errorStr }, | |
| "web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect", | |
| ); | |
| listener.signalClose?.({ | |
| status: 499, | |
| isLoggedOut: false, | |
| error: reason, | |
| }); | |
| return true; | |
| }); | |
| const closeListener = async () => { | |
| setActiveWebListener(account.accountId, null); | |
| if (unregisterUnhandled) { | |
| unregisterUnhandled(); | |
| unregisterUnhandled = null; | |
| } | |
| if (heartbeat) { | |
| clearInterval(heartbeat); | |
| } | |
| if (watchdogTimer) { | |
| clearInterval(watchdogTimer); | |
| } | |
| if (backgroundTasks.size > 0) { | |
| await Promise.allSettled(backgroundTasks); | |
| backgroundTasks.clear(); | |
| } | |
| try { | |
| await listener.close(); | |
| } catch (err) { | |
| logVerbose(`Socket close failed: ${formatError(err)}`); | |
| } | |
| }; | |
| if (keepAlive) { | |
| heartbeat = setInterval(() => { | |
| const authAgeMs = getWebAuthAgeMs(account.authDir); | |
| const minutesSinceLastMessage = lastMessageAt | |
| ? Math.floor((Date.now() - lastMessageAt) / 60000) | |
| : null; | |
| const logData = { | |
| connectionId, | |
| reconnectAttempts, | |
| messagesHandled: handledMessages, | |
| lastMessageAt, | |
| authAgeMs, | |
| uptimeMs: Date.now() - startedAt, | |
| ...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30 | |
| ? { minutesSinceLastMessage } | |
| : {}), | |
| }; | |
| if (minutesSinceLastMessage && minutesSinceLastMessage > 30) { | |
| heartbeatLogger.warn(logData, "⚠️ web gateway heartbeat - no messages in 30+ minutes"); | |
| } else { | |
| heartbeatLogger.info(logData, "web gateway heartbeat"); | |
| } | |
| }, heartbeatSeconds * 1000); | |
| watchdogTimer = setInterval(() => { | |
| if (!lastMessageAt) { | |
| return; | |
| } | |
| const timeSinceLastMessage = Date.now() - lastMessageAt; | |
| if (timeSinceLastMessage <= MESSAGE_TIMEOUT_MS) { | |
| return; | |
| } | |
| const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000); | |
| heartbeatLogger.warn( | |
| { | |
| connectionId, | |
| minutesSinceLastMessage, | |
| lastMessageAt: new Date(lastMessageAt), | |
| messagesHandled: handledMessages, | |
| }, | |
| "Message timeout detected - forcing reconnect", | |
| ); | |
| whatsappHeartbeatLog.warn( | |
| `No messages received in ${minutesSinceLastMessage}m - restarting connection`, | |
| ); | |
| void closeListener().catch((err) => { | |
| logVerbose(`Close listener failed: ${formatError(err)}`); | |
| }); | |
| listener.signalClose?.({ | |
| status: 499, | |
| isLoggedOut: false, | |
| error: "watchdog-timeout", | |
| }); | |
| }, WATCHDOG_CHECK_MS); | |
| } | |
| whatsappLog.info("Listening for personal WhatsApp inbound messages."); | |
| if (process.stdout.isTTY || process.stderr.isTTY) { | |
| whatsappLog.raw("Ctrl+C to stop."); | |
| } | |
| if (!keepAlive) { | |
| await closeListener(); | |
| return; | |
| } | |
| const reason = await Promise.race([ | |
| listener.onClose?.catch((err) => { | |
| reconnectLogger.error({ error: formatError(err) }, "listener.onClose rejected"); | |
| return { status: 500, isLoggedOut: false, error: err }; | |
| }) ?? waitForever(), | |
| abortPromise ?? waitForever(), | |
| ]); | |
| const uptimeMs = Date.now() - startedAt; | |
| if (uptimeMs > heartbeatSeconds * 1000) { | |
| reconnectAttempts = 0; // Healthy stretch; reset the backoff. | |
| } | |
| status.reconnectAttempts = reconnectAttempts; | |
| emitStatus(); | |
| if (stopRequested() || sigintStop || reason === "aborted") { | |
| await closeListener(); | |
| break; | |
| } | |
| const statusCode = | |
| (typeof reason === "object" && reason && "status" in reason | |
| ? (reason as { status?: number }).status | |
| : undefined) ?? "unknown"; | |
| const loggedOut = | |
| typeof reason === "object" && | |
| reason && | |
| "isLoggedOut" in reason && | |
| (reason as { isLoggedOut?: boolean }).isLoggedOut; | |
| const errorStr = formatError(reason); | |
| status.connected = false; | |
| status.lastEventAt = Date.now(); | |
| status.lastDisconnect = { | |
| at: status.lastEventAt, | |
| status: typeof statusCode === "number" ? statusCode : undefined, | |
| error: errorStr, | |
| loggedOut: Boolean(loggedOut), | |
| }; | |
| status.lastError = errorStr; | |
| status.reconnectAttempts = reconnectAttempts; | |
| emitStatus(); | |
| reconnectLogger.info( | |
| { | |
| connectionId, | |
| status: statusCode, | |
| loggedOut, | |
| reconnectAttempts, | |
| error: errorStr, | |
| }, | |
| "web reconnect: connection closed", | |
| ); | |
| enqueueSystemEvent(`WhatsApp gateway disconnected (status ${statusCode ?? "unknown"})`, { | |
| sessionKey: connectRoute.sessionKey, | |
| }); | |
| if (loggedOut) { | |
| runtime.error( | |
| `WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`, | |
| ); | |
| await closeListener(); | |
| break; | |
| } | |
| reconnectAttempts += 1; | |
| status.reconnectAttempts = reconnectAttempts; | |
| emitStatus(); | |
| if (reconnectPolicy.maxAttempts > 0 && reconnectAttempts >= reconnectPolicy.maxAttempts) { | |
| reconnectLogger.warn( | |
| { | |
| connectionId, | |
| status: statusCode, | |
| reconnectAttempts, | |
| maxAttempts: reconnectPolicy.maxAttempts, | |
| }, | |
| "web reconnect: max attempts reached; continuing in degraded mode", | |
| ); | |
| runtime.error( | |
| `WhatsApp Web reconnect: max attempts reached (${reconnectAttempts}/${reconnectPolicy.maxAttempts}). Stopping web monitoring.`, | |
| ); | |
| await closeListener(); | |
| break; | |
| } | |
| const delay = computeBackoff(reconnectPolicy, reconnectAttempts); | |
| reconnectLogger.info( | |
| { | |
| connectionId, | |
| status: statusCode, | |
| reconnectAttempts, | |
| maxAttempts: reconnectPolicy.maxAttempts || "unlimited", | |
| delayMs: delay, | |
| }, | |
| "web reconnect: scheduling retry", | |
| ); | |
| runtime.error( | |
| `WhatsApp Web connection closed (status ${statusCode}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationMs(delay)}… (${errorStr})`, | |
| ); | |
| await closeListener(); | |
| try { | |
| await sleep(delay, abortSignal); | |
| } catch { | |
| break; | |
| } | |
| } | |
| status.running = false; | |
| status.connected = false; | |
| status.lastEventAt = Date.now(); | |
| emitStatus(); | |
| process.removeListener("SIGINT", handleSigint); | |
| } | |