doatlas-2 / artifacts /api-server /src /lib /task-ledger.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* task-ledger — durable per-task state for long-running multi-task jobs
* (Task #176, Wave A).
*
* Each plan (execution_plans row) owns N task_ledger rows keyed by
* (plan_id, task_key). The ledger survives restarts: on resume the
* runner re-walks rows in `pending|running` status, retrying up to
* `max_attempts` and respecting the bounded concurrency.
*
* This module is storage + a simple bounded-pool scheduler. The actual
* task work is supplied by the caller as an async function.
*/
import { and, eq, inArray, sql } from "drizzle-orm";
import { db, taskLedger, type TaskLedgerRow } from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
export type TaskLedgerStatus =
| "pending"
| "running"
| "done"
| "failed"
| "skipped";
export interface SeedTaskInput {
planId: string;
taskKey: string;
params?: Record<string, unknown>;
maxAttempts?: number;
}
/**
* Idempotently insert task rows. Existing (plan_id, task_key) rows are
* left untouched so re-seed during retry is safe.
*/
export async function seedTasks(rows: SeedTaskInput[]): Promise<number> {
if (rows.length === 0) return 0;
let inserted = 0;
for (const r of rows) {
try {
await db.insert(taskLedger).values({
id: newId("tldg"),
planId: r.planId,
taskKey: r.taskKey,
params: r.params ?? {},
maxAttempts: r.maxAttempts ?? 3,
status: "pending",
});
inserted++;
} catch (err) {
// Unique-constraint violation = already seeded; tolerate it.
const msg = err instanceof Error ? err.message : String(err);
if (!/duplicate|unique/i.test(msg)) throw err;
}
}
return inserted;
}
export async function listTasks(
planId: string,
statusFilter?: TaskLedgerStatus[],
): Promise<TaskLedgerRow[]> {
const where =
statusFilter && statusFilter.length > 0
? and(eq(taskLedger.planId, planId), inArray(taskLedger.status, statusFilter))
: eq(taskLedger.planId, planId);
return db.select().from(taskLedger).where(where);
}
export async function ledgerSummary(planId: string): Promise<{
total: number;
pending: number;
running: number;
done: number;
failed: number;
skipped: number;
}> {
const rows = await db
.select()
.from(taskLedger)
.where(eq(taskLedger.planId, planId));
const out = {
total: rows.length,
pending: 0,
running: 0,
done: 0,
failed: 0,
skipped: 0,
};
for (const r of rows) {
if (r.status in out) (out as Record<string, number>)[r.status]++;
}
return out;
}
/**
* Atomically claim up to `n` pending rows for `planId`, transitioning them
* `pending → running` in a single statement protected by `FOR UPDATE SKIP
* LOCKED`. This is the race-free replacement for SELECT-then-UPDATE: two
* concurrent callers (or two iterations of the drain loop) cannot pick
* the same row.
*
* Returns the freshly-claimed `TaskLedgerRow`s (snake-case columns from the
* RETURNING clause are mapped back into the camel-case row shape).
*/
async function claimNextBatch(
planId: string,
n: number,
): Promise<TaskLedgerRow[]> {
if (n <= 0) return [];
const result = await db.execute<{
id: string;
plan_id: string;
task_key: string;
params: Record<string, unknown>;
status: TaskLedgerStatus;
attempts: number;
max_attempts: number;
result: Record<string, unknown> | null;
error_text: string | null;
metrics: Record<string, unknown> | null;
duration_ms: number | null;
started_at: Date | null;
finished_at: Date | null;
created_at: Date;
updated_at: Date;
}>(sql`
WITH claimed AS (
SELECT id FROM task_ledger
WHERE plan_id = ${planId} AND status = 'pending'
ORDER BY created_at, id
LIMIT ${n}
FOR UPDATE SKIP LOCKED
)
UPDATE task_ledger
SET status = 'running',
attempts = task_ledger.attempts + 1,
started_at = NOW(),
updated_at = NOW()
FROM claimed
WHERE task_ledger.id = claimed.id
RETURNING task_ledger.*
`);
const rows = (result as unknown as { rows?: unknown[] }).rows ??
(result as unknown as unknown[]);
return (rows as Array<{
id: string;
plan_id: string;
task_key: string;
params: Record<string, unknown>;
status: TaskLedgerStatus;
attempts: number;
max_attempts: number;
result: Record<string, unknown> | null;
error_text: string | null;
metrics: Record<string, unknown> | null;
duration_ms: number | null;
started_at: Date | string | null;
finished_at: Date | string | null;
created_at: Date | string;
updated_at: Date | string;
}>).map((r) => ({
id: r.id,
planId: r.plan_id,
taskKey: r.task_key,
params: r.params,
status: r.status,
attempts: r.attempts,
maxAttempts: r.max_attempts,
result: r.result,
errorText: r.error_text,
metrics: r.metrics,
durationMs: r.duration_ms,
startedAt: r.started_at ? new Date(r.started_at) : null,
finishedAt: r.finished_at ? new Date(r.finished_at) : null,
createdAt: new Date(r.created_at),
updatedAt: new Date(r.updated_at),
})) as TaskLedgerRow[];
}
async function markDone(
row: TaskLedgerRow,
result: Record<string, unknown>,
metrics: Record<string, unknown> | null,
startMs: number,
): Promise<void> {
await db
.update(taskLedger)
.set({
status: "done",
result,
metrics,
durationMs: Date.now() - startMs,
finishedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(taskLedger.id, row.id));
}
async function markFailed(
row: TaskLedgerRow,
error: unknown,
startMs: number,
willRetry: boolean,
): Promise<void> {
const errText = error instanceof Error ? error.message : String(error);
await db
.update(taskLedger)
.set({
status: willRetry ? "pending" : "failed",
errorText: errText,
durationMs: Date.now() - startMs,
finishedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(taskLedger.id, row.id));
}
export type TaskRunner<O> = (row: TaskLedgerRow) => Promise<{
result: O;
metrics?: Record<string, unknown>;
}>;
export interface RunPlanOptions {
concurrency?: number;
onProgress?: (snapshot: {
completed: number;
failed: number;
total: number;
last?: { taskKey: string; status: TaskLedgerStatus; durationMs?: number };
}) => void;
}
/**
* Walk all `pending` rows for a plan, dispatching them to `runner` with
* bounded concurrency. Each row exhausts its `max_attempts` before
* being parked in `failed`. Returns when no more `pending` rows remain
* (regardless of failures).
*/
export async function runPlan<O extends Record<string, unknown>>(
planId: string,
runner: TaskRunner<O>,
opts: RunPlanOptions = {},
): Promise<{
completed: number;
failed: number;
total: number;
}> {
const concurrency = Math.max(1, Math.min(opts.concurrency ?? 8, 32));
// Recover stale `running` rows from a previous crashed/killed runner so
// `seedTasks → process death → runPlan` resumes correctly. We can do
// this safely here because runPlan is the single writer for a planId
// and the API only allows one in-flight execute at a time (status gate
// in benchmark.ts: only `approved` may run, and we flip to `running`
// before claiming). For multi-process deployments this should later
// grow a heartbeat / claim-lease check.
const stale = await db
.update(taskLedger)
.set({
status: "pending",
errorText: "recovered_from_stale_running",
updatedAt: new Date(),
})
.where(
and(eq(taskLedger.planId, planId), eq(taskLedger.status, "running")),
)
.returning({ id: taskLedger.id });
if (stale.length > 0) {
logger.warn(
{ planId, recovered: stale.length },
"task-ledger: recovered stale running rows on resume",
);
}
const totalRows = await db
.select({ id: taskLedger.id })
.from(taskLedger)
.where(eq(taskLedger.planId, planId));
const total = totalRows.length;
let completed = 0;
let failed = 0;
// NOTE: rows passed to runOne have already been atomically transitioned
// to "running" by claimNextBatch (with attempts incremented), so we do
// NOT call markStart here — that would double-increment attempts and
// race against the very protection we just added.
const runOne = async (row: TaskLedgerRow): Promise<void> => {
const startMs = Date.now();
try {
const out = await runner(row);
await markDone(row, out.result, out.metrics ?? null, startMs);
completed++;
opts.onProgress?.({
completed,
failed,
total,
last: {
taskKey: row.taskKey,
status: "done",
durationMs: Date.now() - startMs,
},
});
} catch (err) {
// `row.attempts` already reflects the in-flight attempt because
// claimNextBatch incremented it during the atomic claim. So if
// attempts == maxAttempts, this attempt was the final one.
const willRetry = row.attempts < row.maxAttempts;
await markFailed(row, err, startMs, willRetry);
if (!willRetry) {
failed++;
logger.warn(
{ err, taskKey: row.taskKey, attempts: row.attempts },
"task-ledger: row exhausted retries",
);
opts.onProgress?.({
completed,
failed,
total,
last: {
taskKey: row.taskKey,
status: "failed",
durationMs: Date.now() - startMs,
},
});
} else {
opts.onProgress?.({
completed,
failed,
total,
last: {
taskKey: row.taskKey,
status: "pending",
durationMs: Date.now() - startMs,
},
});
}
}
};
// Drain loop with race-free claim + slot-availability waiting:
// 1) try to atomically claim up to `slots` rows via FOR UPDATE SKIP LOCKED
// 2) launch a worker per claimed row
// 3) if no rows were claimed but inflight workers exist, wait for the
// next one to finish (Promise.race) — failed-with-retry rows surface
// back to `pending` via markFailed and become claimable again next
// iteration.
const inflight = new Set<Promise<void>>();
// Cap unbounded growth in catastrophic-retry scenarios.
let safety = total * 8 + 64;
while (safety-- > 0) {
const slots = concurrency - inflight.size;
const claimed = slots > 0 ? await claimNextBatch(planId, slots) : [];
if (claimed.length > 0) {
for (const row of claimed) {
const p = runOne(row).finally(() => {
inflight.delete(p);
});
inflight.add(p);
}
continue;
}
if (inflight.size === 0) {
// No claimable work and nothing inflight → drain complete.
break;
}
await Promise.race(inflight);
}
// Wait for any tail tasks to settle before returning.
if (inflight.size > 0) {
await Promise.allSettled(inflight);
}
return { completed, failed, total };
}