| import fs from "node:fs"; |
| import path from "node:path"; |
| import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; |
| import { resolveSessionAgentId } from "../../agents/agent-scope.js"; |
| import { resolveThinkingDefault } from "../../agents/model-selection.js"; |
| import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; |
| import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; |
| import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; |
| import type { MsgContext } from "../../auto-reply/templating.js"; |
| import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; |
| import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; |
| import { resolveSessionFilePath } from "../../config/sessions.js"; |
| import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js"; |
| import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; |
| import { resolveSendPolicy } from "../../sessions/send-policy.js"; |
| import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; |
| import { |
| stripInlineDirectiveTagsForDisplay, |
| stripInlineDirectiveTagsFromMessageForDisplay, |
| } from "../../utils/directive-tags.js"; |
| import { |
| INTERNAL_MESSAGE_CHANNEL, |
| isGatewayCliClient, |
| isWebchatClient, |
| normalizeMessageChannel, |
| } from "../../utils/message-channel.js"; |
| import { |
| abortChatRunById, |
| type ChatAbortControllerEntry, |
| type ChatAbortOps, |
| isChatStopCommandText, |
| resolveChatRunExpiresAtMs, |
| } from "../chat-abort.js"; |
| import { type ChatImageContent, parseMessageWithAttachments } from "../chat-attachments.js"; |
| import { stripEnvelopeFromMessage, stripEnvelopeFromMessages } from "../chat-sanitize.js"; |
| import { ADMIN_SCOPE } from "../method-scopes.js"; |
| import { |
| GATEWAY_CLIENT_CAPS, |
| GATEWAY_CLIENT_MODES, |
| GATEWAY_CLIENT_NAMES, |
| hasGatewayClientCap, |
| } from "../protocol/client-info.js"; |
| import { |
| ErrorCodes, |
| errorShape, |
| formatValidationErrors, |
| validateChatAbortParams, |
| validateChatHistoryParams, |
| validateChatInjectParams, |
| validateChatSendParams, |
| } from "../protocol/index.js"; |
| import { CHAT_SEND_SESSION_KEY_MAX_LENGTH } from "../protocol/schema/primitives.js"; |
| import { getMaxChatHistoryMessagesBytes } from "../server-constants.js"; |
| import { |
| capArrayByJsonBytes, |
| loadSessionEntry, |
| readSessionMessages, |
| resolveSessionModelRef, |
| } from "../session-utils.js"; |
| import { formatForLog } from "../ws-log.js"; |
| import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; |
| import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js"; |
| import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js"; |
| import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; |
| import type { |
| GatewayRequestContext, |
| GatewayRequestHandlerOptions, |
| GatewayRequestHandlers, |
| } from "./types.js"; |
|
|
| type TranscriptAppendResult = { |
| ok: boolean; |
| messageId?: string; |
| message?: Record<string, unknown>; |
| error?: string; |
| }; |
|
|
| type AbortOrigin = "rpc" | "stop-command"; |
|
|
| type AbortedPartialSnapshot = { |
| runId: string; |
| sessionId: string; |
| text: string; |
| abortOrigin: AbortOrigin; |
| }; |
|
|
| type ChatAbortRequester = { |
| connId?: string; |
| deviceId?: string; |
| isAdmin: boolean; |
| }; |
|
|
| const CHAT_HISTORY_TEXT_MAX_CHARS = 12_000; |
| const CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES = 128 * 1024; |
| const CHAT_HISTORY_OVERSIZED_PLACEHOLDER = "[chat.history omitted: message too large]"; |
| let chatHistoryPlaceholderEmitCount = 0; |
| const CHANNEL_AGNOSTIC_SESSION_SCOPES = new Set([ |
| "main", |
| "direct", |
| "dm", |
| "group", |
| "channel", |
| "cron", |
| "run", |
| "subagent", |
| "acp", |
| "thread", |
| "topic", |
| ]); |
| const CHANNEL_SCOPED_SESSION_SHAPES = new Set(["direct", "dm", "group", "channel"]); |
|
|
| type ChatSendDeliveryEntry = { |
| deliveryContext?: { |
| channel?: string; |
| to?: string; |
| accountId?: string; |
| threadId?: string | number; |
| }; |
| lastChannel?: string; |
| lastTo?: string; |
| lastAccountId?: string; |
| lastThreadId?: string | number; |
| }; |
|
|
| type ChatSendOriginatingRoute = { |
| originatingChannel: string; |
| originatingTo?: string; |
| accountId?: string; |
| messageThreadId?: string | number; |
| explicitDeliverRoute: boolean; |
| }; |
|
|
| function resolveChatSendOriginatingRoute(params: { |
| client?: { mode?: string | null; id?: string | null } | null; |
| deliver?: boolean; |
| entry?: ChatSendDeliveryEntry; |
| hasConnectedClient?: boolean; |
| mainKey?: string; |
| sessionKey: string; |
| }): ChatSendOriginatingRoute { |
| const shouldDeliverExternally = params.deliver === true; |
| if (!shouldDeliverExternally) { |
| return { |
| originatingChannel: INTERNAL_MESSAGE_CHANNEL, |
| explicitDeliverRoute: false, |
| }; |
| } |
|
|
| const routeChannelCandidate = normalizeMessageChannel( |
| params.entry?.deliveryContext?.channel ?? params.entry?.lastChannel, |
| ); |
| const routeToCandidate = params.entry?.deliveryContext?.to ?? params.entry?.lastTo; |
| const routeAccountIdCandidate = |
| params.entry?.deliveryContext?.accountId ?? params.entry?.lastAccountId ?? undefined; |
| const routeThreadIdCandidate = |
| params.entry?.deliveryContext?.threadId ?? params.entry?.lastThreadId; |
| if (params.sessionKey.length > CHAT_SEND_SESSION_KEY_MAX_LENGTH) { |
| return { |
| originatingChannel: INTERNAL_MESSAGE_CHANNEL, |
| explicitDeliverRoute: false, |
| }; |
| } |
|
|
| const parsedSessionKey = parseAgentSessionKey(params.sessionKey); |
| const sessionScopeParts = (parsedSessionKey?.rest ?? params.sessionKey) |
| .split(":", 3) |
| .filter(Boolean); |
| const sessionScopeHead = sessionScopeParts[0]; |
| const sessionChannelHint = normalizeMessageChannel(sessionScopeHead); |
| const normalizedSessionScopeHead = (sessionScopeHead ?? "").trim().toLowerCase(); |
| const sessionPeerShapeCandidates = [sessionScopeParts[1], sessionScopeParts[2]] |
| .map((part) => (part ?? "").trim().toLowerCase()) |
| .filter(Boolean); |
| const isChannelAgnosticSessionScope = CHANNEL_AGNOSTIC_SESSION_SCOPES.has( |
| normalizedSessionScopeHead, |
| ); |
| const isChannelScopedSession = sessionPeerShapeCandidates.some((part) => |
| CHANNEL_SCOPED_SESSION_SHAPES.has(part), |
| ); |
| const hasLegacyChannelPeerShape = |
| !isChannelScopedSession && |
| typeof sessionScopeParts[1] === "string" && |
| sessionChannelHint === routeChannelCandidate; |
| const isFromWebchatClient = isWebchatClient(params.client); |
| const isFromGatewayCliClient = isGatewayCliClient(params.client); |
| const hasClientMetadata = |
| (typeof params.client?.mode === "string" && params.client.mode.trim().length > 0) || |
| (typeof params.client?.id === "string" && params.client.id.trim().length > 0); |
| const configuredMainKey = (params.mainKey ?? "main").trim().toLowerCase(); |
| const isConfiguredMainSessionScope = |
| normalizedSessionScopeHead.length > 0 && normalizedSessionScopeHead === configuredMainKey; |
| const canInheritConfiguredMainRoute = |
| isConfiguredMainSessionScope && |
| params.hasConnectedClient && |
| (isFromGatewayCliClient || !hasClientMetadata); |
|
|
| |
| |
| |
| const canInheritDeliverableRoute = Boolean( |
| !isFromWebchatClient && |
| sessionChannelHint && |
| sessionChannelHint !== INTERNAL_MESSAGE_CHANNEL && |
| ((!isChannelAgnosticSessionScope && (isChannelScopedSession || hasLegacyChannelPeerShape)) || |
| canInheritConfiguredMainRoute), |
| ); |
| const hasDeliverableRoute = |
| canInheritDeliverableRoute && |
| routeChannelCandidate && |
| routeChannelCandidate !== INTERNAL_MESSAGE_CHANNEL && |
| typeof routeToCandidate === "string" && |
| routeToCandidate.trim().length > 0; |
|
|
| if (!hasDeliverableRoute) { |
| return { |
| originatingChannel: INTERNAL_MESSAGE_CHANNEL, |
| explicitDeliverRoute: false, |
| }; |
| } |
|
|
| return { |
| originatingChannel: routeChannelCandidate, |
| originatingTo: routeToCandidate, |
| accountId: routeAccountIdCandidate, |
| messageThreadId: routeThreadIdCandidate, |
| explicitDeliverRoute: true, |
| }; |
| } |
|
|
| function stripDisallowedChatControlChars(message: string): string { |
| let output = ""; |
| for (const char of message) { |
| const code = char.charCodeAt(0); |
| if (code === 9 || code === 10 || code === 13 || (code >= 32 && code !== 127)) { |
| output += char; |
| } |
| } |
| return output; |
| } |
|
|
| export function sanitizeChatSendMessageInput( |
| message: string, |
| ): { ok: true; message: string } | { ok: false; error: string } { |
| const normalized = message.normalize("NFC"); |
| if (normalized.includes("\u0000")) { |
| return { ok: false, error: "message must not contain null bytes" }; |
| } |
| return { ok: true, message: stripDisallowedChatControlChars(normalized) }; |
| } |
|
|
| function normalizeOptionalChatSystemReceipt( |
| value: unknown, |
| ): { ok: true; receipt?: string } | { ok: false; error: string } { |
| if (value == null) { |
| return { ok: true }; |
| } |
| if (typeof value !== "string") { |
| return { ok: false, error: "systemProvenanceReceipt must be a string" }; |
| } |
| const sanitized = sanitizeChatSendMessageInput(value); |
| if (!sanitized.ok) { |
| return sanitized; |
| } |
| const receipt = sanitized.message.trim(); |
| return { ok: true, receipt: receipt || undefined }; |
| } |
|
|
| function isAcpBridgeClient(client: GatewayRequestHandlerOptions["client"]): boolean { |
| const info = client?.connect?.client; |
| return ( |
| info?.id === GATEWAY_CLIENT_NAMES.CLI && |
| info?.mode === GATEWAY_CLIENT_MODES.CLI && |
| info?.displayName === "ACP" && |
| info?.version === "acp" |
| ); |
| } |
|
|
| function truncateChatHistoryText(text: string): { text: string; truncated: boolean } { |
| if (text.length <= CHAT_HISTORY_TEXT_MAX_CHARS) { |
| return { text, truncated: false }; |
| } |
| return { |
| text: `${text.slice(0, CHAT_HISTORY_TEXT_MAX_CHARS)}\n...(truncated)...`, |
| truncated: true, |
| }; |
| } |
|
|
| function sanitizeChatHistoryContentBlock(block: unknown): { block: unknown; changed: boolean } { |
| if (!block || typeof block !== "object") { |
| return { block, changed: false }; |
| } |
| const entry = { ...(block as Record<string, unknown>) }; |
| let changed = false; |
| if (typeof entry.text === "string") { |
| const stripped = stripInlineDirectiveTagsForDisplay(entry.text); |
| const res = truncateChatHistoryText(stripped.text); |
| entry.text = res.text; |
| changed ||= stripped.changed || res.truncated; |
| } |
| if (typeof entry.partialJson === "string") { |
| const res = truncateChatHistoryText(entry.partialJson); |
| entry.partialJson = res.text; |
| changed ||= res.truncated; |
| } |
| if (typeof entry.arguments === "string") { |
| const res = truncateChatHistoryText(entry.arguments); |
| entry.arguments = res.text; |
| changed ||= res.truncated; |
| } |
| if (typeof entry.thinking === "string") { |
| const res = truncateChatHistoryText(entry.thinking); |
| entry.thinking = res.text; |
| changed ||= res.truncated; |
| } |
| if ("textSignature" in entry) { |
| delete entry.textSignature; |
| changed = true; |
| } |
| if ("thinkingSignature" in entry) { |
| delete entry.thinkingSignature; |
| changed = true; |
| } |
| const type = typeof entry.type === "string" ? entry.type : ""; |
| if (type === "image" && typeof entry.data === "string") { |
| const bytes = Buffer.byteLength(entry.data, "utf8"); |
| delete entry.data; |
| entry.omitted = true; |
| entry.bytes = bytes; |
| changed = true; |
| } |
| return { block: changed ? entry : block, changed }; |
| } |
|
|
| |
| |
| |
| function toFiniteNumber(x: unknown): number | undefined { |
| return typeof x === "number" && Number.isFinite(x) ? x : undefined; |
| } |
|
|
| |
| |
| |
| |
| function sanitizeUsage(raw: unknown): Record<string, number> | undefined { |
| if (!raw || typeof raw !== "object") { |
| return undefined; |
| } |
| const u = raw as Record<string, unknown>; |
| const out: Record<string, number> = {}; |
|
|
| |
| const knownFields = [ |
| "input", |
| "output", |
| "totalTokens", |
| "inputTokens", |
| "outputTokens", |
| "cacheRead", |
| "cacheWrite", |
| "cache_read_input_tokens", |
| "cache_creation_input_tokens", |
| ]; |
|
|
| for (const k of knownFields) { |
| const n = toFiniteNumber(u[k]); |
| if (n !== undefined) { |
| out[k] = n; |
| } |
| } |
|
|
| |
| if ("cost" in u && u.cost != null && typeof u.cost === "object") { |
| const sanitizedCost = sanitizeCost(u.cost); |
| if (sanitizedCost) { |
| (out as Record<string, unknown>).cost = sanitizedCost; |
| } |
| } |
|
|
| return Object.keys(out).length > 0 ? out : undefined; |
| } |
|
|
| |
| |
| |
| |
| function sanitizeCost(raw: unknown): { total?: number } | undefined { |
| if (!raw || typeof raw !== "object") { |
| return undefined; |
| } |
| const c = raw as Record<string, unknown>; |
| const total = toFiniteNumber(c.total); |
| return total !== undefined ? { total } : undefined; |
| } |
|
|
| function sanitizeChatHistoryMessage(message: unknown): { message: unknown; changed: boolean } { |
| if (!message || typeof message !== "object") { |
| return { message, changed: false }; |
| } |
| const entry = { ...(message as Record<string, unknown>) }; |
| let changed = false; |
|
|
| if ("details" in entry) { |
| delete entry.details; |
| changed = true; |
| } |
|
|
| |
| |
| if (entry.role !== "assistant") { |
| if ("usage" in entry) { |
| delete entry.usage; |
| changed = true; |
| } |
| if ("cost" in entry) { |
| delete entry.cost; |
| changed = true; |
| } |
| } else { |
| |
| if ("usage" in entry) { |
| const sanitized = sanitizeUsage(entry.usage); |
| if (sanitized) { |
| entry.usage = sanitized; |
| } else { |
| delete entry.usage; |
| } |
| changed = true; |
| } |
| if ("cost" in entry) { |
| const sanitized = sanitizeCost(entry.cost); |
| if (sanitized) { |
| entry.cost = sanitized; |
| } else { |
| delete entry.cost; |
| } |
| changed = true; |
| } |
| } |
|
|
| if (typeof entry.content === "string") { |
| const stripped = stripInlineDirectiveTagsForDisplay(entry.content); |
| const res = truncateChatHistoryText(stripped.text); |
| entry.content = res.text; |
| changed ||= stripped.changed || res.truncated; |
| } else if (Array.isArray(entry.content)) { |
| const updated = entry.content.map((block) => sanitizeChatHistoryContentBlock(block)); |
| if (updated.some((item) => item.changed)) { |
| entry.content = updated.map((item) => item.block); |
| changed = true; |
| } |
| } |
|
|
| if (typeof entry.text === "string") { |
| const stripped = stripInlineDirectiveTagsForDisplay(entry.text); |
| const res = truncateChatHistoryText(stripped.text); |
| entry.text = res.text; |
| changed ||= stripped.changed || res.truncated; |
| } |
|
|
| return { message: changed ? entry : message, changed }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| function extractAssistantTextForSilentCheck(message: unknown): string | undefined { |
| if (!message || typeof message !== "object") { |
| return undefined; |
| } |
| const entry = message as Record<string, unknown>; |
| if (entry.role !== "assistant") { |
| return undefined; |
| } |
| if (typeof entry.text === "string") { |
| return entry.text; |
| } |
| if (typeof entry.content === "string") { |
| return entry.content; |
| } |
| if (!Array.isArray(entry.content) || entry.content.length === 0) { |
| return undefined; |
| } |
|
|
| const texts: string[] = []; |
| for (const block of entry.content) { |
| if (!block || typeof block !== "object") { |
| return undefined; |
| } |
| const typed = block as { type?: unknown; text?: unknown }; |
| if (typed.type !== "text" || typeof typed.text !== "string") { |
| return undefined; |
| } |
| texts.push(typed.text); |
| } |
| return texts.length > 0 ? texts.join("\n") : undefined; |
| } |
|
|
| function sanitizeChatHistoryMessages(messages: unknown[]): unknown[] { |
| if (messages.length === 0) { |
| return messages; |
| } |
| let changed = false; |
| const next: unknown[] = []; |
| for (const message of messages) { |
| const res = sanitizeChatHistoryMessage(message); |
| changed ||= res.changed; |
| |
| const text = extractAssistantTextForSilentCheck(res.message); |
| if (text !== undefined && isSilentReplyText(text, SILENT_REPLY_TOKEN)) { |
| changed = true; |
| continue; |
| } |
| next.push(res.message); |
| } |
| return changed ? next : messages; |
| } |
|
|
| function buildOversizedHistoryPlaceholder(message?: unknown): Record<string, unknown> { |
| const role = |
| message && |
| typeof message === "object" && |
| typeof (message as { role?: unknown }).role === "string" |
| ? (message as { role: string }).role |
| : "assistant"; |
| const timestamp = |
| message && |
| typeof message === "object" && |
| typeof (message as { timestamp?: unknown }).timestamp === "number" |
| ? (message as { timestamp: number }).timestamp |
| : Date.now(); |
| return { |
| role, |
| timestamp, |
| content: [{ type: "text", text: CHAT_HISTORY_OVERSIZED_PLACEHOLDER }], |
| __openclaw: { truncated: true, reason: "oversized" }, |
| }; |
| } |
|
|
| function replaceOversizedChatHistoryMessages(params: { |
| messages: unknown[]; |
| maxSingleMessageBytes: number; |
| }): { messages: unknown[]; replacedCount: number } { |
| const { messages, maxSingleMessageBytes } = params; |
| if (messages.length === 0) { |
| return { messages, replacedCount: 0 }; |
| } |
| let replacedCount = 0; |
| const next = messages.map((message) => { |
| if (jsonUtf8Bytes(message) <= maxSingleMessageBytes) { |
| return message; |
| } |
| replacedCount += 1; |
| return buildOversizedHistoryPlaceholder(message); |
| }); |
| return { messages: replacedCount > 0 ? next : messages, replacedCount }; |
| } |
|
|
| function enforceChatHistoryFinalBudget(params: { messages: unknown[]; maxBytes: number }): { |
| messages: unknown[]; |
| placeholderCount: number; |
| } { |
| const { messages, maxBytes } = params; |
| if (messages.length === 0) { |
| return { messages, placeholderCount: 0 }; |
| } |
| if (jsonUtf8Bytes(messages) <= maxBytes) { |
| return { messages, placeholderCount: 0 }; |
| } |
| const last = messages.at(-1); |
| if (last && jsonUtf8Bytes([last]) <= maxBytes) { |
| return { messages: [last], placeholderCount: 0 }; |
| } |
| const placeholder = buildOversizedHistoryPlaceholder(last); |
| if (jsonUtf8Bytes([placeholder]) <= maxBytes) { |
| return { messages: [placeholder], placeholderCount: 1 }; |
| } |
| return { messages: [], placeholderCount: 0 }; |
| } |
|
|
| function resolveTranscriptPath(params: { |
| sessionId: string; |
| storePath: string | undefined; |
| sessionFile?: string; |
| agentId?: string; |
| }): string | null { |
| const { sessionId, storePath, sessionFile, agentId } = params; |
| if (!storePath && !sessionFile) { |
| return null; |
| } |
| try { |
| const sessionsDir = storePath ? path.dirname(storePath) : undefined; |
| return resolveSessionFilePath( |
| sessionId, |
| sessionFile ? { sessionFile } : undefined, |
| sessionsDir || agentId ? { sessionsDir, agentId } : undefined, |
| ); |
| } catch { |
| return null; |
| } |
| } |
|
|
| function ensureTranscriptFile(params: { transcriptPath: string; sessionId: string }): { |
| ok: boolean; |
| error?: string; |
| } { |
| if (fs.existsSync(params.transcriptPath)) { |
| return { ok: true }; |
| } |
| try { |
| fs.mkdirSync(path.dirname(params.transcriptPath), { recursive: true }); |
| const header = { |
| type: "session", |
| version: CURRENT_SESSION_VERSION, |
| id: params.sessionId, |
| timestamp: new Date().toISOString(), |
| cwd: process.cwd(), |
| }; |
| fs.writeFileSync(params.transcriptPath, `${JSON.stringify(header)}\n`, { |
| encoding: "utf-8", |
| mode: 0o600, |
| }); |
| return { ok: true }; |
| } catch (err) { |
| return { ok: false, error: err instanceof Error ? err.message : String(err) }; |
| } |
| } |
|
|
| function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean { |
| try { |
| const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/); |
| for (const line of lines) { |
| if (!line.trim()) { |
| continue; |
| } |
| const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } }; |
| if (parsed?.message?.idempotencyKey === idempotencyKey) { |
| return true; |
| } |
| } |
| return false; |
| } catch { |
| return false; |
| } |
| } |
|
|
| function appendAssistantTranscriptMessage(params: { |
| message: string; |
| label?: string; |
| sessionId: string; |
| storePath: string | undefined; |
| sessionFile?: string; |
| agentId?: string; |
| createIfMissing?: boolean; |
| idempotencyKey?: string; |
| abortMeta?: { |
| aborted: true; |
| origin: AbortOrigin; |
| runId: string; |
| }; |
| }): TranscriptAppendResult { |
| const transcriptPath = resolveTranscriptPath({ |
| sessionId: params.sessionId, |
| storePath: params.storePath, |
| sessionFile: params.sessionFile, |
| agentId: params.agentId, |
| }); |
| if (!transcriptPath) { |
| return { ok: false, error: "transcript path not resolved" }; |
| } |
|
|
| if (!fs.existsSync(transcriptPath)) { |
| if (!params.createIfMissing) { |
| return { ok: false, error: "transcript file not found" }; |
| } |
| const ensured = ensureTranscriptFile({ |
| transcriptPath, |
| sessionId: params.sessionId, |
| }); |
| if (!ensured.ok) { |
| return { ok: false, error: ensured.error ?? "failed to create transcript file" }; |
| } |
| } |
|
|
| if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) { |
| return { ok: true }; |
| } |
|
|
| return appendInjectedAssistantMessageToTranscript({ |
| transcriptPath, |
| message: params.message, |
| label: params.label, |
| idempotencyKey: params.idempotencyKey, |
| abortMeta: params.abortMeta, |
| }); |
| } |
|
|
| function collectSessionAbortPartials(params: { |
| chatAbortControllers: Map<string, ChatAbortControllerEntry>; |
| chatRunBuffers: Map<string, string>; |
| runIds: ReadonlySet<string>; |
| abortOrigin: AbortOrigin; |
| }): AbortedPartialSnapshot[] { |
| const out: AbortedPartialSnapshot[] = []; |
| for (const [runId, active] of params.chatAbortControllers) { |
| if (!params.runIds.has(runId)) { |
| continue; |
| } |
| const text = params.chatRunBuffers.get(runId); |
| if (!text || !text.trim()) { |
| continue; |
| } |
| out.push({ |
| runId, |
| sessionId: active.sessionId, |
| text, |
| abortOrigin: params.abortOrigin, |
| }); |
| } |
| return out; |
| } |
|
|
| function persistAbortedPartials(params: { |
| context: Pick<GatewayRequestContext, "logGateway">; |
| sessionKey: string; |
| snapshots: AbortedPartialSnapshot[]; |
| }) { |
| if (params.snapshots.length === 0) { |
| return; |
| } |
| const { storePath, entry } = loadSessionEntry(params.sessionKey); |
| for (const snapshot of params.snapshots) { |
| const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId; |
| const appended = appendAssistantTranscriptMessage({ |
| message: snapshot.text, |
| sessionId, |
| storePath, |
| sessionFile: entry?.sessionFile, |
| createIfMissing: true, |
| idempotencyKey: `${snapshot.runId}:assistant`, |
| abortMeta: { |
| aborted: true, |
| origin: snapshot.abortOrigin, |
| runId: snapshot.runId, |
| }, |
| }); |
| if (!appended.ok) { |
| params.context.logGateway.warn( |
| `chat.abort transcript append failed: ${appended.error ?? "unknown error"}`, |
| ); |
| } |
| } |
| } |
|
|
| function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps { |
| return { |
| chatAbortControllers: context.chatAbortControllers, |
| chatRunBuffers: context.chatRunBuffers, |
| chatDeltaSentAt: context.chatDeltaSentAt, |
| chatAbortedRuns: context.chatAbortedRuns, |
| removeChatRun: context.removeChatRun, |
| agentRunSeq: context.agentRunSeq, |
| broadcast: context.broadcast, |
| nodeSendToSession: context.nodeSendToSession, |
| }; |
| } |
|
|
| function normalizeOptionalText(value?: string | null): string | undefined { |
| const trimmed = value?.trim(); |
| return trimmed || undefined; |
| } |
|
|
| function resolveChatAbortRequester( |
| client: GatewayRequestHandlerOptions["client"], |
| ): ChatAbortRequester { |
| const scopes = Array.isArray(client?.connect?.scopes) ? client.connect.scopes : []; |
| return { |
| connId: normalizeOptionalText(client?.connId), |
| deviceId: normalizeOptionalText(client?.connect?.device?.id), |
| isAdmin: scopes.includes(ADMIN_SCOPE), |
| }; |
| } |
|
|
| function canRequesterAbortChatRun( |
| entry: ChatAbortControllerEntry, |
| requester: ChatAbortRequester, |
| ): boolean { |
| if (requester.isAdmin) { |
| return true; |
| } |
| const ownerDeviceId = normalizeOptionalText(entry.ownerDeviceId); |
| const ownerConnId = normalizeOptionalText(entry.ownerConnId); |
| if (!ownerDeviceId && !ownerConnId) { |
| return true; |
| } |
| if (ownerDeviceId && requester.deviceId && ownerDeviceId === requester.deviceId) { |
| return true; |
| } |
| if (ownerConnId && requester.connId && ownerConnId === requester.connId) { |
| return true; |
| } |
| return false; |
| } |
|
|
| function resolveAuthorizedRunIdsForSession(params: { |
| chatAbortControllers: Map<string, ChatAbortControllerEntry>; |
| sessionKey: string; |
| requester: ChatAbortRequester; |
| }) { |
| const authorizedRunIds: string[] = []; |
| let matchedSessionRuns = 0; |
| for (const [runId, active] of params.chatAbortControllers) { |
| if (active.sessionKey !== params.sessionKey) { |
| continue; |
| } |
| matchedSessionRuns += 1; |
| if (canRequesterAbortChatRun(active, params.requester)) { |
| authorizedRunIds.push(runId); |
| } |
| } |
| return { |
| matchedSessionRuns, |
| authorizedRunIds, |
| }; |
| } |
|
|
| function abortChatRunsForSessionKeyWithPartials(params: { |
| context: GatewayRequestContext; |
| ops: ChatAbortOps; |
| sessionKey: string; |
| abortOrigin: AbortOrigin; |
| stopReason?: string; |
| requester: ChatAbortRequester; |
| }) { |
| const { matchedSessionRuns, authorizedRunIds } = resolveAuthorizedRunIdsForSession({ |
| chatAbortControllers: params.context.chatAbortControllers, |
| sessionKey: params.sessionKey, |
| requester: params.requester, |
| }); |
| if (authorizedRunIds.length === 0) { |
| return { |
| aborted: false, |
| runIds: [], |
| unauthorized: matchedSessionRuns > 0, |
| }; |
| } |
| const authorizedRunIdSet = new Set(authorizedRunIds); |
| const snapshots = collectSessionAbortPartials({ |
| chatAbortControllers: params.context.chatAbortControllers, |
| chatRunBuffers: params.context.chatRunBuffers, |
| runIds: authorizedRunIdSet, |
| abortOrigin: params.abortOrigin, |
| }); |
| const runIds: string[] = []; |
| for (const runId of authorizedRunIds) { |
| const res = abortChatRunById(params.ops, { |
| runId, |
| sessionKey: params.sessionKey, |
| stopReason: params.stopReason, |
| }); |
| if (res.aborted) { |
| runIds.push(runId); |
| } |
| } |
| const res = { aborted: runIds.length > 0, runIds, unauthorized: false }; |
| if (res.aborted) { |
| persistAbortedPartials({ |
| context: params.context, |
| sessionKey: params.sessionKey, |
| snapshots, |
| }); |
| } |
| return res; |
| } |
|
|
| function nextChatSeq(context: { agentRunSeq: Map<string, number> }, runId: string) { |
| const next = (context.agentRunSeq.get(runId) ?? 0) + 1; |
| context.agentRunSeq.set(runId, next); |
| return next; |
| } |
|
|
| function broadcastChatFinal(params: { |
| context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">; |
| runId: string; |
| sessionKey: string; |
| message?: Record<string, unknown>; |
| }) { |
| const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId); |
| const strippedEnvelopeMessage = stripEnvelopeFromMessage(params.message) as |
| | Record<string, unknown> |
| | undefined; |
| const payload = { |
| runId: params.runId, |
| sessionKey: params.sessionKey, |
| seq, |
| state: "final" as const, |
| message: stripInlineDirectiveTagsFromMessageForDisplay(strippedEnvelopeMessage), |
| }; |
| params.context.broadcast("chat", payload); |
| params.context.nodeSendToSession(params.sessionKey, "chat", payload); |
| params.context.agentRunSeq.delete(params.runId); |
| } |
|
|
| function broadcastChatError(params: { |
| context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">; |
| runId: string; |
| sessionKey: string; |
| errorMessage?: string; |
| }) { |
| const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId); |
| const payload = { |
| runId: params.runId, |
| sessionKey: params.sessionKey, |
| seq, |
| state: "error" as const, |
| errorMessage: params.errorMessage, |
| }; |
| params.context.broadcast("chat", payload); |
| params.context.nodeSendToSession(params.sessionKey, "chat", payload); |
| params.context.agentRunSeq.delete(params.runId); |
| } |
|
|
| export const chatHandlers: GatewayRequestHandlers = { |
| "chat.history": async ({ params, respond, context }) => { |
| if (!validateChatHistoryParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const { sessionKey, limit } = params as { |
| sessionKey: string; |
| limit?: number; |
| }; |
| const { cfg, storePath, entry } = loadSessionEntry(sessionKey); |
| const sessionId = entry?.sessionId; |
| const rawMessages = |
| sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; |
| const hardMax = 1000; |
| const defaultLimit = 200; |
| const requested = typeof limit === "number" ? limit : defaultLimit; |
| const max = Math.min(hardMax, requested); |
| const sliced = rawMessages.length > max ? rawMessages.slice(-max) : rawMessages; |
| const sanitized = stripEnvelopeFromMessages(sliced); |
| const normalized = sanitizeChatHistoryMessages(sanitized); |
| const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); |
| const perMessageHardCap = Math.min(CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes); |
| const replaced = replaceOversizedChatHistoryMessages({ |
| messages: normalized, |
| maxSingleMessageBytes: perMessageHardCap, |
| }); |
| const capped = capArrayByJsonBytes(replaced.messages, maxHistoryBytes).items; |
| const bounded = enforceChatHistoryFinalBudget({ messages: capped, maxBytes: maxHistoryBytes }); |
| const placeholderCount = replaced.replacedCount + bounded.placeholderCount; |
| if (placeholderCount > 0) { |
| chatHistoryPlaceholderEmitCount += placeholderCount; |
| context.logGateway.debug( |
| `chat.history omitted oversized payloads placeholders=${placeholderCount} total=${chatHistoryPlaceholderEmitCount}`, |
| ); |
| } |
| let thinkingLevel = entry?.thinkingLevel; |
| if (!thinkingLevel) { |
| const sessionAgentId = resolveSessionAgentId({ sessionKey, config: cfg }); |
| const { provider, model } = resolveSessionModelRef(cfg, entry, sessionAgentId); |
| const catalog = await context.loadGatewayModelCatalog(); |
| thinkingLevel = resolveThinkingDefault({ |
| cfg, |
| provider, |
| model, |
| catalog, |
| }); |
| } |
| const verboseLevel = entry?.verboseLevel ?? cfg.agents?.defaults?.verboseDefault; |
| respond(true, { |
| sessionKey, |
| sessionId, |
| messages: bounded.messages, |
| thinkingLevel, |
| fastMode: entry?.fastMode, |
| verboseLevel, |
| }); |
| }, |
| "chat.abort": ({ params, respond, context, client }) => { |
| if (!validateChatAbortParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const { sessionKey: rawSessionKey, runId } = params as { |
| sessionKey: string; |
| runId?: string; |
| }; |
|
|
| const ops = createChatAbortOps(context); |
| const requester = resolveChatAbortRequester(client); |
|
|
| if (!runId) { |
| const res = abortChatRunsForSessionKeyWithPartials({ |
| context, |
| ops, |
| sessionKey: rawSessionKey, |
| abortOrigin: "rpc", |
| stopReason: "rpc", |
| requester, |
| }); |
| if (res.unauthorized) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized")); |
| return; |
| } |
| respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); |
| return; |
| } |
|
|
| const active = context.chatAbortControllers.get(runId); |
| if (!active) { |
| respond(true, { ok: true, aborted: false, runIds: [] }); |
| return; |
| } |
| if (active.sessionKey !== rawSessionKey) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "runId does not match sessionKey"), |
| ); |
| return; |
| } |
| if (!canRequesterAbortChatRun(active, requester)) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized")); |
| return; |
| } |
|
|
| const partialText = context.chatRunBuffers.get(runId); |
| const res = abortChatRunById(ops, { |
| runId, |
| sessionKey: rawSessionKey, |
| stopReason: "rpc", |
| }); |
| if (res.aborted && partialText && partialText.trim()) { |
| persistAbortedPartials({ |
| context, |
| sessionKey: rawSessionKey, |
| snapshots: [ |
| { |
| runId, |
| sessionId: active.sessionId, |
| text: partialText, |
| abortOrigin: "rpc", |
| }, |
| ], |
| }); |
| } |
| respond(true, { |
| ok: true, |
| aborted: res.aborted, |
| runIds: res.aborted ? [runId] : [], |
| }); |
| }, |
| "chat.send": async ({ params, respond, context, client }) => { |
| if (!validateChatSendParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { |
| sessionKey: string; |
| message: string; |
| thinking?: string; |
| deliver?: boolean; |
| attachments?: Array<{ |
| type?: string; |
| mimeType?: string; |
| fileName?: string; |
| content?: unknown; |
| }>; |
| timeoutMs?: number; |
| systemInputProvenance?: InputProvenance; |
| systemProvenanceReceipt?: string; |
| idempotencyKey: string; |
| }; |
| if ((p.systemInputProvenance || p.systemProvenanceReceipt) && !isAcpBridgeClient(client)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| "system provenance fields are reserved for the ACP bridge", |
| ), |
| ); |
| return; |
| } |
| const sanitizedMessageResult = sanitizeChatSendMessageInput(p.message); |
| if (!sanitizedMessageResult.ok) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, sanitizedMessageResult.error), |
| ); |
| return; |
| } |
| const systemReceiptResult = normalizeOptionalChatSystemReceipt(p.systemProvenanceReceipt); |
| if (!systemReceiptResult.ok) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, systemReceiptResult.error)); |
| return; |
| } |
| const inboundMessage = sanitizedMessageResult.message; |
| const systemInputProvenance = normalizeInputProvenance(p.systemInputProvenance); |
| const systemProvenanceReceipt = systemReceiptResult.receipt; |
| const stopCommand = isChatStopCommandText(inboundMessage); |
| const normalizedAttachments = normalizeRpcAttachmentsToChatAttachments(p.attachments); |
| const rawMessage = inboundMessage.trim(); |
| if (!rawMessage && normalizedAttachments.length === 0) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "message or attachment required"), |
| ); |
| return; |
| } |
| let parsedMessage = inboundMessage; |
| let parsedImages: ChatImageContent[] = []; |
| if (normalizedAttachments.length > 0) { |
| try { |
| const parsed = await parseMessageWithAttachments(inboundMessage, normalizedAttachments, { |
| maxBytes: 5_000_000, |
| log: context.logGateway, |
| }); |
| parsedMessage = parsed.message; |
| parsedImages = parsed.images; |
| } catch (err) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, String(err))); |
| return; |
| } |
| } |
| const rawSessionKey = p.sessionKey; |
| const { cfg, entry, canonicalKey: sessionKey } = loadSessionEntry(rawSessionKey); |
| const timeoutMs = resolveAgentTimeoutMs({ |
| cfg, |
| overrideMs: p.timeoutMs, |
| }); |
| const now = Date.now(); |
| const clientRunId = p.idempotencyKey; |
|
|
| const sendPolicy = resolveSendPolicy({ |
| cfg, |
| entry, |
| sessionKey, |
| channel: entry?.channel, |
| chatType: entry?.chatType, |
| }); |
| if (sendPolicy === "deny") { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "send blocked by session policy"), |
| ); |
| return; |
| } |
|
|
| if (stopCommand) { |
| const res = abortChatRunsForSessionKeyWithPartials({ |
| context, |
| ops: createChatAbortOps(context), |
| sessionKey: rawSessionKey, |
| abortOrigin: "stop-command", |
| stopReason: "stop", |
| requester: resolveChatAbortRequester(client), |
| }); |
| if (res.unauthorized) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized")); |
| return; |
| } |
| respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); |
| return; |
| } |
|
|
| const cached = context.dedupe.get(`chat:${clientRunId}`); |
| if (cached) { |
| respond(cached.ok, cached.payload, cached.error, { |
| cached: true, |
| }); |
| return; |
| } |
|
|
| const activeExisting = context.chatAbortControllers.get(clientRunId); |
| if (activeExisting) { |
| respond(true, { runId: clientRunId, status: "in_flight" as const }, undefined, { |
| cached: true, |
| runId: clientRunId, |
| }); |
| return; |
| } |
|
|
| try { |
| const abortController = new AbortController(); |
| context.chatAbortControllers.set(clientRunId, { |
| controller: abortController, |
| sessionId: entry?.sessionId ?? clientRunId, |
| sessionKey: rawSessionKey, |
| startedAtMs: now, |
| expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), |
| ownerConnId: normalizeOptionalText(client?.connId), |
| ownerDeviceId: normalizeOptionalText(client?.connect?.device?.id), |
| }); |
| const ackPayload = { |
| runId: clientRunId, |
| status: "started" as const, |
| }; |
| respond(true, ackPayload, undefined, { runId: clientRunId }); |
|
|
| const trimmedMessage = parsedMessage.trim(); |
| const injectThinking = Boolean( |
| p.thinking && trimmedMessage && !trimmedMessage.startsWith("/"), |
| ); |
| const commandBody = injectThinking ? `/think ${p.thinking} ${parsedMessage}` : parsedMessage; |
| const messageForAgent = systemProvenanceReceipt |
| ? [systemProvenanceReceipt, parsedMessage].filter(Boolean).join("\n\n") |
| : parsedMessage; |
| const clientInfo = client?.connect?.client; |
| const { |
| originatingChannel, |
| originatingTo, |
| accountId, |
| messageThreadId, |
| explicitDeliverRoute, |
| } = resolveChatSendOriginatingRoute({ |
| client: clientInfo, |
| deliver: p.deliver, |
| entry, |
| hasConnectedClient: client?.connect !== undefined, |
| mainKey: cfg.session?.mainKey, |
| sessionKey, |
| }); |
| |
| |
| |
| const stampedMessage = injectTimestamp(messageForAgent, timestampOptsFromConfig(cfg)); |
|
|
| const ctx: MsgContext = { |
| Body: messageForAgent, |
| BodyForAgent: stampedMessage, |
| BodyForCommands: commandBody, |
| RawBody: parsedMessage, |
| CommandBody: commandBody, |
| InputProvenance: systemInputProvenance, |
| SessionKey: sessionKey, |
| Provider: INTERNAL_MESSAGE_CHANNEL, |
| Surface: INTERNAL_MESSAGE_CHANNEL, |
| OriginatingChannel: originatingChannel, |
| OriginatingTo: originatingTo, |
| ExplicitDeliverRoute: explicitDeliverRoute, |
| AccountId: accountId, |
| MessageThreadId: messageThreadId, |
| ChatType: "direct", |
| CommandAuthorized: true, |
| MessageSid: clientRunId, |
| SenderId: clientInfo?.id, |
| SenderName: clientInfo?.displayName, |
| SenderUsername: clientInfo?.displayName, |
| GatewayClientScopes: client?.connect?.scopes, |
| }; |
|
|
| const agentId = resolveSessionAgentId({ |
| sessionKey, |
| config: cfg, |
| }); |
| const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ |
| cfg, |
| agentId, |
| channel: INTERNAL_MESSAGE_CHANNEL, |
| }); |
| const finalReplyParts: string[] = []; |
| const dispatcher = createReplyDispatcher({ |
| ...prefixOptions, |
| onError: (err) => { |
| context.logGateway.warn(`webchat dispatch failed: ${formatForLog(err)}`); |
| }, |
| deliver: async (payload, info) => { |
| if (info.kind !== "final") { |
| return; |
| } |
| const text = payload.text?.trim() ?? ""; |
| if (!text) { |
| return; |
| } |
| finalReplyParts.push(text); |
| }, |
| }); |
|
|
| let agentRunStarted = false; |
| void dispatchInboundMessage({ |
| ctx, |
| cfg, |
| dispatcher, |
| replyOptions: { |
| runId: clientRunId, |
| abortSignal: abortController.signal, |
| images: parsedImages.length > 0 ? parsedImages : undefined, |
| onAgentRunStart: (runId) => { |
| agentRunStarted = true; |
| const connId = typeof client?.connId === "string" ? client.connId : undefined; |
| const wantsToolEvents = hasGatewayClientCap( |
| client?.connect?.caps, |
| GATEWAY_CLIENT_CAPS.TOOL_EVENTS, |
| ); |
| if (connId && wantsToolEvents) { |
| context.registerToolEventRecipient(runId, connId); |
| |
| |
| |
| for (const [activeRunId, active] of context.chatAbortControllers) { |
| if (activeRunId !== runId && active.sessionKey === p.sessionKey) { |
| context.registerToolEventRecipient(activeRunId, connId); |
| } |
| } |
| } |
| }, |
| onModelSelected, |
| }, |
| }) |
| .then(() => { |
| if (!agentRunStarted) { |
| const combinedReply = finalReplyParts |
| .map((part) => part.trim()) |
| .filter(Boolean) |
| .join("\n\n") |
| .trim(); |
| let message: Record<string, unknown> | undefined; |
| if (combinedReply) { |
| const { storePath: latestStorePath, entry: latestEntry } = |
| loadSessionEntry(sessionKey); |
| const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId; |
| const appended = appendAssistantTranscriptMessage({ |
| message: combinedReply, |
| sessionId, |
| storePath: latestStorePath, |
| sessionFile: latestEntry?.sessionFile, |
| agentId, |
| createIfMissing: true, |
| }); |
| if (appended.ok) { |
| message = appended.message; |
| } else { |
| context.logGateway.warn( |
| `webchat transcript append failed: ${appended.error ?? "unknown error"}`, |
| ); |
| const now = Date.now(); |
| message = { |
| role: "assistant", |
| content: [{ type: "text", text: combinedReply }], |
| timestamp: now, |
| |
| |
| stopReason: "stop", |
| usage: { input: 0, output: 0, totalTokens: 0 }, |
| }; |
| } |
| } |
| broadcastChatFinal({ |
| context, |
| runId: clientRunId, |
| sessionKey: rawSessionKey, |
| message, |
| }); |
| } |
| setGatewayDedupeEntry({ |
| dedupe: context.dedupe, |
| key: `chat:${clientRunId}`, |
| entry: { |
| ts: Date.now(), |
| ok: true, |
| payload: { runId: clientRunId, status: "ok" as const }, |
| }, |
| }); |
| }) |
| .catch((err) => { |
| const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); |
| setGatewayDedupeEntry({ |
| dedupe: context.dedupe, |
| key: `chat:${clientRunId}`, |
| entry: { |
| ts: Date.now(), |
| ok: false, |
| payload: { |
| runId: clientRunId, |
| status: "error" as const, |
| summary: String(err), |
| }, |
| error, |
| }, |
| }); |
| broadcastChatError({ |
| context, |
| runId: clientRunId, |
| sessionKey: rawSessionKey, |
| errorMessage: String(err), |
| }); |
| }) |
| .finally(() => { |
| context.chatAbortControllers.delete(clientRunId); |
| }); |
| } catch (err) { |
| const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); |
| const payload = { |
| runId: clientRunId, |
| status: "error" as const, |
| summary: String(err), |
| }; |
| setGatewayDedupeEntry({ |
| dedupe: context.dedupe, |
| key: `chat:${clientRunId}`, |
| entry: { |
| ts: Date.now(), |
| ok: false, |
| payload, |
| error, |
| }, |
| }); |
| respond(false, payload, error, { |
| runId: clientRunId, |
| error: formatForLog(err), |
| }); |
| } |
| }, |
| "chat.inject": async ({ params, respond, context }) => { |
| if (!validateChatInjectParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid chat.inject params: ${formatValidationErrors(validateChatInjectParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { |
| sessionKey: string; |
| message: string; |
| label?: string; |
| }; |
|
|
| |
| const rawSessionKey = p.sessionKey; |
| const { cfg, storePath, entry } = loadSessionEntry(rawSessionKey); |
| const sessionId = entry?.sessionId; |
| if (!sessionId || !storePath) { |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "session not found")); |
| return; |
| } |
|
|
| const appended = appendAssistantTranscriptMessage({ |
| message: p.message, |
| label: p.label, |
| sessionId, |
| storePath, |
| sessionFile: entry?.sessionFile, |
| agentId: resolveSessionAgentId({ sessionKey: rawSessionKey, config: cfg }), |
| createIfMissing: true, |
| }); |
| if (!appended.ok || !appended.messageId || !appended.message) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.UNAVAILABLE, |
| `failed to write transcript: ${appended.error ?? "unknown error"}`, |
| ), |
| ); |
| return; |
| } |
|
|
| |
| const chatPayload = { |
| runId: `inject-${appended.messageId}`, |
| sessionKey: rawSessionKey, |
| seq: 0, |
| state: "final" as const, |
| message: stripInlineDirectiveTagsFromMessageForDisplay( |
| stripEnvelopeFromMessage(appended.message) as Record<string, unknown>, |
| ), |
| }; |
| context.broadcast("chat", chatPayload); |
| context.nodeSendToSession(rawSessionKey, "chat", chatPayload); |
|
|
| respond(true, { ok: true, messageId: appended.messageId }); |
| }, |
| }; |
|
|