| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import { and, eq, inArray, sql } from "drizzle-orm"; |
| import { db, taskLedger, type TaskLedgerRow } from "@workspace/db"; |
| import { newId } from "./ids"; |
| import { logger } from "./logger"; |
|
|
| export type TaskLedgerStatus = |
| | "pending" |
| | "running" |
| | "done" |
| | "failed" |
| | "skipped"; |
|
|
| export interface SeedTaskInput { |
| planId: string; |
| taskKey: string; |
| params?: Record<string, unknown>; |
| maxAttempts?: number; |
| } |
|
|
| |
| |
| |
| |
| export async function seedTasks(rows: SeedTaskInput[]): Promise<number> { |
| if (rows.length === 0) return 0; |
| let inserted = 0; |
| for (const r of rows) { |
| try { |
| await db.insert(taskLedger).values({ |
| id: newId("tldg"), |
| planId: r.planId, |
| taskKey: r.taskKey, |
| params: r.params ?? {}, |
| maxAttempts: r.maxAttempts ?? 3, |
| status: "pending", |
| }); |
| inserted++; |
| } catch (err) { |
| |
| const msg = err instanceof Error ? err.message : String(err); |
| if (!/duplicate|unique/i.test(msg)) throw err; |
| } |
| } |
| return inserted; |
| } |
|
|
| export async function listTasks( |
| planId: string, |
| statusFilter?: TaskLedgerStatus[], |
| ): Promise<TaskLedgerRow[]> { |
| const where = |
| statusFilter && statusFilter.length > 0 |
| ? and(eq(taskLedger.planId, planId), inArray(taskLedger.status, statusFilter)) |
| : eq(taskLedger.planId, planId); |
| return db.select().from(taskLedger).where(where); |
| } |
|
|
| export async function ledgerSummary(planId: string): Promise<{ |
| total: number; |
| pending: number; |
| running: number; |
| done: number; |
| failed: number; |
| skipped: number; |
| }> { |
| const rows = await db |
| .select() |
| .from(taskLedger) |
| .where(eq(taskLedger.planId, planId)); |
| const out = { |
| total: rows.length, |
| pending: 0, |
| running: 0, |
| done: 0, |
| failed: 0, |
| skipped: 0, |
| }; |
| for (const r of rows) { |
| if (r.status in out) (out as Record<string, number>)[r.status]++; |
| } |
| return out; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function claimNextBatch( |
| planId: string, |
| n: number, |
| ): Promise<TaskLedgerRow[]> { |
| if (n <= 0) return []; |
| const result = await db.execute<{ |
| id: string; |
| plan_id: string; |
| task_key: string; |
| params: Record<string, unknown>; |
| status: TaskLedgerStatus; |
| attempts: number; |
| max_attempts: number; |
| result: Record<string, unknown> | null; |
| error_text: string | null; |
| metrics: Record<string, unknown> | null; |
| duration_ms: number | null; |
| started_at: Date | null; |
| finished_at: Date | null; |
| created_at: Date; |
| updated_at: Date; |
| }>(sql` |
| WITH claimed AS ( |
| SELECT id FROM task_ledger |
| WHERE plan_id = ${planId} AND status = 'pending' |
| ORDER BY created_at, id |
| LIMIT ${n} |
| FOR UPDATE SKIP LOCKED |
| ) |
| UPDATE task_ledger |
| SET status = 'running', |
| attempts = task_ledger.attempts + 1, |
| started_at = NOW(), |
| updated_at = NOW() |
| FROM claimed |
| WHERE task_ledger.id = claimed.id |
| RETURNING task_ledger.* |
| `); |
| const rows = (result as unknown as { rows?: unknown[] }).rows ?? |
| (result as unknown as unknown[]); |
| return (rows as Array<{ |
| id: string; |
| plan_id: string; |
| task_key: string; |
| params: Record<string, unknown>; |
| status: TaskLedgerStatus; |
| attempts: number; |
| max_attempts: number; |
| result: Record<string, unknown> | null; |
| error_text: string | null; |
| metrics: Record<string, unknown> | null; |
| duration_ms: number | null; |
| started_at: Date | string | null; |
| finished_at: Date | string | null; |
| created_at: Date | string; |
| updated_at: Date | string; |
| }>).map((r) => ({ |
| id: r.id, |
| planId: r.plan_id, |
| taskKey: r.task_key, |
| params: r.params, |
| status: r.status, |
| attempts: r.attempts, |
| maxAttempts: r.max_attempts, |
| result: r.result, |
| errorText: r.error_text, |
| metrics: r.metrics, |
| durationMs: r.duration_ms, |
| startedAt: r.started_at ? new Date(r.started_at) : null, |
| finishedAt: r.finished_at ? new Date(r.finished_at) : null, |
| createdAt: new Date(r.created_at), |
| updatedAt: new Date(r.updated_at), |
| })) as TaskLedgerRow[]; |
| } |
|
|
| async function markDone( |
| row: TaskLedgerRow, |
| result: Record<string, unknown>, |
| metrics: Record<string, unknown> | null, |
| startMs: number, |
| ): Promise<void> { |
| await db |
| .update(taskLedger) |
| .set({ |
| status: "done", |
| result, |
| metrics, |
| durationMs: Date.now() - startMs, |
| finishedAt: new Date(), |
| updatedAt: new Date(), |
| }) |
| .where(eq(taskLedger.id, row.id)); |
| } |
|
|
| async function markFailed( |
| row: TaskLedgerRow, |
| error: unknown, |
| startMs: number, |
| willRetry: boolean, |
| ): Promise<void> { |
| const errText = error instanceof Error ? error.message : String(error); |
| await db |
| .update(taskLedger) |
| .set({ |
| status: willRetry ? "pending" : "failed", |
| errorText: errText, |
| durationMs: Date.now() - startMs, |
| finishedAt: new Date(), |
| updatedAt: new Date(), |
| }) |
| .where(eq(taskLedger.id, row.id)); |
| } |
|
|
| export type TaskRunner<O> = (row: TaskLedgerRow) => Promise<{ |
| result: O; |
| metrics?: Record<string, unknown>; |
| }>; |
|
|
| export interface RunPlanOptions { |
| concurrency?: number; |
| onProgress?: (snapshot: { |
| completed: number; |
| failed: number; |
| total: number; |
| last?: { taskKey: string; status: TaskLedgerStatus; durationMs?: number }; |
| }) => void; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| export async function runPlan<O extends Record<string, unknown>>( |
| planId: string, |
| runner: TaskRunner<O>, |
| opts: RunPlanOptions = {}, |
| ): Promise<{ |
| completed: number; |
| failed: number; |
| total: number; |
| }> { |
| const concurrency = Math.max(1, Math.min(opts.concurrency ?? 8, 32)); |
| |
| |
| |
| |
| |
| |
| |
| const stale = await db |
| .update(taskLedger) |
| .set({ |
| status: "pending", |
| errorText: "recovered_from_stale_running", |
| updatedAt: new Date(), |
| }) |
| .where( |
| and(eq(taskLedger.planId, planId), eq(taskLedger.status, "running")), |
| ) |
| .returning({ id: taskLedger.id }); |
| if (stale.length > 0) { |
| logger.warn( |
| { planId, recovered: stale.length }, |
| "task-ledger: recovered stale running rows on resume", |
| ); |
| } |
| const totalRows = await db |
| .select({ id: taskLedger.id }) |
| .from(taskLedger) |
| .where(eq(taskLedger.planId, planId)); |
| const total = totalRows.length; |
| let completed = 0; |
| let failed = 0; |
| |
| |
| |
| |
| const runOne = async (row: TaskLedgerRow): Promise<void> => { |
| const startMs = Date.now(); |
| try { |
| const out = await runner(row); |
| await markDone(row, out.result, out.metrics ?? null, startMs); |
| completed++; |
| opts.onProgress?.({ |
| completed, |
| failed, |
| total, |
| last: { |
| taskKey: row.taskKey, |
| status: "done", |
| durationMs: Date.now() - startMs, |
| }, |
| }); |
| } catch (err) { |
| |
| |
| |
| const willRetry = row.attempts < row.maxAttempts; |
| await markFailed(row, err, startMs, willRetry); |
| if (!willRetry) { |
| failed++; |
| logger.warn( |
| { err, taskKey: row.taskKey, attempts: row.attempts }, |
| "task-ledger: row exhausted retries", |
| ); |
| opts.onProgress?.({ |
| completed, |
| failed, |
| total, |
| last: { |
| taskKey: row.taskKey, |
| status: "failed", |
| durationMs: Date.now() - startMs, |
| }, |
| }); |
| } else { |
| opts.onProgress?.({ |
| completed, |
| failed, |
| total, |
| last: { |
| taskKey: row.taskKey, |
| status: "pending", |
| durationMs: Date.now() - startMs, |
| }, |
| }); |
| } |
| } |
| }; |
| |
| |
| |
| |
| |
| |
| |
| const inflight = new Set<Promise<void>>(); |
| |
| let safety = total * 8 + 64; |
| while (safety-- > 0) { |
| const slots = concurrency - inflight.size; |
| const claimed = slots > 0 ? await claimNextBatch(planId, slots) : []; |
| if (claimed.length > 0) { |
| for (const row of claimed) { |
| const p = runOne(row).finally(() => { |
| inflight.delete(p); |
| }); |
| inflight.add(p); |
| } |
| continue; |
| } |
| if (inflight.size === 0) { |
| |
| break; |
| } |
| await Promise.race(inflight); |
| } |
| |
| if (inflight.size > 0) { |
| await Promise.allSettled(inflight); |
| } |
| return { completed, failed, total }; |
| } |
|
|