import fs from "node:fs"; import { resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { loadConfig } from "../../config/config.js"; import { loadSessionStore, resolveMainSessionKey, type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; import { normalizeAgentId, parseAgentSessionKey } from "../../routing/session-key.js"; import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, validateSessionsCompactParams, validateSessionsDeleteParams, validateSessionsListParams, validateSessionsMessagesSubscribeParams, validateSessionsMessagesUnsubscribeParams, validateSessionsPatchParams, validateSessionsPreviewParams, validateSessionsResetParams, validateSessionsResolveParams, } from "../protocol/index.js"; import { archiveSessionTranscriptsForSession, cleanupSessionBeforeMutation, emitSessionUnboundLifecycleEvent, performGatewaySessionReset, } from "../session-reset-service.js"; import { archiveFileOnDisk, listSessionsFromStore, loadCombinedSessionStoreForGateway, loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, readSessionPreviewItemsFromTranscript, resolveGatewaySessionStoreTarget, resolveSessionModelRef, resolveSessionTranscriptCandidates, type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, readSessionMessages, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; import type { GatewayClient, GatewayRequestHandlers, RespondFn } from "./types.js"; import { assertValidParams } from "./validation.js"; function requireSessionKey(key: unknown, respond: RespondFn): string | null { const raw = typeof key === "string" ? key : typeof key === "number" ? String(key) : typeof key === "bigint" ? String(key) : ""; const normalized = raw.trim(); if (!normalized) { respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); return null; } return normalized; } function resolveGatewaySessionTargetFromKey(key: string) { const cfg = loadConfig(); const target = resolveGatewaySessionStoreTarget({ cfg, key }); return { cfg, target, storePath: target.storePath }; } function rejectWebchatSessionMutation(params: { action: "patch" | "delete"; client: GatewayClient | null; isWebchatConnect: (params: GatewayClient["connect"] | null | undefined) => boolean; respond: RespondFn; }): boolean { if (!params.client?.connect || !params.isWebchatConnect(params.client.connect)) { return false; } if (params.client.connect.client.id === GATEWAY_CLIENT_IDS.CONTROL_UI) { return false; } params.respond( false, undefined, errorShape( ErrorCodes.INVALID_REQUEST, `webchat clients cannot ${params.action} sessions; use chat.send for session-scoped updates`, ), ); return true; } export const sessionsHandlers: GatewayRequestHandlers = { "sessions.list": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsListParams, "sessions.list", respond)) { return; } const p = params; const cfg = loadConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); const result = listSessionsFromStore({ cfg, storePath, store, opts: p, }); respond(true, result, undefined); }, "sessions.subscribe": ({ client, context, respond }) => { const connId = client?.connId?.trim(); if (connId) { context.subscribeSessionEvents(connId); } respond(true, { subscribed: Boolean(connId) }, undefined); }, "sessions.unsubscribe": ({ client, context, respond }) => { const connId = client?.connId?.trim(); if (connId) { context.unsubscribeSessionEvents(connId); } respond(true, { subscribed: false }, undefined); }, "sessions.messages.subscribe": ({ params, client, context, respond }) => { if ( !assertValidParams( params, validateSessionsMessagesSubscribeParams, "sessions.messages.subscribe", respond, ) ) { return; } const connId = client?.connId?.trim(); const key = requireSessionKey((params as { key?: unknown }).key, respond); if (!key) { return; } const { canonicalKey } = loadSessionEntry(key); if (connId) { context.subscribeSessionMessageEvents(connId, canonicalKey); respond(true, { subscribed: true, key: canonicalKey }, undefined); return; } respond(true, { subscribed: false, key: canonicalKey }, undefined); }, "sessions.messages.unsubscribe": ({ params, client, context, respond }) => { if ( !assertValidParams( params, validateSessionsMessagesUnsubscribeParams, "sessions.messages.unsubscribe", respond, ) ) { return; } const connId = client?.connId?.trim(); const key = requireSessionKey((params as { key?: unknown }).key, respond); if (!key) { return; } const { canonicalKey } = loadSessionEntry(key); if (connId) { context.unsubscribeSessionMessageEvents(connId, canonicalKey); } respond(true, { subscribed: false, key: canonicalKey }, undefined); }, "sessions.preview": ({ params, respond }) => { if (!assertValidParams(params, validateSessionsPreviewParams, "sessions.preview", respond)) { return; } const p = params; const keysRaw = Array.isArray(p.keys) ? p.keys : []; const keys = keysRaw .map((key) => String(key ?? "").trim()) .filter(Boolean) .slice(0, 64); const limit = typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, p.limit) : 12; const maxChars = typeof p.maxChars === "number" && Number.isFinite(p.maxChars) ? Math.max(20, p.maxChars) : 240; if (keys.length === 0) { respond(true, { ts: Date.now(), previews: [] } satisfies SessionsPreviewResult, undefined); return; } const cfg = loadConfig(); const storeCache = new Map>(); const previews: SessionsPreviewEntry[] = []; for (const key of keys) { try { const storeTarget = resolveGatewaySessionStoreTarget({ cfg, key, scanLegacyKeys: false }); const store = storeCache.get(storeTarget.storePath) ?? loadSessionStore(storeTarget.storePath); storeCache.set(storeTarget.storePath, store); const target = resolveGatewaySessionStoreTarget({ cfg, key, store, }); const entry = target.storeKeys.map((candidate) => store[candidate]).find(Boolean); if (!entry?.sessionId) { previews.push({ key, status: "missing", items: [] }); continue; } const items = readSessionPreviewItemsFromTranscript( entry.sessionId, target.storePath, entry.sessionFile, target.agentId, limit, maxChars, ); previews.push({ key, status: items.length > 0 ? "ok" : "empty", items, }); } catch { previews.push({ key, status: "error", items: [] }); } } respond(true, { ts: Date.now(), previews } satisfies SessionsPreviewResult, undefined); }, "sessions.resolve": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsResolveParams, "sessions.resolve", respond)) { return; } const p = params; const cfg = loadConfig(); const resolved = await resolveSessionKeyFromResolveParams({ cfg, p }); if (!resolved.ok) { respond(false, undefined, resolved.error); return; } respond(true, { ok: true, key: resolved.key }, undefined); }, "sessions.patch": async ({ params, respond, context, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsPatchParams, "sessions.patch", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } if (rejectWebchatSessionMutation({ action: "patch", client, isWebchatConnect, respond })) { return; } const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); const applied = await updateSessionStore(storePath, async (store) => { const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store }); return await applySessionsPatchToStore({ cfg, store, storeKey: primaryKey, patch: p, loadGatewayModelCatalog: context.loadGatewayModelCatalog, }); }); if (!applied.ok) { respond(false, undefined, applied.error); return; } const parsed = parseAgentSessionKey(target.canonicalKey ?? key); const agentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); const resolved = resolveSessionModelRef(cfg, applied.entry, agentId); const result: SessionsPatchResult = { ok: true, path: storePath, key: target.canonicalKey, entry: applied.entry, resolved: { modelProvider: resolved.provider, model: resolved.model, }, }; respond(true, result, undefined); }, "sessions.reset": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsResetParams, "sessions.reset", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } const reason = p.reason === "new" ? "new" : "reset"; const result = await performGatewaySessionReset({ key, reason, commandSource: "gateway:sessions.reset", }); if (!result.ok) { respond(false, undefined, result.error); return; } respond(true, { ok: true, key: result.key, entry: result.entry }, undefined); }, "sessions.delete": async ({ params, respond, client, isWebchatConnect }) => { if (!assertValidParams(params, validateSessionsDeleteParams, "sessions.delete", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } if (rejectWebchatSessionMutation({ action: "delete", client, isWebchatConnect, respond })) { return; } const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); const mainKey = resolveMainSessionKey(cfg); if (target.canonicalKey === mainKey) { respond( false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`), ); return; } const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; const { entry, legacyKey, canonicalKey } = loadSessionEntry(key); const mutationCleanupError = await cleanupSessionBeforeMutation({ cfg, key, target, entry, legacyKey, canonicalKey, reason: "session-delete", }); if (mutationCleanupError) { respond(false, undefined, mutationCleanupError); return; } const sessionId = entry?.sessionId; const deleted = await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store }); const hadEntry = Boolean(store[primaryKey]); if (hadEntry) { delete store[primaryKey]; } return hadEntry; }); const archived = deleted && deleteTranscript ? archiveSessionTranscriptsForSession({ sessionId, storePath, sessionFile: entry?.sessionFile, agentId: target.agentId, reason: "deleted", }) : []; if (deleted) { const emitLifecycleHooks = p.emitLifecycleHooks !== false; await emitSessionUnboundLifecycleEvent({ targetSessionKey: target.canonicalKey ?? key, reason: "session-delete", emitHooks: emitLifecycleHooks, }); } respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined); }, "sessions.get": ({ params, respond }) => { const p = params; const key = requireSessionKey(p.key ?? p.sessionKey, respond); if (!key) { return; } const limit = typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, Math.floor(p.limit)) : 200; const { target, storePath } = resolveGatewaySessionTargetFromKey(key); const store = loadSessionStore(storePath); const entry = target.storeKeys.map((k) => store[k]).find(Boolean); if (!entry?.sessionId) { respond(true, { messages: [] }, undefined); return; } const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages; respond(true, { messages }, undefined); }, "sessions.compact": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsCompactParams, "sessions.compact", respond)) { return; } const p = params; const key = requireSessionKey(p.key, respond); if (!key) { return; } const maxLines = typeof p.maxLines === "number" && Number.isFinite(p.maxLines) ? Math.max(1, Math.floor(p.maxLines)) : 400; const { cfg, target, storePath } = resolveGatewaySessionTargetFromKey(key); // Lock + read in a short critical section; transcript work happens outside. const compactTarget = await updateSessionStore(storePath, (store) => { const { entry, primaryKey } = migrateAndPruneGatewaySessionStoreKey({ cfg, key, store }); return { entry, primaryKey }; }); const entry = compactTarget.entry; const sessionId = entry?.sessionId; if (!sessionId) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no sessionId", }, undefined, ); return; } const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, entry?.sessionFile, target.agentId, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, reason: "no transcript", }, undefined, ); return; } const raw = fs.readFileSync(filePath, "utf-8"); const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); if (lines.length <= maxLines) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, kept: lines.length, }, undefined, ); return; } const archived = archiveFileOnDisk(filePath, "bak"); const keptLines = lines.slice(-maxLines); fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); await updateSessionStore(storePath, (store) => { const entryKey = compactTarget.primaryKey; const entryToUpdate = store[entryKey]; if (!entryToUpdate) { return; } delete entryToUpdate.inputTokens; delete entryToUpdate.outputTokens; delete entryToUpdate.totalTokens; delete entryToUpdate.totalTokensFresh; entryToUpdate.updatedAt = Date.now(); }); respond( true, { ok: true, key: target.canonicalKey, compacted: true, archived, kept: keptLines.length, }, undefined, ); }, };