| import crypto from "node:crypto"; |
| import fs from "node:fs"; |
| import { lookupContextTokens } from "../../agents/context.js"; |
| import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; |
| import { resolveModelAuthMode } from "../../agents/model-auth.js"; |
| import { isCliProvider } from "../../agents/model-selection.js"; |
| import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js"; |
| import { hasNonzeroUsage } from "../../agents/usage.js"; |
| import { |
| resolveAgentIdFromSessionKey, |
| resolveSessionFilePath, |
| resolveSessionTranscriptPath, |
| type SessionEntry, |
| updateSessionStore, |
| updateSessionStoreEntry, |
| } from "../../config/sessions.js"; |
| import type { TypingMode } from "../../config/types.js"; |
| import { defaultRuntime } from "../../runtime.js"; |
| import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js"; |
| import type { OriginatingChannelType, TemplateContext } from "../templating.js"; |
| import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js"; |
| import type { GetReplyOptions, ReplyPayload } from "../types.js"; |
| import { runAgentTurnWithFallback } from "./agent-runner-execution.js"; |
| import { |
| createShouldEmitToolOutput, |
| createShouldEmitToolResult, |
| finalizeWithFollowup, |
| isAudioPayload, |
| signalTypingIfNeeded, |
| } from "./agent-runner-helpers.js"; |
| import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js"; |
| import { buildReplyPayloads } from "./agent-runner-payloads.js"; |
| import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.js"; |
| import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; |
| import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; |
| import { createFollowupRunner } from "./followup-runner.js"; |
| import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; |
| import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; |
| import { persistSessionUsageUpdate } from "./session-usage.js"; |
| import { incrementCompactionCount } from "./session-updates.js"; |
| import type { TypingController } from "./typing.js"; |
| import { createTypingSignaler } from "./typing-mode.js"; |
| import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; |
|
|
| const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; |
|
|
| export async function runReplyAgent(params: { |
| commandBody: string; |
| followupRun: FollowupRun; |
| queueKey: string; |
| resolvedQueue: QueueSettings; |
| shouldSteer: boolean; |
| shouldFollowup: boolean; |
| isActive: boolean; |
| isStreaming: boolean; |
| opts?: GetReplyOptions; |
| typing: TypingController; |
| sessionEntry?: SessionEntry; |
| sessionStore?: Record<string, SessionEntry>; |
| sessionKey?: string; |
| storePath?: string; |
| defaultModel: string; |
| agentCfgContextTokens?: number; |
| resolvedVerboseLevel: VerboseLevel; |
| isNewSession: boolean; |
| blockStreamingEnabled: boolean; |
| blockReplyChunking?: { |
| minChars: number; |
| maxChars: number; |
| breakPreference: "paragraph" | "newline" | "sentence"; |
| }; |
| resolvedBlockStreamingBreak: "text_end" | "message_end"; |
| sessionCtx: TemplateContext; |
| shouldInjectGroupIntro: boolean; |
| typingMode: TypingMode; |
| }): Promise<ReplyPayload | ReplyPayload[] | undefined> { |
| const { |
| commandBody, |
| followupRun, |
| queueKey, |
| resolvedQueue, |
| shouldSteer, |
| shouldFollowup, |
| isActive, |
| isStreaming, |
| opts, |
| typing, |
| sessionEntry, |
| sessionStore, |
| sessionKey, |
| storePath, |
| defaultModel, |
| agentCfgContextTokens, |
| resolvedVerboseLevel, |
| isNewSession, |
| blockStreamingEnabled, |
| blockReplyChunking, |
| resolvedBlockStreamingBreak, |
| sessionCtx, |
| shouldInjectGroupIntro, |
| typingMode, |
| } = params; |
|
|
| let activeSessionEntry = sessionEntry; |
| const activeSessionStore = sessionStore; |
| let activeIsNewSession = isNewSession; |
|
|
| const isHeartbeat = opts?.isHeartbeat === true; |
| const typingSignals = createTypingSignaler({ |
| typing, |
| mode: typingMode, |
| isHeartbeat, |
| }); |
|
|
| const shouldEmitToolResult = createShouldEmitToolResult({ |
| sessionKey, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
| const shouldEmitToolOutput = createShouldEmitToolOutput({ |
| sessionKey, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
|
|
| const pendingToolTasks = new Set<Promise<void>>(); |
| const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; |
|
|
| const replyToChannel = |
| sessionCtx.OriginatingChannel ?? |
| ((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as |
| | OriginatingChannelType |
| | undefined); |
| const replyToMode = resolveReplyToMode( |
| followupRun.run.config, |
| replyToChannel, |
| sessionCtx.AccountId, |
| sessionCtx.ChatType, |
| ); |
| const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel); |
| const cfg = followupRun.run.config; |
| const blockReplyCoalescing = |
| blockStreamingEnabled && opts?.onBlockReply |
| ? resolveBlockStreamingCoalescing( |
| cfg, |
| sessionCtx.Provider, |
| sessionCtx.AccountId, |
| blockReplyChunking, |
| ) |
| : undefined; |
| const blockReplyPipeline = |
| blockStreamingEnabled && opts?.onBlockReply |
| ? createBlockReplyPipeline({ |
| onBlockReply: opts.onBlockReply, |
| timeoutMs: blockReplyTimeoutMs, |
| coalescing: blockReplyCoalescing, |
| buffer: createAudioAsVoiceBuffer({ isAudioPayload }), |
| }) |
| : null; |
|
|
| if (shouldSteer && isStreaming) { |
| const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt); |
| if (steered && !shouldFollowup) { |
| if (activeSessionEntry && activeSessionStore && sessionKey) { |
| const updatedAt = Date.now(); |
| activeSessionEntry.updatedAt = updatedAt; |
| activeSessionStore[sessionKey] = activeSessionEntry; |
| if (storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ updatedAt }), |
| }); |
| } |
| } |
| typing.cleanup(); |
| return undefined; |
| } |
| } |
|
|
| if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) { |
| enqueueFollowupRun(queueKey, followupRun, resolvedQueue); |
| if (activeSessionEntry && activeSessionStore && sessionKey) { |
| const updatedAt = Date.now(); |
| activeSessionEntry.updatedAt = updatedAt; |
| activeSessionStore[sessionKey] = activeSessionEntry; |
| if (storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ updatedAt }), |
| }); |
| } |
| } |
| typing.cleanup(); |
| return undefined; |
| } |
|
|
| await typingSignals.signalRunStart(); |
|
|
| activeSessionEntry = await runMemoryFlushIfNeeded({ |
| cfg, |
| followupRun, |
| sessionCtx, |
| opts, |
| defaultModel, |
| agentCfgContextTokens, |
| resolvedVerboseLevel, |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| isHeartbeat, |
| }); |
|
|
| const runFollowupTurn = createFollowupRunner({ |
| opts, |
| typing, |
| typingMode, |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| defaultModel, |
| agentCfgContextTokens, |
| }); |
|
|
| let responseUsageLine: string | undefined; |
| type SessionResetOptions = { |
| failureLabel: string; |
| buildLogMessage: (nextSessionId: string) => string; |
| cleanupTranscripts?: boolean; |
| }; |
| const resetSession = async ({ |
| failureLabel, |
| buildLogMessage, |
| cleanupTranscripts, |
| }: SessionResetOptions): Promise<boolean> => { |
| if (!sessionKey || !activeSessionStore || !storePath) { |
| return false; |
| } |
| const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry; |
| if (!prevEntry) { |
| return false; |
| } |
| const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined; |
| const nextSessionId = crypto.randomUUID(); |
| const nextEntry: SessionEntry = { |
| ...prevEntry, |
| sessionId: nextSessionId, |
| updatedAt: Date.now(), |
| systemSent: false, |
| abortedLastRun: false, |
| }; |
| const agentId = resolveAgentIdFromSessionKey(sessionKey); |
| const nextSessionFile = resolveSessionTranscriptPath( |
| nextSessionId, |
| agentId, |
| sessionCtx.MessageThreadId, |
| ); |
| nextEntry.sessionFile = nextSessionFile; |
| activeSessionStore[sessionKey] = nextEntry; |
| try { |
| await updateSessionStore(storePath, (store) => { |
| store[sessionKey] = nextEntry; |
| }); |
| } catch (err) { |
| defaultRuntime.error( |
| `Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`, |
| ); |
| } |
| followupRun.run.sessionId = nextSessionId; |
| followupRun.run.sessionFile = nextSessionFile; |
| activeSessionEntry = nextEntry; |
| activeIsNewSession = true; |
| defaultRuntime.error(buildLogMessage(nextSessionId)); |
| if (cleanupTranscripts && prevSessionId) { |
| const transcriptCandidates = new Set<string>(); |
| const resolved = resolveSessionFilePath(prevSessionId, prevEntry, { agentId }); |
| if (resolved) { |
| transcriptCandidates.add(resolved); |
| } |
| transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId)); |
| for (const candidate of transcriptCandidates) { |
| try { |
| fs.unlinkSync(candidate); |
| } catch { |
| |
| } |
| } |
| } |
| return true; |
| }; |
| const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> => |
| resetSession({ |
| failureLabel: "compaction failure", |
| buildLogMessage: (nextSessionId) => |
| `Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`, |
| }); |
| const resetSessionAfterRoleOrderingConflict = async (reason: string): Promise<boolean> => |
| resetSession({ |
| failureLabel: "role ordering conflict", |
| buildLogMessage: (nextSessionId) => |
| `Role ordering conflict (${reason}). Restarting session ${sessionKey} -> ${nextSessionId}.`, |
| cleanupTranscripts: true, |
| }); |
| try { |
| const runStartedAt = Date.now(); |
| const runOutcome = await runAgentTurnWithFallback({ |
| commandBody, |
| followupRun, |
| sessionCtx, |
| opts, |
| typingSignals, |
| blockReplyPipeline, |
| blockStreamingEnabled, |
| blockReplyChunking, |
| resolvedBlockStreamingBreak, |
| applyReplyToMode, |
| shouldEmitToolResult, |
| shouldEmitToolOutput, |
| pendingToolTasks, |
| resetSessionAfterCompactionFailure, |
| resetSessionAfterRoleOrderingConflict, |
| isHeartbeat, |
| sessionKey, |
| getActiveSessionEntry: () => activeSessionEntry, |
| activeSessionStore, |
| storePath, |
| resolvedVerboseLevel, |
| }); |
|
|
| if (runOutcome.kind === "final") { |
| return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn); |
| } |
|
|
| const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome; |
| let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome; |
|
|
| if ( |
| shouldInjectGroupIntro && |
| activeSessionEntry && |
| activeSessionStore && |
| sessionKey && |
| activeSessionEntry.groupActivationNeedsSystemIntro |
| ) { |
| const updatedAt = Date.now(); |
| activeSessionEntry.groupActivationNeedsSystemIntro = false; |
| activeSessionEntry.updatedAt = updatedAt; |
| activeSessionStore[sessionKey] = activeSessionEntry; |
| if (storePath) { |
| await updateSessionStoreEntry({ |
| storePath, |
| sessionKey, |
| update: async () => ({ |
| groupActivationNeedsSystemIntro: false, |
| updatedAt, |
| }), |
| }); |
| } |
| } |
|
|
| const payloadArray = runResult.payloads ?? []; |
|
|
| if (blockReplyPipeline) { |
| await blockReplyPipeline.flush({ force: true }); |
| blockReplyPipeline.stop(); |
| } |
| if (pendingToolTasks.size > 0) { |
| await Promise.allSettled(pendingToolTasks); |
| } |
|
|
| const usage = runResult.meta.agentMeta?.usage; |
| const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; |
| const providerUsed = |
| runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider; |
| const cliSessionId = isCliProvider(providerUsed, cfg) |
| ? runResult.meta.agentMeta?.sessionId?.trim() |
| : undefined; |
| const contextTokensUsed = |
| agentCfgContextTokens ?? |
| lookupContextTokens(modelUsed) ?? |
| activeSessionEntry?.contextTokens ?? |
| DEFAULT_CONTEXT_TOKENS; |
|
|
| await persistSessionUsageUpdate({ |
| storePath, |
| sessionKey, |
| usage, |
| modelUsed, |
| providerUsed, |
| contextTokensUsed, |
| systemPromptReport: runResult.meta.systemPromptReport, |
| cliSessionId, |
| }); |
|
|
| |
| |
| |
| if (payloadArray.length === 0) { |
| return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); |
| } |
|
|
| const payloadResult = buildReplyPayloads({ |
| payloads: payloadArray, |
| isHeartbeat, |
| didLogHeartbeatStrip, |
| blockStreamingEnabled, |
| blockReplyPipeline, |
| directlySentBlockKeys, |
| replyToMode, |
| replyToChannel, |
| currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid, |
| messageProvider: followupRun.run.messageProvider, |
| messagingToolSentTexts: runResult.messagingToolSentTexts, |
| messagingToolSentTargets: runResult.messagingToolSentTargets, |
| originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To, |
| accountId: sessionCtx.AccountId, |
| }); |
| const { replyPayloads } = payloadResult; |
| didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip; |
|
|
| if (replyPayloads.length === 0) { |
| return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); |
| } |
|
|
| await signalTypingIfNeeded(replyPayloads, typingSignals); |
|
|
| if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) { |
| const input = usage.input ?? 0; |
| const output = usage.output ?? 0; |
| const cacheRead = usage.cacheRead ?? 0; |
| const cacheWrite = usage.cacheWrite ?? 0; |
| const promptTokens = input + cacheRead + cacheWrite; |
| const totalTokens = usage.total ?? promptTokens + output; |
| const costConfig = resolveModelCostConfig({ |
| provider: providerUsed, |
| model: modelUsed, |
| config: cfg, |
| }); |
| const costUsd = estimateUsageCost({ usage, cost: costConfig }); |
| emitDiagnosticEvent({ |
| type: "model.usage", |
| sessionKey, |
| sessionId: followupRun.run.sessionId, |
| channel: replyToChannel, |
| provider: providerUsed, |
| model: modelUsed, |
| usage: { |
| input, |
| output, |
| cacheRead, |
| cacheWrite, |
| promptTokens, |
| total: totalTokens, |
| }, |
| context: { |
| limit: contextTokensUsed, |
| used: totalTokens, |
| }, |
| costUsd, |
| durationMs: Date.now() - runStartedAt, |
| }); |
| } |
|
|
| const responseUsageRaw = |
| activeSessionEntry?.responseUsage ?? |
| (sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined); |
| const responseUsageMode = resolveResponseUsageMode(responseUsageRaw); |
| if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) { |
| const authMode = resolveModelAuthMode(providerUsed, cfg); |
| const showCost = authMode === "api-key"; |
| const costConfig = showCost |
| ? resolveModelCostConfig({ |
| provider: providerUsed, |
| model: modelUsed, |
| config: cfg, |
| }) |
| : undefined; |
| let formatted = formatResponseUsageLine({ |
| usage, |
| showCost, |
| costConfig, |
| }); |
| if (formatted && responseUsageMode === "full" && sessionKey) { |
| formatted = `${formatted} · session ${sessionKey}`; |
| } |
| if (formatted) { |
| responseUsageLine = formatted; |
| } |
| } |
|
|
| |
| let finalPayloads = replyPayloads; |
| const verboseEnabled = resolvedVerboseLevel !== "off"; |
| if (autoCompactionCompleted) { |
| const count = await incrementCompactionCount({ |
| sessionEntry: activeSessionEntry, |
| sessionStore: activeSessionStore, |
| sessionKey, |
| storePath, |
| }); |
| if (verboseEnabled) { |
| const suffix = typeof count === "number" ? ` (count ${count})` : ""; |
| finalPayloads = [{ text: `🧹 Auto-compaction complete${suffix}.` }, ...finalPayloads]; |
| } |
| } |
| if (verboseEnabled && activeIsNewSession) { |
| finalPayloads = [{ text: `🧭 New session: ${followupRun.run.sessionId}` }, ...finalPayloads]; |
| } |
| if (responseUsageLine) { |
| finalPayloads = appendUsageLine(finalPayloads, responseUsageLine); |
| } |
|
|
| return finalizeWithFollowup( |
| finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, |
| queueKey, |
| runFollowupTurn, |
| ); |
| } finally { |
| blockReplyPipeline?.stop(); |
| typing.markRunComplete(); |
| } |
| } |
|
|