Spaces:
Configuration error
Configuration error
| import path from "node:path"; | |
| import { afterEach, describe, expect, it, vi } from "vitest"; | |
| import { pollUntil } from "../../test/helpers/poll.js"; | |
| import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; | |
| import { | |
| isEmbeddedPiRunActive, | |
| isEmbeddedPiRunStreaming, | |
| runEmbeddedPiAgent, | |
| } from "../agents/pi-embedded.js"; | |
| import { getReplyFromConfig } from "./reply.js"; | |
| vi.mock("../agents/pi-embedded.js", () => ({ | |
| abortEmbeddedPiRun: vi.fn().mockReturnValue(false), | |
| runEmbeddedPiAgent: vi.fn(), | |
| queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), | |
| resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, | |
| isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), | |
| isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), | |
| })); | |
| function makeResult(text: string) { | |
| return { | |
| payloads: [{ text }], | |
| meta: { | |
| durationMs: 5, | |
| agentMeta: { sessionId: "s", provider: "p", model: "m" }, | |
| }, | |
| }; | |
| } | |
| async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> { | |
| return withTempHomeBase( | |
| async (home) => { | |
| vi.mocked(runEmbeddedPiAgent).mockReset(); | |
| return await fn(home); | |
| }, | |
| { prefix: "moltbot-queue-" }, | |
| ); | |
| } | |
| function makeCfg(home: string, queue?: Record<string, unknown>) { | |
| return { | |
| agents: { | |
| defaults: { | |
| model: "anthropic/claude-opus-4-5", | |
| workspace: path.join(home, "clawd"), | |
| }, | |
| }, | |
| channels: { whatsapp: { allowFrom: ["*"] } }, | |
| session: { store: path.join(home, "sessions.json") }, | |
| messages: queue ? { queue } : undefined, | |
| }; | |
| } | |
| describe("queue followups", () => { | |
| afterEach(() => { | |
| vi.useRealTimers(); | |
| }); | |
| it("collects queued messages and drains after run completes", async () => { | |
| vi.useFakeTimers(); | |
| await withTempHome(async (home) => { | |
| const prompts: string[] = []; | |
| vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { | |
| prompts.push(params.prompt); | |
| if (params.prompt.includes("[Queued messages while agent was busy]")) { | |
| return makeResult("followup"); | |
| } | |
| return makeResult("main"); | |
| }); | |
| vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); | |
| vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(true); | |
| const cfg = makeCfg(home, { | |
| mode: "collect", | |
| debounceMs: 200, | |
| cap: 10, | |
| drop: "summarize", | |
| }); | |
| const first = await getReplyFromConfig( | |
| { Body: "first", From: "+1001", To: "+2000", MessageSid: "m-1" }, | |
| {}, | |
| cfg, | |
| ); | |
| expect(first).toBeUndefined(); | |
| expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); | |
| vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); | |
| vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); | |
| const second = await getReplyFromConfig( | |
| { Body: "second", From: "+1001", To: "+2000" }, | |
| {}, | |
| cfg, | |
| ); | |
| const secondText = Array.isArray(second) ? second[0]?.text : second?.text; | |
| expect(secondText).toBe("main"); | |
| await vi.advanceTimersByTimeAsync(500); | |
| await Promise.resolve(); | |
| expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); | |
| const queuedPrompt = prompts.find((p) => | |
| p.includes("[Queued messages while agent was busy]"), | |
| ); | |
| expect(queuedPrompt).toBeTruthy(); | |
| expect(queuedPrompt).toContain("[message_id: m-1]"); | |
| }); | |
| }); | |
| it("summarizes dropped followups when cap is exceeded", async () => { | |
| await withTempHome(async (home) => { | |
| const prompts: string[] = []; | |
| vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { | |
| prompts.push(params.prompt); | |
| return makeResult("ok"); | |
| }); | |
| vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); | |
| vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); | |
| const cfg = makeCfg(home, { | |
| mode: "followup", | |
| debounceMs: 0, | |
| cap: 1, | |
| drop: "summarize", | |
| }); | |
| await getReplyFromConfig({ Body: "one", From: "+1002", To: "+2000" }, {}, cfg); | |
| await getReplyFromConfig({ Body: "two", From: "+1002", To: "+2000" }, {}, cfg); | |
| vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); | |
| await getReplyFromConfig({ Body: "three", From: "+1002", To: "+2000" }, {}, cfg); | |
| await pollUntil( | |
| async () => (prompts.some((p) => p.includes("[Queue overflow]")) ? true : null), | |
| { timeoutMs: 2000 }, | |
| ); | |
| expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true); | |
| }); | |
| }); | |
| }); | |