Spaces:
Paused
Paused
| import type { WebhookRequestBody } from "@line/bot-sdk"; | |
| import type { IncomingMessage, ServerResponse } from "node:http"; | |
| import type { OpenClawConfig } from "../config/config.js"; | |
| import type { RuntimeEnv } from "../runtime.js"; | |
| import type { LineChannelData, ResolvedLineAccount } from "./types.js"; | |
| import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; | |
| import { chunkMarkdownText } from "../auto-reply/chunk.js"; | |
| import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; | |
| import { danger, logVerbose } from "../globals.js"; | |
| import { normalizePluginHttpPath } from "../plugins/http-path.js"; | |
| import { registerPluginHttpRoute } from "../plugins/http-registry.js"; | |
| import { deliverLineAutoReply } from "./auto-reply-delivery.js"; | |
| import { createLineBot } from "./bot.js"; | |
| import { processLineMessage } from "./markdown-to-line.js"; | |
| import { sendLineReplyChunks } from "./reply-chunks.js"; | |
| import { | |
| replyMessageLine, | |
| showLoadingAnimation, | |
| getUserDisplayName, | |
| createQuickReplyItems, | |
| createTextMessageWithQuickReplies, | |
| pushTextMessageWithQuickReplies, | |
| pushMessageLine, | |
| pushMessagesLine, | |
| createFlexMessage, | |
| createImageMessage, | |
| createLocationMessage, | |
| } from "./send.js"; | |
| import { validateLineSignature } from "./signature.js"; | |
| import { buildTemplateMessageFromPayload } from "./template-messages.js"; | |
| export interface MonitorLineProviderOptions { | |
| channelAccessToken: string; | |
| channelSecret: string; | |
| accountId?: string; | |
| config: OpenClawConfig; | |
| runtime: RuntimeEnv; | |
| abortSignal?: AbortSignal; | |
| webhookUrl?: string; | |
| webhookPath?: string; | |
| } | |
| export interface LineProviderMonitor { | |
| account: ResolvedLineAccount; | |
| handleWebhook: (body: WebhookRequestBody) => Promise<void>; | |
| stop: () => void; | |
| } | |
| // Track runtime state in memory (simplified version) | |
| const runtimeState = new Map< | |
| string, | |
| { | |
| running: boolean; | |
| lastStartAt: number | null; | |
| lastStopAt: number | null; | |
| lastError: string | null; | |
| lastInboundAt?: number | null; | |
| lastOutboundAt?: number | null; | |
| } | |
| >(); | |
| function recordChannelRuntimeState(params: { | |
| channel: string; | |
| accountId: string; | |
| state: Partial<{ | |
| running: boolean; | |
| lastStartAt: number | null; | |
| lastStopAt: number | null; | |
| lastError: string | null; | |
| lastInboundAt: number | null; | |
| lastOutboundAt: number | null; | |
| }>; | |
| }): void { | |
| const key = `${params.channel}:${params.accountId}`; | |
| const existing = runtimeState.get(key) ?? { | |
| running: false, | |
| lastStartAt: null, | |
| lastStopAt: null, | |
| lastError: null, | |
| }; | |
| runtimeState.set(key, { ...existing, ...params.state }); | |
| } | |
| export function getLineRuntimeState(accountId: string) { | |
| return runtimeState.get(`line:${accountId}`); | |
| } | |
| async function readRequestBody(req: IncomingMessage): Promise<string> { | |
| return new Promise((resolve, reject) => { | |
| const chunks: Buffer[] = []; | |
| req.on("data", (chunk) => chunks.push(chunk)); | |
| req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8"))); | |
| req.on("error", reject); | |
| }); | |
| } | |
| function startLineLoadingKeepalive(params: { | |
| userId: string; | |
| accountId?: string; | |
| intervalMs?: number; | |
| loadingSeconds?: number; | |
| }): () => void { | |
| const intervalMs = params.intervalMs ?? 18_000; | |
| const loadingSeconds = params.loadingSeconds ?? 20; | |
| let stopped = false; | |
| const trigger = () => { | |
| if (stopped) { | |
| return; | |
| } | |
| void showLoadingAnimation(params.userId, { | |
| accountId: params.accountId, | |
| loadingSeconds, | |
| }).catch(() => {}); | |
| }; | |
| trigger(); | |
| const timer = setInterval(trigger, intervalMs); | |
| return () => { | |
| if (stopped) { | |
| return; | |
| } | |
| stopped = true; | |
| clearInterval(timer); | |
| }; | |
| } | |
| export async function monitorLineProvider( | |
| opts: MonitorLineProviderOptions, | |
| ): Promise<LineProviderMonitor> { | |
| const { | |
| channelAccessToken, | |
| channelSecret, | |
| accountId, | |
| config, | |
| runtime, | |
| abortSignal, | |
| webhookPath, | |
| } = opts; | |
| const resolvedAccountId = accountId ?? "default"; | |
| // Record starting state | |
| recordChannelRuntimeState({ | |
| channel: "line", | |
| accountId: resolvedAccountId, | |
| state: { | |
| running: true, | |
| lastStartAt: Date.now(), | |
| }, | |
| }); | |
| // Create the bot | |
| const bot = createLineBot({ | |
| channelAccessToken, | |
| channelSecret, | |
| accountId, | |
| runtime, | |
| config, | |
| onMessage: async (ctx) => { | |
| if (!ctx) { | |
| return; | |
| } | |
| const { ctxPayload, replyToken, route } = ctx; | |
| // Record inbound activity | |
| recordChannelRuntimeState({ | |
| channel: "line", | |
| accountId: resolvedAccountId, | |
| state: { | |
| lastInboundAt: Date.now(), | |
| }, | |
| }); | |
| const shouldShowLoading = Boolean(ctx.userId && !ctx.isGroup); | |
| // Fetch display name for logging (non-blocking) | |
| const displayNamePromise = ctx.userId | |
| ? getUserDisplayName(ctx.userId, { accountId: ctx.accountId }) | |
| : Promise.resolve(ctxPayload.From); | |
| // Show loading animation while processing (non-blocking, best-effort) | |
| const stopLoading = shouldShowLoading | |
| ? startLineLoadingKeepalive({ userId: ctx.userId!, accountId: ctx.accountId }) | |
| : null; | |
| const displayName = await displayNamePromise; | |
| logVerbose(`line: received message from ${displayName} (${ctxPayload.From})`); | |
| // Dispatch to auto-reply system for AI response | |
| try { | |
| const textLimit = 5000; // LINE max message length | |
| let replyTokenUsed = false; // Track if we've used the one-time reply token | |
| const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ | |
| ctx: ctxPayload, | |
| cfg: config, | |
| dispatcherOptions: { | |
| responsePrefix: resolveEffectiveMessagesConfig(config, route.agentId).responsePrefix, | |
| deliver: async (payload, _info) => { | |
| const lineData = (payload.channelData?.line as LineChannelData | undefined) ?? {}; | |
| // Show loading animation before each delivery (non-blocking) | |
| if (ctx.userId && !ctx.isGroup) { | |
| void showLoadingAnimation(ctx.userId, { accountId: ctx.accountId }).catch(() => {}); | |
| } | |
| const { replyTokenUsed: nextReplyTokenUsed } = await deliverLineAutoReply({ | |
| payload, | |
| lineData, | |
| to: ctxPayload.From, | |
| replyToken, | |
| replyTokenUsed, | |
| accountId: ctx.accountId, | |
| textLimit, | |
| deps: { | |
| buildTemplateMessageFromPayload, | |
| processLineMessage, | |
| chunkMarkdownText, | |
| sendLineReplyChunks, | |
| replyMessageLine, | |
| pushMessageLine, | |
| pushTextMessageWithQuickReplies, | |
| createQuickReplyItems, | |
| createTextMessageWithQuickReplies, | |
| pushMessagesLine, | |
| createFlexMessage, | |
| createImageMessage, | |
| createLocationMessage, | |
| onReplyError: (replyErr) => { | |
| logVerbose( | |
| `line: reply token failed, falling back to push: ${String(replyErr)}`, | |
| ); | |
| }, | |
| }, | |
| }); | |
| replyTokenUsed = nextReplyTokenUsed; | |
| recordChannelRuntimeState({ | |
| channel: "line", | |
| accountId: resolvedAccountId, | |
| state: { | |
| lastOutboundAt: Date.now(), | |
| }, | |
| }); | |
| }, | |
| onError: (err, info) => { | |
| runtime.error?.(danger(`line ${info.kind} reply failed: ${String(err)}`)); | |
| }, | |
| }, | |
| replyOptions: {}, | |
| }); | |
| if (!queuedFinal) { | |
| logVerbose(`line: no response generated for message from ${ctxPayload.From}`); | |
| } | |
| } catch (err) { | |
| runtime.error?.(danger(`line: auto-reply failed: ${String(err)}`)); | |
| // Send error message to user | |
| if (replyToken) { | |
| try { | |
| await replyMessageLine( | |
| replyToken, | |
| [{ type: "text", text: "Sorry, I encountered an error processing your message." }], | |
| { accountId: ctx.accountId }, | |
| ); | |
| } catch (replyErr) { | |
| runtime.error?.(danger(`line: error reply failed: ${String(replyErr)}`)); | |
| } | |
| } | |
| } finally { | |
| stopLoading?.(); | |
| } | |
| }, | |
| }); | |
| // Register HTTP webhook handler | |
| const normalizedPath = normalizePluginHttpPath(webhookPath, "/line/webhook") ?? "/line/webhook"; | |
| const unregisterHttp = registerPluginHttpRoute({ | |
| path: normalizedPath, | |
| pluginId: "line", | |
| accountId: resolvedAccountId, | |
| log: (msg) => logVerbose(msg), | |
| handler: async (req: IncomingMessage, res: ServerResponse) => { | |
| // Handle GET requests for webhook verification | |
| if (req.method === "GET") { | |
| res.statusCode = 200; | |
| res.setHeader("Content-Type", "text/plain"); | |
| res.end("OK"); | |
| return; | |
| } | |
| // Only accept POST requests | |
| if (req.method !== "POST") { | |
| res.statusCode = 405; | |
| res.setHeader("Allow", "GET, POST"); | |
| res.setHeader("Content-Type", "application/json"); | |
| res.end(JSON.stringify({ error: "Method Not Allowed" })); | |
| return; | |
| } | |
| try { | |
| const rawBody = await readRequestBody(req); | |
| const signature = req.headers["x-line-signature"]; | |
| // Validate signature | |
| if (!signature || typeof signature !== "string") { | |
| logVerbose("line: webhook missing X-Line-Signature header"); | |
| res.statusCode = 400; | |
| res.setHeader("Content-Type", "application/json"); | |
| res.end(JSON.stringify({ error: "Missing X-Line-Signature header" })); | |
| return; | |
| } | |
| if (!validateLineSignature(rawBody, signature, channelSecret)) { | |
| logVerbose("line: webhook signature validation failed"); | |
| res.statusCode = 401; | |
| res.setHeader("Content-Type", "application/json"); | |
| res.end(JSON.stringify({ error: "Invalid signature" })); | |
| return; | |
| } | |
| // Parse and process the webhook body | |
| const body = JSON.parse(rawBody) as WebhookRequestBody; | |
| // Respond immediately with 200 to avoid LINE timeout | |
| res.statusCode = 200; | |
| res.setHeader("Content-Type", "application/json"); | |
| res.end(JSON.stringify({ status: "ok" })); | |
| // Process events asynchronously | |
| if (body.events && body.events.length > 0) { | |
| logVerbose(`line: received ${body.events.length} webhook events`); | |
| await bot.handleWebhook(body).catch((err) => { | |
| runtime.error?.(danger(`line webhook handler failed: ${String(err)}`)); | |
| }); | |
| } | |
| } catch (err) { | |
| runtime.error?.(danger(`line webhook error: ${String(err)}`)); | |
| if (!res.headersSent) { | |
| res.statusCode = 500; | |
| res.setHeader("Content-Type", "application/json"); | |
| res.end(JSON.stringify({ error: "Internal server error" })); | |
| } | |
| } | |
| }, | |
| }); | |
| logVerbose(`line: registered webhook handler at ${normalizedPath}`); | |
| // Handle abort signal | |
| const stopHandler = () => { | |
| logVerbose(`line: stopping provider for account ${resolvedAccountId}`); | |
| unregisterHttp(); | |
| recordChannelRuntimeState({ | |
| channel: "line", | |
| accountId: resolvedAccountId, | |
| state: { | |
| running: false, | |
| lastStopAt: Date.now(), | |
| }, | |
| }); | |
| }; | |
| abortSignal?.addEventListener("abort", stopHandler); | |
| return { | |
| account: bot.account, | |
| handleWebhook: bot.handleWebhook, | |
| stop: () => { | |
| stopHandler(); | |
| abortSignal?.removeEventListener("abort", stopHandler); | |
| }, | |
| }; | |
| } | |