doatlas-2 / artifacts /api-server /src /lib /job-tracker.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* job-tracker — persistence + background poller + proactive follow-up for
* async research-engine jobs.
*
* When the chat ReAct loop invokes `run_fulltext_upgrade`,
* `run_validation_workflow` or `run_codex_analysis`, the engine returns a
* `job_id` and runs the heavy work in a background thread. We mirror that
* job into Postgres (`research_jobs` table) so the conversation has a
* durable record, then poll the engine on a fixed interval. On terminal
* transitions we update the row, broadcast a `job_status` event on the
* in-process bus (consumed by `GET /api/conversations/:id/jobs/stream`),
* and post a system-authored assistant message that summarises the result
* and proposes the next action. The follow-up message is bilingual and
* contains no emoji per project rules.
*/
import { and, desc, eq, inArray, sql } from "drizzle-orm";
import {
db,
researchJobs,
messages,
type ContentBlock,
type ResearchJob,
type InsertResearchJob,
type AgentRunSnapshot,
} from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
import {
getJob as engineGetJob,
cancelJobOnEngine,
ResearchEngineError,
} from "./research-bridge";
import { publishJobEvent, type PublicJob } from "./job-events";
// ----------------------------------------------------------------- constants
const POLL_INTERVAL_MS = 2_000;
const POLL_BATCH = 25;
const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled"]);
const ENGINE_STATUS_MAP: Record<string, PublicJob["status"]> = {
queued: "queued",
running: "running",
completed: "succeeded",
succeeded: "succeeded",
failed: "failed",
error: "failed",
cancelled: "cancelled",
canceled: "cancelled",
};
const TOOL_TO_KIND: Record<string, string> = {
run_fulltext_upgrade: "m3_fulltext_upgrade",
run_validation_workflow: "m4_validation_workflow",
run_codex_analysis: "m7_codex_analysis",
};
export const ASYNC_TOOL_NAMES = new Set(Object.keys(TOOL_TO_KIND));
// ---------------------------------------------------------------- registration
export interface RegisterJobInput {
jobId: string;
conversationId: string;
userId: string;
toolName: string;
jobKind?: string | null;
label?: string | null;
taskId?: string | null;
requestArgs?: Record<string, unknown> | null;
language: "zh" | "en";
}
/**
* Persist a job that the chat loop just spawned, then announce it via the
* in-process pub/sub. Idempotent on (jobId, userId, conversationId) — if
* the row already exists for the *same* owner/conversation we return it,
* but if the same id ever surfaces under a different user/conversation
* (engine restart, id reuse, upstream bug) we refuse to leak it.
*/
export async function registerJob(
input: RegisterJobInput,
): Promise<ResearchJob | null> {
const existing = await db
.select()
.from(researchJobs)
.where(eq(researchJobs.id, input.jobId))
.limit(1);
if (existing[0]) {
const owned =
existing[0].userId === input.userId &&
existing[0].conversationId === input.conversationId;
if (!owned) {
logger.error(
{
jobId: input.jobId,
existingUser: existing[0].userId,
existingConv: existing[0].conversationId,
requestUser: input.userId,
requestConv: input.conversationId,
},
"registerJob ownership mismatch — refusing to surface foreign job",
);
return null;
}
return existing[0];
}
const row: InsertResearchJob = {
id: input.jobId,
conversationId: input.conversationId,
userId: input.userId,
toolName: input.toolName,
jobKind: input.jobKind || TOOL_TO_KIND[input.toolName] || input.toolName,
label: input.label || null,
status: "running",
taskId: input.taskId || null,
requestArgs: input.requestArgs || null,
language: input.language,
};
let inserted: ResearchJob | null = null;
try {
const ret = await db.insert(researchJobs).values(row).returning();
inserted = ret[0] || null;
} catch (err) {
logger.warn({ err, jobId: input.jobId }, "registerJob insert failed");
return null;
}
if (inserted) {
publishJobEvent({
type: "job_started",
conversationId: input.conversationId,
job: toPublicJob(inserted),
});
}
return inserted;
}
// ---------------------------------------------------------------- serialization
export function toPublicJob(row: ResearchJob): PublicJob {
return {
id: row.id,
conversation_id: row.conversationId,
tool_name: row.toolName,
kind: row.jobKind,
label: row.label,
status: (row.status as PublicJob["status"]) || "running",
task_id: row.taskId,
result_summary: row.resultSummary ?? null,
agent_run: row.agentRun ?? null,
error_message: row.errorMessage,
followup_message_id: row.followupMessageId,
language: (row.language as "zh" | "en") || "zh",
started_at: row.startedAt.toISOString(),
finished_at: row.finishedAt ? row.finishedAt.toISOString() : null,
};
}
// ---------------------------------------------------------------- queries
export interface ListJobsOptions {
/** Include only queued/running jobs plus terminal jobs that finished
* within the recent window. Defaults to true so the in-flight tray
* never rehydrates stale historical jobs on refresh. */
scopeToTray?: boolean;
/** Recent terminal window in minutes (only used when scopeToTray). */
recentTerminalMinutes?: number;
/** Hard cap on returned rows. */
limit?: number;
}
export async function listJobsForConversation(
conversationId: string,
userId: string,
opts: ListJobsOptions = {},
): Promise<ResearchJob[]> {
const scopeToTray = opts.scopeToTray !== false;
const recentMinutes = Math.max(1, opts.recentTerminalMinutes ?? 10);
const limit = Math.max(1, Math.min(200, opts.limit ?? 50));
const base = db
.select()
.from(researchJobs)
.where(
and(
eq(researchJobs.conversationId, conversationId),
eq(researchJobs.userId, userId),
scopeToTray
? sql`(${researchJobs.status} IN ('queued','running')
OR (${researchJobs.finishedAt} IS NOT NULL
AND ${researchJobs.finishedAt} > now() - (${recentMinutes} || ' minutes')::interval))`
: sql`true`,
),
)
.orderBy(desc(researchJobs.startedAt))
.limit(limit);
return base;
}
export async function getJobOwned(
jobId: string,
userId: string,
): Promise<ResearchJob | null> {
const rows = await db
.select()
.from(researchJobs)
.where(and(eq(researchJobs.id, jobId), eq(researchJobs.userId, userId)))
.limit(1);
return rows[0] ?? null;
}
// ---------------------------------------------------------------- cancel
export type CancelOutcome =
| { ok: true; job: PublicJob }
| { ok: false; reason: "already_terminal"; job: PublicJob }
| { ok: false; reason: "engine_error"; message: string };
/**
* User-initiated cancel: tells the engine to stop, flips the local row to
* `cancelled` (which fires the standard follow-up message), and broadcasts
* the new state via the in-process bus so the SSE tray flips immediately.
*
* Returns `already_terminal` when the row has already settled — callers
* can map that to a 409 so the UI knows the chip is no longer cancellable.
*/
export async function requestCancelJob(row: ResearchJob): Promise<CancelOutcome> {
if (TERMINAL_STATUSES.has(row.status)) {
return { ok: false, reason: "already_terminal", job: toPublicJob(row) };
}
let engineResp: Awaited<ReturnType<typeof cancelJobOnEngine>>;
try {
engineResp = await cancelJobOnEngine(row.id);
} catch (err) {
const message =
err instanceof Error ? err.message : "engine cancel request failed";
logger.warn({ err, jobId: row.id }, "engine cancel request failed");
return { ok: false, reason: "engine_error", message };
}
// Race guard: if the engine reports the job has already settled
// (cancelled === false with a terminal status), the runner finished
// between our pre-check and the engine call. Forcing a local
// "cancelled" write here would clobber the true completed/failed
// result and post the wrong follow-up message. Instead we let the
// poller pull the real terminal state on its next tick and surface
// an `already_terminal` outcome to the caller (HTTP 409).
if (engineResp && engineResp.cancelled === false) {
const engineStatus = engineResp.status.toLowerCase();
const mapped =
ENGINE_STATUS_MAP[engineStatus] ||
(TERMINAL_STATUSES.has(engineStatus)
? (engineStatus as PublicJob["status"])
: null);
if (mapped && TERMINAL_STATUSES.has(mapped)) {
// Trigger an immediate poll so the row catches up rather than
// waiting for the next scheduled tick. Best-effort; pollOne is
// resilient to its own failures.
pollOne(row).catch((err) =>
logger.warn({ err, jobId: row.id }, "race-recovery poll failed"),
);
const refreshed = await db
.select()
.from(researchJobs)
.where(eq(researchJobs.id, row.id))
.limit(1);
return {
ok: false,
reason: "already_terminal",
job: toPublicJob(refreshed[0] || row),
};
}
}
await applyTerminal(row, {
status: "cancelled",
errorMessage: "cancelled by user",
resultSummary: null,
agentRun: null,
});
// Re-read so we return the freshly-flipped row (with followup id).
const fresh = await db
.select()
.from(researchJobs)
.where(eq(researchJobs.id, row.id))
.limit(1);
return { ok: true, job: toPublicJob(fresh[0] || row) };
}
// ---------------------------------------------------------------- follow-up text
interface FollowUpContext {
toolName: string;
label: string | null;
taskId: string | null;
status: PublicJob["status"];
errorMessage: string | null;
resultSummary: Record<string, unknown> | null;
}
function jobLabelHuman(toolName: string, lang: "zh" | "en"): string {
if (lang === "zh") {
if (toolName === "run_fulltext_upgrade") return "全文证据升级";
if (toolName === "run_validation_workflow") return "验证工作流";
if (toolName === "run_codex_analysis") return "Codex 深度分析";
return toolName;
}
if (toolName === "run_fulltext_upgrade") return "full-text evidence upgrade";
if (toolName === "run_validation_workflow") return "validation workflow";
if (toolName === "run_codex_analysis") return "Codex deep analysis";
return toolName;
}
function buildFollowUpText(ctx: FollowUpContext, lang: "zh" | "en"): string {
const label = jobLabelHuman(ctx.toolName, lang);
const taskHint = ctx.taskId ? ` (task ${ctx.taskId})` : "";
if (ctx.status === "succeeded") {
if (lang === "zh") {
const next =
ctx.toolName === "run_fulltext_upgrade"
? "需要我基于升级后的全文证据重新整理候选靶点排名,或调用 inspect_target 深入查看某个靶点吗?"
: ctx.toolName === "run_validation_workflow"
? "需要我汇总验证结论、列出未通过的指标,或为下一步实验给出建议吗?"
: "需要我把 Codex 的分析要点总结成简报,或基于结论提出后续验证方向吗?";
return `后台任务「${label}${taskHint}已完成。${next}`;
}
const next =
ctx.toolName === "run_fulltext_upgrade"
? "Want me to re-rank the candidates using the upgraded full-text evidence, or inspect a specific target in detail?"
: ctx.toolName === "run_validation_workflow"
? "Want me to summarise the validation outcome, list the failing checks, or propose follow-up experiments?"
: "Want me to distil the Codex findings into a brief, or suggest the next validation step based on the conclusions?";
return `Background job "${label}"${taskHint} completed. ${next}`;
}
if (ctx.status === "failed") {
const reason = (ctx.errorMessage || "").slice(0, 400);
if (lang === "zh") {
return `后台任务「${label}${taskHint}执行失败${reason ? `:${reason}` : "。"}是否需要我换一组参数重试,或先排查上一步的输入?`;
}
return `Background job "${label}"${taskHint} failed${reason ? `: ${reason}` : "."} Want me to retry with adjusted parameters, or first check the upstream input?`;
}
if (ctx.status === "cancelled") {
if (lang === "zh") {
return `后台任务「${label}${taskHint}已取消。需要我重新启动,还是切换到其他分析?`;
}
return `Background job "${label}"${taskHint} was cancelled. Should I restart it, or pivot to a different analysis?`;
}
return "";
}
// ---------------------------------------------------------------- poller
let pollerStarted = false;
let pollerHandle: NodeJS.Timeout | null = null;
let pollerBusy = false;
export function startJobPoller(): void {
if (pollerStarted) return;
pollerStarted = true;
logger.info({ intervalMs: POLL_INTERVAL_MS }, "research job poller started");
pollerHandle = setInterval(() => {
if (pollerBusy) return;
pollerBusy = true;
pollOnce()
.catch((err) => logger.error({ err }, "job poller tick failed"))
.finally(() => {
pollerBusy = false;
});
}, POLL_INTERVAL_MS);
// Don't keep the process alive purely for the poller.
pollerHandle.unref?.();
}
export function stopJobPoller(): void {
if (pollerHandle) clearInterval(pollerHandle);
pollerHandle = null;
pollerStarted = false;
}
async function pollOnce(): Promise<void> {
const running = await db
.select()
.from(researchJobs)
.where(inArray(researchJobs.status, ["queued", "running"]))
.limit(POLL_BATCH);
if (running.length === 0) return;
await Promise.all(running.map((row) => pollOne(row).catch((err) => {
logger.warn({ err, jobId: row.id }, "poll job failed");
})));
}
async function pollOne(row: ResearchJob): Promise<void> {
let detail: Record<string, unknown> | null = null;
try {
detail = (await engineGetJob(row.id)) as Record<string, unknown> | null;
} catch (err) {
if (err instanceof ResearchEngineError && err.status === 404) {
// Job vanished from the engine (e.g. dev restart). Mark failed so we
// stop polling and the user is notified.
await applyTerminal(row, {
status: "failed",
errorMessage: "engine_lost_job",
resultSummary: null,
agentRun: null,
});
return;
}
// Transient error — leave row as-is, will retry next tick.
await db
.update(researchJobs)
.set({ lastPolledAt: new Date() })
.where(eq(researchJobs.id, row.id));
return;
}
if (!detail || typeof detail !== "object") {
await db
.update(researchJobs)
.set({ lastPolledAt: new Date() })
.where(eq(researchJobs.id, row.id));
return;
}
const engineStatus = String(detail["status"] || "");
const mapped = ENGINE_STATUS_MAP[engineStatus] || "running";
const partialAgentRun = extractAgentRun(detail);
if (mapped === row.status) {
// Status didn't change, but the engine may have flushed a fresh
// progressive plan/step snapshot into progress_json. Mirror it into
// the local row so SSE subscribers see live progress, not just the
// terminal payload.
if (
partialAgentRun &&
!agentRunEqual(row.agentRun, partialAgentRun)
) {
const updated = await db
.update(researchJobs)
.set({ agentRun: partialAgentRun, lastPolledAt: new Date() })
.where(eq(researchJobs.id, row.id))
.returning();
const fresh = updated[0];
if (fresh) {
publishJobEvent({
type: "job_status",
conversationId: fresh.conversationId,
job: toPublicJob(fresh),
});
}
} else {
await db
.update(researchJobs)
.set({ lastPolledAt: new Date() })
.where(eq(researchJobs.id, row.id));
}
return;
}
if (TERMINAL_STATUSES.has(mapped)) {
await applyTerminal(row, {
status: mapped,
errorMessage:
(detail["error_message"] as string | undefined) ||
(typeof (detail["result_json"] as { error?: string })?.error === "string"
? ((detail["result_json"] as { error?: string }).error as string)
: null),
resultSummary: summariseEngineResult(detail),
agentRun: partialAgentRun,
});
} else {
// Status moved between non-terminal states (queued -> running).
const updated = await db
.update(researchJobs)
.set({
status: mapped,
lastPolledAt: new Date(),
...(partialAgentRun ? { agentRun: partialAgentRun } : {}),
})
.where(eq(researchJobs.id, row.id))
.returning();
const fresh = updated[0];
if (fresh) {
publishJobEvent({
type: "job_status",
conversationId: fresh.conversationId,
job: toPublicJob(fresh),
});
}
}
}
function agentRunEqual(
a: AgentRunSnapshot | null | undefined,
b: AgentRunSnapshot | null | undefined,
): boolean {
if (a === b) return true;
if (!a || !b) return false;
// The supervisor stamps a `finished_at: null` and ever-growing
// `iterations` / step `updated_at` on every snapshot, so a structural
// JSON compare is the cheapest correct equality check here.
try {
return JSON.stringify(a) === JSON.stringify(b);
} catch {
return false;
}
}
function extractAgentRun(
detail: Record<string, unknown>,
): AgentRunSnapshot | null {
// Terminal payload: result_json.agent_run is the authoritative final
// snapshot once the runner returns.
const result = detail["result_json"];
if (result && typeof result === "object") {
const ar = (result as Record<string, unknown>)["agent_run"];
if (ar && typeof ar === "object") {
return ar as AgentRunSnapshot;
}
}
// Pre-terminal: the supervisor mirrors progressive snapshots into
// progress_json.agent_run after every begin/finish_step boundary so
// pollers can show plan + step state long before the runner finishes.
const progress = detail["progress_json"];
if (progress && typeof progress === "object") {
const ar = (progress as Record<string, unknown>)["agent_run"];
if (ar && typeof ar === "object") {
return ar as AgentRunSnapshot;
}
}
return null;
}
function summariseEngineResult(
detail: Record<string, unknown>,
): Record<string, unknown> | null {
const result = detail["result_json"];
if (result && typeof result === "object") {
// Drop oversized arrays / nested payloads — keep top-level scalar/short
// fields to give the UI something compact to render.
const small: Record<string, unknown> = {};
for (const [k, v] of Object.entries(result)) {
if (v == null) continue;
if (typeof v === "string" && v.length > 500) {
small[k] = `${v.slice(0, 480)}...`;
} else if (typeof v === "object") {
const s = JSON.stringify(v);
if (s.length < 600) small[k] = v;
} else {
small[k] = v;
}
}
return small;
}
return null;
}
async function applyTerminal(
row: ResearchJob,
patch: {
status: PublicJob["status"];
errorMessage: string | null;
resultSummary: Record<string, unknown> | null;
agentRun: AgentRunSnapshot | null;
},
): Promise<void> {
const finishedAt = new Date();
const followupId = newId("msg");
const followupText = buildFollowUpText(
{
toolName: row.toolName,
label: row.label,
taskId: row.taskId,
status: patch.status,
errorMessage: patch.errorMessage,
resultSummary: patch.resultSummary,
},
(row.language as "zh" | "en") || "zh",
);
// Compare-and-set on the non-terminal status so concurrent pollers (or a
// dev reload racing the live process) cannot both win and post duplicate
// follow-up messages. Only the worker that flips the row from
// queued/running to terminal proceeds; everyone else exits silently.
// First flip the row to terminal WITHOUT a followup_message_id; we only
// persist that pointer once the message insert below succeeds, so a
// failed insert can never leave a dangling "view" anchor in the UI.
const updated = await db
.update(researchJobs)
.set({
status: patch.status,
errorMessage: patch.errorMessage,
resultSummary: patch.resultSummary,
agentRun: patch.agentRun,
finishedAt,
lastPolledAt: finishedAt,
})
.where(
and(
eq(researchJobs.id, row.id),
inArray(researchJobs.status, ["queued", "running"]),
),
)
.returning();
let fresh = updated[0];
if (!fresh) return;
let postedMessageId: string | null = null;
if (followupText) {
const blocks: ContentBlock[] = [{ type: "text", text: followupText }];
try {
await db.insert(messages).values({
id: followupId,
conversationId: row.conversationId,
role: "assistant",
status: "complete",
content: blocks,
modelId: null,
stopReason: "job_followup",
});
postedMessageId = followupId;
const linked = await db
.update(researchJobs)
.set({ followupMessageId: followupId })
.where(eq(researchJobs.id, row.id))
.returning();
if (linked[0]) fresh = linked[0];
} catch (err) {
logger.warn(
{ err, jobId: row.id },
"failed to insert job follow-up message",
);
}
}
publishJobEvent({
type: "job_status",
conversationId: fresh.conversationId,
job: toPublicJob(fresh),
});
if (postedMessageId) {
publishJobEvent({
type: "job_followup",
conversationId: fresh.conversationId,
job: toPublicJob(fresh),
messageId: postedMessageId,
});
}
}