import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, CronJob, CronMessageChannel, CronRunOutcome, CronRunStatus, CronRunTelemetry, } from "../types.js"; import { computeJobPreviousRunAtMs, computeJobNextRunAtMs, nextWakeAtMs, recomputeNextRunsForMaintenance, recordScheduleComputeError, resolveJobPayloadTextForMain, } from "./jobs.js"; import { locked } from "./locked.js"; import type { CronEvent, CronServiceState } from "./state.js"; import { ensureLoaded, persist } from "./store.js"; import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js"; export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js"; const MAX_TIMER_DELAY_MS = 60_000; /** * Minimum gap between consecutive fires of the same cron job. This is a * safety net that prevents spin-loops when `computeJobNextRunAtMs` returns * a value within the same second as the just-completed run. The guard * is intentionally generous (2 s) so it never masks a legitimate schedule * but always breaks an infinite re-trigger cycle. (See #17821) */ const MIN_REFIRE_GAP_MS = 2_000; const DEFAULT_MISSED_JOB_STAGGER_MS = 5_000; const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5; const DEFAULT_FAILURE_ALERT_AFTER = 2; const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { jobId: string; delivered?: boolean; deliveryAttempted?: boolean; startedAt: number; endedAt: number; }; type StartupCatchupCandidate = { jobId: string; job: CronJob; }; type StartupCatchupPlan = { candidates: StartupCatchupCandidate[]; deferredJobIds: string[]; }; export async function executeJobCoreWithTimeout( state: CronServiceState, job: CronJob, ): Promise>> { const jobTimeoutMs = resolveCronJobTimeoutMs(job); if (typeof jobTimeoutMs !== "number") { return await executeJobCore(state, job); } const runAbortController = new AbortController(); let timeoutId: NodeJS.Timeout | undefined; try { return await Promise.race([ executeJobCore(state, job, runAbortController.signal), new Promise((_, reject) => { timeoutId = setTimeout(() => { runAbortController.abort(timeoutErrorMessage()); reject(new Error(timeoutErrorMessage())); }, jobTimeoutMs); }), ]); } finally { if (timeoutId) { clearTimeout(timeoutId); } } } function resolveRunConcurrency(state: CronServiceState): number { const raw = state.deps.cronConfig?.maxConcurrentRuns; if (typeof raw !== "number" || !Number.isFinite(raw)) { return 1; } return Math.max(1, Math.floor(raw)); } function timeoutErrorMessage(): string { return "cron: job execution timed out"; } function isAbortError(err: unknown): boolean { if (!(err instanceof Error)) { return false; } return err.name === "AbortError" || err.message === timeoutErrorMessage(); } /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. */ const DEFAULT_BACKOFF_SCHEDULE_MS = [ 30_000, // 1st error → 30 s 60_000, // 2nd error → 1 min 5 * 60_000, // 3rd error → 5 min 15 * 60_000, // 4th error → 15 min 60 * 60_000, // 5th+ error → 60 min ]; function errorBackoffMs( consecutiveErrors: number, scheduleMs = DEFAULT_BACKOFF_SCHEDULE_MS, ): number { const idx = Math.min(consecutiveErrors - 1, scheduleMs.length - 1); return scheduleMs[Math.max(0, idx)]; } /** Default max retries for one-shot jobs on transient errors (#24355). */ const DEFAULT_MAX_TRANSIENT_RETRIES = 3; const TRANSIENT_PATTERNS: Record = { rate_limit: /(rate[_ ]limit|too many requests|429|resource has been exhausted|cloudflare|tokens per day)/i, overloaded: /\b529\b|\boverloaded(?:_error)?\b|high demand|temporar(?:ily|y) overloaded|capacity exceeded/i, network: /(network|econnreset|econnrefused|fetch failed|socket)/i, timeout: /(timeout|etimedout)/i, server_error: /\b5\d{2}\b/, }; function isTransientCronError(error: string | undefined, retryOn?: CronRetryOn[]): boolean { if (!error || typeof error !== "string") { return false; } const keys = retryOn?.length ? retryOn : (Object.keys(TRANSIENT_PATTERNS) as CronRetryOn[]); return keys.some((k) => TRANSIENT_PATTERNS[k]?.test(error)); } function resolveRetryConfig(cronConfig?: CronConfig) { const retry = cronConfig?.retry; return { maxAttempts: typeof retry?.maxAttempts === "number" ? retry.maxAttempts : DEFAULT_MAX_TRANSIENT_RETRIES, backoffMs: Array.isArray(retry?.backoffMs) && retry.backoffMs.length > 0 ? retry.backoffMs : DEFAULT_BACKOFF_SCHEDULE_MS.slice(0, 3), retryOn: Array.isArray(retry?.retryOn) && retry.retryOn.length > 0 ? retry.retryOn : undefined, }; } function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): CronDeliveryStatus { if (params.delivered === true) { return "delivered"; } if (params.delivered === false) { return "not-delivered"; } return resolveCronDeliveryPlan(params.job).requested ? "unknown" : "not-requested"; } function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined { if (typeof input !== "string") { return undefined; } const channel = input.trim().toLowerCase(); return channel ? (channel as CronMessageChannel) : undefined; } function normalizeTo(input: unknown): string | undefined { if (typeof input !== "string") { return undefined; } const to = input.trim(); return to ? to : undefined; } function clampPositiveInt(value: unknown, fallback: number): number { if (typeof value !== "number" || !Number.isFinite(value)) { return fallback; } const floored = Math.floor(value); return floored >= 1 ? floored : fallback; } function clampNonNegativeInt(value: unknown, fallback: number): number { if (typeof value !== "number" || !Number.isFinite(value)) { return fallback; } const floored = Math.floor(value); return floored >= 0 ? floored : fallback; } function resolveFailureAlert( state: CronServiceState, job: CronJob, ): { after: number; cooldownMs: number; channel: CronMessageChannel; to?: string; mode?: "announce" | "webhook"; accountId?: string; } | null { const globalConfig = state.deps.cronConfig?.failureAlert; const jobConfig = job.failureAlert === false ? undefined : job.failureAlert; if (job.failureAlert === false) { return null; } if (!jobConfig && globalConfig?.enabled !== true) { return null; } const mode = jobConfig?.mode ?? globalConfig?.mode; const explicitTo = normalizeTo(jobConfig?.to); return { after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER), cooldownMs: clampNonNegativeInt( jobConfig?.cooldownMs ?? globalConfig?.cooldownMs, DEFAULT_FAILURE_ALERT_COOLDOWN_MS, ), channel: normalizeCronMessageChannel(jobConfig?.channel) ?? normalizeCronMessageChannel(job.delivery?.channel) ?? "last", to: mode === "webhook" ? explicitTo : (explicitTo ?? normalizeTo(job.delivery?.to)), mode, accountId: jobConfig?.accountId ?? globalConfig?.accountId, }; } function emitFailureAlert( state: CronServiceState, params: { job: CronJob; error?: string; consecutiveErrors: number; channel: CronMessageChannel; to?: string; mode?: "announce" | "webhook"; accountId?: string; }, ) { const safeJobName = params.job.name || params.job.id; const truncatedError = (params.error?.trim() || "unknown error").slice(0, 200); const text = [ `Cron job "${safeJobName}" failed ${params.consecutiveErrors} times`, `Last error: ${truncatedError}`, ].join("\n"); if (state.deps.sendCronFailureAlert) { void state.deps .sendCronFailureAlert({ job: params.job, text, channel: params.channel, to: params.to, mode: params.mode, accountId: params.accountId, }) .catch((err) => { state.deps.log.warn( { jobId: params.job.id, err: String(err) }, "cron: failure alert delivery failed", ); }); return; } state.deps.enqueueSystemEvent(text, { agentId: params.job.agentId }); if (params.job.wakeMode === "now") { state.deps.requestHeartbeatNow({ reason: `cron:${params.job.id}:failure-alert` }); } } const QUOTA_EXHAUSTED_BACKOFF_MS = 12 * 60 * 60_000; // 12 hours const MAX_CONSECUTIVE_QUOTA_ERRORS = 10; function isQuotaExhaustedError(error: string | undefined): boolean { if (!error) return false; return ( /exhausted|limit reached|429|usage limit|try again in/i.test(error) && /min|hour|day/i.test(error) ); } /** * Apply the result of a job execution to the job's state. * Handles consecutive error tracking, exponential backoff, one-shot disable, * and nextRunAtMs computation. Returns `true` if the job should be deleted. */ export function applyJobResult( state: CronServiceState, job: CronJob, result: { status: CronRunStatus; error?: string; delivered?: boolean; startedAt: number; endedAt: number; }, opts?: { // Preserve recurring "every" anchors for manual force runs. preserveSchedule?: boolean; }, ): boolean { const prevLastRunAtMs = job.state.lastRunAtMs; const computeNextWithPreservedLastRun = (nowMs: number) => { const saved = job.state.lastRunAtMs; job.state.lastRunAtMs = prevLastRunAtMs; try { return computeJobNextRunAtMs(job, nowMs); } finally { job.state.lastRunAtMs = saved; } }; job.state.runningAtMs = undefined; job.state.lastRunAtMs = result.startedAt; job.state.lastRunStatus = result.status; job.state.lastStatus = result.status; job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt); job.state.lastError = result.error; job.state.lastErrorReason = result.status === "error" && typeof result.error === "string" ? (resolveFailoverReasonFromError(result.error) ?? undefined) : undefined; job.state.lastDelivered = result.delivered; const deliveryStatus = resolveDeliveryStatus({ job, delivered: result.delivered }); job.state.lastDeliveryStatus = deliveryStatus; job.state.lastDeliveryError = deliveryStatus === "not-delivered" && result.error ? result.error : undefined; job.updatedAtMs = result.endedAt; // Track consecutive errors for backoff / auto-disable. if (result.status === "error") { job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1; // Circuit breaker for persistent quota errors if ( isQuotaExhaustedError(result.error) && job.state.consecutiveErrors >= MAX_CONSECUTIVE_QUOTA_ERRORS ) { job.enabled = false; job.state.nextRunAtMs = undefined; state.deps.log.error( { jobId: job.id, jobName: job.name, consecutiveErrors: job.state.consecutiveErrors }, "cron: circuit breaker triggered due to persistent quota exhaustion. Disabling job.", ); return false; } const alertConfig = resolveFailureAlert(state, job); if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) { const isBestEffort = job.delivery?.bestEffort === true || (job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true); if (!isBestEffort) { const now = state.deps.nowMs(); const lastAlert = job.state.lastFailureAlertAtMs; const inCooldown = typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs); if (!inCooldown) { emitFailureAlert(state, { job, error: result.error, consecutiveErrors: job.state.consecutiveErrors, channel: alertConfig.channel, to: alertConfig.to, mode: alertConfig.mode, accountId: alertConfig.accountId, }); job.state.lastFailureAlertAtMs = now; } } } } else { job.state.consecutiveErrors = 0; job.state.lastFailureAlertAtMs = undefined; } const shouldDelete = job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok"; if (!shouldDelete) { if (job.schedule.kind === "at") { if (result.status === "ok" || result.status === "skipped") { // One-shot done or skipped: disable to prevent tight-loop (#11452). job.enabled = false; job.state.nextRunAtMs = undefined; } else if (result.status === "error") { const retryConfig = resolveRetryConfig(state.deps.cronConfig); const transient = isTransientCronError(result.error, retryConfig.retryOn); // consecutiveErrors is always set to ≥1 by the increment block above. const consecutive = job.state.consecutiveErrors; if (transient && consecutive <= retryConfig.maxAttempts) { // Schedule retry with backoff (#24355). const backoff = errorBackoffMs(consecutive, retryConfig.backoffMs); job.state.nextRunAtMs = result.endedAt + backoff; state.deps.log.info( { jobId: job.id, jobName: job.name, consecutiveErrors: consecutive, backoffMs: backoff, nextRunAtMs: job.state.nextRunAtMs, }, "cron: scheduling one-shot retry after transient error", ); } else { // Permanent error or max retries exhausted: disable. // Note: deleteAfterRun:true only triggers on ok (see shouldDelete above), // so exhausted-retry jobs are disabled but intentionally kept in the store // to preserve the error state for inspection. job.enabled = false; job.state.nextRunAtMs = undefined; state.deps.log.warn( { jobId: job.id, jobName: job.name, consecutiveErrors: consecutive, error: result.error, reason: transient ? "max retries exhausted" : "permanent error", }, "cron: disabling one-shot job after error", ); } } } else if (result.status === "error" && job.enabled) { // Apply exponential backoff for errored jobs to prevent retry storms. let backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); // Much more aggressive backoff for real quota exhaustion if (isQuotaExhaustedError(result.error)) { backoff = Math.max(backoff, QUOTA_EXHAUSTED_BACKOFF_MS); } let normalNext: number | undefined; try { normalNext = opts?.preserveSchedule && job.schedule.kind === "every" ? computeNextWithPreservedLastRun(result.endedAt) : computeJobNextRunAtMs(job, result.endedAt); } catch (err) { // If the schedule expression/timezone throws (croner edge cases), // record the schedule error (auto-disables after repeated failures) // and fall back to backoff-only schedule so the state update is not lost. recordScheduleComputeError({ state, job, err }); } const backoffNext = result.endedAt + backoff; // Use whichever is later: the natural next run or the backoff delay. job.state.nextRunAtMs = normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext; state.deps.log.info( { jobId: job.id, consecutiveErrors: job.state.consecutiveErrors, backoffMs: backoff, nextRunAtMs: job.state.nextRunAtMs, }, "cron: applying error backoff", ); } else if (job.enabled) { let naturalNext: number | undefined; try { naturalNext = opts?.preserveSchedule && job.schedule.kind === "every" ? computeNextWithPreservedLastRun(result.endedAt) : computeJobNextRunAtMs(job, result.endedAt); } catch (err) { // If the schedule expression/timezone throws (croner edge cases), // record the schedule error (auto-disables after repeated failures) // so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop. recordScheduleComputeError({ state, job, err }); } if (job.schedule.kind === "cron") { // Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS // after the current run ended. Prevents spin-loops when the // schedule computation lands in the same second due to // timezone/croner edge cases (see #17821). const minNext = result.endedAt + MIN_REFIRE_GAP_MS; job.state.nextRunAtMs = naturalNext !== undefined ? Math.max(naturalNext, minNext) : minNext; } else { job.state.nextRunAtMs = naturalNext; } } else { job.state.nextRunAtMs = undefined; } } return shouldDelete; } function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void { const store = state.store; if (!store) { return; } const jobs = store.jobs; const job = jobs.find((entry) => entry.id === result.jobId); if (!job) { state.deps.log.warn( { jobId: result.jobId }, "cron: applyOutcomeToStoredJob — job not found after forceReload, result discarded", ); return; } const shouldDelete = applyJobResult(state, job, { status: result.status, error: result.error, delivered: result.delivered, startedAt: result.startedAt, endedAt: result.endedAt, }); emitJobFinished(state, job, result, result.startedAt); if (shouldDelete) { store.jobs = jobs.filter((entry) => entry.id !== job.id); emit(state, { jobId: job.id, action: "removed" }); } } export function armTimer(state: CronServiceState) { if (state.timer) { clearTimeout(state.timer); } state.timer = null; if (!state.deps.cronEnabled) { state.deps.log.debug({}, "cron: armTimer skipped - scheduler disabled"); return; } const nextAt = nextWakeAtMs(state); if (!nextAt) { const jobCount = state.store?.jobs.length ?? 0; const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0; const withNextRun = state.store?.jobs.filter( (j) => j.enabled && typeof j.state.nextRunAtMs === "number" && Number.isFinite(j.state.nextRunAtMs), ).length ?? 0; state.deps.log.debug( { jobCount, enabledCount, withNextRun }, "cron: armTimer skipped - no jobs with nextRunAtMs", ); return; } const now = state.deps.nowMs(); const delay = Math.max(nextAt - now, 0); // Floor: when the next wake time is in the past (delay === 0), enforce a // minimum delay to prevent a tight setTimeout(0) loop. This can happen // when a job has a stuck runningAtMs marker and a past-due nextRunAtMs: // findDueJobs skips the job (blocked by runningAtMs), while // recomputeNextRunsForMaintenance intentionally does not advance the // past-due nextRunAtMs (per #13992). The finally block in onTimer then // re-invokes armTimer with delay === 0, creating an infinite hot-loop // that saturates the event loop and fills the log file to its size cap. const flooredDelay = delay === 0 ? MIN_REFIRE_GAP_MS : delay; // Wake at least once a minute to avoid schedule drift and recover quickly // when the process was paused or wall-clock time jumps. const clampedDelay = Math.min(flooredDelay, MAX_TIMER_DELAY_MS); // Intentionally avoid an `async` timer callback: // Vitest's fake-timer helpers can await async callbacks, which would block // tests that simulate long-running jobs. Runtime behavior is unchanged. state.timer = setTimeout(() => { void onTimer(state).catch((err) => { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); }); }, clampedDelay); state.deps.log.debug( { nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS }, "cron: timer armed", ); } function armRunningRecheckTimer(state: CronServiceState) { if (state.timer) { clearTimeout(state.timer); } state.timer = setTimeout(() => { void onTimer(state).catch((err) => { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); }); }, MAX_TIMER_DELAY_MS); } export async function onTimer(state: CronServiceState) { if (state.running) { // Re-arm the timer so the scheduler keeps ticking even when a job is // still executing. Without this, a long-running job (e.g. an agentTurn // exceeding MAX_TIMER_DELAY_MS) causes the clamped 60 s timer to fire // while `running` is true. The early return then leaves no timer set, // silently killing the scheduler until the next gateway restart. // // We use MAX_TIMER_DELAY_MS as a fixed re-check interval to avoid a // zero-delay hot-loop when past-due jobs are waiting for the current // execution to finish. // See: https://github.com/openclaw/openclaw/issues/12025 armRunningRecheckTimer(state); return; } state.running = true; // Keep a watchdog timer armed while a tick is executing. If execution hangs // (for example in a provider call), the scheduler still wakes to re-check. armRunningRecheckTimer(state); try { const dueJobs = await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); const dueCheckNow = state.deps.nowMs(); const due = collectRunnableJobs(state, dueCheckNow); if (due.length === 0) { // Use maintenance-only recompute to avoid advancing past-due nextRunAtMs // values without execution. This prevents jobs from being silently skipped // when the timer wakes up but findDueJobs returns empty (see #13992). const changed = recomputeNextRunsForMaintenance(state, { recomputeExpired: true, nowMs: dueCheckNow, }); if (changed) { await persist(state); } return []; } const now = state.deps.nowMs(); for (const job of due) { job.state.runningAtMs = now; job.state.lastError = undefined; } await persist(state); return due.map((j) => ({ id: j.id, job: j, })); }); const runDueJob = async (params: { id: string; job: CronJob; }): Promise => { const { id, job } = params; const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); const jobTimeoutMs = resolveCronJobTimeoutMs(job); try { const result = await executeJobCoreWithTimeout(state, job); return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }; } catch (err) { const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err); state.deps.log.warn( { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, `cron: job failed: ${errorText}`, ); return { jobId: id, status: "error", error: errorText, startedAt, endedAt: state.deps.nowMs(), }; } }; const concurrency = Math.min(resolveRunConcurrency(state), Math.max(1, dueJobs.length)); const results: (TimedCronRunOutcome | undefined)[] = Array.from({ length: dueJobs.length }); let cursor = 0; const workers = Array.from({ length: concurrency }, async () => { for (;;) { const index = cursor++; if (index >= dueJobs.length) { return; } const due = dueJobs[index]; if (!due) { return; } results[index] = await runDueJob(due); } }); await Promise.all(workers); const completedResults: TimedCronRunOutcome[] = results.filter( (entry): entry is TimedCronRunOutcome => entry !== undefined, ); if (completedResults.length > 0) { await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); for (const result of completedResults) { applyOutcomeToStoredJob(state, result); } // Use maintenance-only recompute to avoid advancing past-due // nextRunAtMs values that became due between findDueJobs and this // locked block. The full recomputeNextRuns would silently skip // those jobs (advancing nextRunAtMs without execution), causing // daily cron schedules to jump 48 h instead of 24 h (#17852). recomputeNextRunsForMaintenance(state); await persist(state); }); } } finally { // Piggyback session reaper on timer tick (self-throttled to every 5 min). // Placed in `finally` so the reaper runs even when a long-running job keeps // `state.running` true across multiple timer ticks — the early return at the // top of onTimer would otherwise skip the reaper indefinitely. const storePaths = new Set(); if (state.deps.resolveSessionStorePath) { const defaultAgentId = state.deps.defaultAgentId ?? DEFAULT_AGENT_ID; if (state.store?.jobs?.length) { for (const job of state.store.jobs) { const agentId = typeof job.agentId === "string" && job.agentId.trim() ? job.agentId : defaultAgentId; storePaths.add(state.deps.resolveSessionStorePath(agentId)); } } else { storePaths.add(state.deps.resolveSessionStorePath(defaultAgentId)); } } else if (state.deps.sessionStorePath) { storePaths.add(state.deps.sessionStorePath); } if (storePaths.size > 0) { const nowMs = state.deps.nowMs(); for (const storePath of storePaths) { try { await sweepCronRunSessions({ cronConfig: state.deps.cronConfig, sessionStorePath: storePath, nowMs, log: state.deps.log, }); } catch (err) { state.deps.log.warn({ err: String(err), storePath }, "cron: session reaper sweep failed"); } } } state.running = false; armTimer(state); } } function isRunnableJob(params: { job: CronJob; nowMs: number; skipJobIds?: ReadonlySet; skipAtIfAlreadyRan?: boolean; allowCronMissedRunByLastRun?: boolean; }): boolean { const { job, nowMs } = params; if (!job.state) { job.state = {}; } if (!job.enabled) { return false; } if (params.skipJobIds?.has(job.id)) { return false; } if (typeof job.state.runningAtMs === "number") { return false; } if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) { // One-shot with terminal status: skip unless it's a transient-error retry. // Retries have nextRunAtMs > lastRunAtMs (scheduled after the failed run) (#24355). // ok/skipped or error-without-retry always skip (#13845). const lastRun = job.state.lastRunAtMs; const nextRun = job.state.nextRunAtMs; if ( job.state.lastStatus === "error" && job.enabled && typeof nextRun === "number" && typeof lastRun === "number" && nextRun > lastRun ) { return nowMs >= nextRun; } return false; } const next = job.state.nextRunAtMs; if (typeof next === "number" && Number.isFinite(next) && nowMs >= next) { return true; } if ( typeof next === "number" && Number.isFinite(next) && next > nowMs && isErrorBackoffPending(job, nowMs) ) { // Respect active retry backoff windows on restart, but allow missed-slot // replay once the backoff window has elapsed. return false; } if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") { return false; } let previousRunAtMs: number | undefined; try { previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs); } catch { return false; } if (typeof previousRunAtMs !== "number" || !Number.isFinite(previousRunAtMs)) { return false; } const lastRunAtMs = job.state.lastRunAtMs; if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) { // Only replay a "missed slot" when there is concrete run history. return false; } return previousRunAtMs > lastRunAtMs; } function isErrorBackoffPending(job: CronJob, nowMs: number): boolean { if (job.schedule.kind === "at" || job.state.lastStatus !== "error") { return false; } const lastRunAtMs = job.state.lastRunAtMs; if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) { return false; } const consecutiveErrorsRaw = job.state.consecutiveErrors; const consecutiveErrors = typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw) ? Math.max(1, Math.floor(consecutiveErrorsRaw)) : 1; return nowMs < lastRunAtMs + errorBackoffMs(consecutiveErrors); } function collectRunnableJobs( state: CronServiceState, nowMs: number, opts?: { skipJobIds?: ReadonlySet; skipAtIfAlreadyRan?: boolean; allowCronMissedRunByLastRun?: boolean; }, ): CronJob[] { if (!state.store) { return []; } return state.store.jobs.filter((job) => isRunnableJob({ job, nowMs, skipJobIds: opts?.skipJobIds, skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan, allowCronMissedRunByLastRun: opts?.allowCronMissedRunByLastRun, }), ); } export async function runMissedJobs( state: CronServiceState, opts?: { skipJobIds?: ReadonlySet }, ) { const plan = await planStartupCatchup(state, opts); if (plan.candidates.length === 0 && plan.deferredJobIds.length === 0) { return; } const outcomes = await executeStartupCatchupPlan(state, plan); await applyStartupCatchupOutcomes(state, plan, outcomes); } async function planStartupCatchup( state: CronServiceState, opts?: { skipJobIds?: ReadonlySet }, ): Promise { const maxImmediate = Math.max( 0, state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART, ); return locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); if (!state.store) { return { candidates: [], deferredJobIds: [] }; } const now = state.deps.nowMs(); const missed = collectRunnableJobs(state, now, { skipJobIds: opts?.skipJobIds, skipAtIfAlreadyRan: true, allowCronMissedRunByLastRun: true, }); if (missed.length === 0) { return { candidates: [], deferredJobIds: [] }; } const sorted = missed.toSorted( (a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0), ); const startupCandidates = sorted.slice(0, maxImmediate); const deferred = sorted.slice(maxImmediate); if (deferred.length > 0) { state.deps.log.info( { immediateCount: startupCandidates.length, deferredCount: deferred.length, totalMissed: missed.length, }, "cron: staggering missed jobs to prevent gateway overload", ); } if (startupCandidates.length > 0) { state.deps.log.info( { count: startupCandidates.length, jobIds: startupCandidates.map((j) => j.id) }, "cron: running missed jobs after restart", ); } for (const job of startupCandidates) { job.state.runningAtMs = now; job.state.lastError = undefined; } await persist(state); return { candidates: startupCandidates.map((job) => ({ jobId: job.id, job })), deferredJobIds: deferred.map((job) => job.id), }; }); } async function executeStartupCatchupPlan( state: CronServiceState, plan: StartupCatchupPlan, ): Promise { const outcomes: TimedCronRunOutcome[] = []; for (const candidate of plan.candidates) { outcomes.push(await runStartupCatchupCandidate(state, candidate)); } return outcomes; } async function runStartupCatchupCandidate( state: CronServiceState, candidate: StartupCatchupCandidate, ): Promise { const startedAt = state.deps.nowMs(); emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt }); try { const result = await executeJobCoreWithTimeout(state, candidate.job); return { jobId: candidate.jobId, status: result.status, error: result.error, summary: result.summary, delivered: result.delivered, sessionId: result.sessionId, sessionKey: result.sessionKey, model: result.model, provider: result.provider, usage: result.usage, startedAt, endedAt: state.deps.nowMs(), }; } catch (err) { return { jobId: candidate.jobId, status: "error", error: String(err), startedAt, endedAt: state.deps.nowMs(), }; } } async function applyStartupCatchupOutcomes( state: CronServiceState, plan: StartupCatchupPlan, outcomes: TimedCronRunOutcome[], ): Promise { const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS); await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); if (!state.store) { return; } for (const result of outcomes) { applyOutcomeToStoredJob(state, result); } if (plan.deferredJobIds.length > 0) { const baseNow = state.deps.nowMs(); let offset = staggerMs; for (const jobId of plan.deferredJobIds) { const job = state.store.jobs.find((entry) => entry.id === jobId); if (!job || !job.enabled) { continue; } job.state.nextRunAtMs = baseNow + offset; offset += staggerMs; } } // Preserve any new past-due nextRunAtMs values that became due while // startup catch-up was running. They should execute on a future tick // instead of being silently advanced. recomputeNextRunsForMaintenance(state); await persist(state); }); } export async function runDueJobs(state: CronServiceState) { if (!state.store) { return; } const now = state.deps.nowMs(); const due = collectRunnableJobs(state, now); for (const job of due) { await executeJob(state, job, now, { forced: false }); } } export async function executeJobCore( state: CronServiceState, job: CronJob, abortSignal?: AbortSignal, ): Promise< CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } > { const resolveAbortError = () => ({ status: "error" as const, error: timeoutErrorMessage(), }); const waitWithAbort = async (ms: number) => { if (!abortSignal) { await new Promise((resolve) => setTimeout(resolve, ms)); return; } if (abortSignal.aborted) { return; } await new Promise((resolve) => { const timer = setTimeout(() => { abortSignal.removeEventListener("abort", onAbort); resolve(); }, ms); const onAbort = () => { clearTimeout(timer); abortSignal.removeEventListener("abort", onAbort); resolve(); }; abortSignal.addEventListener("abort", onAbort, { once: true }); }); }; if (abortSignal?.aborted) { return resolveAbortError(); } if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); if (!text) { const kind = job.payload.kind; return { status: "skipped", error: kind === "systemEvent" ? "main job requires non-empty systemEvent text" : 'main job requires payload.kind="systemEvent"', }; } // Preserve the job session namespace for main-target reminders so heartbeat // routing can deliver follow-through in the originating channel/thread. // Downstream gateway wiring canonicalizes/guards this key per agent. const targetMainSessionKey = job.sessionKey; state.deps.enqueueSystemEvent(text, { agentId: job.agentId, sessionKey: targetMainSessionKey, contextKey: `cron:${job.id}`, }); if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { const reason = `cron:${job.id}`; const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000; const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250; const waitStartedAt = state.deps.nowMs(); let heartbeatResult: HeartbeatRunResult; for (;;) { if (abortSignal?.aborted) { return resolveAbortError(); } heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId, sessionKey: targetMainSessionKey, // Cron-triggered heartbeats should deliver to the last active channel. // Without this override, heartbeat target defaults to "none" (since // e2362d35) and cron main-session responses are silently swallowed. // See: https://github.com/openclaw/openclaw/issues/28508 heartbeat: { target: "last" }, }); if ( heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight" ) { break; } if (abortSignal?.aborted) { return resolveAbortError(); } if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { if (abortSignal?.aborted) { return resolveAbortError(); } state.deps.requestHeartbeatNow({ reason, agentId: job.agentId, sessionKey: targetMainSessionKey, }); return { status: "ok", summary: text }; } await waitWithAbort(retryDelayMs); } if (heartbeatResult.status === "ran") { return { status: "ok", summary: text }; } else if (heartbeatResult.status === "skipped") { return { status: "skipped", error: heartbeatResult.reason, summary: text }; } else { return { status: "error", error: heartbeatResult.reason, summary: text }; } } else { if (abortSignal?.aborted) { return resolveAbortError(); } state.deps.requestHeartbeatNow({ reason: `cron:${job.id}`, agentId: job.agentId, sessionKey: targetMainSessionKey, }); return { status: "ok", summary: text }; } } if (job.payload.kind !== "agentTurn") { return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; } if (abortSignal?.aborted) { return resolveAbortError(); } const res = await state.deps.runIsolatedAgentJob({ job, message: job.payload.message, abortSignal, }); if (abortSignal?.aborted) { return { status: "error", error: timeoutErrorMessage() }; } return { status: res.status, error: res.error, summary: res.summary, delivered: res.delivered, deliveryAttempted: res.deliveryAttempted, sessionId: res.sessionId, sessionKey: res.sessionKey, model: res.model, provider: res.provider, usage: res.usage, }; } /** * Execute a job. This version is used by the `run` command and other * places that need the full execution with state updates. */ export async function executeJob( state: CronServiceState, job: CronJob, _nowMs: number, _opts: { forced: boolean }, ) { if (!job.state) { job.state = {}; } const startedAt = state.deps.nowMs(); job.state.runningAtMs = startedAt; job.state.lastError = undefined; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); let coreResult: { status: CronRunStatus; delivered?: boolean; } & CronRunOutcome & CronRunTelemetry; try { coreResult = await executeJobCore(state, job); } catch (err) { coreResult = { status: "error", error: String(err) }; } const endedAt = state.deps.nowMs(); const shouldDelete = applyJobResult(state, job, { status: coreResult.status, error: coreResult.error, delivered: coreResult.delivered, startedAt, endedAt, }); emitJobFinished(state, job, coreResult, startedAt); if (shouldDelete && state.store) { state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); emit(state, { jobId: job.id, action: "removed" }); } } function emitJobFinished( state: CronServiceState, job: CronJob, result: { status: CronRunStatus; delivered?: boolean; } & CronRunOutcome & CronRunTelemetry, runAtMs: number, ) { emit(state, { jobId: job.id, action: "finished", status: result.status, error: result.error, summary: result.summary, delivered: result.delivered, deliveryStatus: job.state.lastDeliveryStatus, deliveryError: job.state.lastDeliveryError, sessionId: result.sessionId, sessionKey: result.sessionKey, runAtMs, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, model: result.model, provider: result.provider, usage: result.usage, }); } export function wake( state: CronServiceState, opts: { mode: "now" | "next-heartbeat"; text: string }, ) { const text = opts.text.trim(); if (!text) { return { ok: false } as const; } state.deps.enqueueSystemEvent(text); if (opts.mode === "now") { state.deps.requestHeartbeatNow({ reason: "wake" }); } return { ok: true } as const; } export function stopTimer(state: CronServiceState) { if (state.timer) { clearTimeout(state.timer); } state.timer = null; } export function emit(state: CronServiceState, evt: CronEvent) { try { state.deps.onEvent?.(evt); } catch { /* ignore */ } }