Spaces:
Sleeping
Sleeping
| import type { AgentMessage } from "@mariozechner/pi-agent-core"; | |
| import type { ImageContent } from "@mariozechner/pi-ai"; | |
| import { streamSimple } from "@mariozechner/pi-ai"; | |
| import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent"; | |
| import fs from "node:fs/promises"; | |
| import os from "node:os"; | |
| import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js"; | |
| import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; | |
| import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; | |
| import { getMachineDisplayName } from "../../../infra/machine-name.js"; | |
| import { MAX_IMAGE_BYTES } from "../../../media/constants.js"; | |
| import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; | |
| import { isSubagentSessionKey } from "../../../routing/session-key.js"; | |
| import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js"; | |
| import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js"; | |
| import { resolveTelegramReactionLevel } from "../../../telegram/reaction-level.js"; | |
| import { buildTtsSystemPromptHint } from "../../../tts/tts.js"; | |
| import { resolveUserPath } from "../../../utils.js"; | |
| import { normalizeMessageChannel } from "../../../utils/message-channel.js"; | |
| import { isReasoningTagProvider } from "../../../utils/provider-utils.js"; | |
| import { resolveOpenClawAgentDir } from "../../agent-paths.js"; | |
| import { resolveSessionAgentIds } from "../../agent-scope.js"; | |
| import { createAnthropicPayloadLogger } from "../../anthropic-payload-log.js"; | |
| import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js"; | |
| import { createCacheTrace } from "../../cache-trace.js"; | |
| import { | |
| listChannelSupportedActions, | |
| resolveChannelMessageToolHints, | |
| } from "../../channel-tools.js"; | |
| import { resolveOpenClawDocsPath } from "../../docs-path.js"; | |
| import { isTimeoutError } from "../../failover-error.js"; | |
| import { resolveModelAuthMode } from "../../model-auth.js"; | |
| import { resolveDefaultModelForAgent } from "../../model-selection.js"; | |
| import { | |
| isCloudCodeAssistFormatError, | |
| resolveBootstrapMaxChars, | |
| validateAnthropicTurns, | |
| validateGeminiTurns, | |
| } from "../../pi-embedded-helpers.js"; | |
| import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js"; | |
| import { | |
| ensurePiCompactionReserveTokens, | |
| resolveCompactionReserveTokensFloor, | |
| } from "../../pi-settings.js"; | |
| import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js"; | |
| import { createOpenClawCodingTools } from "../../pi-tools.js"; | |
| import { resolveSandboxContext } from "../../sandbox.js"; | |
| import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js"; | |
| import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js"; | |
| import { acquireSessionWriteLock } from "../../session-write-lock.js"; | |
| import { | |
| applySkillEnvOverrides, | |
| applySkillEnvOverridesFromSnapshot, | |
| loadWorkspaceSkillEntries, | |
| resolveSkillsPromptForRun, | |
| } from "../../skills.js"; | |
| import { buildSystemPromptParams } from "../../system-prompt-params.js"; | |
| import { buildSystemPromptReport } from "../../system-prompt-report.js"; | |
| import { resolveTranscriptPolicy } from "../../transcript-policy.js"; | |
| import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; | |
| import { isAbortError } from "../abort.js"; | |
| import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js"; | |
| import { buildEmbeddedExtensionPaths } from "../extensions.js"; | |
| import { applyExtraParamsToAgent } from "../extra-params.js"; | |
| import { | |
| logToolSchemasForGoogle, | |
| sanitizeSessionHistory, | |
| sanitizeToolsForGoogle, | |
| } from "../google.js"; | |
| import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js"; | |
| import { log } from "../logger.js"; | |
| import { buildModelAliasLines } from "../model.js"; | |
| import { | |
| clearActiveEmbeddedRun, | |
| type EmbeddedPiQueueHandle, | |
| setActiveEmbeddedRun, | |
| } from "../runs.js"; | |
| import { buildEmbeddedSandboxInfo } from "../sandbox-info.js"; | |
| import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js"; | |
| import { prepareSessionManagerForRun } from "../session-manager-init.js"; | |
| import { | |
| applySystemPromptOverrideToSession, | |
| buildEmbeddedSystemPrompt, | |
| createSystemPromptOverride, | |
| } from "../system-prompt.js"; | |
| import { splitSdkTools } from "../tool-split.js"; | |
| import { describeUnknownError, mapThinkingLevel } from "../utils.js"; | |
| import { detectAndLoadPromptImages } from "./images.js"; | |
| export function injectHistoryImagesIntoMessages( | |
| messages: AgentMessage[], | |
| historyImagesByIndex: Map<number, ImageContent[]>, | |
| ): boolean { | |
| if (historyImagesByIndex.size === 0) { | |
| return false; | |
| } | |
| let didMutate = false; | |
| for (const [msgIndex, images] of historyImagesByIndex) { | |
| // Bounds check: ensure index is valid before accessing | |
| if (msgIndex < 0 || msgIndex >= messages.length) { | |
| continue; | |
| } | |
| const msg = messages[msgIndex]; | |
| if (msg && msg.role === "user") { | |
| // Convert string content to array format if needed | |
| if (typeof msg.content === "string") { | |
| msg.content = [{ type: "text", text: msg.content }]; | |
| didMutate = true; | |
| } | |
| if (Array.isArray(msg.content)) { | |
| // Check for existing image content to avoid duplicates across turns | |
| const existingImageData = new Set( | |
| msg.content | |
| .filter( | |
| (c): c is ImageContent => | |
| c != null && | |
| typeof c === "object" && | |
| c.type === "image" && | |
| typeof c.data === "string", | |
| ) | |
| .map((c) => c.data), | |
| ); | |
| for (const img of images) { | |
| // Only add if this image isn't already in the message | |
| if (!existingImageData.has(img.data)) { | |
| msg.content.push(img); | |
| didMutate = true; | |
| } | |
| } | |
| } | |
| } | |
| } | |
| return didMutate; | |
| } | |
| export async function runEmbeddedAttempt( | |
| params: EmbeddedRunAttemptParams, | |
| ): Promise<EmbeddedRunAttemptResult> { | |
| const resolvedWorkspace = resolveUserPath(params.workspaceDir); | |
| const prevCwd = process.cwd(); | |
| const runAbortController = new AbortController(); | |
| log.debug( | |
| `embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`, | |
| ); | |
| await fs.mkdir(resolvedWorkspace, { recursive: true }); | |
| const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId; | |
| const sandbox = await resolveSandboxContext({ | |
| config: params.config, | |
| sessionKey: sandboxSessionKey, | |
| workspaceDir: resolvedWorkspace, | |
| }); | |
| const effectiveWorkspace = sandbox?.enabled | |
| ? sandbox.workspaceAccess === "rw" | |
| ? resolvedWorkspace | |
| : sandbox.workspaceDir | |
| : resolvedWorkspace; | |
| await fs.mkdir(effectiveWorkspace, { recursive: true }); | |
| let restoreSkillEnv: (() => void) | undefined; | |
| process.chdir(effectiveWorkspace); | |
| try { | |
| const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; | |
| const skillEntries = shouldLoadSkillEntries | |
| ? loadWorkspaceSkillEntries(effectiveWorkspace) | |
| : []; | |
| restoreSkillEnv = params.skillsSnapshot | |
| ? applySkillEnvOverridesFromSnapshot({ | |
| snapshot: params.skillsSnapshot, | |
| config: params.config, | |
| }) | |
| : applySkillEnvOverrides({ | |
| skills: skillEntries ?? [], | |
| config: params.config, | |
| }); | |
| const skillsPrompt = resolveSkillsPromptForRun({ | |
| skillsSnapshot: params.skillsSnapshot, | |
| entries: shouldLoadSkillEntries ? skillEntries : undefined, | |
| config: params.config, | |
| workspaceDir: effectiveWorkspace, | |
| }); | |
| const sessionLabel = params.sessionKey ?? params.sessionId; | |
| const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } = | |
| await resolveBootstrapContextForRun({ | |
| workspaceDir: effectiveWorkspace, | |
| config: params.config, | |
| sessionKey: params.sessionKey, | |
| sessionId: params.sessionId, | |
| warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }), | |
| }); | |
| const workspaceNotes = hookAdjustedBootstrapFiles.some( | |
| (file) => file.name === DEFAULT_BOOTSTRAP_FILENAME && !file.missing, | |
| ) | |
| ? ["Reminder: commit your changes in this workspace after edits."] | |
| : undefined; | |
| const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); | |
| // Check if the model supports native image input | |
| const modelHasVision = params.model.input?.includes("image") ?? false; | |
| const toolsRaw = params.disableTools | |
| ? [] | |
| : createOpenClawCodingTools({ | |
| exec: { | |
| ...params.execOverrides, | |
| elevated: params.bashElevated, | |
| }, | |
| sandbox, | |
| messageProvider: params.messageChannel ?? params.messageProvider, | |
| agentAccountId: params.agentAccountId, | |
| messageTo: params.messageTo, | |
| messageThreadId: params.messageThreadId, | |
| groupId: params.groupId, | |
| groupChannel: params.groupChannel, | |
| groupSpace: params.groupSpace, | |
| spawnedBy: params.spawnedBy, | |
| senderId: params.senderId, | |
| senderName: params.senderName, | |
| senderUsername: params.senderUsername, | |
| senderE164: params.senderE164, | |
| sessionKey: params.sessionKey ?? params.sessionId, | |
| agentDir, | |
| workspaceDir: effectiveWorkspace, | |
| config: params.config, | |
| abortSignal: runAbortController.signal, | |
| modelProvider: params.model.provider, | |
| modelId: params.modelId, | |
| modelAuthMode: resolveModelAuthMode(params.model.provider, params.config), | |
| currentChannelId: params.currentChannelId, | |
| currentThreadTs: params.currentThreadTs, | |
| replyToMode: params.replyToMode, | |
| hasRepliedRef: params.hasRepliedRef, | |
| modelHasVision, | |
| }); | |
| const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider: params.provider }); | |
| logToolSchemasForGoogle({ tools, provider: params.provider }); | |
| const machineName = await getMachineDisplayName(); | |
| const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider); | |
| let runtimeCapabilities = runtimeChannel | |
| ? (resolveChannelCapabilities({ | |
| cfg: params.config, | |
| channel: runtimeChannel, | |
| accountId: params.agentAccountId, | |
| }) ?? []) | |
| : undefined; | |
| if (runtimeChannel === "telegram" && params.config) { | |
| const inlineButtonsScope = resolveTelegramInlineButtonsScope({ | |
| cfg: params.config, | |
| accountId: params.agentAccountId ?? undefined, | |
| }); | |
| if (inlineButtonsScope !== "off") { | |
| if (!runtimeCapabilities) { | |
| runtimeCapabilities = []; | |
| } | |
| if ( | |
| !runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons") | |
| ) { | |
| runtimeCapabilities.push("inlineButtons"); | |
| } | |
| } | |
| } | |
| const reactionGuidance = | |
| runtimeChannel && params.config | |
| ? (() => { | |
| if (runtimeChannel === "telegram") { | |
| const resolved = resolveTelegramReactionLevel({ | |
| cfg: params.config, | |
| accountId: params.agentAccountId ?? undefined, | |
| }); | |
| const level = resolved.agentReactionGuidance; | |
| return level ? { level, channel: "Telegram" } : undefined; | |
| } | |
| if (runtimeChannel === "signal") { | |
| const resolved = resolveSignalReactionLevel({ | |
| cfg: params.config, | |
| accountId: params.agentAccountId ?? undefined, | |
| }); | |
| const level = resolved.agentReactionGuidance; | |
| return level ? { level, channel: "Signal" } : undefined; | |
| } | |
| return undefined; | |
| })() | |
| : undefined; | |
| const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ | |
| sessionKey: params.sessionKey, | |
| config: params.config, | |
| }); | |
| const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); | |
| const reasoningTagHint = isReasoningTagProvider(params.provider); | |
| // Resolve channel-specific message actions for system prompt | |
| const channelActions = runtimeChannel | |
| ? listChannelSupportedActions({ | |
| cfg: params.config, | |
| channel: runtimeChannel, | |
| }) | |
| : undefined; | |
| const messageToolHints = runtimeChannel | |
| ? resolveChannelMessageToolHints({ | |
| cfg: params.config, | |
| channel: runtimeChannel, | |
| accountId: params.agentAccountId, | |
| }) | |
| : undefined; | |
| const defaultModelRef = resolveDefaultModelForAgent({ | |
| cfg: params.config ?? {}, | |
| agentId: sessionAgentId, | |
| }); | |
| const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`; | |
| const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({ | |
| config: params.config, | |
| agentId: sessionAgentId, | |
| workspaceDir: effectiveWorkspace, | |
| cwd: process.cwd(), | |
| runtime: { | |
| host: machineName, | |
| os: `${os.type()} ${os.release()}`, | |
| arch: os.arch(), | |
| node: process.version, | |
| model: `${params.provider}/${params.modelId}`, | |
| defaultModel: defaultModelLabel, | |
| channel: runtimeChannel, | |
| capabilities: runtimeCapabilities, | |
| channelActions, | |
| }, | |
| }); | |
| const isDefaultAgent = sessionAgentId === defaultAgentId; | |
| const promptMode = isSubagentSessionKey(params.sessionKey) ? "minimal" : "full"; | |
| const docsPath = await resolveOpenClawDocsPath({ | |
| workspaceDir: effectiveWorkspace, | |
| argv1: process.argv[1], | |
| cwd: process.cwd(), | |
| moduleUrl: import.meta.url, | |
| }); | |
| const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined; | |
| const appendPrompt = buildEmbeddedSystemPrompt({ | |
| workspaceDir: effectiveWorkspace, | |
| defaultThinkLevel: params.thinkLevel, | |
| reasoningLevel: params.reasoningLevel ?? "off", | |
| extraSystemPrompt: params.extraSystemPrompt, | |
| ownerNumbers: params.ownerNumbers, | |
| reasoningTagHint, | |
| heartbeatPrompt: isDefaultAgent | |
| ? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt) | |
| : undefined, | |
| skillsPrompt, | |
| docsPath: docsPath ?? undefined, | |
| ttsHint, | |
| workspaceNotes, | |
| reactionGuidance, | |
| promptMode, | |
| runtimeInfo, | |
| messageToolHints, | |
| sandboxInfo, | |
| tools, | |
| modelAliasLines: buildModelAliasLines(params.config), | |
| userTimezone, | |
| userTime, | |
| userTimeFormat, | |
| contextFiles, | |
| }); | |
| const systemPromptReport = buildSystemPromptReport({ | |
| source: "run", | |
| generatedAt: Date.now(), | |
| sessionId: params.sessionId, | |
| sessionKey: params.sessionKey, | |
| provider: params.provider, | |
| model: params.modelId, | |
| workspaceDir: effectiveWorkspace, | |
| bootstrapMaxChars: resolveBootstrapMaxChars(params.config), | |
| sandbox: (() => { | |
| const runtime = resolveSandboxRuntimeStatus({ | |
| cfg: params.config, | |
| sessionKey: params.sessionKey ?? params.sessionId, | |
| }); | |
| return { mode: runtime.mode, sandboxed: runtime.sandboxed }; | |
| })(), | |
| systemPrompt: appendPrompt, | |
| bootstrapFiles: hookAdjustedBootstrapFiles, | |
| injectedFiles: contextFiles, | |
| skillsPrompt, | |
| tools, | |
| }); | |
| const systemPromptOverride = createSystemPromptOverride(appendPrompt); | |
| const systemPromptText = systemPromptOverride(); | |
| const sessionLock = await acquireSessionWriteLock({ | |
| sessionFile: params.sessionFile, | |
| }); | |
| let sessionManager: ReturnType<typeof guardSessionManager> | undefined; | |
| let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined; | |
| try { | |
| const hadSessionFile = await fs | |
| .stat(params.sessionFile) | |
| .then(() => true) | |
| .catch(() => false); | |
| const transcriptPolicy = resolveTranscriptPolicy({ | |
| modelApi: params.model?.api, | |
| provider: params.provider, | |
| modelId: params.modelId, | |
| }); | |
| await prewarmSessionFile(params.sessionFile); | |
| sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), { | |
| agentId: sessionAgentId, | |
| sessionKey: params.sessionKey, | |
| allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults, | |
| }); | |
| trackSessionManagerAccess(params.sessionFile); | |
| await prepareSessionManagerForRun({ | |
| sessionManager, | |
| sessionFile: params.sessionFile, | |
| hadSessionFile, | |
| sessionId: params.sessionId, | |
| cwd: effectiveWorkspace, | |
| }); | |
| const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir); | |
| ensurePiCompactionReserveTokens({ | |
| settingsManager, | |
| minReserveTokens: resolveCompactionReserveTokensFloor(params.config), | |
| }); | |
| // Call for side effects (sets compaction/pruning runtime state) | |
| buildEmbeddedExtensionPaths({ | |
| cfg: params.config, | |
| sessionManager, | |
| provider: params.provider, | |
| modelId: params.modelId, | |
| model: params.model, | |
| }); | |
| const { builtInTools, customTools } = splitSdkTools({ | |
| tools, | |
| sandboxEnabled: !!sandbox?.enabled, | |
| }); | |
| // Add client tools (OpenResponses hosted tools) to customTools | |
| let clientToolCallDetected: { name: string; params: Record<string, unknown> } | null = null; | |
| const clientToolDefs = params.clientTools | |
| ? toClientToolDefinitions( | |
| params.clientTools, | |
| (toolName, toolParams) => { | |
| clientToolCallDetected = { name: toolName, params: toolParams }; | |
| }, | |
| { | |
| agentId: sessionAgentId, | |
| sessionKey: params.sessionKey, | |
| }, | |
| ) | |
| : []; | |
| const allCustomTools = [...customTools, ...clientToolDefs]; | |
| ({ session } = await createAgentSession({ | |
| cwd: resolvedWorkspace, | |
| agentDir, | |
| authStorage: params.authStorage, | |
| modelRegistry: params.modelRegistry, | |
| model: params.model, | |
| thinkingLevel: mapThinkingLevel(params.thinkLevel), | |
| tools: builtInTools, | |
| customTools: allCustomTools, | |
| sessionManager, | |
| settingsManager, | |
| })); | |
| applySystemPromptOverrideToSession(session, systemPromptOverride); | |
| if (!session) { | |
| throw new Error("Embedded agent session missing"); | |
| } | |
| const activeSession = session; | |
| const cacheTrace = createCacheTrace({ | |
| cfg: params.config, | |
| env: process.env, | |
| runId: params.runId, | |
| sessionId: activeSession.sessionId, | |
| sessionKey: params.sessionKey, | |
| provider: params.provider, | |
| modelId: params.modelId, | |
| modelApi: params.model.api, | |
| workspaceDir: params.workspaceDir, | |
| }); | |
| const anthropicPayloadLogger = createAnthropicPayloadLogger({ | |
| env: process.env, | |
| runId: params.runId, | |
| sessionId: activeSession.sessionId, | |
| sessionKey: params.sessionKey, | |
| provider: params.provider, | |
| modelId: params.modelId, | |
| modelApi: params.model.api, | |
| workspaceDir: params.workspaceDir, | |
| }); | |
| // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. | |
| activeSession.agent.streamFn = streamSimple; | |
| applyExtraParamsToAgent( | |
| activeSession.agent, | |
| params.config, | |
| params.provider, | |
| params.modelId, | |
| params.streamParams, | |
| ); | |
| if (cacheTrace) { | |
| cacheTrace.recordStage("session:loaded", { | |
| messages: activeSession.messages, | |
| system: systemPromptText, | |
| note: "after session create", | |
| }); | |
| activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn); | |
| } | |
| if (anthropicPayloadLogger) { | |
| activeSession.agent.streamFn = anthropicPayloadLogger.wrapStreamFn( | |
| activeSession.agent.streamFn, | |
| ); | |
| } | |
| try { | |
| const prior = await sanitizeSessionHistory({ | |
| messages: activeSession.messages, | |
| modelApi: params.model.api, | |
| modelId: params.modelId, | |
| provider: params.provider, | |
| sessionManager, | |
| sessionId: params.sessionId, | |
| policy: transcriptPolicy, | |
| }); | |
| cacheTrace?.recordStage("session:sanitized", { messages: prior }); | |
| const validatedGemini = transcriptPolicy.validateGeminiTurns | |
| ? validateGeminiTurns(prior) | |
| : prior; | |
| const validated = transcriptPolicy.validateAnthropicTurns | |
| ? validateAnthropicTurns(validatedGemini) | |
| : validatedGemini; | |
| const limited = limitHistoryTurns( | |
| validated, | |
| getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), | |
| ); | |
| cacheTrace?.recordStage("session:limited", { messages: limited }); | |
| if (limited.length > 0) { | |
| activeSession.agent.replaceMessages(limited); | |
| } | |
| } catch (err) { | |
| sessionManager.flushPendingToolResults?.(); | |
| activeSession.dispose(); | |
| throw err; | |
| } | |
| let aborted = Boolean(params.abortSignal?.aborted); | |
| let timedOut = false; | |
| const getAbortReason = (signal: AbortSignal): unknown => | |
| "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; | |
| const makeTimeoutAbortReason = (): Error => { | |
| const err = new Error("request timed out"); | |
| err.name = "TimeoutError"; | |
| return err; | |
| }; | |
| const makeAbortError = (signal: AbortSignal): Error => { | |
| const reason = getAbortReason(signal); | |
| const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); | |
| err.name = "AbortError"; | |
| return err; | |
| }; | |
| const abortRun = (isTimeout = false, reason?: unknown) => { | |
| aborted = true; | |
| if (isTimeout) { | |
| timedOut = true; | |
| } | |
| if (isTimeout) { | |
| runAbortController.abort(reason ?? makeTimeoutAbortReason()); | |
| } else { | |
| runAbortController.abort(reason); | |
| } | |
| void activeSession.abort(); | |
| }; | |
| const abortable = <T>(promise: Promise<T>): Promise<T> => { | |
| const signal = runAbortController.signal; | |
| if (signal.aborted) { | |
| return Promise.reject(makeAbortError(signal)); | |
| } | |
| return new Promise<T>((resolve, reject) => { | |
| const onAbort = () => { | |
| signal.removeEventListener("abort", onAbort); | |
| reject(makeAbortError(signal)); | |
| }; | |
| signal.addEventListener("abort", onAbort, { once: true }); | |
| promise.then( | |
| (value) => { | |
| signal.removeEventListener("abort", onAbort); | |
| resolve(value); | |
| }, | |
| (err) => { | |
| signal.removeEventListener("abort", onAbort); | |
| reject(err); | |
| }, | |
| ); | |
| }); | |
| }; | |
| const subscription = subscribeEmbeddedPiSession({ | |
| session: activeSession, | |
| runId: params.runId, | |
| verboseLevel: params.verboseLevel, | |
| reasoningMode: params.reasoningLevel ?? "off", | |
| toolResultFormat: params.toolResultFormat, | |
| shouldEmitToolResult: params.shouldEmitToolResult, | |
| shouldEmitToolOutput: params.shouldEmitToolOutput, | |
| onToolResult: params.onToolResult, | |
| onReasoningStream: params.onReasoningStream, | |
| onBlockReply: params.onBlockReply, | |
| onBlockReplyFlush: params.onBlockReplyFlush, | |
| blockReplyBreak: params.blockReplyBreak, | |
| blockReplyChunking: params.blockReplyChunking, | |
| onPartialReply: params.onPartialReply, | |
| onAssistantMessageStart: params.onAssistantMessageStart, | |
| onAgentEvent: params.onAgentEvent, | |
| enforceFinalTag: params.enforceFinalTag, | |
| }); | |
| const { | |
| assistantTexts, | |
| toolMetas, | |
| unsubscribe, | |
| waitForCompactionRetry, | |
| getMessagingToolSentTexts, | |
| getMessagingToolSentTargets, | |
| didSendViaMessagingTool, | |
| getLastToolError, | |
| } = subscription; | |
| const queueHandle: EmbeddedPiQueueHandle = { | |
| queueMessage: async (text: string) => { | |
| await activeSession.steer(text); | |
| }, | |
| isStreaming: () => activeSession.isStreaming, | |
| isCompacting: () => subscription.isCompacting(), | |
| abort: abortRun, | |
| }; | |
| setActiveEmbeddedRun(params.sessionId, queueHandle); | |
| let abortWarnTimer: NodeJS.Timeout | undefined; | |
| const isProbeSession = params.sessionId?.startsWith("probe-") ?? false; | |
| const abortTimer = setTimeout( | |
| () => { | |
| if (!isProbeSession) { | |
| log.warn( | |
| `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, | |
| ); | |
| } | |
| abortRun(true); | |
| if (!abortWarnTimer) { | |
| abortWarnTimer = setTimeout(() => { | |
| if (!activeSession.isStreaming) { | |
| return; | |
| } | |
| if (!isProbeSession) { | |
| log.warn( | |
| `embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`, | |
| ); | |
| } | |
| }, 10_000); | |
| } | |
| }, | |
| Math.max(1, params.timeoutMs), | |
| ); | |
| let messagesSnapshot: AgentMessage[] = []; | |
| let sessionIdUsed = activeSession.sessionId; | |
| const onAbort = () => { | |
| const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; | |
| const timeout = reason ? isTimeoutError(reason) : false; | |
| abortRun(timeout, reason); | |
| }; | |
| if (params.abortSignal) { | |
| if (params.abortSignal.aborted) { | |
| onAbort(); | |
| } else { | |
| params.abortSignal.addEventListener("abort", onAbort, { | |
| once: true, | |
| }); | |
| } | |
| } | |
| // Get hook runner once for both before_agent_start and agent_end hooks | |
| const hookRunner = getGlobalHookRunner(); | |
| let promptError: unknown = null; | |
| try { | |
| const promptStartedAt = Date.now(); | |
| // Run before_agent_start hooks to allow plugins to inject context | |
| let effectivePrompt = params.prompt; | |
| if (hookRunner?.hasHooks("before_agent_start")) { | |
| try { | |
| const hookResult = await hookRunner.runBeforeAgentStart( | |
| { | |
| prompt: params.prompt, | |
| messages: activeSession.messages, | |
| }, | |
| { | |
| agentId: params.sessionKey?.split(":")[0] ?? "main", | |
| sessionKey: params.sessionKey, | |
| workspaceDir: params.workspaceDir, | |
| messageProvider: params.messageProvider ?? undefined, | |
| }, | |
| ); | |
| if (hookResult?.prependContext) { | |
| effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`; | |
| log.debug( | |
| `hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`, | |
| ); | |
| } | |
| } catch (hookErr) { | |
| log.warn(`before_agent_start hook failed: ${String(hookErr)}`); | |
| } | |
| } | |
| log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`); | |
| cacheTrace?.recordStage("prompt:before", { | |
| prompt: effectivePrompt, | |
| messages: activeSession.messages, | |
| }); | |
| // Repair orphaned trailing user messages so new prompts don't violate role ordering. | |
| const leafEntry = sessionManager.getLeafEntry(); | |
| if (leafEntry?.type === "message" && leafEntry.message.role === "user") { | |
| if (leafEntry.parentId) { | |
| sessionManager.branch(leafEntry.parentId); | |
| } else { | |
| sessionManager.resetLeaf(); | |
| } | |
| const sessionContext = sessionManager.buildSessionContext(); | |
| activeSession.agent.replaceMessages(sessionContext.messages); | |
| log.warn( | |
| `Removed orphaned user message to prevent consecutive user turns. ` + | |
| `runId=${params.runId} sessionId=${params.sessionId}`, | |
| ); | |
| } | |
| try { | |
| // Detect and load images referenced in the prompt for vision-capable models. | |
| // This eliminates the need for an explicit "view" tool call by injecting | |
| // images directly into the prompt when the model supports it. | |
| // Also scans conversation history to enable follow-up questions about earlier images. | |
| const imageResult = await detectAndLoadPromptImages({ | |
| prompt: effectivePrompt, | |
| workspaceDir: effectiveWorkspace, | |
| model: params.model, | |
| existingImages: params.images, | |
| historyMessages: activeSession.messages, | |
| maxBytes: MAX_IMAGE_BYTES, | |
| // Enforce sandbox path restrictions when sandbox is enabled | |
| sandboxRoot: sandbox?.enabled ? sandbox.workspaceDir : undefined, | |
| }); | |
| // Inject history images into their original message positions. | |
| // This ensures the model sees images in context (e.g., "compare to the first image"). | |
| const didMutate = injectHistoryImagesIntoMessages( | |
| activeSession.messages, | |
| imageResult.historyImagesByIndex, | |
| ); | |
| if (didMutate) { | |
| // Persist message mutations (e.g., injected history images) so we don't re-scan/reload. | |
| activeSession.agent.replaceMessages(activeSession.messages); | |
| } | |
| cacheTrace?.recordStage("prompt:images", { | |
| prompt: effectivePrompt, | |
| messages: activeSession.messages, | |
| note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`, | |
| }); | |
| const shouldTrackCacheTtl = | |
| params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" && | |
| isCacheTtlEligibleProvider(params.provider, params.modelId); | |
| if (shouldTrackCacheTtl) { | |
| appendCacheTtlTimestamp(sessionManager, { | |
| timestamp: Date.now(), | |
| provider: params.provider, | |
| modelId: params.modelId, | |
| }); | |
| } | |
| // Only pass images option if there are actually images to pass | |
| // This avoids potential issues with models that don't expect the images parameter | |
| if (imageResult.images.length > 0) { | |
| await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images })); | |
| } else { | |
| await abortable(activeSession.prompt(effectivePrompt)); | |
| } | |
| } catch (err) { | |
| promptError = err; | |
| } finally { | |
| log.debug( | |
| `embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`, | |
| ); | |
| } | |
| try { | |
| await waitForCompactionRetry(); | |
| } catch (err) { | |
| if (isAbortError(err)) { | |
| if (!promptError) { | |
| promptError = err; | |
| } | |
| } else { | |
| throw err; | |
| } | |
| } | |
| messagesSnapshot = activeSession.messages.slice(); | |
| sessionIdUsed = activeSession.sessionId; | |
| cacheTrace?.recordStage("session:after", { | |
| messages: messagesSnapshot, | |
| note: promptError ? "prompt error" : undefined, | |
| }); | |
| anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError); | |
| // Run agent_end hooks to allow plugins to analyze the conversation | |
| // This is fire-and-forget, so we don't await | |
| if (hookRunner?.hasHooks("agent_end")) { | |
| hookRunner | |
| .runAgentEnd( | |
| { | |
| messages: messagesSnapshot, | |
| success: !aborted && !promptError, | |
| error: promptError ? describeUnknownError(promptError) : undefined, | |
| durationMs: Date.now() - promptStartedAt, | |
| }, | |
| { | |
| agentId: params.sessionKey?.split(":")[0] ?? "main", | |
| sessionKey: params.sessionKey, | |
| workspaceDir: params.workspaceDir, | |
| messageProvider: params.messageProvider ?? undefined, | |
| }, | |
| ) | |
| .catch((err) => { | |
| log.warn(`agent_end hook failed: ${err}`); | |
| }); | |
| } | |
| } finally { | |
| clearTimeout(abortTimer); | |
| if (abortWarnTimer) { | |
| clearTimeout(abortWarnTimer); | |
| } | |
| unsubscribe(); | |
| clearActiveEmbeddedRun(params.sessionId, queueHandle); | |
| params.abortSignal?.removeEventListener?.("abort", onAbort); | |
| } | |
| const lastAssistant = messagesSnapshot | |
| .slice() | |
| .toReversed() | |
| .find((m) => m.role === "assistant"); | |
| const toolMetasNormalized = toolMetas | |
| .filter( | |
| (entry): entry is { toolName: string; meta?: string } => | |
| typeof entry.toolName === "string" && entry.toolName.trim().length > 0, | |
| ) | |
| .map((entry) => ({ toolName: entry.toolName, meta: entry.meta })); | |
| return { | |
| aborted, | |
| timedOut, | |
| promptError, | |
| sessionIdUsed, | |
| systemPromptReport, | |
| messagesSnapshot, | |
| assistantTexts, | |
| toolMetas: toolMetasNormalized, | |
| lastAssistant, | |
| lastToolError: getLastToolError?.(), | |
| didSendViaMessagingTool: didSendViaMessagingTool(), | |
| messagingToolSentTexts: getMessagingToolSentTexts(), | |
| messagingToolSentTargets: getMessagingToolSentTargets(), | |
| cloudCodeAssistFormatError: Boolean( | |
| lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage), | |
| ), | |
| // Client tool call detected (OpenResponses hosted tools) | |
| clientToolCall: clientToolCallDetected ?? undefined, | |
| }; | |
| } finally { | |
| // Always tear down the session (and release the lock) before we leave this attempt. | |
| sessionManager?.flushPendingToolResults?.(); | |
| session?.dispose(); | |
| await sessionLock.release(); | |
| } | |
| } finally { | |
| restoreSkillEnv?.(); | |
| process.chdir(prevCwd); | |
| } | |
| } | |