Spaces:
Configuration error
Configuration error
| import type { ChildProcessWithoutNullStreams } from "node:child_process"; | |
| import { createSessionSlug as createSessionSlugId } from "./session-slug.js"; | |
| const DEFAULT_JOB_TTL_MS = 30 * 60 * 1000; // 30 minutes | |
| const MIN_JOB_TTL_MS = 60 * 1000; // 1 minute | |
| const MAX_JOB_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours | |
| const DEFAULT_PENDING_OUTPUT_CHARS = 30_000; | |
| function clampTtl(value: number | undefined) { | |
| if (!value || Number.isNaN(value)) return DEFAULT_JOB_TTL_MS; | |
| return Math.min(Math.max(value, MIN_JOB_TTL_MS), MAX_JOB_TTL_MS); | |
| } | |
| let jobTtlMs = clampTtl(Number.parseInt(process.env.PI_BASH_JOB_TTL_MS ?? "", 10)); | |
| export type ProcessStatus = "running" | "completed" | "failed" | "killed"; | |
| export type SessionStdin = { | |
| write: (data: string, cb?: (err?: Error | null) => void) => void; | |
| end: () => void; | |
| destroyed?: boolean; | |
| }; | |
| export interface ProcessSession { | |
| id: string; | |
| command: string; | |
| scopeKey?: string; | |
| sessionKey?: string; | |
| notifyOnExit?: boolean; | |
| exitNotified?: boolean; | |
| child?: ChildProcessWithoutNullStreams; | |
| stdin?: SessionStdin; | |
| pid?: number; | |
| startedAt: number; | |
| cwd?: string; | |
| maxOutputChars: number; | |
| pendingMaxOutputChars?: number; | |
| totalOutputChars: number; | |
| pendingStdout: string[]; | |
| pendingStderr: string[]; | |
| pendingStdoutChars: number; | |
| pendingStderrChars: number; | |
| aggregated: string; | |
| tail: string; | |
| exitCode?: number | null; | |
| exitSignal?: NodeJS.Signals | number | null; | |
| exited: boolean; | |
| truncated: boolean; | |
| backgrounded: boolean; | |
| } | |
| export interface FinishedSession { | |
| id: string; | |
| command: string; | |
| scopeKey?: string; | |
| startedAt: number; | |
| endedAt: number; | |
| cwd?: string; | |
| status: ProcessStatus; | |
| exitCode?: number | null; | |
| exitSignal?: NodeJS.Signals | number | null; | |
| aggregated: string; | |
| tail: string; | |
| truncated: boolean; | |
| totalOutputChars: number; | |
| } | |
| const runningSessions = new Map<string, ProcessSession>(); | |
| const finishedSessions = new Map<string, FinishedSession>(); | |
| let sweeper: NodeJS.Timeout | null = null; | |
| function isSessionIdTaken(id: string) { | |
| return runningSessions.has(id) || finishedSessions.has(id); | |
| } | |
| export function createSessionSlug(): string { | |
| return createSessionSlugId(isSessionIdTaken); | |
| } | |
| export function addSession(session: ProcessSession) { | |
| runningSessions.set(session.id, session); | |
| startSweeper(); | |
| } | |
| export function getSession(id: string) { | |
| return runningSessions.get(id); | |
| } | |
| export function getFinishedSession(id: string) { | |
| return finishedSessions.get(id); | |
| } | |
| export function deleteSession(id: string) { | |
| runningSessions.delete(id); | |
| finishedSessions.delete(id); | |
| } | |
| export function appendOutput(session: ProcessSession, stream: "stdout" | "stderr", chunk: string) { | |
| session.pendingStdout ??= []; | |
| session.pendingStderr ??= []; | |
| session.pendingStdoutChars ??= sumPendingChars(session.pendingStdout); | |
| session.pendingStderrChars ??= sumPendingChars(session.pendingStderr); | |
| const buffer = stream === "stdout" ? session.pendingStdout : session.pendingStderr; | |
| const bufferChars = stream === "stdout" ? session.pendingStdoutChars : session.pendingStderrChars; | |
| const pendingCap = Math.min( | |
| session.pendingMaxOutputChars ?? DEFAULT_PENDING_OUTPUT_CHARS, | |
| session.maxOutputChars, | |
| ); | |
| buffer.push(chunk); | |
| let pendingChars = bufferChars + chunk.length; | |
| if (pendingChars > pendingCap) { | |
| session.truncated = true; | |
| pendingChars = capPendingBuffer(buffer, pendingChars, pendingCap); | |
| } | |
| if (stream === "stdout") { | |
| session.pendingStdoutChars = pendingChars; | |
| } else { | |
| session.pendingStderrChars = pendingChars; | |
| } | |
| session.totalOutputChars += chunk.length; | |
| const aggregated = trimWithCap(session.aggregated + chunk, session.maxOutputChars); | |
| session.truncated = | |
| session.truncated || aggregated.length < session.aggregated.length + chunk.length; | |
| session.aggregated = aggregated; | |
| session.tail = tail(session.aggregated, 2000); | |
| } | |
| export function drainSession(session: ProcessSession) { | |
| const stdout = session.pendingStdout.join(""); | |
| const stderr = session.pendingStderr.join(""); | |
| session.pendingStdout = []; | |
| session.pendingStderr = []; | |
| session.pendingStdoutChars = 0; | |
| session.pendingStderrChars = 0; | |
| return { stdout, stderr }; | |
| } | |
| export function markExited( | |
| session: ProcessSession, | |
| exitCode: number | null, | |
| exitSignal: NodeJS.Signals | number | null, | |
| status: ProcessStatus, | |
| ) { | |
| session.exited = true; | |
| session.exitCode = exitCode; | |
| session.exitSignal = exitSignal; | |
| session.tail = tail(session.aggregated, 2000); | |
| moveToFinished(session, status); | |
| } | |
| export function markBackgrounded(session: ProcessSession) { | |
| session.backgrounded = true; | |
| } | |
| function moveToFinished(session: ProcessSession, status: ProcessStatus) { | |
| runningSessions.delete(session.id); | |
| if (!session.backgrounded) return; | |
| finishedSessions.set(session.id, { | |
| id: session.id, | |
| command: session.command, | |
| scopeKey: session.scopeKey, | |
| startedAt: session.startedAt, | |
| endedAt: Date.now(), | |
| cwd: session.cwd, | |
| status, | |
| exitCode: session.exitCode, | |
| exitSignal: session.exitSignal, | |
| aggregated: session.aggregated, | |
| tail: session.tail, | |
| truncated: session.truncated, | |
| totalOutputChars: session.totalOutputChars, | |
| }); | |
| } | |
| export function tail(text: string, max = 2000) { | |
| if (text.length <= max) return text; | |
| return text.slice(text.length - max); | |
| } | |
| function sumPendingChars(buffer: string[]) { | |
| let total = 0; | |
| for (const chunk of buffer) total += chunk.length; | |
| return total; | |
| } | |
| function capPendingBuffer(buffer: string[], pendingChars: number, cap: number) { | |
| if (pendingChars <= cap) return pendingChars; | |
| const last = buffer.at(-1); | |
| if (last && last.length >= cap) { | |
| buffer.length = 0; | |
| buffer.push(last.slice(last.length - cap)); | |
| return cap; | |
| } | |
| while (buffer.length && pendingChars - buffer[0].length >= cap) { | |
| pendingChars -= buffer[0].length; | |
| buffer.shift(); | |
| } | |
| if (buffer.length && pendingChars > cap) { | |
| const overflow = pendingChars - cap; | |
| buffer[0] = buffer[0].slice(overflow); | |
| pendingChars = cap; | |
| } | |
| return pendingChars; | |
| } | |
| export function trimWithCap(text: string, max: number) { | |
| if (text.length <= max) return text; | |
| return text.slice(text.length - max); | |
| } | |
| export function listRunningSessions() { | |
| return Array.from(runningSessions.values()).filter((s) => s.backgrounded); | |
| } | |
| export function listFinishedSessions() { | |
| return Array.from(finishedSessions.values()); | |
| } | |
| export function clearFinished() { | |
| finishedSessions.clear(); | |
| } | |
| export function resetProcessRegistryForTests() { | |
| runningSessions.clear(); | |
| finishedSessions.clear(); | |
| stopSweeper(); | |
| } | |
| export function setJobTtlMs(value?: number) { | |
| if (value === undefined || Number.isNaN(value)) return; | |
| jobTtlMs = clampTtl(value); | |
| stopSweeper(); | |
| startSweeper(); | |
| } | |
| function pruneFinishedSessions() { | |
| const cutoff = Date.now() - jobTtlMs; | |
| for (const [id, session] of finishedSessions.entries()) { | |
| if (session.endedAt < cutoff) { | |
| finishedSessions.delete(id); | |
| } | |
| } | |
| } | |
| function startSweeper() { | |
| if (sweeper) return; | |
| sweeper = setInterval(pruneFinishedSessions, Math.max(30_000, jobTtlMs / 6)); | |
| sweeper.unref?.(); | |
| } | |
| function stopSweeper() { | |
| if (!sweeper) return; | |
| clearInterval(sweeper); | |
| sweeper = null; | |
| } | |