import type { WebhookRequestBody } from "@line/bot-sdk"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "../config/config.js"; import type { RuntimeEnv } from "../runtime.js"; import type { LineChannelData, ResolvedLineAccount } from "./types.js"; import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; import { chunkMarkdownText } from "../auto-reply/chunk.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { danger, logVerbose } from "../globals.js"; import { normalizePluginHttpPath } from "../plugins/http-path.js"; import { registerPluginHttpRoute } from "../plugins/http-registry.js"; import { deliverLineAutoReply } from "./auto-reply-delivery.js"; import { createLineBot } from "./bot.js"; import { processLineMessage } from "./markdown-to-line.js"; import { sendLineReplyChunks } from "./reply-chunks.js"; import { replyMessageLine, showLoadingAnimation, getUserDisplayName, createQuickReplyItems, createTextMessageWithQuickReplies, pushTextMessageWithQuickReplies, pushMessageLine, pushMessagesLine, createFlexMessage, createImageMessage, createLocationMessage, } from "./send.js"; import { validateLineSignature } from "./signature.js"; import { buildTemplateMessageFromPayload } from "./template-messages.js"; export interface MonitorLineProviderOptions { channelAccessToken: string; channelSecret: string; accountId?: string; config: OpenClawConfig; runtime: RuntimeEnv; abortSignal?: AbortSignal; webhookUrl?: string; webhookPath?: string; } export interface LineProviderMonitor { account: ResolvedLineAccount; handleWebhook: (body: WebhookRequestBody) => Promise; stop: () => void; } // Track runtime state in memory (simplified version) const runtimeState = new Map< string, { running: boolean; lastStartAt: number | null; lastStopAt: number | null; lastError: string | null; lastInboundAt?: number | null; lastOutboundAt?: number | null; } >(); function recordChannelRuntimeState(params: { channel: string; accountId: string; state: Partial<{ running: boolean; lastStartAt: number | null; lastStopAt: number | null; lastError: string | null; lastInboundAt: number | null; lastOutboundAt: number | null; }>; }): void { const key = `${params.channel}:${params.accountId}`; const existing = runtimeState.get(key) ?? { running: false, lastStartAt: null, lastStopAt: null, lastError: null, }; runtimeState.set(key, { ...existing, ...params.state }); } export function getLineRuntimeState(accountId: string) { return runtimeState.get(`line:${accountId}`); } async function readRequestBody(req: IncomingMessage): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; req.on("data", (chunk) => chunks.push(chunk)); req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8"))); req.on("error", reject); }); } function startLineLoadingKeepalive(params: { userId: string; accountId?: string; intervalMs?: number; loadingSeconds?: number; }): () => void { const intervalMs = params.intervalMs ?? 18_000; const loadingSeconds = params.loadingSeconds ?? 20; let stopped = false; const trigger = () => { if (stopped) { return; } void showLoadingAnimation(params.userId, { accountId: params.accountId, loadingSeconds, }).catch(() => {}); }; trigger(); const timer = setInterval(trigger, intervalMs); return () => { if (stopped) { return; } stopped = true; clearInterval(timer); }; } export async function monitorLineProvider( opts: MonitorLineProviderOptions, ): Promise { const { channelAccessToken, channelSecret, accountId, config, runtime, abortSignal, webhookPath, } = opts; const resolvedAccountId = accountId ?? "default"; // Record starting state recordChannelRuntimeState({ channel: "line", accountId: resolvedAccountId, state: { running: true, lastStartAt: Date.now(), }, }); // Create the bot const bot = createLineBot({ channelAccessToken, channelSecret, accountId, runtime, config, onMessage: async (ctx) => { if (!ctx) { return; } const { ctxPayload, replyToken, route } = ctx; // Record inbound activity recordChannelRuntimeState({ channel: "line", accountId: resolvedAccountId, state: { lastInboundAt: Date.now(), }, }); const shouldShowLoading = Boolean(ctx.userId && !ctx.isGroup); // Fetch display name for logging (non-blocking) const displayNamePromise = ctx.userId ? getUserDisplayName(ctx.userId, { accountId: ctx.accountId }) : Promise.resolve(ctxPayload.From); // Show loading animation while processing (non-blocking, best-effort) const stopLoading = shouldShowLoading ? startLineLoadingKeepalive({ userId: ctx.userId!, accountId: ctx.accountId }) : null; const displayName = await displayNamePromise; logVerbose(`line: received message from ${displayName} (${ctxPayload.From})`); // Dispatch to auto-reply system for AI response try { const textLimit = 5000; // LINE max message length let replyTokenUsed = false; // Track if we've used the one-time reply token const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { responsePrefix: resolveEffectiveMessagesConfig(config, route.agentId).responsePrefix, deliver: async (payload, _info) => { const lineData = (payload.channelData?.line as LineChannelData | undefined) ?? {}; // Show loading animation before each delivery (non-blocking) if (ctx.userId && !ctx.isGroup) { void showLoadingAnimation(ctx.userId, { accountId: ctx.accountId }).catch(() => {}); } const { replyTokenUsed: nextReplyTokenUsed } = await deliverLineAutoReply({ payload, lineData, to: ctxPayload.From, replyToken, replyTokenUsed, accountId: ctx.accountId, textLimit, deps: { buildTemplateMessageFromPayload, processLineMessage, chunkMarkdownText, sendLineReplyChunks, replyMessageLine, pushMessageLine, pushTextMessageWithQuickReplies, createQuickReplyItems, createTextMessageWithQuickReplies, pushMessagesLine, createFlexMessage, createImageMessage, createLocationMessage, onReplyError: (replyErr) => { logVerbose( `line: reply token failed, falling back to push: ${String(replyErr)}`, ); }, }, }); replyTokenUsed = nextReplyTokenUsed; recordChannelRuntimeState({ channel: "line", accountId: resolvedAccountId, state: { lastOutboundAt: Date.now(), }, }); }, onError: (err, info) => { runtime.error?.(danger(`line ${info.kind} reply failed: ${String(err)}`)); }, }, replyOptions: {}, }); if (!queuedFinal) { logVerbose(`line: no response generated for message from ${ctxPayload.From}`); } } catch (err) { runtime.error?.(danger(`line: auto-reply failed: ${String(err)}`)); // Send error message to user if (replyToken) { try { await replyMessageLine( replyToken, [{ type: "text", text: "Sorry, I encountered an error processing your message." }], { accountId: ctx.accountId }, ); } catch (replyErr) { runtime.error?.(danger(`line: error reply failed: ${String(replyErr)}`)); } } } finally { stopLoading?.(); } }, }); // Register HTTP webhook handler const normalizedPath = normalizePluginHttpPath(webhookPath, "/line/webhook") ?? "/line/webhook"; const unregisterHttp = registerPluginHttpRoute({ path: normalizedPath, pluginId: "line", accountId: resolvedAccountId, log: (msg) => logVerbose(msg), handler: async (req: IncomingMessage, res: ServerResponse) => { // Handle GET requests for webhook verification if (req.method === "GET") { res.statusCode = 200; res.setHeader("Content-Type", "text/plain"); res.end("OK"); return; } // Only accept POST requests if (req.method !== "POST") { res.statusCode = 405; res.setHeader("Allow", "GET, POST"); res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ error: "Method Not Allowed" })); return; } try { const rawBody = await readRequestBody(req); const signature = req.headers["x-line-signature"]; // Validate signature if (!signature || typeof signature !== "string") { logVerbose("line: webhook missing X-Line-Signature header"); res.statusCode = 400; res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ error: "Missing X-Line-Signature header" })); return; } if (!validateLineSignature(rawBody, signature, channelSecret)) { logVerbose("line: webhook signature validation failed"); res.statusCode = 401; res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ error: "Invalid signature" })); return; } // Parse and process the webhook body const body = JSON.parse(rawBody) as WebhookRequestBody; // Respond immediately with 200 to avoid LINE timeout res.statusCode = 200; res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ status: "ok" })); // Process events asynchronously if (body.events && body.events.length > 0) { logVerbose(`line: received ${body.events.length} webhook events`); await bot.handleWebhook(body).catch((err) => { runtime.error?.(danger(`line webhook handler failed: ${String(err)}`)); }); } } catch (err) { runtime.error?.(danger(`line webhook error: ${String(err)}`)); if (!res.headersSent) { res.statusCode = 500; res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ error: "Internal server error" })); } } }, }); logVerbose(`line: registered webhook handler at ${normalizedPath}`); // Handle abort signal const stopHandler = () => { logVerbose(`line: stopping provider for account ${resolvedAccountId}`); unregisterHttp(); recordChannelRuntimeState({ channel: "line", accountId: resolvedAccountId, state: { running: false, lastStopAt: Date.now(), }, }); }; abortSignal?.addEventListener("abort", stopHandler); return { account: bot.account, handleWebhook: bot.handleWebhook, stop: () => { stopHandler(); abortSignal?.removeEventListener("abort", stopHandler); }, }; }