import { WorkerClient } from "./WorkerClient.js"; import { ResourceStore } from "./resourceStore.js"; import type { Config } from "../config.js"; import { newId } from "../utils/ids.js"; import { ToolError } from "../utils/errors.js"; export interface CreateSessionOptions { initialFiles?: Record; cwd?: string; ttlSeconds?: number; capabilities?: { network?: boolean; python?: boolean; javascript?: boolean; }; ownerTokenHash?: string; } export interface SessionRecord { id: string; worker: WorkerClient; cwd: string; initialFiles: Record; capabilities: { network: boolean; python: boolean; javascript: boolean }; createdAt: number; lastActiveAt: number; expiresAt: number; idleTtlMs: number; valid: boolean; invalidReason?: string; execQueue: Promise; ownerTokenHash?: string; } export class SessionManager { private sessions = new Map(); private resources: ResourceStore; private sweepTimer: NodeJS.Timeout | null = null; private draining = false; constructor(private readonly cfg: Config) { this.resources = new ResourceStore( cfg.maxExecsRetainedSession, cfg.maxResourceBytesSession, cfg.maxResourceBytesGlobal, ); } get resourceStore() { return this.resources; } get config() { return this.cfg; } count(): number { return this.sessions.size; } startSweeper() { if (this.sweepTimer) return; this.sweepTimer = setInterval(() => this.sweep(), this.cfg.sessionSweepIntervalSeconds * 1000); // Don't keep the event loop alive only for sweeping if (typeof this.sweepTimer.unref === "function") this.sweepTimer.unref(); } stopSweeper() { if (this.sweepTimer) clearInterval(this.sweepTimer); this.sweepTimer = null; } setDraining(value: boolean) { this.draining = value; } isDraining() { return this.draining; } async createSession(opts: CreateSessionOptions): Promise { if (this.draining) { throw new ToolError("INTERNAL", "Server is draining; cannot create sessions"); } if (this.sessions.size >= this.cfg.maxSessions) { throw new ToolError("QUOTA_EXCEEDED", `Max sessions reached (${this.cfg.maxSessions})`); } if (opts.ownerTokenHash) { let perTokenCount = 0; for (const s of this.sessions.values()) { if (s.ownerTokenHash === opts.ownerTokenHash) perTokenCount++; } if (perTokenCount >= this.cfg.maxSessionsPerToken) { throw new ToolError( "QUOTA_EXCEEDED", `Max sessions per token reached (${this.cfg.maxSessionsPerToken})`, ); } } const cwd = opts.cwd ?? "/home/user"; // Default each capability to "whatever the server allows". Callers can still // explicitly opt out by passing `false`. They cannot opt in beyond what the server allows. const askedNetwork = opts.capabilities?.network ?? this.cfg.enableNetwork; const askedPython = opts.capabilities?.python ?? this.cfg.enablePython; const askedJs = opts.capabilities?.javascript ?? this.cfg.enableJavaScript; const capabilities = { network: askedNetwork && this.cfg.enableNetwork, python: askedPython && this.cfg.enablePython, javascript: askedJs && this.cfg.enableJavaScript, }; const ttlSeconds = Math.min( Math.max(opts.ttlSeconds ?? this.cfg.sessionIdleTtlSeconds, 30), this.cfg.sessionIdleTtlSeconds * 4, ); const id = newId("jb"); const worker = new WorkerClient({ workerMaxOldGenMb: this.cfg.workerMaxOldGenMb }); try { await worker.init({ sessionId: id, cwd, initialFiles: opts.initialFiles ?? {}, capabilities, networkConfig: capabilities.network ? { dangerouslyAllowFullInternetAccess: this.cfg.dangerouslyAllowFullInternetAccess, allowedUrlPrefixes: this.cfg.allowedUrlPrefixes, allowedMethods: this.cfg.allowedMethods, } : undefined, executionLimits: { maxCallDepth: 50, maxCommandCount: 5000, maxLoopIterations: 5000, maxAwkIterations: 5000, maxSedIterations: 5000, }, maxSessionFsBytes: this.cfg.maxSessionFsBytes, maxSingleFileBytes: this.cfg.maxSingleFileBytes, maxWriteBatchBytes: this.cfg.maxWriteBatchBytes, maxReadBytesPerCall: this.cfg.maxReadBytesPerCall, maxCapturedOutputBytesExec: this.cfg.maxCapturedOutputBytesExec, }); } catch (e) { await worker.terminate("init_failed"); throw e; } const now = Date.now(); const rec: SessionRecord = { id, worker, cwd, initialFiles: opts.initialFiles ?? {}, capabilities, createdAt: now, lastActiveAt: now, expiresAt: now + ttlSeconds * 1000, idleTtlMs: ttlSeconds * 1000, valid: true, execQueue: Promise.resolve(), ownerTokenHash: opts.ownerTokenHash, }; this.sessions.set(id, rec); return rec; } get(id: string): SessionRecord | null { const s = this.sessions.get(id); if (!s) return null; if (Date.now() > s.expiresAt) { void this.delete(id, "expired"); return null; } if (!s.valid) { return s; // caller will see invalid } return s; } require(id: string): SessionRecord { const s = this.get(id); if (!s) throw new ToolError("UNKNOWN_SESSION", `Unknown or expired session_id`); if (!s.valid) throw new ToolError( "SESSION_INVALID", `Session is invalid: ${s.invalidReason ?? "terminated"}`, ); return s; } touch(id: string) { const s = this.sessions.get(id); if (!s) return; s.lastActiveAt = Date.now(); s.expiresAt = s.lastActiveAt + s.idleTtlMs; } async delete(id: string, reason: string = "deleted"): Promise { const s = this.sessions.get(id); if (!s) return false; this.sessions.delete(id); this.resources.deleteSession(id); await s.worker.terminate(reason); return true; } async reset(id: string, mode: "empty" | "initial_files"): Promise { const s = this.require(id); const initialFiles = mode === "initial_files" ? s.initialFiles : {}; await s.worker.terminate("reset"); const newWorker = new WorkerClient({ workerMaxOldGenMb: this.cfg.workerMaxOldGenMb }); try { await newWorker.init({ sessionId: s.id, cwd: s.cwd, initialFiles, capabilities: s.capabilities, networkConfig: s.capabilities.network ? { dangerouslyAllowFullInternetAccess: this.cfg.dangerouslyAllowFullInternetAccess, allowedUrlPrefixes: this.cfg.allowedUrlPrefixes, allowedMethods: this.cfg.allowedMethods, } : undefined, executionLimits: { maxCallDepth: 50, maxCommandCount: 5000, maxLoopIterations: 5000, maxAwkIterations: 5000, maxSedIterations: 5000, }, maxSessionFsBytes: this.cfg.maxSessionFsBytes, maxSingleFileBytes: this.cfg.maxSingleFileBytes, maxWriteBatchBytes: this.cfg.maxWriteBatchBytes, maxReadBytesPerCall: this.cfg.maxReadBytesPerCall, maxCapturedOutputBytesExec: this.cfg.maxCapturedOutputBytesExec, }); } catch (e) { await newWorker.terminate("reset_init_failed"); this.sessions.delete(s.id); throw e; } s.worker = newWorker; s.lastActiveAt = Date.now(); s.expiresAt = s.lastActiveAt + s.idleTtlMs; s.valid = true; s.invalidReason = undefined; s.execQueue = Promise.resolve(); this.resources.deleteSession(s.id); return s; } invalidate(id: string, reason: string) { const s = this.sessions.get(id); if (!s) return; s.valid = false; s.invalidReason = reason; this.resources.deleteSession(id); void s.worker.terminate(reason); } // Serialize exec calls per session. runOnSessionQueue(sessionId: string, fn: () => Promise): Promise { const s = this.require(sessionId); const next = s.execQueue.then(fn, fn); s.execQueue = next.catch(() => undefined); return next; } sweep() { const now = Date.now(); for (const s of Array.from(this.sessions.values())) { if (now > s.expiresAt || !s.valid) { void this.delete(s.id, !s.valid ? `invalid:${s.invalidReason}` : "expired"); } } this.resources.sweepExpired(); } async shutdown(graceMs: number): Promise { this.setDraining(true); this.stopSweeper(); // Wait for in-flight execs per session const deadline = Date.now() + graceMs; while (Date.now() < deadline) { let inflight = 0; for (const s of this.sessions.values()) { // Probe queue const start = Date.now(); let resolved = false; await Promise.race([ s.execQueue.then(() => { resolved = true; }), new Promise((r) => setTimeout(r, 50)), ]); if (!resolved && Date.now() - start >= 50) inflight++; } if (inflight === 0) break; } for (const s of Array.from(this.sessions.values())) { await this.delete(s.id, "shutdown"); } } }