import fs from "node:fs/promises"; import os from "node:os"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { createAgentSession, DefaultResourceLoader, estimateTokens, SessionManager, } from "@mariozechner/pi-coding-agent"; import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js"; import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js"; import { resolveChannelCapabilities } from "../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../config/config.js"; import { ensureContextEnginesInitialized, resolveContextEngine, } from "../../context-engine/index.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getMemorySearchManager } from "../../memory/index.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { resolveSignalReactionLevel } from "../../signal/reaction-level.js"; import { resolveTelegramInlineButtonsScope } from "../../telegram/inline-buttons.js"; import { resolveTelegramReactionLevel } from "../../telegram/reaction-level.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; import { resolveUserPath } from "../../utils.js"; import { normalizeMessageChannel } from "../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { resolveOpenClawAgentDir } from "../agent-paths.js"; import { resolveSessionAgentId, resolveSessionAgentIds } from "../agent-scope.js"; import type { ExecElevatedDefaults } from "../bash-tools.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js"; import { resolveContextWindowInfo } from "../context-window-guard.js"; import { ensureCustomApiRegistered } from "../custom-api-registry.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; import { resolveMemorySearchConfig } from "../memory-search.js"; import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js"; import { supportsModelTools } from "../model-tool-support.js"; import { ensureOpenClawModelsJson } from "../models-config.js"; import { createConfiguredOllamaStreamFn } from "../ollama-stream.js"; import { resolveOwnerDisplaySetting } from "../owner-display.js"; import { ensureSessionHeader, validateAnthropicTurns, validateGeminiTurns, } from "../pi-embedded-helpers.js"; import { createPreparedEmbeddedPiSettingsManager } from "../pi-project-settings.js"; import { createOpenClawCodingTools } from "../pi-tools.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; import { resolveSandboxContext } from "../sandbox.js"; import { repairSessionFileIfNeeded } from "../session-file-repair.js"; import { guardSessionManager } from "../session-tool-result-guard-wrapper.js"; import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js"; import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, } from "../session-write-lock.js"; import { detectRuntimeShell } from "../shell-utils.js"; import { applySkillEnvOverrides, applySkillEnvOverridesFromSnapshot, resolveSkillsPromptForRun, type SkillSnapshot, } from "../skills.js"; import { resolveTranscriptPolicy } from "../transcript-policy.js"; import { compactWithSafetyTimeout, EMBEDDED_COMPACTION_TIMEOUT_MS, } from "./compaction-safety-timeout.js"; import { buildEmbeddedExtensionFactories } from "./extensions.js"; import { logToolSchemasForGoogle, sanitizeSessionHistory, sanitizeToolsForGoogle, } from "./google.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { buildModelAliasLines, resolveModel } from "./model.js"; import { buildEmbeddedSandboxInfo } from "./sandbox-info.js"; import { prewarmSessionFile, trackSessionManagerAccess } from "./session-manager-cache.js"; import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js"; import { applySystemPromptOverrideToSession, buildEmbeddedSystemPrompt, createSystemPromptOverride, } from "./system-prompt.js"; import { collectAllowedToolNames } from "./tool-name-allowlist.js"; import { splitSdkTools } from "./tool-split.js"; import type { EmbeddedPiCompactResult } from "./types.js"; import { describeUnknownError, mapThinkingLevel } from "./utils.js"; import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; export type CompactEmbeddedPiSessionParams = { sessionId: string; runId?: string; sessionKey?: string; messageChannel?: string; messageProvider?: string; agentAccountId?: string; authProfileId?: string; /** Group id for channel-level tool policy resolution. */ groupId?: string | null; /** Group channel label (e.g. #general) for channel-level tool policy resolution. */ groupChannel?: string | null; /** Group space label (e.g. guild/team id) for channel-level tool policy resolution. */ groupSpace?: string | null; /** Parent session key for subagent policy inheritance. */ spawnedBy?: string | null; /** Whether the sender is an owner (required for owner-only tools). */ senderIsOwner?: boolean; sessionFile: string; /** Optional caller-observed live prompt tokens used for compaction diagnostics. */ currentTokenCount?: number; workspaceDir: string; agentDir?: string; config?: OpenClawConfig; skillsSnapshot?: SkillSnapshot; provider?: string; model?: string; thinkLevel?: ThinkLevel; reasoningLevel?: ReasoningLevel; bashElevated?: ExecElevatedDefaults; customInstructions?: string; tokenBudget?: number; force?: boolean; trigger?: "overflow" | "manual"; diagId?: string; attempt?: number; maxAttempts?: number; lane?: string; enqueue?: typeof enqueueCommand; extraSystemPrompt?: string; ownerNumbers?: string[]; }; type CompactionMessageMetrics = { messages: number; historyTextChars: number; toolResultChars: number; estTokens?: number; contributors: Array<{ role: string; chars: number; tool?: string }>; }; function hasRealConversationContent(msg: AgentMessage): boolean { return msg.role === "user" || msg.role === "assistant" || msg.role === "toolResult"; } function createCompactionDiagId(): string { return `cmp-${Date.now().toString(36)}-${generateSecureToken(4)}`; } function normalizeObservedTokenCount(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? Math.floor(value) : undefined; } function getMessageTextChars(msg: AgentMessage): number { const content = (msg as { content?: unknown }).content; if (typeof content === "string") { return content.length; } if (!Array.isArray(content)) { return 0; } let total = 0; for (const block of content) { if (!block || typeof block !== "object") { continue; } const text = (block as { text?: unknown }).text; if (typeof text === "string") { total += text.length; } } return total; } function resolveMessageToolLabel(msg: AgentMessage): string | undefined { const candidate = (msg as { toolName?: unknown }).toolName ?? (msg as { name?: unknown }).name ?? (msg as { tool?: unknown }).tool; return typeof candidate === "string" && candidate.trim().length > 0 ? candidate : undefined; } function summarizeCompactionMessages(messages: AgentMessage[]): CompactionMessageMetrics { let historyTextChars = 0; let toolResultChars = 0; const contributors: Array<{ role: string; chars: number; tool?: string }> = []; let estTokens = 0; let tokenEstimationFailed = false; for (const msg of messages) { const role = typeof msg.role === "string" ? msg.role : "unknown"; const chars = getMessageTextChars(msg); historyTextChars += chars; if (role === "toolResult") { toolResultChars += chars; } contributors.push({ role, chars, tool: resolveMessageToolLabel(msg) }); if (!tokenEstimationFailed) { try { estTokens += estimateTokens(msg); } catch { tokenEstimationFailed = true; } } } return { messages: messages.length, historyTextChars, toolResultChars, estTokens: tokenEstimationFailed ? undefined : estTokens, contributors: contributors.toSorted((a, b) => b.chars - a.chars).slice(0, 3), }; } function classifyCompactionReason(reason?: string): string { const text = (reason ?? "").trim().toLowerCase(); if (!text) { return "unknown"; } if (text.includes("nothing to compact")) { return "no_compactable_entries"; } if (text.includes("below threshold")) { return "below_threshold"; } if (text.includes("already compacted")) { return "already_compacted_recently"; } if (text.includes("still exceeds target")) { return "live_context_still_exceeds_target"; } if (text.includes("guard")) { return "guard_blocked"; } if (text.includes("summary")) { return "summary_failed"; } if (text.includes("timed out") || text.includes("timeout")) { return "timeout"; } if ( text.includes("400") || text.includes("401") || text.includes("403") || text.includes("429") ) { return "provider_error_4xx"; } if ( text.includes("500") || text.includes("502") || text.includes("503") || text.includes("504") ) { return "provider_error_5xx"; } return "unknown"; } function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" { const mode = config?.agents?.defaults?.compaction?.postIndexSync; if (mode === "off" || mode === "async" || mode === "await") { return mode; } return "async"; } async function runPostCompactionSessionMemorySync(params: { config?: OpenClawConfig; sessionKey?: string; sessionFile: string; }): Promise { if (!params.config) { return; } try { const sessionFile = params.sessionFile.trim(); if (!sessionFile) { return; } const agentId = resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.config, }); const resolvedMemory = resolveMemorySearchConfig(params.config, agentId); if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) { return; } if (!resolvedMemory.sync.sessions.postCompactionForce) { return; } const { manager } = await getMemorySearchManager({ cfg: params.config, agentId, }); if (!manager?.sync) { return; } const syncTask = manager.sync({ reason: "post-compaction", sessionFiles: [sessionFile], }); await syncTask; } catch (err) { log.warn(`memory sync skipped (post-compaction): ${String(err)}`); } } function syncPostCompactionSessionMemory(params: { config?: OpenClawConfig; sessionKey?: string; sessionFile: string; mode: "off" | "async" | "await"; }): Promise { if (params.mode === "off" || !params.config) { return Promise.resolve(); } const syncTask = runPostCompactionSessionMemorySync({ config: params.config, sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); if (params.mode === "await") { return syncTask; } void syncTask; return Promise.resolve(); } async function runPostCompactionSideEffects(params: { config?: OpenClawConfig; sessionKey?: string; sessionFile: string; }): Promise { const sessionFile = params.sessionFile.trim(); if (!sessionFile) { return; } emitSessionTranscriptUpdate(sessionFile); await syncPostCompactionSessionMemory({ config: params.config, sessionKey: params.sessionKey, sessionFile, mode: resolvePostCompactionIndexSyncMode(params.config), }); } /** * Core compaction logic without lane queueing. * Use this when already inside a session/global lane to avoid deadlocks. */ export async function compactEmbeddedPiSessionDirect( params: CompactEmbeddedPiSessionParams, ): Promise { const startedAt = Date.now(); const diagId = params.diagId?.trim() || createCompactionDiagId(); const trigger = params.trigger ?? "manual"; const attempt = params.attempt ?? 1; const maxAttempts = params.maxAttempts ?? 1; const runId = params.runId ?? params.sessionId; const resolvedWorkspace = resolveUserPath(params.workspaceDir); ensureRuntimePluginsLoaded({ config: params.config, workspaceDir: resolvedWorkspace, }); const prevCwd = process.cwd(); // Resolve compaction model: prefer config override, then fall back to caller-supplied model const compactionModelOverride = params.config?.agents?.defaults?.compaction?.model?.trim(); let provider: string; let modelId: string; // When switching provider via override, drop the primary auth profile to avoid // sending the wrong credentials (e.g. OpenAI profile token to OpenRouter). let authProfileId: string | undefined = params.authProfileId; if (compactionModelOverride) { const slashIdx = compactionModelOverride.indexOf("/"); if (slashIdx > 0) { provider = compactionModelOverride.slice(0, slashIdx).trim(); modelId = compactionModelOverride.slice(slashIdx + 1).trim() || DEFAULT_MODEL; // Provider changed — drop primary auth profile so getApiKeyForModel // falls back to provider-based key resolution for the override model. if (provider !== (params.provider ?? "").trim()) { authProfileId = undefined; } } else { provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; modelId = compactionModelOverride; } } else { provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; } const fail = (reason: string): EmbeddedPiCompactResult => { log.warn( `[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` + `diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` + `attempt=${attempt} maxAttempts=${maxAttempts} outcome=failed reason=${classifyCompactionReason(reason)} ` + `durationMs=${Date.now() - startedAt}`, ); return { ok: false, compacted: false, reason, }; }; const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); await ensureOpenClawModelsJson(params.config, agentDir); const { model, error, authStorage, modelRegistry } = resolveModel( provider, modelId, agentDir, params.config, ); if (!model) { const reason = error ?? `Unknown model: ${provider}/${modelId}`; return fail(reason); } try { const apiKeyInfo = await getApiKeyForModel({ model, cfg: params.config, profileId: authProfileId, agentDir, }); if (!apiKeyInfo.apiKey) { if (apiKeyInfo.mode !== "aws-sdk") { throw new Error( `No API key resolved for provider "${model.provider}" (auth mode: ${apiKeyInfo.mode}).`, ); } } else if (model.provider === "github-copilot") { const { resolveCopilotApiToken } = await import("../../providers/github-copilot-token.js"); const copilotToken = await resolveCopilotApiToken({ githubToken: apiKeyInfo.apiKey, }); authStorage.setRuntimeApiKey(model.provider, copilotToken.token); } else { authStorage.setRuntimeApiKey(model.provider, apiKeyInfo.apiKey); } } catch (err) { const reason = describeUnknownError(err); return fail(reason); } await fs.mkdir(resolvedWorkspace, { recursive: true }); const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; const sandbox = await resolveSandboxContext({ config: params.config, sessionKey: sandboxSessionKey, workspaceDir: resolvedWorkspace, }); const effectiveWorkspace = sandbox?.enabled ? sandbox.workspaceAccess === "rw" ? resolvedWorkspace : sandbox.workspaceDir : resolvedWorkspace; await fs.mkdir(effectiveWorkspace, { recursive: true }); await ensureSessionHeader({ sessionFile: params.sessionFile, sessionId: params.sessionId, cwd: effectiveWorkspace, }); let restoreSkillEnv: (() => void) | undefined; process.chdir(effectiveWorkspace); try { const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({ workspaceDir: effectiveWorkspace, config: params.config, skillsSnapshot: params.skillsSnapshot, }); restoreSkillEnv = params.skillsSnapshot ? applySkillEnvOverridesFromSnapshot({ snapshot: params.skillsSnapshot, config: params.config, }) : applySkillEnvOverrides({ skills: skillEntries ?? [], config: params.config, }); const skillsPrompt = resolveSkillsPromptForRun({ skillsSnapshot: params.skillsSnapshot, entries: shouldLoadSkillEntries ? skillEntries : undefined, config: params.config, workspaceDir: effectiveWorkspace, }); const sessionLabel = params.sessionKey ?? params.sessionId; const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; const { contextFiles } = await resolveBootstrapContextForRun({ workspaceDir: effectiveWorkspace, config: params.config, sessionKey: params.sessionKey, sessionId: params.sessionId, warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }), }); // Apply contextTokens cap to model so pi-coding-agent's auto-compaction // threshold uses the effective limit, not the native context window. const ctxInfo = resolveContextWindowInfo({ cfg: params.config, provider, modelId, modelContextWindow: model.contextWindow, defaultTokens: DEFAULT_CONTEXT_TOKENS, }); const effectiveModel = ctxInfo.tokens < (model.contextWindow ?? Infinity) ? { ...model, contextWindow: ctxInfo.tokens } : model; const runAbortController = new AbortController(); const toolsRaw = createOpenClawCodingTools({ exec: { elevated: params.bashElevated, }, sandbox, messageProvider: resolvedMessageProvider, agentAccountId: params.agentAccountId, sessionKey: sandboxSessionKey, sessionId: params.sessionId, runId: params.runId, groupId: params.groupId, groupChannel: params.groupChannel, groupSpace: params.groupSpace, spawnedBy: params.spawnedBy, senderIsOwner: params.senderIsOwner, agentDir, workspaceDir: effectiveWorkspace, config: params.config, abortSignal: runAbortController.signal, modelProvider: model.provider, modelId, modelContextWindowTokens: ctxInfo.tokens, modelAuthMode: resolveModelAuthMode(model.provider, params.config), }); const tools = sanitizeToolsForGoogle({ tools: supportsModelTools(model) ? toolsRaw : [], provider, }); const allowedToolNames = collectAllowedToolNames({ tools }); logToolSchemasForGoogle({ tools, provider }); const machineName = await getMachineDisplayName(); const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider); let runtimeCapabilities = runtimeChannel ? (resolveChannelCapabilities({ cfg: params.config, channel: runtimeChannel, accountId: params.agentAccountId, }) ?? []) : undefined; if (runtimeChannel === "telegram" && params.config) { const inlineButtonsScope = resolveTelegramInlineButtonsScope({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); if (inlineButtonsScope !== "off") { if (!runtimeCapabilities) { runtimeCapabilities = []; } if ( !runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons") ) { runtimeCapabilities.push("inlineButtons"); } } } const reactionGuidance = runtimeChannel && params.config ? (() => { if (runtimeChannel === "telegram") { const resolved = resolveTelegramReactionLevel({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); const level = resolved.agentReactionGuidance; return level ? { level, channel: "Telegram" } : undefined; } if (runtimeChannel === "signal") { const resolved = resolveSignalReactionLevel({ cfg: params.config, accountId: params.agentAccountId ?? undefined, }); const level = resolved.agentReactionGuidance; return level ? { level, channel: "Signal" } : undefined; } return undefined; })() : undefined; // Resolve channel-specific message actions for system prompt const channelActions = runtimeChannel ? listChannelSupportedActions({ cfg: params.config, channel: runtimeChannel, }) : undefined; const messageToolHints = runtimeChannel ? resolveChannelMessageToolHints({ cfg: params.config, channel: runtimeChannel, accountId: params.agentAccountId, }) : undefined; const runtimeInfo = { host: machineName, os: `${os.type()} ${os.release()}`, arch: os.arch(), node: process.version, model: `${provider}/${modelId}`, shell: detectRuntimeShell(), channel: runtimeChannel, capabilities: runtimeCapabilities, channelActions, }; const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); const reasoningTagHint = isReasoningTagProvider(provider); const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone); const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat); const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat); const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, config: params.config, }); const isDefaultAgent = sessionAgentId === defaultAgentId; const promptMode = isSubagentSessionKey(params.sessionKey) || isCronSessionKey(params.sessionKey) ? "minimal" : "full"; const docsPath = await resolveOpenClawDocsPath({ workspaceDir: effectiveWorkspace, argv1: process.argv[1], cwd: process.cwd(), moduleUrl: import.meta.url, }); const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined; const ownerDisplay = resolveOwnerDisplaySetting(params.config); const appendPrompt = buildEmbeddedSystemPrompt({ workspaceDir: effectiveWorkspace, defaultThinkLevel: params.thinkLevel, reasoningLevel: params.reasoningLevel ?? "off", extraSystemPrompt: params.extraSystemPrompt, ownerNumbers: params.ownerNumbers, ownerDisplay: ownerDisplay.ownerDisplay, ownerDisplaySecret: ownerDisplay.ownerDisplaySecret, reasoningTagHint, heartbeatPrompt: isDefaultAgent ? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt) : undefined, skillsPrompt, docsPath: docsPath ?? undefined, ttsHint, promptMode, acpEnabled: params.config?.acp?.enabled !== false, runtimeInfo, reactionGuidance, messageToolHints, sandboxInfo, tools, modelAliasLines: buildModelAliasLines(params.config), userTimezone, userTime, userTimeFormat, contextFiles, memoryCitationsMode: params.config?.memory?.citations, }); const systemPromptOverride = createSystemPromptOverride(appendPrompt); const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ timeoutMs: EMBEDDED_COMPACTION_TIMEOUT_MS, }), }); try { await repairSessionFileIfNeeded({ sessionFile: params.sessionFile, warn: (message) => log.warn(message), }); await prewarmSessionFile(params.sessionFile); const transcriptPolicy = resolveTranscriptPolicy({ modelApi: model.api, provider, modelId, }); const sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), { agentId: sessionAgentId, sessionKey: params.sessionKey, allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults, allowedToolNames, }); trackSessionManagerAccess(params.sessionFile); const settingsManager = createPreparedEmbeddedPiSettingsManager({ cwd: effectiveWorkspace, agentDir, cfg: params.config, }); // Sets compaction/pruning runtime state and returns extension factories // that must be passed to the resource loader for the safeguard to be active. const extensionFactories = buildEmbeddedExtensionFactories({ cfg: params.config, sessionManager, provider, modelId, model, }); // Only create an explicit resource loader when there are extension factories // to register; otherwise let createAgentSession use its built-in default. let resourceLoader: DefaultResourceLoader | undefined; if (extensionFactories.length > 0) { resourceLoader = new DefaultResourceLoader({ cwd: resolvedWorkspace, agentDir, settingsManager, extensionFactories, }); await resourceLoader.reload(); } const { builtInTools, customTools } = splitSdkTools({ tools, sandboxEnabled: !!sandbox?.enabled, }); const { session } = await createAgentSession({ cwd: effectiveWorkspace, agentDir, authStorage, modelRegistry, model: effectiveModel, thinkingLevel: mapThinkingLevel(params.thinkLevel), tools: builtInTools, customTools, sessionManager, settingsManager, resourceLoader, }); applySystemPromptOverrideToSession(session, systemPromptOverride()); if (model.api === "ollama") { const providerBaseUrl = typeof params.config?.models?.providers?.[model.provider]?.baseUrl === "string" ? params.config.models.providers[model.provider]?.baseUrl : undefined; ensureCustomApiRegistered( model.api, createConfiguredOllamaStreamFn({ model, providerBaseUrl, }), ); } try { const prior = await sanitizeSessionHistory({ messages: session.messages, modelApi: model.api, modelId, provider, allowedToolNames, config: params.config, sessionManager, sessionId: params.sessionId, policy: transcriptPolicy, }); const validatedGemini = transcriptPolicy.validateGeminiTurns ? validateGeminiTurns(prior) : prior; const validated = transcriptPolicy.validateAnthropicTurns ? validateAnthropicTurns(validatedGemini) : validatedGemini; // Apply validated transcript to the live session even when no history limit is configured, // so compaction and hook metrics are based on the same message set. session.agent.replaceMessages(validated); // "Original" compaction metrics should describe the validated transcript that enters // limiting/compaction, not the raw on-disk session snapshot. const originalMessages = session.messages.slice(); const truncated = limitHistoryTurns( session.messages, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), ); // Re-run tool_use/tool_result pairing repair after truncation, since // limitHistoryTurns can orphan tool_result blocks by removing the // assistant message that contained the matching tool_use. const limited = transcriptPolicy.repairToolUseResultPairing ? sanitizeToolUseResultPairing(truncated) : truncated; if (limited.length > 0) { session.agent.replaceMessages(limited); } const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); const hookSessionKey = params.sessionKey?.trim() || params.sessionId; const hookRunner = getGlobalHookRunner(); const observedTokenCount = normalizeObservedTokenCount(params.currentTokenCount); const messageCountOriginal = originalMessages.length; let tokenCountOriginal: number | undefined; try { tokenCountOriginal = 0; for (const message of originalMessages) { tokenCountOriginal += estimateTokens(message); } } catch { tokenCountOriginal = undefined; } const messageCountBefore = session.messages.length; let tokenCountBefore = observedTokenCount; if (tokenCountBefore === undefined) { try { tokenCountBefore = 0; for (const message of session.messages) { tokenCountBefore += estimateTokens(message); } } catch { tokenCountBefore = undefined; } } // TODO(#7175): Consider exposing full message snapshots or pre-compaction injection // hooks; current events only report counts/metadata. try { const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { sessionId: params.sessionId, missingSessionKey, messageCount: messageCountBefore, tokenCount: tokenCountBefore, messageCountOriginal, tokenCountOriginal, }); await triggerInternalHook(hookEvent); } catch (err) { log.warn("session:compact:before hook failed", { errorMessage: err instanceof Error ? err.message : String(err), errorStack: err instanceof Error ? err.stack : undefined, }); } if (hookRunner?.hasHooks("before_compaction")) { try { await hookRunner.runBeforeCompaction( { messageCount: messageCountBefore, tokenCount: tokenCountBefore, }, { sessionId: params.sessionId, agentId: sessionAgentId, sessionKey: hookSessionKey, workspaceDir: effectiveWorkspace, messageProvider: resolvedMessageProvider, }, ); } catch (err) { log.warn("before_compaction hook failed", { errorMessage: err instanceof Error ? err.message : String(err), errorStack: err instanceof Error ? err.stack : undefined, }); } } const diagEnabled = log.isEnabled("debug"); const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; if (diagEnabled && preMetrics) { log.debug( `[compaction-diag] start runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` + `diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` + `attempt=${attempt} maxAttempts=${maxAttempts} ` + `pre.messages=${preMetrics.messages} pre.historyTextChars=${preMetrics.historyTextChars} ` + `pre.toolResultChars=${preMetrics.toolResultChars} pre.estTokens=${preMetrics.estTokens ?? "unknown"}`, ); log.debug( `[compaction-diag] contributors diagId=${diagId} top=${JSON.stringify(preMetrics.contributors)}`, ); } if (!session.messages.some(hasRealConversationContent)) { log.info( `[compaction] skipping — no real conversation messages (sessionKey=${params.sessionKey ?? params.sessionId})`, ); return { ok: true, compacted: false, reason: "no real conversation messages", }; } const compactStartedAt = Date.now(); // Measure compactedCount from the original pre-limiting transcript so compaction // lifecycle metrics represent total reduction through the compaction pipeline. const messageCountCompactionInput = messageCountOriginal; // Estimate full session tokens BEFORE compaction (including system prompt, // bootstrap context, workspace files, and all history). This is needed for // a correct sanity check — result.tokensBefore only covers the summarizable // history subset, not the full session. let fullSessionTokensBefore = 0; try { fullSessionTokensBefore = limited.reduce((sum, msg) => sum + estimateTokens(msg), 0); } catch { // If token estimation throws on a malformed message, fall back to 0 so // the sanity check below becomes a no-op instead of crashing compaction. } const result = await compactWithSafetyTimeout(() => session.compact(params.customInstructions), ); await runPostCompactionSideEffects({ config: params.config, sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); // Estimate tokens after compaction by summing token estimates for remaining messages let tokensAfter: number | undefined; try { tokensAfter = 0; for (const message of session.messages) { tokensAfter += estimateTokens(message); } // Sanity check: compare against the best full-session pre-compaction baseline. // Prefer the provider-observed live count when available; otherwise use the // heuristic full-session estimate with a 10% margin for counter jitter. const sanityCheckBaseline = observedTokenCount ?? fullSessionTokensBefore; if ( sanityCheckBaseline > 0 && tokensAfter > (observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1) ) { tokensAfter = undefined; // Don't trust the estimate } } catch { // If estimation fails, leave tokensAfter undefined tokensAfter = undefined; } const messageCountAfter = session.messages.length; const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter); const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; if (diagEnabled && preMetrics && postMetrics) { log.debug( `[compaction-diag] end runId=${runId} sessionKey=${params.sessionKey ?? params.sessionId} ` + `diagId=${diagId} trigger=${trigger} provider=${provider}/${modelId} ` + `attempt=${attempt} maxAttempts=${maxAttempts} outcome=compacted reason=none ` + `durationMs=${Date.now() - compactStartedAt} retrying=false ` + `post.messages=${postMetrics.messages} post.historyTextChars=${postMetrics.historyTextChars} ` + `post.toolResultChars=${postMetrics.toolResultChars} post.estTokens=${postMetrics.estTokens ?? "unknown"} ` + `delta.messages=${postMetrics.messages - preMetrics.messages} ` + `delta.historyTextChars=${postMetrics.historyTextChars - preMetrics.historyTextChars} ` + `delta.toolResultChars=${postMetrics.toolResultChars - preMetrics.toolResultChars} ` + `delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`, ); } // TODO(#9611): Consider exposing compaction summaries or post-compaction injection; // current events only report summary metadata. try { const hookEvent = createInternalHookEvent("session", "compact:after", hookSessionKey, { sessionId: params.sessionId, missingSessionKey, messageCount: messageCountAfter, tokenCount: tokensAfter, compactedCount, summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, tokensBefore: result.tokensBefore, tokensAfter, firstKeptEntryId: result.firstKeptEntryId, }); await triggerInternalHook(hookEvent); } catch (err) { log.warn("session:compact:after hook failed", { errorMessage: err instanceof Error ? err.message : String(err), errorStack: err instanceof Error ? err.stack : undefined, }); } if (hookRunner?.hasHooks("after_compaction")) { try { await hookRunner.runAfterCompaction( { messageCount: messageCountAfter, tokenCount: tokensAfter, compactedCount, }, { sessionId: params.sessionId, agentId: sessionAgentId, sessionKey: hookSessionKey, workspaceDir: effectiveWorkspace, messageProvider: resolvedMessageProvider, }, ); } catch (err) { log.warn("after_compaction hook failed", { errorMessage: err instanceof Error ? err.message : String(err), errorStack: err instanceof Error ? err.stack : undefined, }); } } return { ok: true, compacted: true, result: { summary: result.summary, firstKeptEntryId: result.firstKeptEntryId, tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, details: result.details, }, }; } finally { await flushPendingToolResultsAfterIdle({ agent: session?.agent, sessionManager, clearPendingOnTimeout: true, }); session.dispose(); } } finally { await sessionLock.release(); } } catch (err) { const reason = describeUnknownError(err); return fail(reason); } finally { restoreSkillEnv?.(); process.chdir(prevCwd); } } /** * Compacts a session with lane queueing (session lane + global lane). * Use this from outside a lane context. If already inside a lane, use * `compactEmbeddedPiSessionDirect` to avoid deadlocks. */ export async function compactEmbeddedPiSession( params: CompactEmbeddedPiSessionParams, ): Promise { const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); const enqueueGlobal = params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); return enqueueCommandInLane(sessionLane, () => enqueueGlobal(async () => { ensureRuntimePluginsLoaded({ config: params.config, workspaceDir: params.workspaceDir, }); ensureContextEnginesInitialized(); const contextEngine = await resolveContextEngine(params.config); try { // Resolve token budget from model context window so the context engine // knows the compaction target. The runner's afterTurn path passes this // automatically, but the /compact command path needs to compute it here. const ceProvider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; const ceModelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); const { model: ceModel } = resolveModel(ceProvider, ceModelId, agentDir, params.config); const ceCtxInfo = resolveContextWindowInfo({ cfg: params.config, provider: ceProvider, modelId: ceModelId, modelContextWindow: ceModel?.contextWindow, defaultTokens: DEFAULT_CONTEXT_TOKENS, }); // When the context engine owns compaction, its compact() implementation // bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally). // Fire before_compaction / after_compaction hooks here so plugin subscribers // are notified regardless of which engine is active. const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null; const hookSessionKey = params.sessionKey?.trim() || params.sessionId; const { sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, config: params.config, }); const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; const hookCtx = { sessionId: params.sessionId, agentId: sessionAgentId, sessionKey: hookSessionKey, workspaceDir: resolveUserPath(params.workspaceDir), messageProvider: resolvedMessageProvider, }; // Engine-owned compaction doesn't load the transcript at this level, so // message counts are unavailable. We pass sessionFile so hook subscribers // can read the transcript themselves if they need exact counts. if (hookRunner?.hasHooks("before_compaction")) { try { await hookRunner.runBeforeCompaction( { messageCount: -1, sessionFile: params.sessionFile, }, hookCtx, ); } catch (err) { log.warn("before_compaction hook failed", { errorMessage: err instanceof Error ? err.message : String(err), }); } } const result = await contextEngine.compact({ sessionId: params.sessionId, sessionKey: params.sessionKey, sessionFile: params.sessionFile, tokenBudget: ceCtxInfo.tokens, currentTokenCount: params.currentTokenCount, customInstructions: params.customInstructions, force: params.trigger === "manual", runtimeContext: params as Record, }); if (engineOwnsCompaction && result.ok && result.compacted) { await runPostCompactionSideEffects({ config: params.config, sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); } if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) { try { await hookRunner.runAfterCompaction( { messageCount: -1, compactedCount: -1, tokenCount: result.result?.tokensAfter, sessionFile: params.sessionFile, }, hookCtx, ); } catch (err) { log.warn("after_compaction hook failed", { errorMessage: err instanceof Error ? err.message : String(err), }); } } return { ok: result.ok, compacted: result.compacted, reason: result.reason, result: result.result ? { summary: result.result.summary ?? "", firstKeptEntryId: result.result.firstKeptEntryId ?? "", tokensBefore: result.result.tokensBefore, tokensAfter: result.result.tokensAfter, details: result.result.details, } : undefined, }; } finally { await contextEngine.dispose?.(); } }), ); }