| import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js"; |
| import { |
| readCronRunLogEntriesPage, |
| readCronRunLogEntriesPageAll, |
| resolveCronRunLogPath, |
| } from "../../cron/run-log.js"; |
| import type { CronJobCreate, CronJobPatch } from "../../cron/types.js"; |
| import { validateScheduleTimestamp } from "../../cron/validate-timestamp.js"; |
| import { |
| ErrorCodes, |
| errorShape, |
| formatValidationErrors, |
| validateCronAddParams, |
| validateCronListParams, |
| validateCronRemoveParams, |
| validateCronRunParams, |
| validateCronRunsParams, |
| validateCronStatusParams, |
| validateCronUpdateParams, |
| validateWakeParams, |
| } from "../protocol/index.js"; |
| import type { GatewayRequestHandlers } from "./types.js"; |
|
|
| export const cronHandlers: GatewayRequestHandlers = { |
| wake: ({ params, respond, context }) => { |
| if (!validateWakeParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid wake params: ${formatValidationErrors(validateWakeParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { |
| mode: "now" | "next-heartbeat"; |
| text: string; |
| }; |
| const result = context.cron.wake({ mode: p.mode, text: p.text }); |
| respond(true, result, undefined); |
| }, |
| "cron.list": async ({ params, respond, context }) => { |
| if (!validateCronListParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.list params: ${formatValidationErrors(validateCronListParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { |
| includeDisabled?: boolean; |
| limit?: number; |
| offset?: number; |
| query?: string; |
| enabled?: "all" | "enabled" | "disabled"; |
| sortBy?: "nextRunAtMs" | "updatedAtMs" | "name"; |
| sortDir?: "asc" | "desc"; |
| }; |
| const page = await context.cron.listPage({ |
| includeDisabled: p.includeDisabled, |
| limit: p.limit, |
| offset: p.offset, |
| query: p.query, |
| enabled: p.enabled, |
| sortBy: p.sortBy, |
| sortDir: p.sortDir, |
| }); |
| respond(true, page, undefined); |
| }, |
| "cron.status": async ({ params, respond, context }) => { |
| if (!validateCronStatusParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.status params: ${formatValidationErrors(validateCronStatusParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const status = await context.cron.status(); |
| respond(true, status, undefined); |
| }, |
| "cron.add": async ({ params, respond, context }) => { |
| const normalized = normalizeCronJobCreate(params) ?? params; |
| if (!validateCronAddParams(normalized)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.add params: ${formatValidationErrors(validateCronAddParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const jobCreate = normalized as unknown as CronJobCreate; |
| const timestampValidation = validateScheduleTimestamp(jobCreate.schedule); |
| if (!timestampValidation.ok) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, timestampValidation.message), |
| ); |
| return; |
| } |
| const job = await context.cron.add(jobCreate); |
| context.logGateway.info("cron: job created", { jobId: job.id, schedule: jobCreate.schedule }); |
| respond(true, job, undefined); |
| }, |
| "cron.update": async ({ params, respond, context }) => { |
| const normalizedPatch = normalizeCronJobPatch((params as { patch?: unknown } | null)?.patch); |
| const candidate = |
| normalizedPatch && typeof params === "object" && params !== null |
| ? { ...params, patch: normalizedPatch } |
| : params; |
| if (!validateCronUpdateParams(candidate)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.update params: ${formatValidationErrors(validateCronUpdateParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = candidate as { |
| id?: string; |
| jobId?: string; |
| patch: Record<string, unknown>; |
| }; |
| const jobId = p.id ?? p.jobId; |
| if (!jobId) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.update params: missing id"), |
| ); |
| return; |
| } |
| const patch = p.patch as unknown as CronJobPatch; |
| if (patch.schedule) { |
| const timestampValidation = validateScheduleTimestamp(patch.schedule); |
| if (!timestampValidation.ok) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, timestampValidation.message), |
| ); |
| return; |
| } |
| } |
| const job = await context.cron.update(jobId, patch); |
| context.logGateway.info("cron: job updated", { jobId }); |
| respond(true, job, undefined); |
| }, |
| "cron.remove": async ({ params, respond, context }) => { |
| if (!validateCronRemoveParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.remove params: ${formatValidationErrors(validateCronRemoveParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { id?: string; jobId?: string }; |
| const jobId = p.id ?? p.jobId; |
| if (!jobId) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.remove params: missing id"), |
| ); |
| return; |
| } |
| const result = await context.cron.remove(jobId); |
| if (result.removed) { |
| context.logGateway.info("cron: job removed", { jobId }); |
| } |
| respond(true, result, undefined); |
| }, |
| "cron.run": async ({ params, respond, context }) => { |
| if (!validateCronRunParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.run params: ${formatValidationErrors(validateCronRunParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { id?: string; jobId?: string; mode?: "due" | "force" }; |
| const jobId = p.id ?? p.jobId; |
| if (!jobId) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.run params: missing id"), |
| ); |
| return; |
| } |
| const result = await context.cron.enqueueRun(jobId, p.mode ?? "force"); |
| respond(true, result, undefined); |
| }, |
| "cron.runs": async ({ params, respond, context }) => { |
| if (!validateCronRunsParams(params)) { |
| respond( |
| false, |
| undefined, |
| errorShape( |
| ErrorCodes.INVALID_REQUEST, |
| `invalid cron.runs params: ${formatValidationErrors(validateCronRunsParams.errors)}`, |
| ), |
| ); |
| return; |
| } |
| const p = params as { |
| scope?: "job" | "all"; |
| id?: string; |
| jobId?: string; |
| limit?: number; |
| offset?: number; |
| statuses?: Array<"ok" | "error" | "skipped">; |
| status?: "all" | "ok" | "error" | "skipped"; |
| deliveryStatuses?: Array<"delivered" | "not-delivered" | "unknown" | "not-requested">; |
| deliveryStatus?: "delivered" | "not-delivered" | "unknown" | "not-requested"; |
| query?: string; |
| sortDir?: "asc" | "desc"; |
| }; |
| const explicitScope = p.scope; |
| const jobId = p.id ?? p.jobId; |
| const scope: "job" | "all" = explicitScope ?? (jobId ? "job" : "all"); |
| if (scope === "job" && !jobId) { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.runs params: missing id"), |
| ); |
| return; |
| } |
| if (scope === "all") { |
| const jobs = await context.cron.list({ includeDisabled: true }); |
| const jobNameById = Object.fromEntries( |
| jobs |
| .filter((job) => typeof job.id === "string" && typeof job.name === "string") |
| .map((job) => [job.id, job.name]), |
| ); |
| const page = await readCronRunLogEntriesPageAll({ |
| storePath: context.cronStorePath, |
| limit: p.limit, |
| offset: p.offset, |
| statuses: p.statuses, |
| status: p.status, |
| deliveryStatuses: p.deliveryStatuses, |
| deliveryStatus: p.deliveryStatus, |
| query: p.query, |
| sortDir: p.sortDir, |
| jobNameById, |
| }); |
| respond(true, page, undefined); |
| return; |
| } |
| let logPath: string; |
| try { |
| logPath = resolveCronRunLogPath({ |
| storePath: context.cronStorePath, |
| jobId: jobId as string, |
| }); |
| } catch { |
| respond( |
| false, |
| undefined, |
| errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.runs params: invalid id"), |
| ); |
| return; |
| } |
| const page = await readCronRunLogEntriesPage(logPath, { |
| limit: p.limit, |
| offset: p.offset, |
| jobId: jobId as string, |
| statuses: p.statuses, |
| status: p.status, |
| deliveryStatuses: p.deliveryStatuses, |
| deliveryStatus: p.deliveryStatus, |
| query: p.query, |
| sortDir: p.sortDir, |
| }); |
| respond(true, page, undefined); |
| }, |
| }; |
|
|