| import fs from "node:fs/promises"; |
| import os from "node:os"; |
| import path from "node:path"; |
| import { afterEach, describe, expect, it, vi } from "vitest"; |
| import type { SubagentRunRecord } from "../../agents/subagent-registry.js"; |
| import type { OpenClawConfig } from "../../config/config.js"; |
| import { |
| getAbortMemory, |
| getAbortMemorySizeForTest, |
| isAbortRequestText, |
| isAbortTrigger, |
| resetAbortMemoryForTest, |
| resolveAbortCutoffFromContext, |
| resolveSessionEntryForKey, |
| setAbortMemory, |
| shouldSkipMessageByAbortCutoff, |
| tryFastAbortFromMessage, |
| } from "./abort.js"; |
| import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js"; |
| import { initSessionState } from "./session.js"; |
| import { buildTestCtx } from "./test-ctx.js"; |
|
|
| vi.mock("../../agents/pi-embedded.js", () => ({ |
| abortEmbeddedPiRun: vi.fn().mockReturnValue(true), |
| resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, |
| })); |
|
|
| const commandQueueMocks = vi.hoisted(() => ({ |
| clearCommandLane: vi.fn(), |
| })); |
|
|
| vi.mock("../../process/command-queue.js", () => commandQueueMocks); |
|
|
| const subagentRegistryMocks = vi.hoisted(() => ({ |
| listSubagentRunsForRequester: vi.fn<(requesterSessionKey: string) => SubagentRunRecord[]>( |
| () => [], |
| ), |
| markSubagentRunTerminated: vi.fn(() => 1), |
| })); |
|
|
| vi.mock("../../agents/subagent-registry.js", () => ({ |
| listSubagentRunsForRequester: subagentRegistryMocks.listSubagentRunsForRequester, |
| markSubagentRunTerminated: subagentRegistryMocks.markSubagentRunTerminated, |
| })); |
|
|
| const acpManagerMocks = vi.hoisted(() => ({ |
| resolveSession: vi.fn< |
| () => |
| | { kind: "none" } |
| | { |
| kind: "ready"; |
| sessionKey: string; |
| meta: unknown; |
| } |
| >(() => ({ kind: "none" })), |
| cancelSession: vi.fn(async () => {}), |
| })); |
|
|
| vi.mock("../../acp/control-plane/manager.js", () => ({ |
| getAcpSessionManager: () => ({ |
| resolveSession: acpManagerMocks.resolveSession, |
| cancelSession: acpManagerMocks.cancelSession, |
| }), |
| })); |
|
|
| describe("abort detection", () => { |
| async function writeSessionStore( |
| storePath: string, |
| sessionIdsByKey: Record<string, string>, |
| nowMs = Date.now(), |
| ) { |
| const storeEntries = Object.fromEntries( |
| Object.entries(sessionIdsByKey).map(([key, sessionId]) => [ |
| key, |
| { sessionId, updatedAt: nowMs }, |
| ]), |
| ); |
| await fs.writeFile(storePath, JSON.stringify(storeEntries, null, 2)); |
| } |
|
|
| async function createAbortConfig(params?: { |
| commandsTextEnabled?: boolean; |
| sessionIdsByKey?: Record<string, string>; |
| nowMs?: number; |
| }) { |
| const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-abort-")); |
| const storePath = path.join(root, "sessions.json"); |
| const cfg = { |
| session: { store: storePath }, |
| ...(typeof params?.commandsTextEnabled === "boolean" |
| ? { commands: { text: params.commandsTextEnabled } } |
| : {}), |
| } as OpenClawConfig; |
| if (params?.sessionIdsByKey) { |
| await writeSessionStore(storePath, params.sessionIdsByKey, params.nowMs); |
| } |
| return { root, storePath, cfg }; |
| } |
|
|
| async function runStopCommand(params: { |
| cfg: OpenClawConfig; |
| sessionKey: string; |
| from: string; |
| to: string; |
| targetSessionKey?: string; |
| messageSid?: string; |
| timestamp?: number; |
| }) { |
| return tryFastAbortFromMessage({ |
| ctx: buildTestCtx({ |
| CommandBody: "/stop", |
| RawBody: "/stop", |
| CommandAuthorized: true, |
| SessionKey: params.sessionKey, |
| Provider: "telegram", |
| Surface: "telegram", |
| From: params.from, |
| To: params.to, |
| ...(params.targetSessionKey ? { CommandTargetSessionKey: params.targetSessionKey } : {}), |
| ...(params.messageSid ? { MessageSid: params.messageSid } : {}), |
| ...(typeof params.timestamp === "number" ? { Timestamp: params.timestamp } : {}), |
| }), |
| cfg: params.cfg, |
| }); |
| } |
|
|
| function enqueueQueuedFollowupRun(params: { |
| root: string; |
| cfg: OpenClawConfig; |
| sessionId: string; |
| sessionKey: string; |
| }) { |
| const followupRun: FollowupRun = { |
| prompt: "queued", |
| enqueuedAt: Date.now(), |
| run: { |
| agentId: "main", |
| agentDir: path.join(params.root, "agent"), |
| sessionId: params.sessionId, |
| sessionKey: params.sessionKey, |
| messageProvider: "telegram", |
| agentAccountId: "acct", |
| sessionFile: path.join(params.root, "session.jsonl"), |
| workspaceDir: path.join(params.root, "workspace"), |
| config: params.cfg, |
| provider: "anthropic", |
| model: "claude-opus-4-5", |
| timeoutMs: 1000, |
| blockReplyBreak: "text_end", |
| }, |
| }; |
| enqueueFollowupRun( |
| params.sessionKey, |
| followupRun, |
| { mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" }, |
| "none", |
| ); |
| } |
|
|
| function expectSessionLaneCleared(sessionKey: string) { |
| expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); |
| } |
|
|
| afterEach(() => { |
| resetAbortMemoryForTest(); |
| acpManagerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" }); |
| acpManagerMocks.cancelSession.mockReset().mockResolvedValue(undefined); |
| }); |
|
|
| it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => { |
| const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-abort-")); |
| const storePath = path.join(root, "sessions.json"); |
| const cfg = { session: { store: storePath } } as OpenClawConfig; |
|
|
| const groupMessageCtx = { |
| Body: `[Context]\nJake: /stop\n[from: Jake]`, |
| RawBody: "/stop", |
| ChatType: "group", |
| SessionKey: "agent:main:whatsapp:group:g1", |
| }; |
|
|
| const result = await initSessionState({ |
| ctx: groupMessageCtx, |
| cfg, |
| commandAuthorized: true, |
| }); |
|
|
| |
| expect(result.triggerBodyNormalized).toBe("/stop"); |
| }); |
|
|
| it("isAbortTrigger matches standalone abort trigger phrases", () => { |
| const positives = [ |
| "stop", |
| "esc", |
| "abort", |
| "wait", |
| "exit", |
| "interrupt", |
| "stop openclaw", |
| "openclaw stop", |
| "stop action", |
| "stop current action", |
| "stop run", |
| "stop current run", |
| "stop agent", |
| "stop the agent", |
| "stop don't do anything", |
| "stop dont do anything", |
| "stop do not do anything", |
| "stop doing anything", |
| "do not do that", |
| "please stop", |
| "stop please", |
| "STOP OPENCLAW", |
| "stop openclaw!!!", |
| "stop don’t do anything", |
| "detente", |
| "detén", |
| "arrête", |
| "停止", |
| "やめて", |
| "止めて", |
| "रुको", |
| "توقف", |
| "стоп", |
| "остановись", |
| "останови", |
| "остановить", |
| "прекрати", |
| "halt", |
| "anhalten", |
| "aufhören", |
| "hoer auf", |
| "stopp", |
| "pare", |
| ]; |
| for (const candidate of positives) { |
| expect(isAbortTrigger(candidate)).toBe(true); |
| } |
|
|
| expect(isAbortTrigger("hello")).toBe(false); |
| expect(isAbortTrigger("please do not do that")).toBe(false); |
| |
| expect(isAbortTrigger("/stop")).toBe(false); |
| }); |
|
|
| it("isAbortRequestText aligns abort command semantics", () => { |
| expect(isAbortRequestText("/stop")).toBe(true); |
| expect(isAbortRequestText("/STOP")).toBe(true); |
| expect(isAbortRequestText("/stop!!!")).toBe(true); |
| expect(isAbortRequestText("/Stop!!!")).toBe(true); |
| expect(isAbortRequestText("stop")).toBe(true); |
| expect(isAbortRequestText("Stop")).toBe(true); |
| expect(isAbortRequestText("STOP")).toBe(true); |
| expect(isAbortRequestText("stop action")).toBe(true); |
| expect(isAbortRequestText("stop openclaw!!!")).toBe(true); |
| expect(isAbortRequestText("やめて")).toBe(true); |
| expect(isAbortRequestText("остановись")).toBe(true); |
| expect(isAbortRequestText("halt")).toBe(true); |
| expect(isAbortRequestText("stopp")).toBe(true); |
| expect(isAbortRequestText("pare")).toBe(true); |
| expect(isAbortRequestText(" توقف ")).toBe(true); |
| expect(isAbortRequestText("/stop@openclaw_bot", { botUsername: "openclaw_bot" })).toBe(true); |
| expect(isAbortRequestText("/Stop@openclaw_bot", { botUsername: "openclaw_bot" })).toBe(true); |
|
|
| expect(isAbortRequestText("/status")).toBe(false); |
| expect(isAbortRequestText("do not do that")).toBe(true); |
| expect(isAbortRequestText("please do not do that")).toBe(false); |
| expect(isAbortRequestText("/abort")).toBe(false); |
| }); |
|
|
| it("removes abort memory entry when flag is reset", () => { |
| setAbortMemory("session-1", true); |
| expect(getAbortMemory("session-1")).toBe(true); |
|
|
| setAbortMemory("session-1", false); |
| expect(getAbortMemory("session-1")).toBeUndefined(); |
| expect(getAbortMemorySizeForTest()).toBe(0); |
| }); |
|
|
| it("caps abort memory tracking to a bounded max size", () => { |
| for (let i = 0; i < 2105; i += 1) { |
| setAbortMemory(`session-${i}`, true); |
| } |
| expect(getAbortMemorySizeForTest()).toBe(2000); |
| expect(getAbortMemory("session-0")).toBeUndefined(); |
| expect(getAbortMemory("session-2104")).toBe(true); |
| }); |
|
|
| it("extracts abort cutoff metadata from context", () => { |
| expect( |
| resolveAbortCutoffFromContext( |
| buildTestCtx({ |
| MessageSid: "42", |
| Timestamp: 123, |
| }), |
| ), |
| ).toEqual({ |
| messageSid: "42", |
| timestamp: 123, |
| }); |
| }); |
|
|
| it("treats numeric message IDs at or before cutoff as stale", () => { |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffMessageSid: "200", |
| messageSid: "199", |
| }), |
| ).toBe(true); |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffMessageSid: "200", |
| messageSid: "200", |
| }), |
| ).toBe(true); |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffMessageSid: "200", |
| messageSid: "201", |
| }), |
| ).toBe(false); |
| }); |
|
|
| it("falls back to timestamp cutoff when message IDs are unavailable", () => { |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffTimestamp: 2000, |
| timestamp: 1999, |
| }), |
| ).toBe(true); |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffTimestamp: 2000, |
| timestamp: 2000, |
| }), |
| ).toBe(true); |
| expect( |
| shouldSkipMessageByAbortCutoff({ |
| cutoffTimestamp: 2000, |
| timestamp: 2001, |
| }), |
| ).toBe(false); |
| }); |
|
|
| it("resolves session entry when key exists in store", () => { |
| const store = { |
| "session-1": { sessionId: "abc", updatedAt: 0 }, |
| } as const; |
| expect(resolveSessionEntryForKey(store, "session-1")).toEqual({ |
| entry: store["session-1"], |
| key: "session-1", |
| }); |
| expect(resolveSessionEntryForKey(store, "session-2")).toEqual({}); |
| expect(resolveSessionEntryForKey(undefined, "session-1")).toEqual({}); |
| }); |
|
|
| it("resolves Telegram forum topic session when lookup key has different casing than store", () => { |
| |
| const storeKey = "agent:main:telegram:group:-1001234567890:topic:99"; |
| const lookupKey = "Agent:Main:Telegram:Group:-1001234567890:Topic:99"; |
| const store = { |
| [storeKey]: { sessionId: "pi-topic-99", updatedAt: 0 }, |
| } as Record<string, { sessionId: string; updatedAt: number }>; |
| |
| expect(store[lookupKey]).toBeUndefined(); |
| const result = resolveSessionEntryForKey(store, lookupKey); |
| expect(result.entry?.sessionId).toBe("pi-topic-99"); |
| expect(result.key).toBe(storeKey); |
| }); |
|
|
| it("fast-aborts even when text commands are disabled", async () => { |
| const { cfg } = await createAbortConfig({ commandsTextEnabled: false }); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey: "telegram:123", |
| from: "telegram:123", |
| to: "telegram:123", |
| }); |
|
|
| expect(result.handled).toBe(true); |
| }); |
|
|
| it("fast-abort clears queued followups and session lane", async () => { |
| const sessionKey = "telegram:123"; |
| const sessionId = "session-123"; |
| const { root, cfg } = await createAbortConfig({ |
| sessionIdsByKey: { [sessionKey]: sessionId }, |
| }); |
| enqueueQueuedFollowupRun({ root, cfg, sessionId, sessionKey }); |
| expect(getFollowupQueueDepth(sessionKey)).toBe(1); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:123", |
| to: "telegram:123", |
| }); |
|
|
| expect(result.handled).toBe(true); |
| expect(getFollowupQueueDepth(sessionKey)).toBe(0); |
| expectSessionLaneCleared(sessionKey); |
| }); |
|
|
| it("plain-language stop on ACP-bound session triggers ACP cancel", async () => { |
| const sessionKey = "agent:codex:acp:test-1"; |
| const sessionId = "session-123"; |
| const { cfg } = await createAbortConfig({ |
| sessionIdsByKey: { [sessionKey]: sessionId }, |
| }); |
| acpManagerMocks.resolveSession.mockReturnValue({ |
| kind: "ready", |
| sessionKey, |
| meta: {} as never, |
| }); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:123", |
| to: "telegram:123", |
| targetSessionKey: sessionKey, |
| }); |
|
|
| expect(result.handled).toBe(true); |
| expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({ |
| cfg, |
| sessionKey, |
| reason: "fast-abort", |
| }); |
| }); |
|
|
| it("ACP cancel failures do not skip queue and lane cleanup", async () => { |
| const sessionKey = "agent:codex:acp:test-2"; |
| const sessionId = "session-456"; |
| const { root, cfg } = await createAbortConfig({ |
| sessionIdsByKey: { [sessionKey]: sessionId }, |
| }); |
| enqueueQueuedFollowupRun({ root, cfg, sessionId, sessionKey }); |
| acpManagerMocks.resolveSession.mockReturnValue({ |
| kind: "ready", |
| sessionKey, |
| meta: {} as never, |
| }); |
| acpManagerMocks.cancelSession.mockRejectedValueOnce(new Error("cancel failed")); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:123", |
| to: "telegram:123", |
| targetSessionKey: sessionKey, |
| }); |
|
|
| expect(result.handled).toBe(true); |
| expect(getFollowupQueueDepth(sessionKey)).toBe(0); |
| expectSessionLaneCleared(sessionKey); |
| }); |
|
|
| it("persists abort cutoff metadata on /stop when command and target session match", async () => { |
| const sessionKey = "telegram:123"; |
| const sessionId = "session-123"; |
| const { storePath, cfg } = await createAbortConfig({ |
| sessionIdsByKey: { [sessionKey]: sessionId }, |
| }); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:123", |
| to: "telegram:123", |
| messageSid: "55", |
| timestamp: 1234567890000, |
| }); |
|
|
| expect(result.handled).toBe(true); |
| const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>; |
| const entry = store[sessionKey] as { |
| abortedLastRun?: boolean; |
| abortCutoffMessageSid?: string; |
| abortCutoffTimestamp?: number; |
| }; |
| expect(entry.abortedLastRun).toBe(true); |
| expect(entry.abortCutoffMessageSid).toBe("55"); |
| expect(entry.abortCutoffTimestamp).toBe(1234567890000); |
| }); |
|
|
| it("does not persist cutoff metadata when native /stop targets a different session", async () => { |
| const slashSessionKey = "telegram:slash:123"; |
| const targetSessionKey = "agent:main:telegram:group:123"; |
| const targetSessionId = "session-target"; |
| const { storePath, cfg } = await createAbortConfig({ |
| sessionIdsByKey: { [targetSessionKey]: targetSessionId }, |
| }); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey: slashSessionKey, |
| from: "telegram:123", |
| to: "telegram:123", |
| targetSessionKey, |
| messageSid: "999", |
| timestamp: 1234567890000, |
| }); |
|
|
| expect(result.handled).toBe(true); |
| const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>; |
| const entry = store[targetSessionKey] as { |
| abortedLastRun?: boolean; |
| abortCutoffMessageSid?: string; |
| abortCutoffTimestamp?: number; |
| }; |
| expect(entry.abortedLastRun).toBe(true); |
| expect(entry.abortCutoffMessageSid).toBeUndefined(); |
| expect(entry.abortCutoffTimestamp).toBeUndefined(); |
| }); |
|
|
| it("fast-abort stops active subagent runs for requester session", async () => { |
| const sessionKey = "telegram:parent"; |
| const childKey = "agent:main:subagent:child-1"; |
| const sessionId = "session-parent"; |
| const childSessionId = "session-child"; |
| const { cfg } = await createAbortConfig({ |
| sessionIdsByKey: { |
| [sessionKey]: sessionId, |
| [childKey]: childSessionId, |
| }, |
| }); |
|
|
| subagentRegistryMocks.listSubagentRunsForRequester.mockReturnValueOnce([ |
| { |
| runId: "run-1", |
| childSessionKey: childKey, |
| requesterSessionKey: sessionKey, |
| requesterDisplayKey: "telegram:parent", |
| task: "do work", |
| cleanup: "keep", |
| createdAt: Date.now(), |
| }, |
| ]); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:parent", |
| to: "telegram:parent", |
| }); |
|
|
| expect(result.stoppedSubagents).toBe(1); |
| expectSessionLaneCleared(childKey); |
| }); |
|
|
| it("cascade stop kills depth-2 children when stopping depth-1 agent", async () => { |
| const sessionKey = "telegram:parent"; |
| const depth1Key = "agent:main:subagent:child-1"; |
| const depth2Key = "agent:main:subagent:child-1:subagent:grandchild-1"; |
| const sessionId = "session-parent"; |
| const depth1SessionId = "session-child"; |
| const depth2SessionId = "session-grandchild"; |
| const { cfg } = await createAbortConfig({ |
| sessionIdsByKey: { |
| [sessionKey]: sessionId, |
| [depth1Key]: depth1SessionId, |
| [depth2Key]: depth2SessionId, |
| }, |
| }); |
|
|
| |
| |
| |
| subagentRegistryMocks.listSubagentRunsForRequester |
| .mockReturnValueOnce([ |
| { |
| runId: "run-1", |
| childSessionKey: depth1Key, |
| requesterSessionKey: sessionKey, |
| requesterDisplayKey: "telegram:parent", |
| task: "orchestrator", |
| cleanup: "keep", |
| createdAt: Date.now(), |
| }, |
| ]) |
| .mockReturnValueOnce([ |
| { |
| runId: "run-2", |
| childSessionKey: depth2Key, |
| requesterSessionKey: depth1Key, |
| requesterDisplayKey: depth1Key, |
| task: "leaf worker", |
| cleanup: "keep", |
| createdAt: Date.now(), |
| }, |
| ]) |
| .mockReturnValueOnce([]); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:parent", |
| to: "telegram:parent", |
| }); |
|
|
| |
| expect(result.stoppedSubagents).toBe(2); |
| expectSessionLaneCleared(depth1Key); |
| expectSessionLaneCleared(depth2Key); |
| }); |
|
|
| it("cascade stop traverses ended depth-1 parents to stop active depth-2 children", async () => { |
| subagentRegistryMocks.listSubagentRunsForRequester.mockClear(); |
| subagentRegistryMocks.markSubagentRunTerminated.mockClear(); |
| const sessionKey = "telegram:parent"; |
| const depth1Key = "agent:main:subagent:child-ended"; |
| const depth2Key = "agent:main:subagent:child-ended:subagent:grandchild-active"; |
| const now = Date.now(); |
| const { cfg } = await createAbortConfig({ |
| nowMs: now, |
| sessionIdsByKey: { |
| [sessionKey]: "session-parent", |
| [depth1Key]: "session-child-ended", |
| [depth2Key]: "session-grandchild-active", |
| }, |
| }); |
|
|
| |
| |
| |
| subagentRegistryMocks.listSubagentRunsForRequester |
| .mockReturnValueOnce([ |
| { |
| runId: "run-1", |
| childSessionKey: depth1Key, |
| requesterSessionKey: sessionKey, |
| requesterDisplayKey: "telegram:parent", |
| task: "orchestrator", |
| cleanup: "keep", |
| createdAt: now - 1_000, |
| endedAt: now - 500, |
| outcome: { status: "ok" }, |
| }, |
| ]) |
| .mockReturnValueOnce([ |
| { |
| runId: "run-2", |
| childSessionKey: depth2Key, |
| requesterSessionKey: depth1Key, |
| requesterDisplayKey: depth1Key, |
| task: "leaf worker", |
| cleanup: "keep", |
| createdAt: now - 500, |
| }, |
| ]) |
| .mockReturnValueOnce([]); |
|
|
| const result = await runStopCommand({ |
| cfg, |
| sessionKey, |
| from: "telegram:parent", |
| to: "telegram:parent", |
| }); |
|
|
| |
| expect(result.stoppedSubagents).toBe(1); |
| expectSessionLaneCleared(depth2Key); |
| expect(subagentRegistryMocks.markSubagentRunTerminated).toHaveBeenCalledWith( |
| expect.objectContaining({ runId: "run-2", childSessionKey: depth2Key }), |
| ); |
| }); |
| }); |
|
|