| import fs from "node:fs"; |
| import { lookupContextTokens } from "../../agents/context.js"; |
| import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; |
| import { resolveModelAuthMode } from "../../agents/model-auth.js"; |
| import { isCliProvider } from "../../agents/model-selection.js"; |
| import { classifyFailoverReason } from "../../agents/pi-embedded-helpers.js"; |
| import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js"; |
| import { hasNonzeroUsage } from "../../agents/usage.js"; |
| import { |
| canAttemptInterruptedResume, |
| MAX_RATE_LIMIT_INTERRUPTED_RESUME_COUNT, |
| markSessionUnfinishedTurn, |
| markSessionInterruptedResumeAttempt, |
| resolveAgentIdFromSessionKey, |
| resolveSessionFilePath, |
| resolveSessionFilePathOptions, |
| resolveSessionTranscriptPath, |
| type SessionEntry, |
| updateSessionStore, |
| updateSessionStoreEntry, |
| } from "../../config/sessions.js"; |
| import type { TypingMode } from "../../config/types.js"; |
| import { emitAgentEvent } from "../../infra/agent-events.js"; |
| import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; |
| import { generateSecureUuid } from "../../infra/secure-random.js"; |
| import { enqueueSystemEvent } from "../../infra/system-events.js"; |
| import { defaultRuntime } from "../../runtime.js"; |
| import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js"; |
| import { |
| buildFallbackClearedNotice, |
| buildFallbackNotice, |
| resolveFallbackTransition, |
| } from "../fallback-state.js"; |
| import type { OriginatingChannelType, TemplateContext } from "../templating.js"; |
| import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js"; |
| import type { GetReplyOptions, ReplyPayload } from "../types.js"; |
| import { runAgentTurnWithFallback } from "./agent-runner-execution.js"; |
| import type { RuntimeFallbackAttempt } from "./agent-runner-execution.js"; |
| import { |
| createShouldEmitToolOutput, |
| createShouldEmitToolResult, |
| finalizeWithFollowup, |
| isAudioPayload, |
| signalTypingIfNeeded, |
| } from "./agent-runner-helpers.js"; |
| import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js"; |
| import { buildReplyPayloads } from "./agent-runner-payloads.js"; |
| import { |
| appendUnscheduledReminderNote, |
| hasSessionRelatedCronJobs, |
| hasUnbackedReminderCommitment, |
| } from "./agent-runner-reminder-guard.js"; |
| import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.js"; |
| import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; |
| import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; |
| import { createFollowupRunner } from "./followup-runner.js"; |
| import { |
| buildBackgroundTaskStillRunningAutoResumePayload, |
| buildBackgroundTaskStillRunningPayload, |
| buildMissingFinalReplyAutoResumePayload, |
| buildMissingFinalReplyDegradedPayload, |
| resolveInteractiveTurnOutcome, |
| } from "./interactive-turn-outcome.js"; |
| import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js"; |
| import { readPostCompactionContext } from "./post-compaction-context.js"; |
| import { resolveActiveRunQueueAction } from "./queue-policy.js"; |
| import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; |
| import { createReplyMediaPathNormalizer } from "./reply-media-paths.js"; |
| import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; |
| import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; |
| import { createTypingSignaler } from "./typing-mode.js"; |
| import type { TypingController } from "./typing.js"; |
|
|
| const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; |
| const RATE_LIMIT_INTERRUPTED_RESUME_DELAY_MS = 10_000; |
|
|
| function extractRunErrorMessage(error: unknown): string | undefined { |
| if (typeof error === "string") { |
| const trimmed = error.trim(); |
| return trimmed || undefined; |
| } |
| if (!error || typeof error !== "object") { |
| return undefined; |
| } |
| if ("message" in error && typeof error.message === "string") { |
| const trimmed = error.message.trim(); |
| return trimmed || undefined; |
| } |
| return undefined; |
| } |
|
|
| function shouldBackoffInterruptedRetry(params: { |
| fallbackAttempts: RuntimeFallbackAttempt[]; |
| runError?: unknown; |
| }): boolean { |
| if (params.fallbackAttempts.some((attempt) => attempt.reason === "rate_limit")) { |
| return true; |
| } |
| const rawError = extractRunErrorMessage(params.runError); |
| return rawError ? classifyFailoverReason(rawError) === "rate_limit" : false; |
| } |
|
|
| export async function runReplyAgent(params: { |
| commandBody: string; |
| followupRun: FollowupRun; |
| queueKey: string; |
| resolvedQueue: QueueSettings; |
| shouldSteer: boolean; |
| shouldFollowup: boolean; |
| isActive: boolean; |
| isStreaming: boolean; |
| opts?: GetReplyOptions; |
| typing: TypingController; |
| sessionEntry?: SessionEntry; |
| sessionStore?: Record<string, SessionEntry>; |
| sessionKey?: string; |
| storePath?: string; |
| resumePreviewText?: string; |
| defaultModel: string; |
| agentCfgContextTokens?: number; |
| resolvedVerboseLevel: VerboseLevel; |
| isNewSession: boolean; |
| blockStreamingEnabled: boolean; |
| blockReplyChunking?: { |
| minChars: number; |
| maxChars: number; |
| breakPreference: "paragraph" | "newline" | "sentence"; |
| flushOnParagraph?: boolean; |
| }; |
| resolvedBlockStreamingBreak: "text_end" | "message_end"; |
| sessionCtx: TemplateContext; |
| shouldInjectGroupIntro: boolean; |
| typingMode: TypingMode; |
| }): Promise<ReplyPayload | ReplyPayload[] | undefined> { |
| const { |
| commandBody, |
| followupRun, |
| queueKey, |
| resolvedQueue, |
| shouldSteer, |
| shouldFollowup, |
| isActive, |
| isStreaming, |
| opts, |
| typing, |
| sessionEntry, |
| sessionStore, |
| sessionKey, |
| storePath, |
| resumePreviewText, |
| defaultModel, |
| agentCfgContextTokens, |
| resolvedVerboseLevel, |
| isNewSession, |
| blockStreamingEnabled, |
| blockReplyChunking, |
| resolvedBlockStreamingBreak, |
| sessionCtx, |
| shouldInjectGroupIntro, |
| typingMode, |
| } = params; |
|
|
| let activeSessionEntry = sessionEntry; |
| const activeSessionStore = sessionStore; |
| let activeIsNewSession = isNewSession; |
|
|
| const isHeartbeat = opts?.isHeartbeat === true; |
| const typingSignals = createTypingSignaler({ |
| typing, |
| mode: typingMode, |
| isHeartbeat, |
| }); |
|
|
| const currentUnfinishedTurn = await markSessionUnfinishedTurn({ |
| sessionId: followupRun.run.sessionId, |
| sessionKey, |
| sessionStore: activeSessionStore, |
| storePath, |
| prompt: resumePreviewText ?? commandBody, |
| turnId: generateSecureUuid(), |
| messageChannel: sessionCtx.OriginatingChannel ?? sessionCtx.Provider, |
| channel: |
| resolveOriginMessageProvider({ |
| originatingChannel: sessionCtx.OriginatingChannel, |
| provider: sessionCtx.Surface ?? sessionCtx.Provider, |
| }) ?? undefined, |
| to: sessionCtx.To, |
| accountId: sessionCtx.AccountId, |
| threadId: sessionCtx.MessageThreadId, |
| senderIsOwner: followupRun.run.senderIsOwner, |
| }); |
|
|
| const shouldEmitToolResult = createShouldEmitToolResult({ |
| sessionKey, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
| const shouldEmitToolOutput = createShouldEmitToolOutput({ |
| sessionKey, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
|
|
| const pendingToolTasks = new Set<Promise<void>>(); |
| const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; |
|
|
| const replyToChannel = resolveOriginMessageProvider({ |
| originatingChannel: sessionCtx.OriginatingChannel, |
| provider: sessionCtx.Surface ?? sessionCtx.Provider, |
| }) as OriginatingChannelType | undefined; |
| const replyToMode = resolveReplyToMode( |
| followupRun.run.config, |
| replyToChannel, |
| sessionCtx.AccountId, |
| sessionCtx.ChatType, |
| ); |
| const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel); |
| const cfg = followupRun.run.config; |
| const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({ |
| cfg, |
| sessionKey, |
| workspaceDir: followupRun.run.workspaceDir, |
| }); |
| const blockReplyCoalescing = |
| blockStreamingEnabled && opts?.onBlockReply |
| ? resolveEffectiveBlockStreamingConfig({ |
| cfg, |
| provider: sessionCtx.Provider, |
| accountId: sessionCtx.AccountId, |
| chunking: blockReplyChunking, |
| }).coalescing |
| : undefined; |
| const blockReplyPipeline = |
| blockStreamingEnabled && opts?.onBlockReply |
| ? createBlockReplyPipeline({ |
| onBlockReply: opts.onBlockReply, |
| timeoutMs: blockReplyTimeoutMs, |
| coalescing: blockReplyCoalescing, |
| buffer: createAudioAsVoiceBuffer({ isAudioPayload }), |
| }) |
| : null; |
| const touchActiveSessionEntry = async () => { |
| if (!activeSessionEntry || !activeSessionStore || !sessionKey) { |
| return; |
| } |
| const updatedAt = Date.now(); |
| activeSessionEntry.updatedAt = updatedAt; |
| activeSessionStore[sessionKey] = activeSessionEntry; |
| if (storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ updatedAt }), |
| }); |
| } |
| }; |
|
|
| if (shouldSteer && isStreaming) { |
| const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt); |
| if (steered && !shouldFollowup) { |
| await touchActiveSessionEntry(); |
| typing.cleanup(); |
| return undefined; |
| } |
| } |
|
|
| const activeRunQueueAction = resolveActiveRunQueueAction({ |
| isActive, |
| isHeartbeat, |
| shouldFollowup, |
| queueMode: resolvedQueue.mode, |
| }); |
|
|
| if (activeRunQueueAction === "drop") { |
| typing.cleanup(); |
| return undefined; |
| } |
|
|
| if (activeRunQueueAction === "enqueue-followup") { |
| enqueueFollowupRun(queueKey, followupRun, resolvedQueue); |
| await touchActiveSessionEntry(); |
| typing.cleanup(); |
| return undefined; |
| } |
|
|
| await typingSignals.signalRunStart(); |
|
|
| activeSessionEntry = await runMemoryFlushIfNeeded({ |
| cfg, |
| followupRun, |
| promptForEstimate: followupRun.prompt, |
| sessionCtx, |
| opts, |
| defaultModel, |
| agentCfgContextTokens, |
| resolvedVerboseLevel, |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| isHeartbeat, |
| }); |
|
|
| const runFollowupTurn = createFollowupRunner({ |
| opts, |
| typing, |
| typingMode, |
| queueKey, |
| resolvedQueue, |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| defaultModel, |
| agentCfgContextTokens, |
| }); |
|
|
| const maybeQueueInterruptedRetry = async (options?: { rateLimitBackoff?: boolean }) => { |
| if (!sessionKey || !storePath || !currentUnfinishedTurn) { |
| return false; |
| } |
| const entryForResume = activeSessionStore?.[sessionKey] ?? activeSessionEntry; |
| const maxResumeCount = options?.rateLimitBackoff |
| ? MAX_RATE_LIMIT_INTERRUPTED_RESUME_COUNT |
| : undefined; |
| if (!canAttemptInterruptedResume(entryForResume, { maxResumeCount })) { |
| return false; |
| } |
| const updatedEntry = await markSessionInterruptedResumeAttempt({ |
| sessionKey, |
| sessionStore: activeSessionStore, |
| storePath, |
| }); |
| const nextResumeCount = updatedEntry?.interruptedTurn?.resumeCount ?? 1; |
| const queueSettings = |
| options?.rateLimitBackoff && |
| (resolvedQueue.debounceMs ?? 0) < RATE_LIMIT_INTERRUPTED_RESUME_DELAY_MS |
| ? { |
| ...resolvedQueue, |
| debounceMs: RATE_LIMIT_INTERRUPTED_RESUME_DELAY_MS, |
| } |
| : resolvedQueue; |
| const enqueued = enqueueFollowupRun( |
| queueKey, |
| { |
| ...followupRun, |
| enqueuedAt: Date.now(), |
| messageId: `auto-resume:${currentUnfinishedTurn.turnId}:${nextResumeCount}`, |
| summaryLine: followupRun.summaryLine |
| ? `Auto-resume: ${followupRun.summaryLine}` |
| : "Auto-resume interrupted turn", |
| }, |
| queueSettings, |
| ); |
| if (enqueued && updatedEntry) { |
| activeSessionEntry = updatedEntry; |
| } |
| return enqueued; |
| }; |
|
|
| let responseUsageLine: string | undefined; |
| type SessionResetOptions = { |
| failureLabel: string; |
| buildLogMessage: (nextSessionId: string) => string; |
| cleanupTranscripts?: boolean; |
| }; |
| const resetSession = async ({ |
| failureLabel, |
| buildLogMessage, |
| cleanupTranscripts, |
| }: SessionResetOptions): Promise<boolean> => { |
| if (!sessionKey || !activeSessionStore || !storePath) { |
| return false; |
| } |
| const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry; |
| if (!prevEntry) { |
| return false; |
| } |
| const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined; |
| const nextSessionId = generateSecureUuid(); |
| const nextEntry: SessionEntry = { |
| ...prevEntry, |
| sessionId: nextSessionId, |
| updatedAt: Date.now(), |
| systemSent: false, |
| abortedLastRun: false, |
| modelProvider: undefined, |
| model: undefined, |
| contextTokens: undefined, |
| systemPromptReport: undefined, |
| fallbackNoticeSelectedModel: undefined, |
| fallbackNoticeActiveModel: undefined, |
| fallbackNoticeReason: undefined, |
| }; |
| const agentId = resolveAgentIdFromSessionKey(sessionKey); |
| const nextSessionFile = resolveSessionTranscriptPath( |
| nextSessionId, |
| agentId, |
| sessionCtx.MessageThreadId, |
| ); |
| nextEntry.sessionFile = nextSessionFile; |
| activeSessionStore[sessionKey] = nextEntry; |
| try { |
| await updateSessionStore(storePath, (store) => { |
| store[sessionKey] = nextEntry; |
| }); |
| } catch (err) { |
| defaultRuntime.error( |
| `Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`, |
| ); |
| } |
| followupRun.run.sessionId = nextSessionId; |
| followupRun.run.sessionFile = nextSessionFile; |
| activeSessionEntry = nextEntry; |
| activeIsNewSession = true; |
| defaultRuntime.error(buildLogMessage(nextSessionId)); |
| if (cleanupTranscripts && prevSessionId) { |
| const transcriptCandidates = new Set<string>(); |
| const resolved = resolveSessionFilePath( |
| prevSessionId, |
| prevEntry, |
| resolveSessionFilePathOptions({ agentId, storePath }), |
| ); |
| if (resolved) { |
| transcriptCandidates.add(resolved); |
| } |
| transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId)); |
| for (const candidate of transcriptCandidates) { |
| try { |
| fs.unlinkSync(candidate); |
| } catch { |
| |
| } |
| } |
| } |
| return true; |
| }; |
| const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> => |
| resetSession({ |
| failureLabel: "compaction failure", |
| buildLogMessage: (nextSessionId) => |
| `Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`, |
| }); |
| const resetSessionAfterRoleOrderingConflict = async (reason: string): Promise<boolean> => |
| resetSession({ |
| failureLabel: "role ordering conflict", |
| buildLogMessage: (nextSessionId) => |
| `Role ordering conflict (${reason}). Restarting session ${sessionKey} -> ${nextSessionId}.`, |
| cleanupTranscripts: true, |
| }); |
| try { |
| const runStartedAt = Date.now(); |
| const runOutcome = await runAgentTurnWithFallback({ |
| commandBody, |
| followupRun, |
| sessionCtx, |
| opts, |
| typingSignals, |
| blockReplyPipeline, |
| blockStreamingEnabled, |
| blockReplyChunking, |
| resolvedBlockStreamingBreak, |
| applyReplyToMode, |
| shouldEmitToolResult, |
| shouldEmitToolOutput, |
| pendingToolTasks, |
| resetSessionAfterCompactionFailure, |
| resetSessionAfterRoleOrderingConflict, |
| isHeartbeat, |
| sessionKey, |
| getActiveSessionEntry: () => activeSessionEntry, |
| activeSessionStore, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
|
|
| if (runOutcome.kind === "final") { |
| return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn); |
| } |
|
|
| const { |
| runId, |
| runResult, |
| fallbackProvider, |
| fallbackModel, |
| fallbackAttempts, |
| directlySentBlockKeys, |
| sawToolActivity, |
| deliveredVisibleToolReply, |
| hasActiveBackgroundTask, |
| } = runOutcome; |
| let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome; |
|
|
| if ( |
| shouldInjectGroupIntro && |
| activeSessionEntry && |
| activeSessionStore && |
| sessionKey && |
| activeSessionEntry.groupActivationNeedsSystemIntro |
| ) { |
| const updatedAt = Date.now(); |
| activeSessionEntry.groupActivationNeedsSystemIntro = false; |
| activeSessionEntry.updatedAt = updatedAt; |
| activeSessionStore[sessionKey] = activeSessionEntry; |
| if (storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ |
| groupActivationNeedsSystemIntro: false, |
| updatedAt, |
| }), |
| }); |
| } |
| } |
|
|
| const payloadArray = runResult.payloads ?? []; |
|
|
| if (blockReplyPipeline) { |
| await blockReplyPipeline.flush({ force: true }); |
| blockReplyPipeline.stop(); |
| } |
| if (pendingToolTasks.size > 0) { |
| await Promise.allSettled(pendingToolTasks); |
| } |
|
|
| const usage = runResult.meta?.agentMeta?.usage; |
| const promptTokens = runResult.meta?.agentMeta?.promptTokens; |
| const modelUsed = runResult.meta?.agentMeta?.model ?? fallbackModel ?? defaultModel; |
| const providerUsed = |
| runResult.meta?.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider; |
| const verboseEnabled = resolvedVerboseLevel !== "off"; |
| const selectedProvider = followupRun.run.provider; |
| const selectedModel = followupRun.run.model; |
| const fallbackStateEntry = |
| activeSessionEntry ?? (sessionKey ? activeSessionStore?.[sessionKey] : undefined); |
| const fallbackTransition = resolveFallbackTransition({ |
| selectedProvider, |
| selectedModel, |
| activeProvider: providerUsed, |
| activeModel: modelUsed, |
| attempts: fallbackAttempts, |
| state: fallbackStateEntry, |
| }); |
| if (fallbackTransition.stateChanged) { |
| if (fallbackStateEntry) { |
| fallbackStateEntry.fallbackNoticeSelectedModel = fallbackTransition.nextState.selectedModel; |
| fallbackStateEntry.fallbackNoticeActiveModel = fallbackTransition.nextState.activeModel; |
| fallbackStateEntry.fallbackNoticeReason = fallbackTransition.nextState.reason; |
| fallbackStateEntry.updatedAt = Date.now(); |
| activeSessionEntry = fallbackStateEntry; |
| } |
| if (sessionKey && fallbackStateEntry && activeSessionStore) { |
| activeSessionStore[sessionKey] = fallbackStateEntry; |
| } |
| if (sessionKey && storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ |
| fallbackNoticeSelectedModel: fallbackTransition.nextState.selectedModel, |
| fallbackNoticeActiveModel: fallbackTransition.nextState.activeModel, |
| fallbackNoticeReason: fallbackTransition.nextState.reason, |
| }), |
| }); |
| } |
| } |
| const cliSessionId = isCliProvider(providerUsed, cfg) |
| ? runResult.meta?.agentMeta?.sessionId?.trim() |
| : undefined; |
| const contextTokensUsed = |
| agentCfgContextTokens ?? |
| lookupContextTokens(modelUsed) ?? |
| activeSessionEntry?.contextTokens ?? |
| DEFAULT_CONTEXT_TOKENS; |
|
|
| await persistRunSessionUsage({ |
| storePath, |
| sessionKey, |
| usage, |
| lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, |
| promptTokens, |
| modelUsed, |
| providerUsed, |
| contextTokensUsed, |
| systemPromptReport: runResult.meta?.systemPromptReport, |
| cliSessionId, |
| }); |
|
|
| const resolveTurnOutcome = (replyCount: number) => |
| resolveInteractiveTurnOutcome({ |
| replyCount, |
| deliveredVisibleBlockReply: |
| Boolean(blockReplyPipeline?.didStream()) || Boolean(directlySentBlockKeys?.size), |
| deliveredVisibleToolReply, |
| deliveredViaMessagingTool: |
| runResult.didSendViaMessagingTool === true || |
| runResult.didSendDeterministicApprovalPrompt === true, |
| sawToolActivity: |
| sawToolActivity || |
| runResult.meta?.stopReason === "toolUse" || |
| runResult.meta?.stopReason === "tool_calls", |
| hadExecutionError: runResult.meta?.stopReason === "error" || Boolean(runResult.meta?.error), |
| hasActiveBackgroundTask, |
| }); |
| const shouldBackoffAutoResume = shouldBackoffInterruptedRetry({ |
| fallbackAttempts, |
| runError: runResult.meta?.error, |
| }); |
|
|
| |
| |
| |
| if (payloadArray.length === 0) { |
| const turnOutcome = resolveTurnOutcome(0); |
| if (turnOutcome.shouldSynthesizeDegradedReply) { |
| const autoResumeQueued = await maybeQueueInterruptedRetry({ |
| rateLimitBackoff: shouldBackoffAutoResume, |
| }); |
| return finalizeWithFollowup( |
| hasActiveBackgroundTask |
| ? autoResumeQueued |
| ? buildBackgroundTaskStillRunningAutoResumePayload() |
| : buildBackgroundTaskStillRunningPayload() |
| : autoResumeQueued |
| ? buildMissingFinalReplyAutoResumePayload() |
| : buildMissingFinalReplyDegradedPayload(), |
| queueKey, |
| runFollowupTurn, |
| ); |
| } |
| return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); |
| } |
|
|
| const payloadResult = await buildReplyPayloads({ |
| payloads: payloadArray, |
| isHeartbeat, |
| didLogHeartbeatStrip, |
| blockStreamingEnabled, |
| blockReplyPipeline, |
| directlySentBlockKeys, |
| replyToMode, |
| replyToChannel, |
| currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, |
| messageProvider: followupRun.run.messageProvider, |
| messagingToolSentTexts: runResult.messagingToolSentTexts, |
| messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls, |
| messagingToolSentTargets: runResult.messagingToolSentTargets, |
| originatingChannel: sessionCtx.OriginatingChannel, |
| originatingTo: resolveOriginMessageTo({ |
| originatingTo: sessionCtx.OriginatingTo, |
| to: sessionCtx.To, |
| }), |
| accountId: sessionCtx.AccountId, |
| normalizeMediaPaths: normalizeReplyMediaPaths, |
| }); |
| const { replyPayloads } = payloadResult; |
| didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip; |
|
|
| if (replyPayloads.length === 0) { |
| const turnOutcome = resolveTurnOutcome(0); |
| if (turnOutcome.shouldSynthesizeDegradedReply) { |
| const autoResumeQueued = await maybeQueueInterruptedRetry({ |
| rateLimitBackoff: shouldBackoffAutoResume, |
| }); |
| return finalizeWithFollowup( |
| hasActiveBackgroundTask |
| ? autoResumeQueued |
| ? buildBackgroundTaskStillRunningAutoResumePayload() |
| : buildBackgroundTaskStillRunningPayload() |
| : autoResumeQueued |
| ? buildMissingFinalReplyAutoResumePayload() |
| : buildMissingFinalReplyDegradedPayload(), |
| queueKey, |
| runFollowupTurn, |
| ); |
| } |
| return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); |
| } |
|
|
| const successfulCronAdds = runResult.successfulCronAdds ?? 0; |
| const hasReminderCommitment = replyPayloads.some( |
| (payload) => |
| !payload.isError && |
| typeof payload.text === "string" && |
| hasUnbackedReminderCommitment(payload.text), |
| ); |
| |
| |
| const coveredByExistingCron = |
| hasReminderCommitment && successfulCronAdds === 0 |
| ? await hasSessionRelatedCronJobs({ |
| cronStorePath: cfg.cron?.store, |
| sessionKey, |
| }) |
| : false; |
| const guardedReplyPayloads = |
| hasReminderCommitment && successfulCronAdds === 0 && !coveredByExistingCron |
| ? appendUnscheduledReminderNote(replyPayloads) |
| : replyPayloads; |
|
|
| await signalTypingIfNeeded(guardedReplyPayloads, typingSignals); |
|
|
| if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) { |
| const input = usage.input ?? 0; |
| const output = usage.output ?? 0; |
| const cacheRead = usage.cacheRead ?? 0; |
| const cacheWrite = usage.cacheWrite ?? 0; |
| const promptTokens = input + cacheRead + cacheWrite; |
| const totalTokens = usage.total ?? promptTokens + output; |
| const costConfig = resolveModelCostConfig({ |
| provider: providerUsed, |
| model: modelUsed, |
| config: cfg, |
| }); |
| const costUsd = estimateUsageCost({ usage, cost: costConfig }); |
| emitDiagnosticEvent({ |
| type: "model.usage", |
| sessionKey, |
| sessionId: followupRun.run.sessionId, |
| channel: replyToChannel, |
| provider: providerUsed, |
| model: modelUsed, |
| usage: { |
| input, |
| output, |
| cacheRead, |
| cacheWrite, |
| promptTokens, |
| total: totalTokens, |
| }, |
| lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, |
| context: { |
| limit: contextTokensUsed, |
| used: totalTokens, |
| }, |
| costUsd, |
| durationMs: Date.now() - runStartedAt, |
| }); |
| } |
|
|
| const responseUsageRaw = |
| activeSessionEntry?.responseUsage ?? |
| (sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined); |
| const responseUsageMode = resolveResponseUsageMode(responseUsageRaw); |
| if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) { |
| const authMode = resolveModelAuthMode(providerUsed, cfg); |
| const showCost = authMode === "api-key"; |
| const costConfig = showCost |
| ? resolveModelCostConfig({ |
| provider: providerUsed, |
| model: modelUsed, |
| config: cfg, |
| }) |
| : undefined; |
| let formatted = formatResponseUsageLine({ |
| usage, |
| showCost, |
| costConfig, |
| }); |
| if (formatted && responseUsageMode === "full" && sessionKey) { |
| formatted = `${formatted} · session \`${sessionKey}\``; |
| } |
| if (formatted) { |
| responseUsageLine = formatted; |
| } |
| } |
|
|
| |
| let finalPayloads = guardedReplyPayloads; |
| const verboseNotices: ReplyPayload[] = []; |
|
|
| if (verboseEnabled && activeIsNewSession) { |
| verboseNotices.push({ text: `🧭 New session: ${followupRun.run.sessionId}` }); |
| } |
|
|
| if (fallbackTransition.fallbackTransitioned) { |
| emitAgentEvent({ |
| runId, |
| sessionKey, |
| stream: "lifecycle", |
| data: { |
| phase: "fallback", |
| selectedProvider, |
| selectedModel, |
| activeProvider: providerUsed, |
| activeModel: modelUsed, |
| reasonSummary: fallbackTransition.reasonSummary, |
| attemptSummaries: fallbackTransition.attemptSummaries, |
| attempts: fallbackAttempts, |
| }, |
| }); |
| if (verboseEnabled) { |
| const fallbackNotice = buildFallbackNotice({ |
| selectedProvider, |
| selectedModel, |
| activeProvider: providerUsed, |
| activeModel: modelUsed, |
| attempts: fallbackAttempts, |
| }); |
| if (fallbackNotice) { |
| verboseNotices.push({ text: fallbackNotice }); |
| } |
| } |
| } |
| if (fallbackTransition.fallbackCleared) { |
| emitAgentEvent({ |
| runId, |
| sessionKey, |
| stream: "lifecycle", |
| data: { |
| phase: "fallback_cleared", |
| selectedProvider, |
| selectedModel, |
| activeProvider: providerUsed, |
| activeModel: modelUsed, |
| previousActiveModel: fallbackTransition.previousState.activeModel, |
| }, |
| }); |
| if (verboseEnabled) { |
| verboseNotices.push({ |
| text: buildFallbackClearedNotice({ |
| selectedProvider, |
| selectedModel, |
| previousActiveModel: fallbackTransition.previousState.activeModel, |
| }), |
| }); |
| } |
| } |
|
|
| if (autoCompactionCompleted) { |
| const count = await incrementRunCompactionCount({ |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, |
| contextTokensUsed, |
| }); |
|
|
| |
| if (sessionKey) { |
| const workspaceDir = process.cwd(); |
| readPostCompactionContext(workspaceDir, cfg) |
| .then((contextContent) => { |
| if (contextContent) { |
| enqueueSystemEvent(contextContent, { sessionKey }); |
| } |
| }) |
| .catch(() => { |
| |
| }); |
| } |
|
|
| if (verboseEnabled) { |
| const suffix = typeof count === "number" ? ` (count ${count})` : ""; |
| verboseNotices.push({ text: `🧹 Auto-compaction complete${suffix}.` }); |
| } |
| } |
| if (verboseNotices.length > 0) { |
| finalPayloads = [...verboseNotices, ...finalPayloads]; |
| } |
| if (responseUsageLine) { |
| finalPayloads = appendUsageLine(finalPayloads, responseUsageLine); |
| } |
|
|
| return finalizeWithFollowup( |
| finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, |
| queueKey, |
| runFollowupTurn, |
| ); |
| } catch (error) { |
| |
| |
| finalizeWithFollowup(undefined, queueKey, runFollowupTurn); |
| throw error; |
| } finally { |
| blockReplyPipeline?.stop(); |
| typing.markRunComplete(); |
| |
| |
| |
| |
| |
| |
| typing.markDispatchIdle(); |
| } |
| } |
|
|