Spaces:
Paused
Paused
| import fs from "node:fs/promises"; | |
| import path from "node:path"; | |
| import { execFile as execFileCallback } from "node:child_process"; | |
| import { promisify } from "node:util"; | |
| import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; | |
| import type { Db } from "@paperclipai/db"; | |
| import type { BillingType } from "@paperclipai/shared"; | |
| import { | |
| agents, | |
| agentRuntimeState, | |
| agentTaskSessions, | |
| agentWakeupRequests, | |
| heartbeatRunEvents, | |
| heartbeatRuns, | |
| issues, | |
| projects, | |
| projectWorkspaces, | |
| } from "@paperclipai/db"; | |
| import { conflict, notFound } from "../errors.js"; | |
| import { logger } from "../middleware/logger.js"; | |
| import { publishLiveEvent } from "./live-events.js"; | |
| import { getRunLogStore, type RunLogHandle } from "./run-log-store.js"; | |
| import { getServerAdapter, runningProcesses } from "../adapters/index.js"; | |
| import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec, UsageSummary } from "../adapters/index.js"; | |
| import { createLocalAgentJwt } from "../agent-auth-jwt.js"; | |
| import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js"; | |
| import { costService } from "./costs.js"; | |
| import { budgetService, type BudgetEnforcementScope } from "./budgets.js"; | |
| import { secretService } from "./secrets.js"; | |
| import { resolveDefaultAgentWorkspaceDir, resolveManagedProjectWorkspaceDir } from "../home-paths.js"; | |
| import { summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js"; | |
| import { | |
| buildWorkspaceReadyComment, | |
| cleanupExecutionWorkspaceArtifacts, | |
| ensureRuntimeServicesForRun, | |
| persistAdapterManagedRuntimeServices, | |
| realizeExecutionWorkspace, | |
| releaseRuntimeServicesForRun, | |
| sanitizeRuntimeServiceBaseEnv, | |
| } from "./workspace-runtime.js"; | |
| import { issueService } from "./issues.js"; | |
| import { executionWorkspaceService } from "./execution-workspaces.js"; | |
| import { workspaceOperationService } from "./workspace-operations.js"; | |
| import { | |
| buildExecutionWorkspaceAdapterConfig, | |
| gateProjectExecutionWorkspacePolicy, | |
| parseIssueExecutionWorkspaceSettings, | |
| parseProjectExecutionWorkspacePolicy, | |
| resolveExecutionWorkspaceMode, | |
| } from "./execution-workspace-policy.js"; | |
| import { instanceSettingsService } from "./instance-settings.js"; | |
| import { redactCurrentUserText, redactCurrentUserValue } from "../log-redaction.js"; | |
| import { | |
| hasSessionCompactionThresholds, | |
| resolveSessionCompactionPolicy, | |
| type SessionCompactionPolicy, | |
| } from "@paperclipai/adapter-utils"; | |
| const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; | |
| const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1; | |
| const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10; | |
| const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext"; | |
| const startLocksByAgent = new Map<string, Promise<void>>(); | |
| const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__"; | |
| const MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS = 10 * 60 * 1000; | |
| const execFile = promisify(execFileCallback); | |
| const SESSIONED_LOCAL_ADAPTERS = new Set([ | |
| "claude_local", | |
| "codex_local", | |
| "cursor", | |
| "gemini_local", | |
| "opencode_local", | |
| "pi_local", | |
| ]); | |
| function deriveRepoNameFromRepoUrl(repoUrl: string | null): string | null { | |
| const trimmed = repoUrl?.trim() ?? ""; | |
| if (!trimmed) return null; | |
| try { | |
| const parsed = new URL(trimmed); | |
| const cleanedPath = parsed.pathname.replace(/\/+$/, ""); | |
| const repoName = cleanedPath.split("/").filter(Boolean).pop()?.replace(/\.git$/i, "") ?? ""; | |
| return repoName || null; | |
| } catch { | |
| return null; | |
| } | |
| } | |
| async function ensureManagedProjectWorkspace(input: { | |
| companyId: string; | |
| projectId: string; | |
| repoUrl: string | null; | |
| }): Promise<{ cwd: string; warning: string | null }> { | |
| const cwd = resolveManagedProjectWorkspaceDir({ | |
| companyId: input.companyId, | |
| projectId: input.projectId, | |
| repoName: deriveRepoNameFromRepoUrl(input.repoUrl), | |
| }); | |
| await fs.mkdir(path.dirname(cwd), { recursive: true }); | |
| const stats = await fs.stat(cwd).catch(() => null); | |
| if (!input.repoUrl) { | |
| if (!stats) { | |
| await fs.mkdir(cwd, { recursive: true }); | |
| } | |
| return { cwd, warning: null }; | |
| } | |
| const gitDirExists = await fs | |
| .stat(path.resolve(cwd, ".git")) | |
| .then((entry) => entry.isDirectory()) | |
| .catch(() => false); | |
| if (gitDirExists) { | |
| return { cwd, warning: null }; | |
| } | |
| if (stats) { | |
| const entries = await fs.readdir(cwd).catch(() => []); | |
| if (entries.length > 0) { | |
| return { | |
| cwd, | |
| warning: `Managed workspace path "${cwd}" already exists but is not a git checkout. Using it as-is.`, | |
| }; | |
| } | |
| await fs.rm(cwd, { recursive: true, force: true }); | |
| } | |
| try { | |
| await execFile("git", ["clone", input.repoUrl, cwd], { | |
| env: sanitizeRuntimeServiceBaseEnv(process.env), | |
| timeout: MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS, | |
| }); | |
| return { cwd, warning: null }; | |
| } catch (error) { | |
| const reason = error instanceof Error ? error.message : String(error); | |
| throw new Error(`Failed to prepare managed checkout for "${input.repoUrl}" at "${cwd}": ${reason}`); | |
| } | |
| } | |
| const heartbeatRunListColumns = { | |
| id: heartbeatRuns.id, | |
| companyId: heartbeatRuns.companyId, | |
| agentId: heartbeatRuns.agentId, | |
| invocationSource: heartbeatRuns.invocationSource, | |
| triggerDetail: heartbeatRuns.triggerDetail, | |
| status: heartbeatRuns.status, | |
| startedAt: heartbeatRuns.startedAt, | |
| finishedAt: heartbeatRuns.finishedAt, | |
| error: heartbeatRuns.error, | |
| wakeupRequestId: heartbeatRuns.wakeupRequestId, | |
| exitCode: heartbeatRuns.exitCode, | |
| signal: heartbeatRuns.signal, | |
| usageJson: heartbeatRuns.usageJson, | |
| resultJson: heartbeatRuns.resultJson, | |
| sessionIdBefore: heartbeatRuns.sessionIdBefore, | |
| sessionIdAfter: heartbeatRuns.sessionIdAfter, | |
| logStore: heartbeatRuns.logStore, | |
| logRef: heartbeatRuns.logRef, | |
| logBytes: heartbeatRuns.logBytes, | |
| logSha256: heartbeatRuns.logSha256, | |
| logCompressed: heartbeatRuns.logCompressed, | |
| stdoutExcerpt: sql<string | null>`NULL`.as("stdoutExcerpt"), | |
| stderrExcerpt: sql<string | null>`NULL`.as("stderrExcerpt"), | |
| errorCode: heartbeatRuns.errorCode, | |
| externalRunId: heartbeatRuns.externalRunId, | |
| contextSnapshot: heartbeatRuns.contextSnapshot, | |
| createdAt: heartbeatRuns.createdAt, | |
| updatedAt: heartbeatRuns.updatedAt, | |
| } as const; | |
| function appendExcerpt(prev: string, chunk: string) { | |
| return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); | |
| } | |
| function normalizeMaxConcurrentRuns(value: unknown) { | |
| const parsed = Math.floor(asNumber(value, HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT)); | |
| if (!Number.isFinite(parsed)) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT; | |
| return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed)); | |
| } | |
| async function withAgentStartLock<T>(agentId: string, fn: () => Promise<T>) { | |
| const previous = startLocksByAgent.get(agentId) ?? Promise.resolve(); | |
| const run = previous.then(fn); | |
| const marker = run.then( | |
| () => undefined, | |
| () => undefined, | |
| ); | |
| startLocksByAgent.set(agentId, marker); | |
| try { | |
| return await run; | |
| } finally { | |
| if (startLocksByAgent.get(agentId) === marker) { | |
| startLocksByAgent.delete(agentId); | |
| } | |
| } | |
| } | |
| interface WakeupOptions { | |
| source?: "timer" | "assignment" | "on_demand" | "automation"; | |
| triggerDetail?: "manual" | "ping" | "callback" | "system"; | |
| reason?: string | null; | |
| payload?: Record<string, unknown> | null; | |
| idempotencyKey?: string | null; | |
| requestedByActorType?: "user" | "agent" | "system"; | |
| requestedByActorId?: string | null; | |
| contextSnapshot?: Record<string, unknown>; | |
| } | |
| type UsageTotals = { | |
| inputTokens: number; | |
| cachedInputTokens: number; | |
| outputTokens: number; | |
| }; | |
| type SessionCompactionDecision = { | |
| rotate: boolean; | |
| reason: string | null; | |
| handoffMarkdown: string | null; | |
| previousRunId: string | null; | |
| }; | |
| interface ParsedIssueAssigneeAdapterOverrides { | |
| adapterConfig: Record<string, unknown> | null; | |
| useProjectWorkspace: boolean | null; | |
| } | |
| export type ResolvedWorkspaceForRun = { | |
| cwd: string; | |
| source: "project_primary" | "task_session" | "agent_home"; | |
| projectId: string | null; | |
| workspaceId: string | null; | |
| repoUrl: string | null; | |
| repoRef: string | null; | |
| workspaceHints: Array<{ | |
| workspaceId: string; | |
| cwd: string | null; | |
| repoUrl: string | null; | |
| repoRef: string | null; | |
| }>; | |
| warnings: string[]; | |
| }; | |
| type ProjectWorkspaceCandidate = { | |
| id: string; | |
| }; | |
| export function prioritizeProjectWorkspaceCandidatesForRun<T extends ProjectWorkspaceCandidate>( | |
| rows: T[], | |
| preferredWorkspaceId: string | null | undefined, | |
| ): T[] { | |
| if (!preferredWorkspaceId) return rows; | |
| const preferredIndex = rows.findIndex((row) => row.id === preferredWorkspaceId); | |
| if (preferredIndex <= 0) return rows; | |
| return [rows[preferredIndex]!, ...rows.slice(0, preferredIndex), ...rows.slice(preferredIndex + 1)]; | |
| } | |
| function readNonEmptyString(value: unknown): string | null { | |
| return typeof value === "string" && value.trim().length > 0 ? value : null; | |
| } | |
| function normalizeLedgerBillingType(value: unknown): BillingType { | |
| const raw = readNonEmptyString(value); | |
| switch (raw) { | |
| case "api": | |
| case "metered_api": | |
| return "metered_api"; | |
| case "subscription": | |
| case "subscription_included": | |
| return "subscription_included"; | |
| case "subscription_overage": | |
| return "subscription_overage"; | |
| case "credits": | |
| return "credits"; | |
| case "fixed": | |
| return "fixed"; | |
| default: | |
| return "unknown"; | |
| } | |
| } | |
| function resolveLedgerBiller(result: AdapterExecutionResult): string { | |
| return readNonEmptyString(result.biller) ?? readNonEmptyString(result.provider) ?? "unknown"; | |
| } | |
| function normalizeBilledCostCents(costUsd: number | null | undefined, billingType: BillingType): number { | |
| if (billingType === "subscription_included") return 0; | |
| if (typeof costUsd !== "number" || !Number.isFinite(costUsd)) return 0; | |
| return Math.max(0, Math.round(costUsd * 100)); | |
| } | |
| async function resolveLedgerScopeForRun( | |
| db: Db, | |
| companyId: string, | |
| run: typeof heartbeatRuns.$inferSelect, | |
| ) { | |
| const context = parseObject(run.contextSnapshot); | |
| const contextIssueId = readNonEmptyString(context.issueId); | |
| const contextProjectId = readNonEmptyString(context.projectId); | |
| if (!contextIssueId) { | |
| return { | |
| issueId: null, | |
| projectId: contextProjectId, | |
| }; | |
| } | |
| const issue = await db | |
| .select({ | |
| id: issues.id, | |
| projectId: issues.projectId, | |
| }) | |
| .from(issues) | |
| .where(and(eq(issues.id, contextIssueId), eq(issues.companyId, companyId))) | |
| .then((rows) => rows[0] ?? null); | |
| return { | |
| issueId: issue?.id ?? null, | |
| projectId: issue?.projectId ?? contextProjectId, | |
| }; | |
| } | |
| function normalizeUsageTotals(usage: UsageSummary | null | undefined): UsageTotals | null { | |
| if (!usage) return null; | |
| return { | |
| inputTokens: Math.max(0, Math.floor(asNumber(usage.inputTokens, 0))), | |
| cachedInputTokens: Math.max(0, Math.floor(asNumber(usage.cachedInputTokens, 0))), | |
| outputTokens: Math.max(0, Math.floor(asNumber(usage.outputTokens, 0))), | |
| }; | |
| } | |
| function readRawUsageTotals(usageJson: unknown): UsageTotals | null { | |
| const parsed = parseObject(usageJson); | |
| if (Object.keys(parsed).length === 0) return null; | |
| const inputTokens = Math.max( | |
| 0, | |
| Math.floor(asNumber(parsed.rawInputTokens, asNumber(parsed.inputTokens, 0))), | |
| ); | |
| const cachedInputTokens = Math.max( | |
| 0, | |
| Math.floor(asNumber(parsed.rawCachedInputTokens, asNumber(parsed.cachedInputTokens, 0))), | |
| ); | |
| const outputTokens = Math.max( | |
| 0, | |
| Math.floor(asNumber(parsed.rawOutputTokens, asNumber(parsed.outputTokens, 0))), | |
| ); | |
| if (inputTokens <= 0 && cachedInputTokens <= 0 && outputTokens <= 0) { | |
| return null; | |
| } | |
| return { | |
| inputTokens, | |
| cachedInputTokens, | |
| outputTokens, | |
| }; | |
| } | |
| function deriveNormalizedUsageDelta(current: UsageTotals | null, previous: UsageTotals | null): UsageTotals | null { | |
| if (!current) return null; | |
| if (!previous) return { ...current }; | |
| const inputTokens = current.inputTokens >= previous.inputTokens | |
| ? current.inputTokens - previous.inputTokens | |
| : current.inputTokens; | |
| const cachedInputTokens = current.cachedInputTokens >= previous.cachedInputTokens | |
| ? current.cachedInputTokens - previous.cachedInputTokens | |
| : current.cachedInputTokens; | |
| const outputTokens = current.outputTokens >= previous.outputTokens | |
| ? current.outputTokens - previous.outputTokens | |
| : current.outputTokens; | |
| return { | |
| inputTokens: Math.max(0, inputTokens), | |
| cachedInputTokens: Math.max(0, cachedInputTokens), | |
| outputTokens: Math.max(0, outputTokens), | |
| }; | |
| } | |
| function formatCount(value: number | null | undefined) { | |
| if (typeof value !== "number" || !Number.isFinite(value)) return "0"; | |
| return value.toLocaleString("en-US"); | |
| } | |
| export function parseSessionCompactionPolicy(agent: typeof agents.$inferSelect): SessionCompactionPolicy { | |
| return resolveSessionCompactionPolicy(agent.adapterType, agent.runtimeConfig).policy; | |
| } | |
| export function resolveRuntimeSessionParamsForWorkspace(input: { | |
| agentId: string; | |
| previousSessionParams: Record<string, unknown> | null; | |
| resolvedWorkspace: ResolvedWorkspaceForRun; | |
| }) { | |
| const { agentId, previousSessionParams, resolvedWorkspace } = input; | |
| const previousSessionId = readNonEmptyString(previousSessionParams?.sessionId); | |
| const previousCwd = readNonEmptyString(previousSessionParams?.cwd); | |
| if (!previousSessionId || !previousCwd) { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| if (resolvedWorkspace.source !== "project_primary") { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| const projectCwd = readNonEmptyString(resolvedWorkspace.cwd); | |
| if (!projectCwd) { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| const fallbackAgentHomeCwd = resolveDefaultAgentWorkspaceDir(agentId); | |
| if (path.resolve(previousCwd) !== path.resolve(fallbackAgentHomeCwd)) { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| if (path.resolve(projectCwd) === path.resolve(previousCwd)) { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| const previousWorkspaceId = readNonEmptyString(previousSessionParams?.workspaceId); | |
| if ( | |
| previousWorkspaceId && | |
| resolvedWorkspace.workspaceId && | |
| previousWorkspaceId !== resolvedWorkspace.workspaceId | |
| ) { | |
| return { | |
| sessionParams: previousSessionParams, | |
| warning: null as string | null, | |
| }; | |
| } | |
| const migratedSessionParams: Record<string, unknown> = { | |
| ...(previousSessionParams ?? {}), | |
| cwd: projectCwd, | |
| }; | |
| if (resolvedWorkspace.workspaceId) migratedSessionParams.workspaceId = resolvedWorkspace.workspaceId; | |
| if (resolvedWorkspace.repoUrl) migratedSessionParams.repoUrl = resolvedWorkspace.repoUrl; | |
| if (resolvedWorkspace.repoRef) migratedSessionParams.repoRef = resolvedWorkspace.repoRef; | |
| return { | |
| sessionParams: migratedSessionParams, | |
| warning: | |
| `Project workspace "${projectCwd}" is now available. ` + | |
| `Attempting to resume session "${previousSessionId}" that was previously saved in fallback workspace "${previousCwd}".`, | |
| }; | |
| } | |
| function parseIssueAssigneeAdapterOverrides( | |
| raw: unknown, | |
| ): ParsedIssueAssigneeAdapterOverrides | null { | |
| const parsed = parseObject(raw); | |
| const parsedAdapterConfig = parseObject(parsed.adapterConfig); | |
| const adapterConfig = | |
| Object.keys(parsedAdapterConfig).length > 0 ? parsedAdapterConfig : null; | |
| const useProjectWorkspace = | |
| typeof parsed.useProjectWorkspace === "boolean" | |
| ? parsed.useProjectWorkspace | |
| : null; | |
| if (!adapterConfig && useProjectWorkspace === null) return null; | |
| return { | |
| adapterConfig, | |
| useProjectWorkspace, | |
| }; | |
| } | |
| function deriveTaskKey( | |
| contextSnapshot: Record<string, unknown> | null | undefined, | |
| payload: Record<string, unknown> | null | undefined, | |
| ) { | |
| return ( | |
| readNonEmptyString(contextSnapshot?.taskKey) ?? | |
| readNonEmptyString(contextSnapshot?.taskId) ?? | |
| readNonEmptyString(contextSnapshot?.issueId) ?? | |
| readNonEmptyString(payload?.taskKey) ?? | |
| readNonEmptyString(payload?.taskId) ?? | |
| readNonEmptyString(payload?.issueId) ?? | |
| null | |
| ); | |
| } | |
| export function shouldResetTaskSessionForWake( | |
| contextSnapshot: Record<string, unknown> | null | undefined, | |
| ) { | |
| if (contextSnapshot?.forceFreshSession === true) return true; | |
| const wakeReason = readNonEmptyString(contextSnapshot?.wakeReason); | |
| if (wakeReason === "issue_assigned") return true; | |
| return false; | |
| } | |
| function describeSessionResetReason( | |
| contextSnapshot: Record<string, unknown> | null | undefined, | |
| ) { | |
| if (contextSnapshot?.forceFreshSession === true) return "forceFreshSession was requested"; | |
| const wakeReason = readNonEmptyString(contextSnapshot?.wakeReason); | |
| if (wakeReason === "issue_assigned") return "wake reason is issue_assigned"; | |
| return null; | |
| } | |
| function deriveCommentId( | |
| contextSnapshot: Record<string, unknown> | null | undefined, | |
| payload: Record<string, unknown> | null | undefined, | |
| ) { | |
| return ( | |
| readNonEmptyString(contextSnapshot?.wakeCommentId) ?? | |
| readNonEmptyString(contextSnapshot?.commentId) ?? | |
| readNonEmptyString(payload?.commentId) ?? | |
| null | |
| ); | |
| } | |
| function enrichWakeContextSnapshot(input: { | |
| contextSnapshot: Record<string, unknown>; | |
| reason: string | null; | |
| source: WakeupOptions["source"]; | |
| triggerDetail: WakeupOptions["triggerDetail"] | null; | |
| payload: Record<string, unknown> | null; | |
| }) { | |
| const { contextSnapshot, reason, source, triggerDetail, payload } = input; | |
| const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); | |
| const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]); | |
| const taskKey = deriveTaskKey(contextSnapshot, payload); | |
| const wakeCommentId = deriveCommentId(contextSnapshot, payload); | |
| if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { | |
| contextSnapshot.wakeReason = reason; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) { | |
| contextSnapshot.issueId = issueIdFromPayload; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) { | |
| contextSnapshot.taskId = issueIdFromPayload; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) { | |
| contextSnapshot.taskKey = taskKey; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) { | |
| contextSnapshot.commentId = commentIdFromPayload; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) { | |
| contextSnapshot.wakeCommentId = wakeCommentId; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["wakeSource"]) && source) { | |
| contextSnapshot.wakeSource = source; | |
| } | |
| if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) { | |
| contextSnapshot.wakeTriggerDetail = triggerDetail; | |
| } | |
| return { | |
| contextSnapshot, | |
| issueIdFromPayload, | |
| commentIdFromPayload, | |
| taskKey, | |
| wakeCommentId, | |
| }; | |
| } | |
| function mergeCoalescedContextSnapshot( | |
| existingRaw: unknown, | |
| incoming: Record<string, unknown>, | |
| ) { | |
| const existing = parseObject(existingRaw); | |
| const merged: Record<string, unknown> = { | |
| ...existing, | |
| ...incoming, | |
| }; | |
| const commentId = deriveCommentId(incoming, null); | |
| if (commentId) { | |
| merged.commentId = commentId; | |
| merged.wakeCommentId = commentId; | |
| } | |
| return merged; | |
| } | |
| function runTaskKey(run: typeof heartbeatRuns.$inferSelect) { | |
| return deriveTaskKey(run.contextSnapshot as Record<string, unknown> | null, null); | |
| } | |
| function isSameTaskScope(left: string | null, right: string | null) { | |
| return (left ?? null) === (right ?? null); | |
| } | |
| function truncateDisplayId(value: string | null | undefined, max = 128) { | |
| if (!value) return null; | |
| return value.length > max ? value.slice(0, max) : value; | |
| } | |
| function normalizeAgentNameKey(value: string | null | undefined) { | |
| if (typeof value !== "string") return null; | |
| const normalized = value.trim().toLowerCase(); | |
| return normalized.length > 0 ? normalized : null; | |
| } | |
| const defaultSessionCodec: AdapterSessionCodec = { | |
| deserialize(raw: unknown) { | |
| const asObj = parseObject(raw); | |
| if (Object.keys(asObj).length > 0) return asObj; | |
| const sessionId = readNonEmptyString((raw as Record<string, unknown> | null)?.sessionId); | |
| if (sessionId) return { sessionId }; | |
| return null; | |
| }, | |
| serialize(params: Record<string, unknown> | null) { | |
| if (!params || Object.keys(params).length === 0) return null; | |
| return params; | |
| }, | |
| getDisplayId(params: Record<string, unknown> | null) { | |
| return readNonEmptyString(params?.sessionId); | |
| }, | |
| }; | |
| function getAdapterSessionCodec(adapterType: string) { | |
| const adapter = getServerAdapter(adapterType); | |
| return adapter.sessionCodec ?? defaultSessionCodec; | |
| } | |
| function normalizeSessionParams(params: Record<string, unknown> | null | undefined) { | |
| if (!params) return null; | |
| return Object.keys(params).length > 0 ? params : null; | |
| } | |
| function resolveNextSessionState(input: { | |
| codec: AdapterSessionCodec; | |
| adapterResult: AdapterExecutionResult; | |
| previousParams: Record<string, unknown> | null; | |
| previousDisplayId: string | null; | |
| previousLegacySessionId: string | null; | |
| }) { | |
| const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input; | |
| if (adapterResult.clearSession) { | |
| return { | |
| params: null as Record<string, unknown> | null, | |
| displayId: null as string | null, | |
| legacySessionId: null as string | null, | |
| }; | |
| } | |
| const explicitParams = adapterResult.sessionParams; | |
| const hasExplicitParams = adapterResult.sessionParams !== undefined; | |
| const hasExplicitSessionId = adapterResult.sessionId !== undefined; | |
| const explicitSessionId = readNonEmptyString(adapterResult.sessionId); | |
| const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined; | |
| const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId); | |
| const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay; | |
| const candidateParams = | |
| hasExplicitParams | |
| ? explicitParams | |
| : hasExplicitSessionId | |
| ? (explicitSessionId ? { sessionId: explicitSessionId } : null) | |
| : previousParams; | |
| const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null)); | |
| const deserialized = normalizeSessionParams(codec.deserialize(serialized)); | |
| const displayId = truncateDisplayId( | |
| explicitDisplayId ?? | |
| (codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ?? | |
| readNonEmptyString(deserialized?.sessionId) ?? | |
| (shouldUsePrevious ? previousDisplayId : null) ?? | |
| explicitSessionId ?? | |
| (shouldUsePrevious ? previousLegacySessionId : null), | |
| ); | |
| const legacySessionId = | |
| explicitSessionId ?? | |
| readNonEmptyString(deserialized?.sessionId) ?? | |
| displayId ?? | |
| (shouldUsePrevious ? previousLegacySessionId : null); | |
| return { | |
| params: serialized, | |
| displayId, | |
| legacySessionId, | |
| }; | |
| } | |
| export function heartbeatService(db: Db) { | |
| const instanceSettings = instanceSettingsService(db); | |
| const runLogStore = getRunLogStore(); | |
| const secretsSvc = secretService(db); | |
| const issuesSvc = issueService(db); | |
| const executionWorkspacesSvc = executionWorkspaceService(db); | |
| const workspaceOperationsSvc = workspaceOperationService(db); | |
| const activeRunExecutions = new Set<string>(); | |
| const budgetHooks = { | |
| cancelWorkForScope: cancelBudgetScopeWork, | |
| }; | |
| const budgets = budgetService(db, budgetHooks); | |
| async function getAgent(agentId: string) { | |
| return db | |
| .select() | |
| .from(agents) | |
| .where(eq(agents.id, agentId)) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function getRun(runId: string) { | |
| return db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(eq(heartbeatRuns.id, runId)) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function getRuntimeState(agentId: string) { | |
| return db | |
| .select() | |
| .from(agentRuntimeState) | |
| .where(eq(agentRuntimeState.agentId, agentId)) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function getTaskSession( | |
| companyId: string, | |
| agentId: string, | |
| adapterType: string, | |
| taskKey: string, | |
| ) { | |
| return db | |
| .select() | |
| .from(agentTaskSessions) | |
| .where( | |
| and( | |
| eq(agentTaskSessions.companyId, companyId), | |
| eq(agentTaskSessions.agentId, agentId), | |
| eq(agentTaskSessions.adapterType, adapterType), | |
| eq(agentTaskSessions.taskKey, taskKey), | |
| ), | |
| ) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function getLatestRunForSession( | |
| agentId: string, | |
| sessionId: string, | |
| opts?: { excludeRunId?: string | null }, | |
| ) { | |
| const conditions = [ | |
| eq(heartbeatRuns.agentId, agentId), | |
| eq(heartbeatRuns.sessionIdAfter, sessionId), | |
| ]; | |
| if (opts?.excludeRunId) { | |
| conditions.push(sql`${heartbeatRuns.id} <> ${opts.excludeRunId}`); | |
| } | |
| return db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(and(...conditions)) | |
| .orderBy(desc(heartbeatRuns.createdAt)) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function getOldestRunForSession(agentId: string, sessionId: string) { | |
| return db | |
| .select({ | |
| id: heartbeatRuns.id, | |
| createdAt: heartbeatRuns.createdAt, | |
| }) | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.sessionIdAfter, sessionId))) | |
| .orderBy(asc(heartbeatRuns.createdAt), asc(heartbeatRuns.id)) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function resolveNormalizedUsageForSession(input: { | |
| agentId: string; | |
| runId: string; | |
| sessionId: string | null; | |
| rawUsage: UsageTotals | null; | |
| }) { | |
| const { agentId, runId, sessionId, rawUsage } = input; | |
| if (!sessionId || !rawUsage) { | |
| return { | |
| normalizedUsage: rawUsage, | |
| previousRawUsage: null as UsageTotals | null, | |
| derivedFromSessionTotals: false, | |
| }; | |
| } | |
| const previousRun = await getLatestRunForSession(agentId, sessionId, { excludeRunId: runId }); | |
| const previousRawUsage = readRawUsageTotals(previousRun?.usageJson); | |
| return { | |
| normalizedUsage: deriveNormalizedUsageDelta(rawUsage, previousRawUsage), | |
| previousRawUsage, | |
| derivedFromSessionTotals: previousRawUsage !== null, | |
| }; | |
| } | |
| async function evaluateSessionCompaction(input: { | |
| agent: typeof agents.$inferSelect; | |
| sessionId: string | null; | |
| issueId: string | null; | |
| }): Promise<SessionCompactionDecision> { | |
| const { agent, sessionId, issueId } = input; | |
| if (!sessionId) { | |
| return { | |
| rotate: false, | |
| reason: null, | |
| handoffMarkdown: null, | |
| previousRunId: null, | |
| }; | |
| } | |
| const policy = parseSessionCompactionPolicy(agent); | |
| if (!policy.enabled || !hasSessionCompactionThresholds(policy)) { | |
| return { | |
| rotate: false, | |
| reason: null, | |
| handoffMarkdown: null, | |
| previousRunId: null, | |
| }; | |
| } | |
| const fetchLimit = Math.max(policy.maxSessionRuns > 0 ? policy.maxSessionRuns + 1 : 0, 4); | |
| const runs = await db | |
| .select({ | |
| id: heartbeatRuns.id, | |
| createdAt: heartbeatRuns.createdAt, | |
| usageJson: heartbeatRuns.usageJson, | |
| resultJson: heartbeatRuns.resultJson, | |
| error: heartbeatRuns.error, | |
| }) | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agent.id), eq(heartbeatRuns.sessionIdAfter, sessionId))) | |
| .orderBy(desc(heartbeatRuns.createdAt)) | |
| .limit(fetchLimit); | |
| if (runs.length === 0) { | |
| return { | |
| rotate: false, | |
| reason: null, | |
| handoffMarkdown: null, | |
| previousRunId: null, | |
| }; | |
| } | |
| const latestRun = runs[0] ?? null; | |
| const oldestRun = | |
| policy.maxSessionAgeHours > 0 | |
| ? await getOldestRunForSession(agent.id, sessionId) | |
| : runs[runs.length - 1] ?? latestRun; | |
| const latestRawUsage = readRawUsageTotals(latestRun?.usageJson); | |
| const sessionAgeHours = | |
| latestRun && oldestRun | |
| ? Math.max( | |
| 0, | |
| (new Date(latestRun.createdAt).getTime() - new Date(oldestRun.createdAt).getTime()) / (1000 * 60 * 60), | |
| ) | |
| : 0; | |
| let reason: string | null = null; | |
| if (policy.maxSessionRuns > 0 && runs.length > policy.maxSessionRuns) { | |
| reason = `session exceeded ${policy.maxSessionRuns} runs`; | |
| } else if ( | |
| policy.maxRawInputTokens > 0 && | |
| latestRawUsage && | |
| latestRawUsage.inputTokens >= policy.maxRawInputTokens | |
| ) { | |
| reason = | |
| `session raw input reached ${formatCount(latestRawUsage.inputTokens)} tokens ` + | |
| `(threshold ${formatCount(policy.maxRawInputTokens)})`; | |
| } else if (policy.maxSessionAgeHours > 0 && sessionAgeHours >= policy.maxSessionAgeHours) { | |
| reason = `session age reached ${Math.floor(sessionAgeHours)} hours`; | |
| } | |
| if (!reason || !latestRun) { | |
| return { | |
| rotate: false, | |
| reason: null, | |
| handoffMarkdown: null, | |
| previousRunId: latestRun?.id ?? null, | |
| }; | |
| } | |
| const latestSummary = summarizeHeartbeatRunResultJson(latestRun.resultJson); | |
| const latestTextSummary = | |
| readNonEmptyString(latestSummary?.summary) ?? | |
| readNonEmptyString(latestSummary?.result) ?? | |
| readNonEmptyString(latestSummary?.message) ?? | |
| readNonEmptyString(latestRun.error); | |
| const handoffMarkdown = [ | |
| "Paperclip session handoff:", | |
| `- Previous session: ${sessionId}`, | |
| issueId ? `- Issue: ${issueId}` : "", | |
| `- Rotation reason: ${reason}`, | |
| latestTextSummary ? `- Last run summary: ${latestTextSummary}` : "", | |
| "Continue from the current task state. Rebuild only the minimum context you need.", | |
| ] | |
| .filter(Boolean) | |
| .join("\n"); | |
| return { | |
| rotate: true, | |
| reason, | |
| handoffMarkdown, | |
| previousRunId: latestRun.id, | |
| }; | |
| } | |
| async function resolveSessionBeforeForWakeup( | |
| agent: typeof agents.$inferSelect, | |
| taskKey: string | null, | |
| ) { | |
| if (taskKey) { | |
| const codec = getAdapterSessionCodec(agent.adapterType); | |
| const existingTaskSession = await getTaskSession( | |
| agent.companyId, | |
| agent.id, | |
| agent.adapterType, | |
| taskKey, | |
| ); | |
| const parsedParams = normalizeSessionParams( | |
| codec.deserialize(existingTaskSession?.sessionParamsJson ?? null), | |
| ); | |
| return truncateDisplayId( | |
| existingTaskSession?.sessionDisplayId ?? | |
| (codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ?? | |
| readNonEmptyString(parsedParams?.sessionId), | |
| ); | |
| } | |
| const runtimeForRun = await getRuntimeState(agent.id); | |
| return runtimeForRun?.sessionId ?? null; | |
| } | |
| async function resolveWorkspaceForRun( | |
| agent: typeof agents.$inferSelect, | |
| context: Record<string, unknown>, | |
| previousSessionParams: Record<string, unknown> | null, | |
| opts?: { useProjectWorkspace?: boolean | null }, | |
| ): Promise<ResolvedWorkspaceForRun> { | |
| const issueId = readNonEmptyString(context.issueId); | |
| const contextProjectId = readNonEmptyString(context.projectId); | |
| const contextProjectWorkspaceId = readNonEmptyString(context.projectWorkspaceId); | |
| const issueProjectRef = issueId | |
| ? await db | |
| .select({ | |
| projectId: issues.projectId, | |
| projectWorkspaceId: issues.projectWorkspaceId, | |
| }) | |
| .from(issues) | |
| .where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId))) | |
| .then((rows) => rows[0] ?? null) | |
| : null; | |
| const issueProjectId = issueProjectRef?.projectId ?? null; | |
| const preferredProjectWorkspaceId = | |
| issueProjectRef?.projectWorkspaceId ?? contextProjectWorkspaceId ?? null; | |
| const resolvedProjectId = issueProjectId ?? contextProjectId; | |
| const useProjectWorkspace = opts?.useProjectWorkspace !== false; | |
| const workspaceProjectId = useProjectWorkspace ? resolvedProjectId : null; | |
| const unorderedProjectWorkspaceRows = workspaceProjectId | |
| ? await db | |
| .select() | |
| .from(projectWorkspaces) | |
| .where( | |
| and( | |
| eq(projectWorkspaces.companyId, agent.companyId), | |
| eq(projectWorkspaces.projectId, workspaceProjectId), | |
| ), | |
| ) | |
| .orderBy(asc(projectWorkspaces.createdAt), asc(projectWorkspaces.id)) | |
| : []; | |
| const projectWorkspaceRows = prioritizeProjectWorkspaceCandidatesForRun( | |
| unorderedProjectWorkspaceRows, | |
| preferredProjectWorkspaceId, | |
| ); | |
| const workspaceHints = projectWorkspaceRows.map((workspace) => ({ | |
| workspaceId: workspace.id, | |
| cwd: readNonEmptyString(workspace.cwd), | |
| repoUrl: readNonEmptyString(workspace.repoUrl), | |
| repoRef: readNonEmptyString(workspace.repoRef), | |
| })); | |
| if (projectWorkspaceRows.length > 0) { | |
| const preferredWorkspace = preferredProjectWorkspaceId | |
| ? projectWorkspaceRows.find((workspace) => workspace.id === preferredProjectWorkspaceId) ?? null | |
| : null; | |
| const missingProjectCwds: string[] = []; | |
| let hasConfiguredProjectCwd = false; | |
| let preferredWorkspaceWarning: string | null = null; | |
| if (preferredProjectWorkspaceId && !preferredWorkspace) { | |
| preferredWorkspaceWarning = | |
| `Selected project workspace "${preferredProjectWorkspaceId}" is not available on this project.`; | |
| } | |
| for (const workspace of projectWorkspaceRows) { | |
| let projectCwd = readNonEmptyString(workspace.cwd); | |
| let managedWorkspaceWarning: string | null = null; | |
| if (!projectCwd || projectCwd === REPO_ONLY_CWD_SENTINEL) { | |
| try { | |
| const managedWorkspace = await ensureManagedProjectWorkspace({ | |
| companyId: agent.companyId, | |
| projectId: workspaceProjectId ?? resolvedProjectId ?? workspace.projectId, | |
| repoUrl: readNonEmptyString(workspace.repoUrl), | |
| }); | |
| projectCwd = managedWorkspace.cwd; | |
| managedWorkspaceWarning = managedWorkspace.warning; | |
| } catch (error) { | |
| if (preferredWorkspace?.id === workspace.id) { | |
| preferredWorkspaceWarning = error instanceof Error ? error.message : String(error); | |
| } | |
| continue; | |
| } | |
| } | |
| hasConfiguredProjectCwd = true; | |
| const projectCwdExists = await fs | |
| .stat(projectCwd) | |
| .then((stats) => stats.isDirectory()) | |
| .catch(() => false); | |
| if (projectCwdExists) { | |
| return { | |
| cwd: projectCwd, | |
| source: "project_primary" as const, | |
| projectId: resolvedProjectId, | |
| workspaceId: workspace.id, | |
| repoUrl: workspace.repoUrl, | |
| repoRef: workspace.repoRef, | |
| workspaceHints, | |
| warnings: [preferredWorkspaceWarning, managedWorkspaceWarning].filter( | |
| (value): value is string => Boolean(value), | |
| ), | |
| }; | |
| } | |
| if (preferredWorkspace?.id === workspace.id) { | |
| preferredWorkspaceWarning = | |
| `Selected project workspace path "${projectCwd}" is not available yet.`; | |
| } | |
| missingProjectCwds.push(projectCwd); | |
| } | |
| const fallbackCwd = resolveDefaultAgentWorkspaceDir(agent.id); | |
| await fs.mkdir(fallbackCwd, { recursive: true }); | |
| const warnings: string[] = []; | |
| if (preferredWorkspaceWarning) { | |
| warnings.push(preferredWorkspaceWarning); | |
| } | |
| if (missingProjectCwds.length > 0) { | |
| const firstMissing = missingProjectCwds[0]; | |
| const extraMissingCount = Math.max(0, missingProjectCwds.length - 1); | |
| warnings.push( | |
| extraMissingCount > 0 | |
| ? `Project workspace path "${firstMissing}" and ${extraMissingCount} other configured path(s) are not available yet. Using fallback workspace "${fallbackCwd}" for this run.` | |
| : `Project workspace path "${firstMissing}" is not available yet. Using fallback workspace "${fallbackCwd}" for this run.`, | |
| ); | |
| } else if (!hasConfiguredProjectCwd) { | |
| warnings.push( | |
| `Project workspace has no local cwd configured. Using fallback workspace "${fallbackCwd}" for this run.`, | |
| ); | |
| } | |
| return { | |
| cwd: fallbackCwd, | |
| source: "project_primary" as const, | |
| projectId: resolvedProjectId, | |
| workspaceId: projectWorkspaceRows[0]?.id ?? null, | |
| repoUrl: projectWorkspaceRows[0]?.repoUrl ?? null, | |
| repoRef: projectWorkspaceRows[0]?.repoRef ?? null, | |
| workspaceHints, | |
| warnings, | |
| }; | |
| } | |
| if (workspaceProjectId) { | |
| const managedWorkspace = await ensureManagedProjectWorkspace({ | |
| companyId: agent.companyId, | |
| projectId: workspaceProjectId, | |
| repoUrl: null, | |
| }); | |
| return { | |
| cwd: managedWorkspace.cwd, | |
| source: "project_primary" as const, | |
| projectId: resolvedProjectId, | |
| workspaceId: null, | |
| repoUrl: null, | |
| repoRef: null, | |
| workspaceHints, | |
| warnings: managedWorkspace.warning ? [managedWorkspace.warning] : [], | |
| }; | |
| } | |
| const sessionCwd = readNonEmptyString(previousSessionParams?.cwd); | |
| if (sessionCwd) { | |
| const sessionCwdExists = await fs | |
| .stat(sessionCwd) | |
| .then((stats) => stats.isDirectory()) | |
| .catch(() => false); | |
| if (sessionCwdExists) { | |
| return { | |
| cwd: sessionCwd, | |
| source: "task_session" as const, | |
| projectId: resolvedProjectId, | |
| workspaceId: readNonEmptyString(previousSessionParams?.workspaceId), | |
| repoUrl: readNonEmptyString(previousSessionParams?.repoUrl), | |
| repoRef: readNonEmptyString(previousSessionParams?.repoRef), | |
| workspaceHints, | |
| warnings: [], | |
| }; | |
| } | |
| } | |
| const cwd = resolveDefaultAgentWorkspaceDir(agent.id); | |
| await fs.mkdir(cwd, { recursive: true }); | |
| const warnings: string[] = []; | |
| if (sessionCwd) { | |
| warnings.push( | |
| `Saved session workspace "${sessionCwd}" is not available. Using fallback workspace "${cwd}" for this run.`, | |
| ); | |
| } else if (resolvedProjectId) { | |
| warnings.push( | |
| `No project workspace directory is currently available for this issue. Using fallback workspace "${cwd}" for this run.`, | |
| ); | |
| } else { | |
| warnings.push( | |
| `No project or prior session workspace was available. Using fallback workspace "${cwd}" for this run.`, | |
| ); | |
| } | |
| return { | |
| cwd, | |
| source: "agent_home" as const, | |
| projectId: resolvedProjectId, | |
| workspaceId: null, | |
| repoUrl: null, | |
| repoRef: null, | |
| workspaceHints, | |
| warnings, | |
| }; | |
| } | |
| async function upsertTaskSession(input: { | |
| companyId: string; | |
| agentId: string; | |
| adapterType: string; | |
| taskKey: string; | |
| sessionParamsJson: Record<string, unknown> | null; | |
| sessionDisplayId: string | null; | |
| lastRunId: string | null; | |
| lastError: string | null; | |
| }) { | |
| const existing = await getTaskSession( | |
| input.companyId, | |
| input.agentId, | |
| input.adapterType, | |
| input.taskKey, | |
| ); | |
| if (existing) { | |
| return db | |
| .update(agentTaskSessions) | |
| .set({ | |
| sessionParamsJson: input.sessionParamsJson, | |
| sessionDisplayId: input.sessionDisplayId, | |
| lastRunId: input.lastRunId, | |
| lastError: input.lastError, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentTaskSessions.id, existing.id)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| return db | |
| .insert(agentTaskSessions) | |
| .values({ | |
| companyId: input.companyId, | |
| agentId: input.agentId, | |
| adapterType: input.adapterType, | |
| taskKey: input.taskKey, | |
| sessionParamsJson: input.sessionParamsJson, | |
| sessionDisplayId: input.sessionDisplayId, | |
| lastRunId: input.lastRunId, | |
| lastError: input.lastError, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| } | |
| async function clearTaskSessions( | |
| companyId: string, | |
| agentId: string, | |
| opts?: { taskKey?: string | null; adapterType?: string | null }, | |
| ) { | |
| const conditions = [ | |
| eq(agentTaskSessions.companyId, companyId), | |
| eq(agentTaskSessions.agentId, agentId), | |
| ]; | |
| if (opts?.taskKey) { | |
| conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey)); | |
| } | |
| if (opts?.adapterType) { | |
| conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType)); | |
| } | |
| return db | |
| .delete(agentTaskSessions) | |
| .where(and(...conditions)) | |
| .returning() | |
| .then((rows) => rows.length); | |
| } | |
| async function ensureRuntimeState(agent: typeof agents.$inferSelect) { | |
| const existing = await getRuntimeState(agent.id); | |
| if (existing) return existing; | |
| return db | |
| .insert(agentRuntimeState) | |
| .values({ | |
| agentId: agent.id, | |
| companyId: agent.companyId, | |
| adapterType: agent.adapterType, | |
| stateJson: {}, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| } | |
| async function setRunStatus( | |
| runId: string, | |
| status: string, | |
| patch?: Partial<typeof heartbeatRuns.$inferInsert>, | |
| ) { | |
| const updated = await db | |
| .update(heartbeatRuns) | |
| .set({ status, ...patch, updatedAt: new Date() }) | |
| .where(eq(heartbeatRuns.id, runId)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (updated) { | |
| publishLiveEvent({ | |
| companyId: updated.companyId, | |
| type: "heartbeat.run.status", | |
| payload: { | |
| runId: updated.id, | |
| agentId: updated.agentId, | |
| status: updated.status, | |
| invocationSource: updated.invocationSource, | |
| triggerDetail: updated.triggerDetail, | |
| error: updated.error ?? null, | |
| errorCode: updated.errorCode ?? null, | |
| startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null, | |
| finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null, | |
| }, | |
| }); | |
| } | |
| return updated; | |
| } | |
| async function setWakeupStatus( | |
| wakeupRequestId: string | null | undefined, | |
| status: string, | |
| patch?: Partial<typeof agentWakeupRequests.$inferInsert>, | |
| ) { | |
| if (!wakeupRequestId) return; | |
| await db | |
| .update(agentWakeupRequests) | |
| .set({ status, ...patch, updatedAt: new Date() }) | |
| .where(eq(agentWakeupRequests.id, wakeupRequestId)); | |
| } | |
| async function appendRunEvent( | |
| run: typeof heartbeatRuns.$inferSelect, | |
| seq: number, | |
| event: { | |
| eventType: string; | |
| stream?: "system" | "stdout" | "stderr"; | |
| level?: "info" | "warn" | "error"; | |
| color?: string; | |
| message?: string; | |
| payload?: Record<string, unknown>; | |
| }, | |
| ) { | |
| const sanitizedMessage = event.message ? redactCurrentUserText(event.message) : event.message; | |
| const sanitizedPayload = event.payload ? redactCurrentUserValue(event.payload) : event.payload; | |
| await db.insert(heartbeatRunEvents).values({ | |
| companyId: run.companyId, | |
| runId: run.id, | |
| agentId: run.agentId, | |
| seq, | |
| eventType: event.eventType, | |
| stream: event.stream, | |
| level: event.level, | |
| color: event.color, | |
| message: sanitizedMessage, | |
| payload: sanitizedPayload, | |
| }); | |
| publishLiveEvent({ | |
| companyId: run.companyId, | |
| type: "heartbeat.run.event", | |
| payload: { | |
| runId: run.id, | |
| agentId: run.agentId, | |
| seq, | |
| eventType: event.eventType, | |
| stream: event.stream ?? null, | |
| level: event.level ?? null, | |
| color: event.color ?? null, | |
| message: sanitizedMessage ?? null, | |
| payload: sanitizedPayload ?? null, | |
| }, | |
| }); | |
| } | |
| function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) { | |
| const runtimeConfig = parseObject(agent.runtimeConfig); | |
| const heartbeat = parseObject(runtimeConfig.heartbeat); | |
| return { | |
| enabled: asBoolean(heartbeat.enabled, true), | |
| intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), | |
| wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true), | |
| maxConcurrentRuns: normalizeMaxConcurrentRuns(heartbeat.maxConcurrentRuns), | |
| }; | |
| } | |
| async function countRunningRunsForAgent(agentId: string) { | |
| const [{ count }] = await db | |
| .select({ count: sql<number>`count(*)` }) | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))); | |
| return Number(count ?? 0); | |
| } | |
| async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) { | |
| if (run.status !== "queued") return run; | |
| const agent = await getAgent(run.agentId); | |
| if (!agent) { | |
| await cancelRunInternal(run.id, "Cancelled because the agent no longer exists"); | |
| return null; | |
| } | |
| if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") { | |
| await cancelRunInternal(run.id, "Cancelled because the agent is not invokable"); | |
| return null; | |
| } | |
| const context = parseObject(run.contextSnapshot); | |
| const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, { | |
| issueId: readNonEmptyString(context.issueId), | |
| projectId: readNonEmptyString(context.projectId), | |
| }); | |
| if (budgetBlock) { | |
| await cancelRunInternal(run.id, budgetBlock.reason); | |
| return null; | |
| } | |
| const claimedAt = new Date(); | |
| const claimed = await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| status: "running", | |
| startedAt: run.startedAt ?? claimedAt, | |
| updatedAt: claimedAt, | |
| }) | |
| .where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued"))) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (!claimed) return null; | |
| publishLiveEvent({ | |
| companyId: claimed.companyId, | |
| type: "heartbeat.run.status", | |
| payload: { | |
| runId: claimed.id, | |
| agentId: claimed.agentId, | |
| status: claimed.status, | |
| invocationSource: claimed.invocationSource, | |
| triggerDetail: claimed.triggerDetail, | |
| error: claimed.error ?? null, | |
| errorCode: claimed.errorCode ?? null, | |
| startedAt: claimed.startedAt ? new Date(claimed.startedAt).toISOString() : null, | |
| finishedAt: claimed.finishedAt ? new Date(claimed.finishedAt).toISOString() : null, | |
| }, | |
| }); | |
| await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt }); | |
| return claimed; | |
| } | |
| async function finalizeAgentStatus( | |
| agentId: string, | |
| outcome: "succeeded" | "failed" | "cancelled" | "timed_out", | |
| ) { | |
| const existing = await getAgent(agentId); | |
| if (!existing) return; | |
| if (existing.status === "paused" || existing.status === "terminated") { | |
| return; | |
| } | |
| const runningCount = await countRunningRunsForAgent(agentId); | |
| const nextStatus = | |
| runningCount > 0 | |
| ? "running" | |
| : outcome === "succeeded" || outcome === "cancelled" | |
| ? "idle" | |
| : "error"; | |
| const updated = await db | |
| .update(agents) | |
| .set({ | |
| status: nextStatus, | |
| lastHeartbeatAt: new Date(), | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agents.id, agentId)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (updated) { | |
| publishLiveEvent({ | |
| companyId: updated.companyId, | |
| type: "agent.status", | |
| payload: { | |
| agentId: updated.id, | |
| status: updated.status, | |
| lastHeartbeatAt: updated.lastHeartbeatAt | |
| ? new Date(updated.lastHeartbeatAt).toISOString() | |
| : null, | |
| outcome, | |
| }, | |
| }); | |
| } | |
| } | |
| async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) { | |
| const staleThresholdMs = opts?.staleThresholdMs ?? 0; | |
| const now = new Date(); | |
| // Find all runs stuck in "running" state (queued runs are legitimately waiting; resumeQueuedRuns handles them) | |
| const activeRuns = await db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(eq(heartbeatRuns.status, "running")); | |
| const reaped: string[] = []; | |
| for (const run of activeRuns) { | |
| if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue; | |
| // Apply staleness threshold to avoid false positives | |
| if (staleThresholdMs > 0) { | |
| const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0; | |
| if (now.getTime() - refTime < staleThresholdMs) continue; | |
| } | |
| await setRunStatus(run.id, "failed", { | |
| error: "Process lost -- server may have restarted", | |
| errorCode: "process_lost", | |
| finishedAt: now, | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, "failed", { | |
| finishedAt: now, | |
| error: "Process lost -- server may have restarted", | |
| }); | |
| const updatedRun = await getRun(run.id); | |
| if (updatedRun) { | |
| await appendRunEvent(updatedRun, 1, { | |
| eventType: "lifecycle", | |
| stream: "system", | |
| level: "error", | |
| message: "Process lost -- server may have restarted", | |
| }); | |
| await releaseIssueExecutionAndPromote(updatedRun); | |
| } | |
| await finalizeAgentStatus(run.agentId, "failed"); | |
| await startNextQueuedRunForAgent(run.agentId); | |
| runningProcesses.delete(run.id); | |
| reaped.push(run.id); | |
| } | |
| if (reaped.length > 0) { | |
| logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs"); | |
| } | |
| return { reaped: reaped.length, runIds: reaped }; | |
| } | |
| async function resumeQueuedRuns() { | |
| const queuedRuns = await db | |
| .select({ agentId: heartbeatRuns.agentId }) | |
| .from(heartbeatRuns) | |
| .where(eq(heartbeatRuns.status, "queued")); | |
| const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))]; | |
| for (const agentId of agentIds) { | |
| await startNextQueuedRunForAgent(agentId); | |
| } | |
| } | |
| async function updateRuntimeState( | |
| agent: typeof agents.$inferSelect, | |
| run: typeof heartbeatRuns.$inferSelect, | |
| result: AdapterExecutionResult, | |
| session: { legacySessionId: string | null }, | |
| normalizedUsage?: UsageTotals | null, | |
| ) { | |
| await ensureRuntimeState(agent); | |
| const usage = normalizedUsage ?? normalizeUsageTotals(result.usage); | |
| const inputTokens = usage?.inputTokens ?? 0; | |
| const outputTokens = usage?.outputTokens ?? 0; | |
| const cachedInputTokens = usage?.cachedInputTokens ?? 0; | |
| const billingType = normalizeLedgerBillingType(result.billingType); | |
| const additionalCostCents = normalizeBilledCostCents(result.costUsd, billingType); | |
| const hasTokenUsage = inputTokens > 0 || outputTokens > 0 || cachedInputTokens > 0; | |
| const provider = result.provider ?? "unknown"; | |
| const biller = resolveLedgerBiller(result); | |
| const ledgerScope = await resolveLedgerScopeForRun(db, agent.companyId, run); | |
| await db | |
| .update(agentRuntimeState) | |
| .set({ | |
| adapterType: agent.adapterType, | |
| sessionId: session.legacySessionId, | |
| lastRunId: run.id, | |
| lastRunStatus: run.status, | |
| lastError: result.errorMessage ?? null, | |
| totalInputTokens: sql`${agentRuntimeState.totalInputTokens} + ${inputTokens}`, | |
| totalOutputTokens: sql`${agentRuntimeState.totalOutputTokens} + ${outputTokens}`, | |
| totalCachedInputTokens: sql`${agentRuntimeState.totalCachedInputTokens} + ${cachedInputTokens}`, | |
| totalCostCents: sql`${agentRuntimeState.totalCostCents} + ${additionalCostCents}`, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentRuntimeState.agentId, agent.id)); | |
| if (additionalCostCents > 0 || hasTokenUsage) { | |
| const costs = costService(db, budgetHooks); | |
| await costs.createEvent(agent.companyId, { | |
| heartbeatRunId: run.id, | |
| agentId: agent.id, | |
| issueId: ledgerScope.issueId, | |
| projectId: ledgerScope.projectId, | |
| provider, | |
| biller, | |
| billingType, | |
| model: result.model ?? "unknown", | |
| inputTokens, | |
| cachedInputTokens, | |
| outputTokens, | |
| costCents: additionalCostCents, | |
| occurredAt: new Date(), | |
| }); | |
| } | |
| } | |
| async function startNextQueuedRunForAgent(agentId: string) { | |
| return withAgentStartLock(agentId, async () => { | |
| const agent = await getAgent(agentId); | |
| if (!agent) return []; | |
| if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") { | |
| return []; | |
| } | |
| const policy = parseHeartbeatPolicy(agent); | |
| const runningCount = await countRunningRunsForAgent(agentId); | |
| const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount); | |
| if (availableSlots <= 0) return []; | |
| const queuedRuns = await db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) | |
| .orderBy(asc(heartbeatRuns.createdAt)) | |
| .limit(availableSlots); | |
| if (queuedRuns.length === 0) return []; | |
| const claimedRuns: Array<typeof heartbeatRuns.$inferSelect> = []; | |
| for (const queuedRun of queuedRuns) { | |
| const claimed = await claimQueuedRun(queuedRun); | |
| if (claimed) claimedRuns.push(claimed); | |
| } | |
| if (claimedRuns.length === 0) return []; | |
| for (const claimedRun of claimedRuns) { | |
| void executeRun(claimedRun.id).catch((err) => { | |
| logger.error({ err, runId: claimedRun.id }, "queued heartbeat execution failed"); | |
| }); | |
| } | |
| return claimedRuns; | |
| }); | |
| } | |
| async function executeRun(runId: string) { | |
| let run = await getRun(runId); | |
| if (!run) return; | |
| if (run.status !== "queued" && run.status !== "running") return; | |
| if (run.status === "queued") { | |
| const claimed = await claimQueuedRun(run); | |
| if (!claimed) { | |
| // Another worker has already claimed or finalized this run. | |
| return; | |
| } | |
| run = claimed; | |
| } | |
| activeRunExecutions.add(run.id); | |
| try { | |
| const agent = await getAgent(run.agentId); | |
| if (!agent) { | |
| await setRunStatus(runId, "failed", { | |
| error: "Agent not found", | |
| errorCode: "agent_not_found", | |
| finishedAt: new Date(), | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, "failed", { | |
| finishedAt: new Date(), | |
| error: "Agent not found", | |
| }); | |
| const failedRun = await getRun(runId); | |
| if (failedRun) await releaseIssueExecutionAndPromote(failedRun); | |
| return; | |
| } | |
| const runtime = await ensureRuntimeState(agent); | |
| const context = parseObject(run.contextSnapshot); | |
| const taskKey = deriveTaskKey(context, null); | |
| const sessionCodec = getAdapterSessionCodec(agent.adapterType); | |
| const issueId = readNonEmptyString(context.issueId); | |
| const issueContext = issueId | |
| ? await db | |
| .select({ | |
| id: issues.id, | |
| identifier: issues.identifier, | |
| title: issues.title, | |
| projectId: issues.projectId, | |
| projectWorkspaceId: issues.projectWorkspaceId, | |
| executionWorkspaceId: issues.executionWorkspaceId, | |
| executionWorkspacePreference: issues.executionWorkspacePreference, | |
| assigneeAgentId: issues.assigneeAgentId, | |
| assigneeAdapterOverrides: issues.assigneeAdapterOverrides, | |
| executionWorkspaceSettings: issues.executionWorkspaceSettings, | |
| }) | |
| .from(issues) | |
| .where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId))) | |
| .then((rows) => rows[0] ?? null) | |
| : null; | |
| const issueAssigneeOverrides = | |
| issueContext && issueContext.assigneeAgentId === agent.id | |
| ? parseIssueAssigneeAdapterOverrides( | |
| issueContext.assigneeAdapterOverrides, | |
| ) | |
| : null; | |
| const isolatedWorkspacesEnabled = (await instanceSettings.getExperimental()).enableIsolatedWorkspaces; | |
| const issueExecutionWorkspaceSettings = isolatedWorkspacesEnabled | |
| ? parseIssueExecutionWorkspaceSettings(issueContext?.executionWorkspaceSettings) | |
| : null; | |
| const contextProjectId = readNonEmptyString(context.projectId); | |
| const executionProjectId = issueContext?.projectId ?? contextProjectId; | |
| const projectExecutionWorkspacePolicy = executionProjectId | |
| ? await db | |
| .select({ executionWorkspacePolicy: projects.executionWorkspacePolicy }) | |
| .from(projects) | |
| .where(and(eq(projects.id, executionProjectId), eq(projects.companyId, agent.companyId))) | |
| .then((rows) => | |
| gateProjectExecutionWorkspacePolicy( | |
| parseProjectExecutionWorkspacePolicy(rows[0]?.executionWorkspacePolicy), | |
| isolatedWorkspacesEnabled, | |
| )) | |
| : null; | |
| const taskSession = taskKey | |
| ? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey) | |
| : null; | |
| const resetTaskSession = shouldResetTaskSessionForWake(context); | |
| const sessionResetReason = describeSessionResetReason(context); | |
| const taskSessionForRun = resetTaskSession ? null : taskSession; | |
| const previousSessionParams = normalizeSessionParams( | |
| sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null), | |
| ); | |
| const config = parseObject(agent.adapterConfig); | |
| const executionWorkspaceMode = resolveExecutionWorkspaceMode({ | |
| projectPolicy: projectExecutionWorkspacePolicy, | |
| issueSettings: issueExecutionWorkspaceSettings, | |
| legacyUseProjectWorkspace: issueAssigneeOverrides?.useProjectWorkspace ?? null, | |
| }); | |
| const resolvedWorkspace = await resolveWorkspaceForRun( | |
| agent, | |
| context, | |
| previousSessionParams, | |
| { useProjectWorkspace: executionWorkspaceMode !== "agent_default" }, | |
| ); | |
| const workspaceManagedConfig = buildExecutionWorkspaceAdapterConfig({ | |
| agentConfig: config, | |
| projectPolicy: projectExecutionWorkspacePolicy, | |
| issueSettings: issueExecutionWorkspaceSettings, | |
| mode: executionWorkspaceMode, | |
| legacyUseProjectWorkspace: issueAssigneeOverrides?.useProjectWorkspace ?? null, | |
| }); | |
| const mergedConfig = issueAssigneeOverrides?.adapterConfig | |
| ? { ...workspaceManagedConfig, ...issueAssigneeOverrides.adapterConfig } | |
| : workspaceManagedConfig; | |
| const { config: resolvedConfig, secretKeys } = await secretsSvc.resolveAdapterConfigForRuntime( | |
| agent.companyId, | |
| mergedConfig, | |
| ); | |
| const issueRef = issueContext | |
| ? { | |
| id: issueContext.id, | |
| identifier: issueContext.identifier, | |
| title: issueContext.title, | |
| projectId: issueContext.projectId, | |
| projectWorkspaceId: issueContext.projectWorkspaceId, | |
| executionWorkspaceId: issueContext.executionWorkspaceId, | |
| executionWorkspacePreference: issueContext.executionWorkspacePreference, | |
| } | |
| : null; | |
| const existingExecutionWorkspace = | |
| issueRef?.executionWorkspaceId ? await executionWorkspacesSvc.getById(issueRef.executionWorkspaceId) : null; | |
| const workspaceOperationRecorder = workspaceOperationsSvc.createRecorder({ | |
| companyId: agent.companyId, | |
| heartbeatRunId: run.id, | |
| executionWorkspaceId: existingExecutionWorkspace?.id ?? null, | |
| }); | |
| const executionWorkspace = await realizeExecutionWorkspace({ | |
| base: { | |
| baseCwd: resolvedWorkspace.cwd, | |
| source: resolvedWorkspace.source, | |
| projectId: resolvedWorkspace.projectId, | |
| workspaceId: resolvedWorkspace.workspaceId, | |
| repoUrl: resolvedWorkspace.repoUrl, | |
| repoRef: resolvedWorkspace.repoRef, | |
| }, | |
| config: resolvedConfig, | |
| issue: issueRef, | |
| agent: { | |
| id: agent.id, | |
| name: agent.name, | |
| companyId: agent.companyId, | |
| }, | |
| recorder: workspaceOperationRecorder, | |
| }); | |
| const resolvedProjectId = executionWorkspace.projectId ?? issueRef?.projectId ?? executionProjectId ?? null; | |
| const resolvedProjectWorkspaceId = issueRef?.projectWorkspaceId ?? resolvedWorkspace.workspaceId ?? null; | |
| const shouldReuseExisting = | |
| issueRef?.executionWorkspacePreference === "reuse_existing" && | |
| existingExecutionWorkspace && | |
| existingExecutionWorkspace.status !== "archived"; | |
| let persistedExecutionWorkspace = null; | |
| try { | |
| persistedExecutionWorkspace = shouldReuseExisting && existingExecutionWorkspace | |
| ? await executionWorkspacesSvc.update(existingExecutionWorkspace.id, { | |
| cwd: executionWorkspace.cwd, | |
| repoUrl: executionWorkspace.repoUrl, | |
| baseRef: executionWorkspace.repoRef, | |
| branchName: executionWorkspace.branchName, | |
| providerType: executionWorkspace.strategy === "git_worktree" ? "git_worktree" : "local_fs", | |
| providerRef: executionWorkspace.worktreePath, | |
| status: "active", | |
| lastUsedAt: new Date(), | |
| metadata: { | |
| ...(existingExecutionWorkspace.metadata ?? {}), | |
| source: executionWorkspace.source, | |
| createdByRuntime: executionWorkspace.created, | |
| }, | |
| }) | |
| : resolvedProjectId | |
| ? await executionWorkspacesSvc.create({ | |
| companyId: agent.companyId, | |
| projectId: resolvedProjectId, | |
| projectWorkspaceId: resolvedProjectWorkspaceId, | |
| sourceIssueId: issueRef?.id ?? null, | |
| mode: | |
| executionWorkspaceMode === "isolated_workspace" | |
| ? "isolated_workspace" | |
| : executionWorkspaceMode === "operator_branch" | |
| ? "operator_branch" | |
| : executionWorkspaceMode === "agent_default" | |
| ? "adapter_managed" | |
| : "shared_workspace", | |
| strategyType: executionWorkspace.strategy === "git_worktree" ? "git_worktree" : "project_primary", | |
| name: executionWorkspace.branchName ?? issueRef?.identifier ?? `workspace-${agent.id.slice(0, 8)}`, | |
| status: "active", | |
| cwd: executionWorkspace.cwd, | |
| repoUrl: executionWorkspace.repoUrl, | |
| baseRef: executionWorkspace.repoRef, | |
| branchName: executionWorkspace.branchName, | |
| providerType: executionWorkspace.strategy === "git_worktree" ? "git_worktree" : "local_fs", | |
| providerRef: executionWorkspace.worktreePath, | |
| lastUsedAt: new Date(), | |
| openedAt: new Date(), | |
| metadata: { | |
| source: executionWorkspace.source, | |
| createdByRuntime: executionWorkspace.created, | |
| }, | |
| }) | |
| : null; | |
| } catch (error) { | |
| if (executionWorkspace.created) { | |
| try { | |
| await cleanupExecutionWorkspaceArtifacts({ | |
| workspace: { | |
| id: existingExecutionWorkspace?.id ?? `transient-${run.id}`, | |
| cwd: executionWorkspace.cwd, | |
| providerType: executionWorkspace.strategy === "git_worktree" ? "git_worktree" : "local_fs", | |
| providerRef: executionWorkspace.worktreePath, | |
| branchName: executionWorkspace.branchName, | |
| repoUrl: executionWorkspace.repoUrl, | |
| baseRef: executionWorkspace.repoRef, | |
| projectId: resolvedProjectId, | |
| projectWorkspaceId: resolvedProjectWorkspaceId, | |
| sourceIssueId: issueRef?.id ?? null, | |
| metadata: { | |
| createdByRuntime: true, | |
| source: executionWorkspace.source, | |
| }, | |
| }, | |
| projectWorkspace: { | |
| cwd: resolvedWorkspace.cwd, | |
| cleanupCommand: null, | |
| }, | |
| teardownCommand: projectExecutionWorkspacePolicy?.workspaceStrategy?.teardownCommand ?? null, | |
| recorder: workspaceOperationRecorder, | |
| }); | |
| } catch (cleanupError) { | |
| logger.warn( | |
| { | |
| runId: run.id, | |
| issueId, | |
| executionWorkspaceCwd: executionWorkspace.cwd, | |
| cleanupError: cleanupError instanceof Error ? cleanupError.message : String(cleanupError), | |
| }, | |
| "Failed to cleanup realized execution workspace after persistence failure", | |
| ); | |
| } | |
| } | |
| throw error; | |
| } | |
| await workspaceOperationRecorder.attachExecutionWorkspaceId(persistedExecutionWorkspace?.id ?? null); | |
| if ( | |
| existingExecutionWorkspace && | |
| persistedExecutionWorkspace && | |
| existingExecutionWorkspace.id !== persistedExecutionWorkspace.id && | |
| existingExecutionWorkspace.status === "active" | |
| ) { | |
| await executionWorkspacesSvc.update(existingExecutionWorkspace.id, { | |
| status: "idle", | |
| cleanupReason: null, | |
| }); | |
| } | |
| if (issueId && persistedExecutionWorkspace && issueRef?.executionWorkspaceId !== persistedExecutionWorkspace.id) { | |
| await issuesSvc.update(issueId, { | |
| executionWorkspaceId: persistedExecutionWorkspace.id, | |
| ...(resolvedProjectWorkspaceId ? { projectWorkspaceId: resolvedProjectWorkspaceId } : {}), | |
| }); | |
| } | |
| if (persistedExecutionWorkspace) { | |
| context.executionWorkspaceId = persistedExecutionWorkspace.id; | |
| await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| contextSnapshot: context, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, run.id)); | |
| } | |
| const runtimeSessionResolution = resolveRuntimeSessionParamsForWorkspace({ | |
| agentId: agent.id, | |
| previousSessionParams, | |
| resolvedWorkspace: { | |
| ...resolvedWorkspace, | |
| cwd: executionWorkspace.cwd, | |
| }, | |
| }); | |
| const runtimeSessionParams = runtimeSessionResolution.sessionParams; | |
| const runtimeWorkspaceWarnings = [ | |
| ...resolvedWorkspace.warnings, | |
| ...executionWorkspace.warnings, | |
| ...(runtimeSessionResolution.warning ? [runtimeSessionResolution.warning] : []), | |
| ...(resetTaskSession && sessionResetReason | |
| ? [ | |
| taskKey | |
| ? `Skipping saved session resume for task "${taskKey}" because ${sessionResetReason}.` | |
| : `Skipping saved session resume because ${sessionResetReason}.`, | |
| ] | |
| : []), | |
| ]; | |
| context.paperclipWorkspace = { | |
| cwd: executionWorkspace.cwd, | |
| source: executionWorkspace.source, | |
| mode: executionWorkspaceMode, | |
| strategy: executionWorkspace.strategy, | |
| projectId: executionWorkspace.projectId, | |
| workspaceId: executionWorkspace.workspaceId, | |
| repoUrl: executionWorkspace.repoUrl, | |
| repoRef: executionWorkspace.repoRef, | |
| branchName: executionWorkspace.branchName, | |
| worktreePath: executionWorkspace.worktreePath, | |
| agentHome: resolveDefaultAgentWorkspaceDir(agent.id), | |
| }; | |
| context.paperclipWorkspaces = resolvedWorkspace.workspaceHints; | |
| const runtimeServiceIntents = (() => { | |
| const runtimeConfig = parseObject(resolvedConfig.workspaceRuntime); | |
| return Array.isArray(runtimeConfig.services) | |
| ? runtimeConfig.services.filter( | |
| (value): value is Record<string, unknown> => typeof value === "object" && value !== null, | |
| ) | |
| : []; | |
| })(); | |
| if (runtimeServiceIntents.length > 0) { | |
| context.paperclipRuntimeServiceIntents = runtimeServiceIntents; | |
| } else { | |
| delete context.paperclipRuntimeServiceIntents; | |
| } | |
| if (executionWorkspace.projectId && !readNonEmptyString(context.projectId)) { | |
| context.projectId = executionWorkspace.projectId; | |
| } | |
| const runtimeSessionFallback = taskKey || resetTaskSession ? null : runtime.sessionId; | |
| let previousSessionDisplayId = truncateDisplayId( | |
| taskSessionForRun?.sessionDisplayId ?? | |
| (sessionCodec.getDisplayId ? sessionCodec.getDisplayId(runtimeSessionParams) : null) ?? | |
| readNonEmptyString(runtimeSessionParams?.sessionId) ?? | |
| runtimeSessionFallback, | |
| ); | |
| let runtimeSessionIdForAdapter = | |
| readNonEmptyString(runtimeSessionParams?.sessionId) ?? runtimeSessionFallback; | |
| let runtimeSessionParamsForAdapter = runtimeSessionParams; | |
| const sessionCompaction = await evaluateSessionCompaction({ | |
| agent, | |
| sessionId: previousSessionDisplayId ?? runtimeSessionIdForAdapter, | |
| issueId, | |
| }); | |
| if (sessionCompaction.rotate) { | |
| context.paperclipSessionHandoffMarkdown = sessionCompaction.handoffMarkdown; | |
| context.paperclipSessionRotationReason = sessionCompaction.reason; | |
| context.paperclipPreviousSessionId = previousSessionDisplayId ?? runtimeSessionIdForAdapter; | |
| runtimeSessionIdForAdapter = null; | |
| runtimeSessionParamsForAdapter = null; | |
| previousSessionDisplayId = null; | |
| if (sessionCompaction.reason) { | |
| runtimeWorkspaceWarnings.push( | |
| `Starting a fresh session because ${sessionCompaction.reason}.`, | |
| ); | |
| } | |
| } else { | |
| delete context.paperclipSessionHandoffMarkdown; | |
| delete context.paperclipSessionRotationReason; | |
| delete context.paperclipPreviousSessionId; | |
| } | |
| const runtimeForAdapter = { | |
| sessionId: runtimeSessionIdForAdapter, | |
| sessionParams: runtimeSessionParamsForAdapter, | |
| sessionDisplayId: previousSessionDisplayId, | |
| taskKey, | |
| }; | |
| let seq = 1; | |
| let handle: RunLogHandle | null = null; | |
| let stdoutExcerpt = ""; | |
| let stderrExcerpt = ""; | |
| try { | |
| const startedAt = run.startedAt ?? new Date(); | |
| const runningWithSession = await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| startedAt, | |
| sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, | |
| contextSnapshot: context, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, run.id)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (runningWithSession) run = runningWithSession; | |
| const runningAgent = await db | |
| .update(agents) | |
| .set({ status: "running", updatedAt: new Date() }) | |
| .where(eq(agents.id, agent.id)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (runningAgent) { | |
| publishLiveEvent({ | |
| companyId: runningAgent.companyId, | |
| type: "agent.status", | |
| payload: { | |
| agentId: runningAgent.id, | |
| status: runningAgent.status, | |
| outcome: "running", | |
| }, | |
| }); | |
| } | |
| const currentRun = run; | |
| await appendRunEvent(currentRun, seq++, { | |
| eventType: "lifecycle", | |
| stream: "system", | |
| level: "info", | |
| message: "run started", | |
| }); | |
| handle = await runLogStore.begin({ | |
| companyId: run.companyId, | |
| agentId: run.agentId, | |
| runId, | |
| }); | |
| await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| logStore: handle.store, | |
| logRef: handle.logRef, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, runId)); | |
| const onLog = async (stream: "stdout" | "stderr", chunk: string) => { | |
| const sanitizedChunk = redactCurrentUserText(chunk); | |
| if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, sanitizedChunk); | |
| if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, sanitizedChunk); | |
| const ts = new Date().toISOString(); | |
| if (handle) { | |
| await runLogStore.append(handle, { | |
| stream, | |
| chunk: sanitizedChunk, | |
| ts, | |
| }); | |
| } | |
| const payloadChunk = | |
| sanitizedChunk.length > MAX_LIVE_LOG_CHUNK_BYTES | |
| ? sanitizedChunk.slice(sanitizedChunk.length - MAX_LIVE_LOG_CHUNK_BYTES) | |
| : sanitizedChunk; | |
| publishLiveEvent({ | |
| companyId: run.companyId, | |
| type: "heartbeat.run.log", | |
| payload: { | |
| runId: run.id, | |
| agentId: run.agentId, | |
| ts, | |
| stream, | |
| chunk: payloadChunk, | |
| truncated: payloadChunk.length !== sanitizedChunk.length, | |
| }, | |
| }); | |
| }; | |
| for (const warning of runtimeWorkspaceWarnings) { | |
| await onLog("stderr", `[paperclip] ${warning}\n`); | |
| } | |
| const adapterEnv = Object.fromEntries( | |
| Object.entries(parseObject(resolvedConfig.env)).filter( | |
| (entry): entry is [string, string] => typeof entry[0] === "string" && typeof entry[1] === "string", | |
| ), | |
| ); | |
| const runtimeServices = await ensureRuntimeServicesForRun({ | |
| db, | |
| runId: run.id, | |
| agent: { | |
| id: agent.id, | |
| name: agent.name, | |
| companyId: agent.companyId, | |
| }, | |
| issue: issueRef, | |
| workspace: executionWorkspace, | |
| executionWorkspaceId: persistedExecutionWorkspace?.id ?? issueRef?.executionWorkspaceId ?? null, | |
| config: resolvedConfig, | |
| adapterEnv, | |
| onLog, | |
| }); | |
| if (runtimeServices.length > 0) { | |
| context.paperclipRuntimeServices = runtimeServices; | |
| context.paperclipRuntimePrimaryUrl = | |
| runtimeServices.find((service) => readNonEmptyString(service.url))?.url ?? null; | |
| await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| contextSnapshot: context, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, run.id)); | |
| } | |
| if (issueId && (executionWorkspace.created || runtimeServices.some((service) => !service.reused))) { | |
| try { | |
| await issuesSvc.addComment( | |
| issueId, | |
| buildWorkspaceReadyComment({ | |
| workspace: executionWorkspace, | |
| runtimeServices, | |
| }), | |
| { agentId: agent.id }, | |
| ); | |
| } catch (err) { | |
| await onLog( | |
| "stderr", | |
| `[paperclip] Failed to post workspace-ready comment: ${err instanceof Error ? err.message : String(err)}\n`, | |
| ); | |
| } | |
| } | |
| const onAdapterMeta = async (meta: AdapterInvocationMeta) => { | |
| if (meta.env && secretKeys.size > 0) { | |
| for (const key of secretKeys) { | |
| if (key in meta.env) meta.env[key] = "***REDACTED***"; | |
| } | |
| } | |
| await appendRunEvent(currentRun, seq++, { | |
| eventType: "adapter.invoke", | |
| stream: "system", | |
| level: "info", | |
| message: "adapter invocation", | |
| payload: meta as unknown as Record<string, unknown>, | |
| }); | |
| }; | |
| const adapter = getServerAdapter(agent.adapterType); | |
| const authToken = adapter.supportsLocalAgentJwt | |
| ? createLocalAgentJwt(agent.id, agent.companyId, agent.adapterType, run.id) | |
| : null; | |
| if (adapter.supportsLocalAgentJwt && !authToken) { | |
| logger.warn( | |
| { | |
| companyId: agent.companyId, | |
| agentId: agent.id, | |
| runId: run.id, | |
| adapterType: agent.adapterType, | |
| }, | |
| "local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY", | |
| ); | |
| } | |
| const adapterResult = await adapter.execute({ | |
| runId: run.id, | |
| agent, | |
| runtime: runtimeForAdapter, | |
| config: resolvedConfig, | |
| context, | |
| onLog, | |
| onMeta: onAdapterMeta, | |
| authToken: authToken ?? undefined, | |
| }); | |
| const adapterManagedRuntimeServices = adapterResult.runtimeServices | |
| ? await persistAdapterManagedRuntimeServices({ | |
| db, | |
| adapterType: agent.adapterType, | |
| runId: run.id, | |
| agent: { | |
| id: agent.id, | |
| name: agent.name, | |
| companyId: agent.companyId, | |
| }, | |
| issue: issueRef, | |
| workspace: executionWorkspace, | |
| reports: adapterResult.runtimeServices, | |
| }) | |
| : []; | |
| if (adapterManagedRuntimeServices.length > 0) { | |
| const combinedRuntimeServices = [ | |
| ...runtimeServices, | |
| ...adapterManagedRuntimeServices, | |
| ]; | |
| context.paperclipRuntimeServices = combinedRuntimeServices; | |
| context.paperclipRuntimePrimaryUrl = | |
| combinedRuntimeServices.find((service) => readNonEmptyString(service.url))?.url ?? null; | |
| await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| contextSnapshot: context, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, run.id)); | |
| if (issueId) { | |
| try { | |
| await issuesSvc.addComment( | |
| issueId, | |
| buildWorkspaceReadyComment({ | |
| workspace: executionWorkspace, | |
| runtimeServices: adapterManagedRuntimeServices, | |
| }), | |
| { agentId: agent.id }, | |
| ); | |
| } catch (err) { | |
| await onLog( | |
| "stderr", | |
| `[paperclip] Failed to post adapter-managed runtime comment: ${err instanceof Error ? err.message : String(err)}\n`, | |
| ); | |
| } | |
| } | |
| } | |
| const nextSessionState = resolveNextSessionState({ | |
| codec: sessionCodec, | |
| adapterResult, | |
| previousParams: previousSessionParams, | |
| previousDisplayId: runtimeForAdapter.sessionDisplayId, | |
| previousLegacySessionId: runtimeForAdapter.sessionId, | |
| }); | |
| const rawUsage = normalizeUsageTotals(adapterResult.usage); | |
| const sessionUsageResolution = await resolveNormalizedUsageForSession({ | |
| agentId: agent.id, | |
| runId: run.id, | |
| sessionId: nextSessionState.displayId ?? nextSessionState.legacySessionId, | |
| rawUsage, | |
| }); | |
| const normalizedUsage = sessionUsageResolution.normalizedUsage; | |
| let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; | |
| const latestRun = await getRun(run.id); | |
| if (latestRun?.status === "cancelled") { | |
| outcome = "cancelled"; | |
| } else if (adapterResult.timedOut) { | |
| outcome = "timed_out"; | |
| } else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) { | |
| outcome = "succeeded"; | |
| } else { | |
| outcome = "failed"; | |
| } | |
| let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; | |
| if (handle) { | |
| logSummary = await runLogStore.finalize(handle); | |
| } | |
| const status = | |
| outcome === "succeeded" | |
| ? "succeeded" | |
| : outcome === "cancelled" | |
| ? "cancelled" | |
| : outcome === "timed_out" | |
| ? "timed_out" | |
| : "failed"; | |
| const usageJson = | |
| normalizedUsage || adapterResult.costUsd != null | |
| ? ({ | |
| ...(normalizedUsage ?? {}), | |
| ...(rawUsage ? { | |
| rawInputTokens: rawUsage.inputTokens, | |
| rawCachedInputTokens: rawUsage.cachedInputTokens, | |
| rawOutputTokens: rawUsage.outputTokens, | |
| } : {}), | |
| ...(sessionUsageResolution.derivedFromSessionTotals ? { usageSource: "session_delta" } : {}), | |
| ...((nextSessionState.displayId ?? nextSessionState.legacySessionId) | |
| ? { persistedSessionId: nextSessionState.displayId ?? nextSessionState.legacySessionId } | |
| : {}), | |
| sessionReused: runtimeForAdapter.sessionId != null || runtimeForAdapter.sessionDisplayId != null, | |
| taskSessionReused: taskSessionForRun != null, | |
| freshSession: runtimeForAdapter.sessionId == null && runtimeForAdapter.sessionDisplayId == null, | |
| sessionRotated: sessionCompaction.rotate, | |
| sessionRotationReason: sessionCompaction.reason, | |
| provider: readNonEmptyString(adapterResult.provider) ?? "unknown", | |
| biller: resolveLedgerBiller(adapterResult), | |
| model: readNonEmptyString(adapterResult.model) ?? "unknown", | |
| ...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}), | |
| billingType: normalizeLedgerBillingType(adapterResult.billingType), | |
| } as Record<string, unknown>) | |
| : null; | |
| await setRunStatus(run.id, status, { | |
| finishedAt: new Date(), | |
| error: | |
| outcome === "succeeded" | |
| ? null | |
| : redactCurrentUserText( | |
| adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"), | |
| ), | |
| errorCode: | |
| outcome === "timed_out" | |
| ? "timeout" | |
| : outcome === "cancelled" | |
| ? "cancelled" | |
| : outcome === "failed" | |
| ? (adapterResult.errorCode ?? "adapter_failed") | |
| : null, | |
| exitCode: adapterResult.exitCode, | |
| signal: adapterResult.signal, | |
| usageJson, | |
| resultJson: adapterResult.resultJson ?? null, | |
| sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId, | |
| stdoutExcerpt, | |
| stderrExcerpt, | |
| logBytes: logSummary?.bytes, | |
| logSha256: logSummary?.sha256, | |
| logCompressed: logSummary?.compressed ?? false, | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, { | |
| finishedAt: new Date(), | |
| error: adapterResult.errorMessage ?? null, | |
| }); | |
| const finalizedRun = await getRun(run.id); | |
| if (finalizedRun) { | |
| await appendRunEvent(finalizedRun, seq++, { | |
| eventType: "lifecycle", | |
| stream: "system", | |
| level: outcome === "succeeded" ? "info" : "error", | |
| message: `run ${outcome}`, | |
| payload: { | |
| status, | |
| exitCode: adapterResult.exitCode, | |
| }, | |
| }); | |
| await releaseIssueExecutionAndPromote(finalizedRun); | |
| } | |
| if (finalizedRun) { | |
| await updateRuntimeState(agent, finalizedRun, adapterResult, { | |
| legacySessionId: nextSessionState.legacySessionId, | |
| }, normalizedUsage); | |
| if (taskKey) { | |
| if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) { | |
| await clearTaskSessions(agent.companyId, agent.id, { | |
| taskKey, | |
| adapterType: agent.adapterType, | |
| }); | |
| } else { | |
| await upsertTaskSession({ | |
| companyId: agent.companyId, | |
| agentId: agent.id, | |
| adapterType: agent.adapterType, | |
| taskKey, | |
| sessionParamsJson: nextSessionState.params, | |
| sessionDisplayId: nextSessionState.displayId, | |
| lastRunId: finalizedRun.id, | |
| lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"), | |
| }); | |
| } | |
| } | |
| } | |
| await finalizeAgentStatus(agent.id, outcome); | |
| } catch (err) { | |
| const message = redactCurrentUserText(err instanceof Error ? err.message : "Unknown adapter failure"); | |
| logger.error({ err, runId }, "heartbeat execution failed"); | |
| let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; | |
| if (handle) { | |
| try { | |
| logSummary = await runLogStore.finalize(handle); | |
| } catch (finalizeErr) { | |
| logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error"); | |
| } | |
| } | |
| const failedRun = await setRunStatus(run.id, "failed", { | |
| error: message, | |
| errorCode: "adapter_failed", | |
| finishedAt: new Date(), | |
| stdoutExcerpt, | |
| stderrExcerpt, | |
| logBytes: logSummary?.bytes, | |
| logSha256: logSummary?.sha256, | |
| logCompressed: logSummary?.compressed ?? false, | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, "failed", { | |
| finishedAt: new Date(), | |
| error: message, | |
| }); | |
| if (failedRun) { | |
| await appendRunEvent(failedRun, seq++, { | |
| eventType: "error", | |
| stream: "system", | |
| level: "error", | |
| message, | |
| }); | |
| await releaseIssueExecutionAndPromote(failedRun); | |
| await updateRuntimeState(agent, failedRun, { | |
| exitCode: null, | |
| signal: null, | |
| timedOut: false, | |
| errorMessage: message, | |
| }, { | |
| legacySessionId: runtimeForAdapter.sessionId, | |
| }); | |
| if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) { | |
| await upsertTaskSession({ | |
| companyId: agent.companyId, | |
| agentId: agent.id, | |
| adapterType: agent.adapterType, | |
| taskKey, | |
| sessionParamsJson: previousSessionParams, | |
| sessionDisplayId: previousSessionDisplayId, | |
| lastRunId: failedRun.id, | |
| lastError: message, | |
| }); | |
| } | |
| } | |
| await finalizeAgentStatus(agent.id, "failed"); | |
| } | |
| } catch (outerErr) { | |
| // Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun). | |
| // The inner catch did not fire, so we must record the failure here. | |
| const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure"; | |
| logger.error({ err: outerErr, runId }, "heartbeat execution setup failed"); | |
| await setRunStatus(runId, "failed", { | |
| error: message, | |
| errorCode: "adapter_failed", | |
| finishedAt: new Date(), | |
| }).catch(() => undefined); | |
| await setWakeupStatus(run.wakeupRequestId, "failed", { | |
| finishedAt: new Date(), | |
| error: message, | |
| }).catch(() => undefined); | |
| const failedRun = await getRun(runId).catch(() => null); | |
| if (failedRun) { | |
| // Emit a run-log event so the failure is visible in the run timeline, | |
| // consistent with what the inner catch block does for adapter failures. | |
| await appendRunEvent(failedRun, 1, { | |
| eventType: "error", | |
| stream: "system", | |
| level: "error", | |
| message, | |
| }).catch(() => undefined); | |
| await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined); | |
| } | |
| // Ensure the agent is not left stuck in "running" if the inner catch handler's | |
| // DB calls threw (e.g. a transient DB error in finalizeAgentStatus). | |
| await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined); | |
| } finally { | |
| await releaseRuntimeServicesForRun(run.id).catch(() => undefined); | |
| activeRunExecutions.delete(run.id); | |
| await startNextQueuedRunForAgent(run.agentId); | |
| } | |
| } | |
| async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) { | |
| const promotedRun = await db.transaction(async (tx) => { | |
| await tx.execute( | |
| sql`select id from issues where company_id = ${run.companyId} and execution_run_id = ${run.id} for update`, | |
| ); | |
| const issue = await tx | |
| .select({ | |
| id: issues.id, | |
| companyId: issues.companyId, | |
| }) | |
| .from(issues) | |
| .where(and(eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id))) | |
| .then((rows) => rows[0] ?? null); | |
| if (!issue) return; | |
| await tx | |
| .update(issues) | |
| .set({ | |
| executionRunId: null, | |
| executionAgentNameKey: null, | |
| executionLockedAt: null, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(issues.id, issue.id)); | |
| while (true) { | |
| const deferred = await tx | |
| .select() | |
| .from(agentWakeupRequests) | |
| .where( | |
| and( | |
| eq(agentWakeupRequests.companyId, issue.companyId), | |
| eq(agentWakeupRequests.status, "deferred_issue_execution"), | |
| sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issue.id}`, | |
| ), | |
| ) | |
| .orderBy(asc(agentWakeupRequests.requestedAt)) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| if (!deferred) return null; | |
| const deferredAgent = await tx | |
| .select() | |
| .from(agents) | |
| .where(eq(agents.id, deferred.agentId)) | |
| .then((rows) => rows[0] ?? null); | |
| if ( | |
| !deferredAgent || | |
| deferredAgent.companyId !== issue.companyId || | |
| deferredAgent.status === "paused" || | |
| deferredAgent.status === "terminated" || | |
| deferredAgent.status === "pending_approval" | |
| ) { | |
| await tx | |
| .update(agentWakeupRequests) | |
| .set({ | |
| status: "failed", | |
| finishedAt: new Date(), | |
| error: "Deferred wake could not be promoted: agent is not invokable", | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentWakeupRequests.id, deferred.id)); | |
| continue; | |
| } | |
| const deferredPayload = parseObject(deferred.payload); | |
| const deferredContextSeed = parseObject(deferredPayload[DEFERRED_WAKE_CONTEXT_KEY]); | |
| const promotedContextSeed: Record<string, unknown> = { ...deferredContextSeed }; | |
| const promotedReason = readNonEmptyString(deferred.reason) ?? "issue_execution_promoted"; | |
| const promotedSource = | |
| (readNonEmptyString(deferred.source) as WakeupOptions["source"]) ?? "automation"; | |
| const promotedTriggerDetail = | |
| (readNonEmptyString(deferred.triggerDetail) as WakeupOptions["triggerDetail"]) ?? null; | |
| const promotedPayload = deferredPayload; | |
| delete promotedPayload[DEFERRED_WAKE_CONTEXT_KEY]; | |
| const { | |
| contextSnapshot: promotedContextSnapshot, | |
| taskKey: promotedTaskKey, | |
| } = enrichWakeContextSnapshot({ | |
| contextSnapshot: promotedContextSeed, | |
| reason: promotedReason, | |
| source: promotedSource, | |
| triggerDetail: promotedTriggerDetail, | |
| payload: promotedPayload, | |
| }); | |
| const sessionBefore = await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey); | |
| const now = new Date(); | |
| const newRun = await tx | |
| .insert(heartbeatRuns) | |
| .values({ | |
| companyId: deferredAgent.companyId, | |
| agentId: deferredAgent.id, | |
| invocationSource: promotedSource, | |
| triggerDetail: promotedTriggerDetail, | |
| status: "queued", | |
| wakeupRequestId: deferred.id, | |
| contextSnapshot: promotedContextSnapshot, | |
| sessionIdBefore: sessionBefore, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| await tx | |
| .update(agentWakeupRequests) | |
| .set({ | |
| status: "queued", | |
| reason: "issue_execution_promoted", | |
| runId: newRun.id, | |
| claimedAt: null, | |
| finishedAt: null, | |
| error: null, | |
| updatedAt: now, | |
| }) | |
| .where(eq(agentWakeupRequests.id, deferred.id)); | |
| await tx | |
| .update(issues) | |
| .set({ | |
| executionRunId: newRun.id, | |
| executionAgentNameKey: normalizeAgentNameKey(deferredAgent.name), | |
| executionLockedAt: now, | |
| updatedAt: now, | |
| }) | |
| .where(eq(issues.id, issue.id)); | |
| return newRun; | |
| } | |
| }); | |
| if (!promotedRun) return; | |
| publishLiveEvent({ | |
| companyId: promotedRun.companyId, | |
| type: "heartbeat.run.queued", | |
| payload: { | |
| runId: promotedRun.id, | |
| agentId: promotedRun.agentId, | |
| invocationSource: promotedRun.invocationSource, | |
| triggerDetail: promotedRun.triggerDetail, | |
| wakeupRequestId: promotedRun.wakeupRequestId, | |
| }, | |
| }); | |
| await startNextQueuedRunForAgent(promotedRun.agentId); | |
| } | |
| async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) { | |
| const source = opts.source ?? "on_demand"; | |
| const triggerDetail = opts.triggerDetail ?? null; | |
| const contextSnapshot: Record<string, unknown> = { ...(opts.contextSnapshot ?? {}) }; | |
| const reason = opts.reason ?? null; | |
| const payload = opts.payload ?? null; | |
| const { | |
| contextSnapshot: enrichedContextSnapshot, | |
| issueIdFromPayload, | |
| taskKey, | |
| wakeCommentId, | |
| } = enrichWakeContextSnapshot({ | |
| contextSnapshot, | |
| reason, | |
| source, | |
| triggerDetail, | |
| payload, | |
| }); | |
| const issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload; | |
| const agent = await getAgent(agentId); | |
| if (!agent) throw notFound("Agent not found"); | |
| const writeSkippedRequest = async (skipReason: string) => { | |
| await db.insert(agentWakeupRequests).values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason: skipReason, | |
| payload, | |
| status: "skipped", | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| finishedAt: new Date(), | |
| }); | |
| }; | |
| let projectId = readNonEmptyString(enrichedContextSnapshot.projectId); | |
| if (!projectId && issueId) { | |
| projectId = await db | |
| .select({ projectId: issues.projectId }) | |
| .from(issues) | |
| .where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId))) | |
| .then((rows) => rows[0]?.projectId ?? null); | |
| } | |
| const budgetBlock = await budgets.getInvocationBlock(agent.companyId, agentId, { | |
| issueId, | |
| projectId, | |
| }); | |
| if (budgetBlock) { | |
| await writeSkippedRequest("budget.blocked"); | |
| throw conflict(budgetBlock.reason, { | |
| scopeType: budgetBlock.scopeType, | |
| scopeId: budgetBlock.scopeId, | |
| }); | |
| } | |
| if ( | |
| agent.status === "paused" || | |
| agent.status === "terminated" || | |
| agent.status === "pending_approval" | |
| ) { | |
| throw conflict("Agent is not invokable in its current state", { status: agent.status }); | |
| } | |
| const policy = parseHeartbeatPolicy(agent); | |
| if (source === "timer" && !policy.enabled) { | |
| await writeSkippedRequest("heartbeat.disabled"); | |
| return null; | |
| } | |
| if (source !== "timer" && !policy.wakeOnDemand) { | |
| await writeSkippedRequest("heartbeat.wakeOnDemand.disabled"); | |
| return null; | |
| } | |
| const bypassIssueExecutionLock = | |
| reason === "issue_comment_mentioned" || | |
| readNonEmptyString(enrichedContextSnapshot.wakeReason) === "issue_comment_mentioned"; | |
| if (issueId && !bypassIssueExecutionLock) { | |
| const agentNameKey = normalizeAgentNameKey(agent.name); | |
| const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); | |
| const outcome = await db.transaction(async (tx) => { | |
| await tx.execute( | |
| sql`select id from issues where id = ${issueId} and company_id = ${agent.companyId} for update`, | |
| ); | |
| const issue = await tx | |
| .select({ | |
| id: issues.id, | |
| companyId: issues.companyId, | |
| executionRunId: issues.executionRunId, | |
| executionAgentNameKey: issues.executionAgentNameKey, | |
| }) | |
| .from(issues) | |
| .where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId))) | |
| .then((rows) => rows[0] ?? null); | |
| if (!issue) { | |
| await tx.insert(agentWakeupRequests).values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason: "issue_execution_issue_not_found", | |
| payload, | |
| status: "skipped", | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| finishedAt: new Date(), | |
| }); | |
| return { kind: "skipped" as const }; | |
| } | |
| let activeExecutionRun = issue.executionRunId | |
| ? await tx | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(eq(heartbeatRuns.id, issue.executionRunId)) | |
| .then((rows) => rows[0] ?? null) | |
| : null; | |
| if (activeExecutionRun && activeExecutionRun.status !== "queued" && activeExecutionRun.status !== "running") { | |
| activeExecutionRun = null; | |
| } | |
| if (!activeExecutionRun && issue.executionRunId) { | |
| await tx | |
| .update(issues) | |
| .set({ | |
| executionRunId: null, | |
| executionAgentNameKey: null, | |
| executionLockedAt: null, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(issues.id, issue.id)); | |
| } | |
| if (!activeExecutionRun) { | |
| const legacyRun = await tx | |
| .select() | |
| .from(heartbeatRuns) | |
| .where( | |
| and( | |
| eq(heartbeatRuns.companyId, issue.companyId), | |
| inArray(heartbeatRuns.status, ["queued", "running"]), | |
| sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issue.id}`, | |
| ), | |
| ) | |
| .orderBy( | |
| sql`case when ${heartbeatRuns.status} = 'running' then 0 else 1 end`, | |
| asc(heartbeatRuns.createdAt), | |
| ) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| if (legacyRun) { | |
| activeExecutionRun = legacyRun; | |
| const legacyAgent = await tx | |
| .select({ name: agents.name }) | |
| .from(agents) | |
| .where(eq(agents.id, legacyRun.agentId)) | |
| .then((rows) => rows[0] ?? null); | |
| await tx | |
| .update(issues) | |
| .set({ | |
| executionRunId: legacyRun.id, | |
| executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name), | |
| executionLockedAt: new Date(), | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(issues.id, issue.id)); | |
| } | |
| } | |
| if (activeExecutionRun) { | |
| const executionAgent = await tx | |
| .select({ name: agents.name }) | |
| .from(agents) | |
| .where(eq(agents.id, activeExecutionRun.agentId)) | |
| .then((rows) => rows[0] ?? null); | |
| const executionAgentNameKey = | |
| normalizeAgentNameKey(issue.executionAgentNameKey) ?? | |
| normalizeAgentNameKey(executionAgent?.name); | |
| const isSameExecutionAgent = | |
| Boolean(executionAgentNameKey) && executionAgentNameKey === agentNameKey; | |
| const shouldQueueFollowupForCommentWake = | |
| Boolean(wakeCommentId) && | |
| activeExecutionRun.status === "running" && | |
| isSameExecutionAgent; | |
| if (isSameExecutionAgent && !shouldQueueFollowupForCommentWake) { | |
| const mergedContextSnapshot = mergeCoalescedContextSnapshot( | |
| activeExecutionRun.contextSnapshot, | |
| enrichedContextSnapshot, | |
| ); | |
| const mergedRun = await tx | |
| .update(heartbeatRuns) | |
| .set({ | |
| contextSnapshot: mergedContextSnapshot, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, activeExecutionRun.id)) | |
| .returning() | |
| .then((rows) => rows[0] ?? activeExecutionRun); | |
| await tx.insert(agentWakeupRequests).values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason: "issue_execution_same_name", | |
| payload, | |
| status: "coalesced", | |
| coalescedCount: 1, | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| runId: mergedRun.id, | |
| finishedAt: new Date(), | |
| }); | |
| return { kind: "coalesced" as const, run: mergedRun }; | |
| } | |
| const deferredPayload = { | |
| ...(payload ?? {}), | |
| issueId, | |
| [DEFERRED_WAKE_CONTEXT_KEY]: enrichedContextSnapshot, | |
| }; | |
| const existingDeferred = await tx | |
| .select() | |
| .from(agentWakeupRequests) | |
| .where( | |
| and( | |
| eq(agentWakeupRequests.companyId, agent.companyId), | |
| eq(agentWakeupRequests.agentId, agentId), | |
| eq(agentWakeupRequests.status, "deferred_issue_execution"), | |
| sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issue.id}`, | |
| ), | |
| ) | |
| .orderBy(asc(agentWakeupRequests.requestedAt)) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| if (existingDeferred) { | |
| const existingDeferredPayload = parseObject(existingDeferred.payload); | |
| const existingDeferredContext = parseObject(existingDeferredPayload[DEFERRED_WAKE_CONTEXT_KEY]); | |
| const mergedDeferredContext = mergeCoalescedContextSnapshot( | |
| existingDeferredContext, | |
| enrichedContextSnapshot, | |
| ); | |
| const mergedDeferredPayload = { | |
| ...existingDeferredPayload, | |
| ...(payload ?? {}), | |
| issueId, | |
| [DEFERRED_WAKE_CONTEXT_KEY]: mergedDeferredContext, | |
| }; | |
| await tx | |
| .update(agentWakeupRequests) | |
| .set({ | |
| payload: mergedDeferredPayload, | |
| coalescedCount: (existingDeferred.coalescedCount ?? 0) + 1, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentWakeupRequests.id, existingDeferred.id)); | |
| return { kind: "deferred" as const }; | |
| } | |
| await tx.insert(agentWakeupRequests).values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason: "issue_execution_deferred", | |
| payload: deferredPayload, | |
| status: "deferred_issue_execution", | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| }); | |
| return { kind: "deferred" as const }; | |
| } | |
| const wakeupRequest = await tx | |
| .insert(agentWakeupRequests) | |
| .values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason, | |
| payload, | |
| status: "queued", | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| const newRun = await tx | |
| .insert(heartbeatRuns) | |
| .values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| invocationSource: source, | |
| triggerDetail, | |
| status: "queued", | |
| wakeupRequestId: wakeupRequest.id, | |
| contextSnapshot: enrichedContextSnapshot, | |
| sessionIdBefore: sessionBefore, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| await tx | |
| .update(agentWakeupRequests) | |
| .set({ | |
| runId: newRun.id, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentWakeupRequests.id, wakeupRequest.id)); | |
| await tx | |
| .update(issues) | |
| .set({ | |
| executionRunId: newRun.id, | |
| executionAgentNameKey: agentNameKey, | |
| executionLockedAt: new Date(), | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(issues.id, issue.id)); | |
| return { kind: "queued" as const, run: newRun }; | |
| }); | |
| if (outcome.kind === "deferred" || outcome.kind === "skipped") return null; | |
| if (outcome.kind === "coalesced") return outcome.run; | |
| const newRun = outcome.run; | |
| publishLiveEvent({ | |
| companyId: newRun.companyId, | |
| type: "heartbeat.run.queued", | |
| payload: { | |
| runId: newRun.id, | |
| agentId: newRun.agentId, | |
| invocationSource: newRun.invocationSource, | |
| triggerDetail: newRun.triggerDetail, | |
| wakeupRequestId: newRun.wakeupRequestId, | |
| }, | |
| }); | |
| await startNextQueuedRunForAgent(agent.id); | |
| return newRun; | |
| } | |
| const activeRuns = await db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) | |
| .orderBy(desc(heartbeatRuns.createdAt)); | |
| const sameScopeQueuedRun = activeRuns.find( | |
| (candidate) => candidate.status === "queued" && isSameTaskScope(runTaskKey(candidate), taskKey), | |
| ); | |
| const sameScopeRunningRun = activeRuns.find( | |
| (candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey), | |
| ); | |
| const shouldQueueFollowupForCommentWake = | |
| Boolean(wakeCommentId) && Boolean(sameScopeRunningRun) && !sameScopeQueuedRun; | |
| const coalescedTargetRun = | |
| sameScopeQueuedRun ?? | |
| (shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ?? null); | |
| if (coalescedTargetRun) { | |
| const mergedContextSnapshot = mergeCoalescedContextSnapshot( | |
| coalescedTargetRun.contextSnapshot, | |
| contextSnapshot, | |
| ); | |
| const mergedRun = await db | |
| .update(heartbeatRuns) | |
| .set({ | |
| contextSnapshot: mergedContextSnapshot, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(heartbeatRuns.id, coalescedTargetRun.id)) | |
| .returning() | |
| .then((rows) => rows[0] ?? coalescedTargetRun); | |
| await db.insert(agentWakeupRequests).values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason, | |
| payload, | |
| status: "coalesced", | |
| coalescedCount: 1, | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| runId: mergedRun.id, | |
| finishedAt: new Date(), | |
| }); | |
| return mergedRun; | |
| } | |
| const wakeupRequest = await db | |
| .insert(agentWakeupRequests) | |
| .values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| source, | |
| triggerDetail, | |
| reason, | |
| payload, | |
| status: "queued", | |
| requestedByActorType: opts.requestedByActorType ?? null, | |
| requestedByActorId: opts.requestedByActorId ?? null, | |
| idempotencyKey: opts.idempotencyKey ?? null, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); | |
| const newRun = await db | |
| .insert(heartbeatRuns) | |
| .values({ | |
| companyId: agent.companyId, | |
| agentId, | |
| invocationSource: source, | |
| triggerDetail, | |
| status: "queued", | |
| wakeupRequestId: wakeupRequest.id, | |
| contextSnapshot: enrichedContextSnapshot, | |
| sessionIdBefore: sessionBefore, | |
| }) | |
| .returning() | |
| .then((rows) => rows[0]); | |
| await db | |
| .update(agentWakeupRequests) | |
| .set({ | |
| runId: newRun.id, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(agentWakeupRequests.id, wakeupRequest.id)); | |
| publishLiveEvent({ | |
| companyId: newRun.companyId, | |
| type: "heartbeat.run.queued", | |
| payload: { | |
| runId: newRun.id, | |
| agentId: newRun.agentId, | |
| invocationSource: newRun.invocationSource, | |
| triggerDetail: newRun.triggerDetail, | |
| wakeupRequestId: newRun.wakeupRequestId, | |
| }, | |
| }); | |
| await startNextQueuedRunForAgent(agent.id); | |
| return newRun; | |
| } | |
| async function listProjectScopedRunIds(companyId: string, projectId: string) { | |
| const runIssueId = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`; | |
| const effectiveProjectId = sql<string | null>`coalesce(${heartbeatRuns.contextSnapshot} ->> 'projectId', ${issues.projectId}::text)`; | |
| const rows = await db | |
| .selectDistinctOn([heartbeatRuns.id], { id: heartbeatRuns.id }) | |
| .from(heartbeatRuns) | |
| .leftJoin( | |
| issues, | |
| and( | |
| eq(issues.companyId, companyId), | |
| sql`${issues.id}::text = ${runIssueId}`, | |
| ), | |
| ) | |
| .where( | |
| and( | |
| eq(heartbeatRuns.companyId, companyId), | |
| inArray(heartbeatRuns.status, ["queued", "running"]), | |
| sql`${effectiveProjectId} = ${projectId}`, | |
| ), | |
| ); | |
| return rows.map((row) => row.id); | |
| } | |
| async function listProjectScopedWakeupIds(companyId: string, projectId: string) { | |
| const wakeIssueId = sql<string | null>`${agentWakeupRequests.payload} ->> 'issueId'`; | |
| const effectiveProjectId = sql<string | null>`coalesce(${agentWakeupRequests.payload} ->> 'projectId', ${issues.projectId}::text)`; | |
| const rows = await db | |
| .selectDistinctOn([agentWakeupRequests.id], { id: agentWakeupRequests.id }) | |
| .from(agentWakeupRequests) | |
| .leftJoin( | |
| issues, | |
| and( | |
| eq(issues.companyId, companyId), | |
| sql`${issues.id}::text = ${wakeIssueId}`, | |
| ), | |
| ) | |
| .where( | |
| and( | |
| eq(agentWakeupRequests.companyId, companyId), | |
| inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]), | |
| sql`${agentWakeupRequests.runId} is null`, | |
| sql`${effectiveProjectId} = ${projectId}`, | |
| ), | |
| ); | |
| return rows.map((row) => row.id); | |
| } | |
| async function cancelPendingWakeupsForBudgetScope(scope: BudgetEnforcementScope) { | |
| const now = new Date(); | |
| let wakeupIds: string[] = []; | |
| if (scope.scopeType === "company") { | |
| wakeupIds = await db | |
| .select({ id: agentWakeupRequests.id }) | |
| .from(agentWakeupRequests) | |
| .where( | |
| and( | |
| eq(agentWakeupRequests.companyId, scope.companyId), | |
| inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]), | |
| sql`${agentWakeupRequests.runId} is null`, | |
| ), | |
| ) | |
| .then((rows) => rows.map((row) => row.id)); | |
| } else if (scope.scopeType === "agent") { | |
| wakeupIds = await db | |
| .select({ id: agentWakeupRequests.id }) | |
| .from(agentWakeupRequests) | |
| .where( | |
| and( | |
| eq(agentWakeupRequests.companyId, scope.companyId), | |
| eq(agentWakeupRequests.agentId, scope.scopeId), | |
| inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]), | |
| sql`${agentWakeupRequests.runId} is null`, | |
| ), | |
| ) | |
| .then((rows) => rows.map((row) => row.id)); | |
| } else { | |
| wakeupIds = await listProjectScopedWakeupIds(scope.companyId, scope.scopeId); | |
| } | |
| if (wakeupIds.length === 0) return 0; | |
| await db | |
| .update(agentWakeupRequests) | |
| .set({ | |
| status: "cancelled", | |
| finishedAt: now, | |
| error: "Cancelled due to budget pause", | |
| updatedAt: now, | |
| }) | |
| .where(inArray(agentWakeupRequests.id, wakeupIds)); | |
| return wakeupIds.length; | |
| } | |
| async function cancelRunInternal(runId: string, reason = "Cancelled by control plane") { | |
| const run = await getRun(runId); | |
| if (!run) throw notFound("Heartbeat run not found"); | |
| if (run.status !== "running" && run.status !== "queued") return run; | |
| const running = runningProcesses.get(run.id); | |
| if (running) { | |
| running.child.kill("SIGTERM"); | |
| const graceMs = Math.max(1, running.graceSec) * 1000; | |
| setTimeout(() => { | |
| if (!running.child.killed) { | |
| running.child.kill("SIGKILL"); | |
| } | |
| }, graceMs); | |
| } | |
| const cancelled = await setRunStatus(run.id, "cancelled", { | |
| finishedAt: new Date(), | |
| error: reason, | |
| errorCode: "cancelled", | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, "cancelled", { | |
| finishedAt: new Date(), | |
| error: reason, | |
| }); | |
| if (cancelled) { | |
| await appendRunEvent(cancelled, 1, { | |
| eventType: "lifecycle", | |
| stream: "system", | |
| level: "warn", | |
| message: "run cancelled", | |
| }); | |
| await releaseIssueExecutionAndPromote(cancelled); | |
| } | |
| runningProcesses.delete(run.id); | |
| await finalizeAgentStatus(run.agentId, "cancelled"); | |
| await startNextQueuedRunForAgent(run.agentId); | |
| return cancelled; | |
| } | |
| async function cancelActiveForAgentInternal(agentId: string, reason = "Cancelled due to agent pause") { | |
| const runs = await db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))); | |
| for (const run of runs) { | |
| await setRunStatus(run.id, "cancelled", { | |
| finishedAt: new Date(), | |
| error: reason, | |
| errorCode: "cancelled", | |
| }); | |
| await setWakeupStatus(run.wakeupRequestId, "cancelled", { | |
| finishedAt: new Date(), | |
| error: reason, | |
| }); | |
| const running = runningProcesses.get(run.id); | |
| if (running) { | |
| running.child.kill("SIGTERM"); | |
| runningProcesses.delete(run.id); | |
| } | |
| await releaseIssueExecutionAndPromote(run); | |
| } | |
| return runs.length; | |
| } | |
| async function cancelBudgetScopeWork(scope: BudgetEnforcementScope) { | |
| if (scope.scopeType === "agent") { | |
| await cancelActiveForAgentInternal(scope.scopeId, "Cancelled due to budget pause"); | |
| await cancelPendingWakeupsForBudgetScope(scope); | |
| return; | |
| } | |
| const runIds = | |
| scope.scopeType === "company" | |
| ? await db | |
| .select({ id: heartbeatRuns.id }) | |
| .from(heartbeatRuns) | |
| .where( | |
| and( | |
| eq(heartbeatRuns.companyId, scope.companyId), | |
| inArray(heartbeatRuns.status, ["queued", "running"]), | |
| ), | |
| ) | |
| .then((rows) => rows.map((row) => row.id)) | |
| : await listProjectScopedRunIds(scope.companyId, scope.scopeId); | |
| for (const runId of runIds) { | |
| await cancelRunInternal(runId, "Cancelled due to budget pause"); | |
| } | |
| await cancelPendingWakeupsForBudgetScope(scope); | |
| } | |
| return { | |
| list: async (companyId: string, agentId?: string, limit?: number) => { | |
| const query = db | |
| .select(heartbeatRunListColumns) | |
| .from(heartbeatRuns) | |
| .where( | |
| agentId | |
| ? and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)) | |
| : eq(heartbeatRuns.companyId, companyId), | |
| ) | |
| .orderBy(desc(heartbeatRuns.createdAt)); | |
| const rows = limit ? await query.limit(limit) : await query; | |
| return rows.map((row) => ({ | |
| ...row, | |
| resultJson: summarizeHeartbeatRunResultJson(row.resultJson), | |
| })); | |
| }, | |
| getRun, | |
| getRuntimeState: async (agentId: string) => { | |
| const state = await getRuntimeState(agentId); | |
| const agent = await getAgent(agentId); | |
| if (!agent) return null; | |
| const ensured = state ?? (await ensureRuntimeState(agent)); | |
| const latestTaskSession = await db | |
| .select() | |
| .from(agentTaskSessions) | |
| .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id))) | |
| .orderBy(desc(agentTaskSessions.updatedAt)) | |
| .limit(1) | |
| .then((rows) => rows[0] ?? null); | |
| return { | |
| ...ensured, | |
| sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId, | |
| sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null, | |
| }; | |
| }, | |
| listTaskSessions: async (agentId: string) => { | |
| const agent = await getAgent(agentId); | |
| if (!agent) throw notFound("Agent not found"); | |
| return db | |
| .select() | |
| .from(agentTaskSessions) | |
| .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId))) | |
| .orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt)); | |
| }, | |
| resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => { | |
| const agent = await getAgent(agentId); | |
| if (!agent) throw notFound("Agent not found"); | |
| await ensureRuntimeState(agent); | |
| const taskKey = readNonEmptyString(opts?.taskKey); | |
| const clearedTaskSessions = await clearTaskSessions( | |
| agent.companyId, | |
| agent.id, | |
| taskKey ? { taskKey, adapterType: agent.adapterType } : undefined, | |
| ); | |
| const runtimePatch: Partial<typeof agentRuntimeState.$inferInsert> = { | |
| sessionId: null, | |
| lastError: null, | |
| updatedAt: new Date(), | |
| }; | |
| if (!taskKey) { | |
| runtimePatch.stateJson = {}; | |
| } | |
| const updated = await db | |
| .update(agentRuntimeState) | |
| .set(runtimePatch) | |
| .where(eq(agentRuntimeState.agentId, agentId)) | |
| .returning() | |
| .then((rows) => rows[0] ?? null); | |
| if (!updated) return null; | |
| return { | |
| ...updated, | |
| sessionDisplayId: null, | |
| sessionParamsJson: null, | |
| clearedTaskSessions, | |
| }; | |
| }, | |
| listEvents: (runId: string, afterSeq = 0, limit = 200) => | |
| db | |
| .select() | |
| .from(heartbeatRunEvents) | |
| .where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq))) | |
| .orderBy(asc(heartbeatRunEvents.seq)) | |
| .limit(Math.max(1, Math.min(limit, 1000))), | |
| readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => { | |
| const run = await getRun(runId); | |
| if (!run) throw notFound("Heartbeat run not found"); | |
| if (!run.logStore || !run.logRef) throw notFound("Run log not found"); | |
| const result = await runLogStore.read( | |
| { | |
| store: run.logStore as "local_file", | |
| logRef: run.logRef, | |
| }, | |
| opts, | |
| ); | |
| return { | |
| runId, | |
| store: run.logStore, | |
| logRef: run.logRef, | |
| ...result, | |
| content: redactCurrentUserText(result.content), | |
| }; | |
| }, | |
| invoke: async ( | |
| agentId: string, | |
| source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand", | |
| contextSnapshot: Record<string, unknown> = {}, | |
| triggerDetail: "manual" | "ping" | "callback" | "system" = "manual", | |
| actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null }, | |
| ) => | |
| enqueueWakeup(agentId, { | |
| source, | |
| triggerDetail, | |
| contextSnapshot, | |
| requestedByActorType: actor?.actorType, | |
| requestedByActorId: actor?.actorId ?? null, | |
| }), | |
| wakeup: enqueueWakeup, | |
| reapOrphanedRuns, | |
| resumeQueuedRuns, | |
| tickTimers: async (now = new Date()) => { | |
| const allAgents = await db.select().from(agents); | |
| let checked = 0; | |
| let enqueued = 0; | |
| let skipped = 0; | |
| for (const agent of allAgents) { | |
| if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") continue; | |
| const policy = parseHeartbeatPolicy(agent); | |
| if (!policy.enabled || policy.intervalSec <= 0) continue; | |
| checked += 1; | |
| const baseline = new Date(agent.lastHeartbeatAt ?? agent.createdAt).getTime(); | |
| const elapsedMs = now.getTime() - baseline; | |
| if (elapsedMs < policy.intervalSec * 1000) continue; | |
| const run = await enqueueWakeup(agent.id, { | |
| source: "timer", | |
| triggerDetail: "system", | |
| reason: "heartbeat_timer", | |
| requestedByActorType: "system", | |
| requestedByActorId: "heartbeat_scheduler", | |
| contextSnapshot: { | |
| source: "scheduler", | |
| reason: "interval_elapsed", | |
| now: now.toISOString(), | |
| }, | |
| }); | |
| if (run) enqueued += 1; | |
| else skipped += 1; | |
| } | |
| return { checked, enqueued, skipped }; | |
| }, | |
| cancelRun: (runId: string) => cancelRunInternal(runId), | |
| cancelActiveForAgent: (agentId: string) => cancelActiveForAgentInternal(agentId), | |
| cancelBudgetScopeWork, | |
| getActiveRunForAgent: async (agentId: string) => { | |
| const [run] = await db | |
| .select() | |
| .from(heartbeatRuns) | |
| .where( | |
| and( | |
| eq(heartbeatRuns.agentId, agentId), | |
| eq(heartbeatRuns.status, "running"), | |
| ), | |
| ) | |
| .orderBy(desc(heartbeatRuns.startedAt)) | |
| .limit(1); | |
| return run ?? null; | |
| }, | |
| }; | |
| } | |