Spaces:
Paused
Paused
| import { randomUUID } from "node:crypto"; | |
| import fs from "node:fs"; | |
| import type { GatewayRequestHandlers } from "./types.js"; | |
| import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js"; | |
| import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; | |
| import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; | |
| import { loadConfig } from "../../config/config.js"; | |
| import { | |
| loadSessionStore, | |
| snapshotSessionOrigin, | |
| resolveMainSessionKey, | |
| type SessionEntry, | |
| updateSessionStore, | |
| } from "../../config/sessions.js"; | |
| import { | |
| ErrorCodes, | |
| errorShape, | |
| formatValidationErrors, | |
| validateSessionsCompactParams, | |
| validateSessionsDeleteParams, | |
| validateSessionsListParams, | |
| validateSessionsPatchParams, | |
| validateSessionsPreviewParams, | |
| validateSessionsResetParams, | |
| validateSessionsResolveParams, | |
| } from "../protocol/index.js"; | |
| import { | |
| archiveFileOnDisk, | |
| listSessionsFromStore, | |
| loadCombinedSessionStoreForGateway, | |
| loadSessionEntry, | |
| readSessionPreviewItemsFromTranscript, | |
| resolveGatewaySessionStoreTarget, | |
| resolveSessionTranscriptCandidates, | |
| type SessionsPatchResult, | |
| type SessionsPreviewEntry, | |
| type SessionsPreviewResult, | |
| } from "../session-utils.js"; | |
| import { applySessionsPatchToStore } from "../sessions-patch.js"; | |
| import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; | |
| export const sessionsHandlers: GatewayRequestHandlers = { | |
| "sessions.list": ({ params, respond }) => { | |
| if (!validateSessionsListParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const cfg = loadConfig(); | |
| const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); | |
| const result = listSessionsFromStore({ | |
| cfg, | |
| storePath, | |
| store, | |
| opts: p, | |
| }); | |
| respond(true, result, undefined); | |
| }, | |
| "sessions.preview": ({ params, respond }) => { | |
| if (!validateSessionsPreviewParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.preview params: ${formatValidationErrors( | |
| validateSessionsPreviewParams.errors, | |
| )}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const keysRaw = Array.isArray(p.keys) ? p.keys : []; | |
| const keys = keysRaw | |
| .map((key) => String(key ?? "").trim()) | |
| .filter(Boolean) | |
| .slice(0, 64); | |
| const limit = | |
| typeof p.limit === "number" && Number.isFinite(p.limit) ? Math.max(1, p.limit) : 12; | |
| const maxChars = | |
| typeof p.maxChars === "number" && Number.isFinite(p.maxChars) | |
| ? Math.max(20, p.maxChars) | |
| : 240; | |
| if (keys.length === 0) { | |
| respond(true, { ts: Date.now(), previews: [] } satisfies SessionsPreviewResult, undefined); | |
| return; | |
| } | |
| const cfg = loadConfig(); | |
| const storeCache = new Map<string, Record<string, SessionEntry>>(); | |
| const previews: SessionsPreviewEntry[] = []; | |
| for (const key of keys) { | |
| try { | |
| const target = resolveGatewaySessionStoreTarget({ cfg, key }); | |
| const store = storeCache.get(target.storePath) ?? loadSessionStore(target.storePath); | |
| storeCache.set(target.storePath, store); | |
| const entry = | |
| target.storeKeys.map((candidate) => store[candidate]).find(Boolean) ?? | |
| store[target.canonicalKey]; | |
| if (!entry?.sessionId) { | |
| previews.push({ key, status: "missing", items: [] }); | |
| continue; | |
| } | |
| const items = readSessionPreviewItemsFromTranscript( | |
| entry.sessionId, | |
| target.storePath, | |
| entry.sessionFile, | |
| target.agentId, | |
| limit, | |
| maxChars, | |
| ); | |
| previews.push({ | |
| key, | |
| status: items.length > 0 ? "ok" : "empty", | |
| items, | |
| }); | |
| } catch { | |
| previews.push({ key, status: "error", items: [] }); | |
| } | |
| } | |
| respond(true, { ts: Date.now(), previews } satisfies SessionsPreviewResult, undefined); | |
| }, | |
| "sessions.resolve": ({ params, respond }) => { | |
| if (!validateSessionsResolveParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.resolve params: ${formatValidationErrors(validateSessionsResolveParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const cfg = loadConfig(); | |
| const resolved = resolveSessionKeyFromResolveParams({ cfg, p }); | |
| if (!resolved.ok) { | |
| respond(false, undefined, resolved.error); | |
| return; | |
| } | |
| respond(true, { ok: true, key: resolved.key }, undefined); | |
| }, | |
| "sessions.patch": async ({ params, respond, context }) => { | |
| if (!validateSessionsPatchParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const key = String(p.key ?? "").trim(); | |
| if (!key) { | |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); | |
| return; | |
| } | |
| const cfg = loadConfig(); | |
| const target = resolveGatewaySessionStoreTarget({ cfg, key }); | |
| const storePath = target.storePath; | |
| const applied = await updateSessionStore(storePath, async (store) => { | |
| const primaryKey = target.storeKeys[0] ?? key; | |
| const existingKey = target.storeKeys.find((candidate) => store[candidate]); | |
| if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { | |
| store[primaryKey] = store[existingKey]; | |
| delete store[existingKey]; | |
| } | |
| return await applySessionsPatchToStore({ | |
| cfg, | |
| store, | |
| storeKey: primaryKey, | |
| patch: p, | |
| loadGatewayModelCatalog: context.loadGatewayModelCatalog, | |
| }); | |
| }); | |
| if (!applied.ok) { | |
| respond(false, undefined, applied.error); | |
| return; | |
| } | |
| const result: SessionsPatchResult = { | |
| ok: true, | |
| path: storePath, | |
| key: target.canonicalKey, | |
| entry: applied.entry, | |
| }; | |
| respond(true, result, undefined); | |
| }, | |
| "sessions.reset": async ({ params, respond }) => { | |
| if (!validateSessionsResetParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const key = String(p.key ?? "").trim(); | |
| if (!key) { | |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); | |
| return; | |
| } | |
| const cfg = loadConfig(); | |
| const target = resolveGatewaySessionStoreTarget({ cfg, key }); | |
| const storePath = target.storePath; | |
| const next = await updateSessionStore(storePath, (store) => { | |
| const primaryKey = target.storeKeys[0] ?? key; | |
| const existingKey = target.storeKeys.find((candidate) => store[candidate]); | |
| if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { | |
| store[primaryKey] = store[existingKey]; | |
| delete store[existingKey]; | |
| } | |
| const entry = store[primaryKey]; | |
| const now = Date.now(); | |
| const nextEntry: SessionEntry = { | |
| sessionId: randomUUID(), | |
| updatedAt: now, | |
| systemSent: false, | |
| abortedLastRun: false, | |
| thinkingLevel: entry?.thinkingLevel, | |
| verboseLevel: entry?.verboseLevel, | |
| reasoningLevel: entry?.reasoningLevel, | |
| responseUsage: entry?.responseUsage, | |
| model: entry?.model, | |
| contextTokens: entry?.contextTokens, | |
| sendPolicy: entry?.sendPolicy, | |
| label: entry?.label, | |
| origin: snapshotSessionOrigin(entry), | |
| lastChannel: entry?.lastChannel, | |
| lastTo: entry?.lastTo, | |
| skillsSnapshot: entry?.skillsSnapshot, | |
| // Reset token counts to 0 on session reset (#1523) | |
| inputTokens: 0, | |
| outputTokens: 0, | |
| totalTokens: 0, | |
| }; | |
| store[primaryKey] = nextEntry; | |
| return nextEntry; | |
| }); | |
| respond(true, { ok: true, key: target.canonicalKey, entry: next }, undefined); | |
| }, | |
| "sessions.delete": async ({ params, respond }) => { | |
| if (!validateSessionsDeleteParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const key = String(p.key ?? "").trim(); | |
| if (!key) { | |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); | |
| return; | |
| } | |
| const cfg = loadConfig(); | |
| const mainKey = resolveMainSessionKey(cfg); | |
| const target = resolveGatewaySessionStoreTarget({ cfg, key }); | |
| if (target.canonicalKey === mainKey) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape(ErrorCodes.INVALID_REQUEST, `Cannot delete the main session (${mainKey}).`), | |
| ); | |
| return; | |
| } | |
| const deleteTranscript = typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true; | |
| const storePath = target.storePath; | |
| const { entry } = loadSessionEntry(key); | |
| const sessionId = entry?.sessionId; | |
| const existed = Boolean(entry); | |
| const queueKeys = new Set<string>(target.storeKeys); | |
| queueKeys.add(target.canonicalKey); | |
| if (sessionId) { | |
| queueKeys.add(sessionId); | |
| } | |
| clearSessionQueues([...queueKeys]); | |
| stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey }); | |
| if (sessionId) { | |
| abortEmbeddedPiRun(sessionId); | |
| const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000); | |
| if (!ended) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.UNAVAILABLE, | |
| `Session ${key} is still active; try again in a moment.`, | |
| ), | |
| ); | |
| return; | |
| } | |
| } | |
| await updateSessionStore(storePath, (store) => { | |
| const primaryKey = target.storeKeys[0] ?? key; | |
| const existingKey = target.storeKeys.find((candidate) => store[candidate]); | |
| if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { | |
| store[primaryKey] = store[existingKey]; | |
| delete store[existingKey]; | |
| } | |
| if (store[primaryKey]) { | |
| delete store[primaryKey]; | |
| } | |
| }); | |
| const archived: string[] = []; | |
| if (deleteTranscript && sessionId) { | |
| for (const candidate of resolveSessionTranscriptCandidates( | |
| sessionId, | |
| storePath, | |
| entry?.sessionFile, | |
| target.agentId, | |
| )) { | |
| if (!fs.existsSync(candidate)) { | |
| continue; | |
| } | |
| try { | |
| archived.push(archiveFileOnDisk(candidate, "deleted")); | |
| } catch { | |
| // Best-effort. | |
| } | |
| } | |
| } | |
| respond(true, { ok: true, key: target.canonicalKey, deleted: existed, archived }, undefined); | |
| }, | |
| "sessions.compact": async ({ params, respond }) => { | |
| if (!validateSessionsCompactParams(params)) { | |
| respond( | |
| false, | |
| undefined, | |
| errorShape( | |
| ErrorCodes.INVALID_REQUEST, | |
| `invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`, | |
| ), | |
| ); | |
| return; | |
| } | |
| const p = params; | |
| const key = String(p.key ?? "").trim(); | |
| if (!key) { | |
| respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "key required")); | |
| return; | |
| } | |
| const maxLines = | |
| typeof p.maxLines === "number" && Number.isFinite(p.maxLines) | |
| ? Math.max(1, Math.floor(p.maxLines)) | |
| : 400; | |
| const cfg = loadConfig(); | |
| const target = resolveGatewaySessionStoreTarget({ cfg, key }); | |
| const storePath = target.storePath; | |
| // Lock + read in a short critical section; transcript work happens outside. | |
| const compactTarget = await updateSessionStore(storePath, (store) => { | |
| const primaryKey = target.storeKeys[0] ?? key; | |
| const existingKey = target.storeKeys.find((candidate) => store[candidate]); | |
| if (existingKey && existingKey !== primaryKey && !store[primaryKey]) { | |
| store[primaryKey] = store[existingKey]; | |
| delete store[existingKey]; | |
| } | |
| return { entry: store[primaryKey], primaryKey }; | |
| }); | |
| const entry = compactTarget.entry; | |
| const sessionId = entry?.sessionId; | |
| if (!sessionId) { | |
| respond( | |
| true, | |
| { | |
| ok: true, | |
| key: target.canonicalKey, | |
| compacted: false, | |
| reason: "no sessionId", | |
| }, | |
| undefined, | |
| ); | |
| return; | |
| } | |
| const filePath = resolveSessionTranscriptCandidates( | |
| sessionId, | |
| storePath, | |
| entry?.sessionFile, | |
| target.agentId, | |
| ).find((candidate) => fs.existsSync(candidate)); | |
| if (!filePath) { | |
| respond( | |
| true, | |
| { | |
| ok: true, | |
| key: target.canonicalKey, | |
| compacted: false, | |
| reason: "no transcript", | |
| }, | |
| undefined, | |
| ); | |
| return; | |
| } | |
| const raw = fs.readFileSync(filePath, "utf-8"); | |
| const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0); | |
| if (lines.length <= maxLines) { | |
| respond( | |
| true, | |
| { | |
| ok: true, | |
| key: target.canonicalKey, | |
| compacted: false, | |
| kept: lines.length, | |
| }, | |
| undefined, | |
| ); | |
| return; | |
| } | |
| const archived = archiveFileOnDisk(filePath, "bak"); | |
| const keptLines = lines.slice(-maxLines); | |
| fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); | |
| await updateSessionStore(storePath, (store) => { | |
| const entryKey = compactTarget.primaryKey; | |
| const entryToUpdate = store[entryKey]; | |
| if (!entryToUpdate) { | |
| return; | |
| } | |
| delete entryToUpdate.inputTokens; | |
| delete entryToUpdate.outputTokens; | |
| delete entryToUpdate.totalTokens; | |
| entryToUpdate.updatedAt = Date.now(); | |
| }); | |
| respond( | |
| true, | |
| { | |
| ok: true, | |
| key: target.canonicalKey, | |
| compacted: true, | |
| archived, | |
| kept: keptLines.length, | |
| }, | |
| undefined, | |
| ); | |
| }, | |
| }; | |