/** * job-tracker — persistence + background poller + proactive follow-up for * async research-engine jobs. * * When the chat ReAct loop invokes `run_fulltext_upgrade`, * `run_validation_workflow` or `run_codex_analysis`, the engine returns a * `job_id` and runs the heavy work in a background thread. We mirror that * job into Postgres (`research_jobs` table) so the conversation has a * durable record, then poll the engine on a fixed interval. On terminal * transitions we update the row, broadcast a `job_status` event on the * in-process bus (consumed by `GET /api/conversations/:id/jobs/stream`), * and post a system-authored assistant message that summarises the result * and proposes the next action. The follow-up message is bilingual and * contains no emoji per project rules. */ import { and, desc, eq, inArray, sql } from "drizzle-orm"; import { db, researchJobs, messages, type ContentBlock, type ResearchJob, type InsertResearchJob, type AgentRunSnapshot, } from "@workspace/db"; import { newId } from "./ids"; import { logger } from "./logger"; import { getJob as engineGetJob, cancelJobOnEngine, ResearchEngineError, } from "./research-bridge"; import { publishJobEvent, type PublicJob } from "./job-events"; // ----------------------------------------------------------------- constants const POLL_INTERVAL_MS = 2_000; const POLL_BATCH = 25; const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled"]); const ENGINE_STATUS_MAP: Record = { queued: "queued", running: "running", completed: "succeeded", succeeded: "succeeded", failed: "failed", error: "failed", cancelled: "cancelled", canceled: "cancelled", }; const TOOL_TO_KIND: Record = { run_fulltext_upgrade: "m3_fulltext_upgrade", run_validation_workflow: "m4_validation_workflow", run_codex_analysis: "m7_codex_analysis", }; export const ASYNC_TOOL_NAMES = new Set(Object.keys(TOOL_TO_KIND)); // ---------------------------------------------------------------- registration export interface RegisterJobInput { jobId: string; conversationId: string; userId: string; toolName: string; jobKind?: string | null; label?: string | null; taskId?: string | null; requestArgs?: Record | null; language: "zh" | "en"; } /** * Persist a job that the chat loop just spawned, then announce it via the * in-process pub/sub. Idempotent on (jobId, userId, conversationId) — if * the row already exists for the *same* owner/conversation we return it, * but if the same id ever surfaces under a different user/conversation * (engine restart, id reuse, upstream bug) we refuse to leak it. */ export async function registerJob( input: RegisterJobInput, ): Promise { const existing = await db .select() .from(researchJobs) .where(eq(researchJobs.id, input.jobId)) .limit(1); if (existing[0]) { const owned = existing[0].userId === input.userId && existing[0].conversationId === input.conversationId; if (!owned) { logger.error( { jobId: input.jobId, existingUser: existing[0].userId, existingConv: existing[0].conversationId, requestUser: input.userId, requestConv: input.conversationId, }, "registerJob ownership mismatch — refusing to surface foreign job", ); return null; } return existing[0]; } const row: InsertResearchJob = { id: input.jobId, conversationId: input.conversationId, userId: input.userId, toolName: input.toolName, jobKind: input.jobKind || TOOL_TO_KIND[input.toolName] || input.toolName, label: input.label || null, status: "running", taskId: input.taskId || null, requestArgs: input.requestArgs || null, language: input.language, }; let inserted: ResearchJob | null = null; try { const ret = await db.insert(researchJobs).values(row).returning(); inserted = ret[0] || null; } catch (err) { logger.warn({ err, jobId: input.jobId }, "registerJob insert failed"); return null; } if (inserted) { publishJobEvent({ type: "job_started", conversationId: input.conversationId, job: toPublicJob(inserted), }); } return inserted; } // ---------------------------------------------------------------- serialization export function toPublicJob(row: ResearchJob): PublicJob { return { id: row.id, conversation_id: row.conversationId, tool_name: row.toolName, kind: row.jobKind, label: row.label, status: (row.status as PublicJob["status"]) || "running", task_id: row.taskId, result_summary: row.resultSummary ?? null, agent_run: row.agentRun ?? null, error_message: row.errorMessage, followup_message_id: row.followupMessageId, language: (row.language as "zh" | "en") || "zh", started_at: row.startedAt.toISOString(), finished_at: row.finishedAt ? row.finishedAt.toISOString() : null, }; } // ---------------------------------------------------------------- queries export interface ListJobsOptions { /** Include only queued/running jobs plus terminal jobs that finished * within the recent window. Defaults to true so the in-flight tray * never rehydrates stale historical jobs on refresh. */ scopeToTray?: boolean; /** Recent terminal window in minutes (only used when scopeToTray). */ recentTerminalMinutes?: number; /** Hard cap on returned rows. */ limit?: number; } export async function listJobsForConversation( conversationId: string, userId: string, opts: ListJobsOptions = {}, ): Promise { const scopeToTray = opts.scopeToTray !== false; const recentMinutes = Math.max(1, opts.recentTerminalMinutes ?? 10); const limit = Math.max(1, Math.min(200, opts.limit ?? 50)); const base = db .select() .from(researchJobs) .where( and( eq(researchJobs.conversationId, conversationId), eq(researchJobs.userId, userId), scopeToTray ? sql`(${researchJobs.status} IN ('queued','running') OR (${researchJobs.finishedAt} IS NOT NULL AND ${researchJobs.finishedAt} > now() - (${recentMinutes} || ' minutes')::interval))` : sql`true`, ), ) .orderBy(desc(researchJobs.startedAt)) .limit(limit); return base; } export async function getJobOwned( jobId: string, userId: string, ): Promise { const rows = await db .select() .from(researchJobs) .where(and(eq(researchJobs.id, jobId), eq(researchJobs.userId, userId))) .limit(1); return rows[0] ?? null; } // ---------------------------------------------------------------- cancel export type CancelOutcome = | { ok: true; job: PublicJob } | { ok: false; reason: "already_terminal"; job: PublicJob } | { ok: false; reason: "engine_error"; message: string }; /** * User-initiated cancel: tells the engine to stop, flips the local row to * `cancelled` (which fires the standard follow-up message), and broadcasts * the new state via the in-process bus so the SSE tray flips immediately. * * Returns `already_terminal` when the row has already settled — callers * can map that to a 409 so the UI knows the chip is no longer cancellable. */ export async function requestCancelJob(row: ResearchJob): Promise { if (TERMINAL_STATUSES.has(row.status)) { return { ok: false, reason: "already_terminal", job: toPublicJob(row) }; } let engineResp: Awaited>; try { engineResp = await cancelJobOnEngine(row.id); } catch (err) { const message = err instanceof Error ? err.message : "engine cancel request failed"; logger.warn({ err, jobId: row.id }, "engine cancel request failed"); return { ok: false, reason: "engine_error", message }; } // Race guard: if the engine reports the job has already settled // (cancelled === false with a terminal status), the runner finished // between our pre-check and the engine call. Forcing a local // "cancelled" write here would clobber the true completed/failed // result and post the wrong follow-up message. Instead we let the // poller pull the real terminal state on its next tick and surface // an `already_terminal` outcome to the caller (HTTP 409). if (engineResp && engineResp.cancelled === false) { const engineStatus = engineResp.status.toLowerCase(); const mapped = ENGINE_STATUS_MAP[engineStatus] || (TERMINAL_STATUSES.has(engineStatus) ? (engineStatus as PublicJob["status"]) : null); if (mapped && TERMINAL_STATUSES.has(mapped)) { // Trigger an immediate poll so the row catches up rather than // waiting for the next scheduled tick. Best-effort; pollOne is // resilient to its own failures. pollOne(row).catch((err) => logger.warn({ err, jobId: row.id }, "race-recovery poll failed"), ); const refreshed = await db .select() .from(researchJobs) .where(eq(researchJobs.id, row.id)) .limit(1); return { ok: false, reason: "already_terminal", job: toPublicJob(refreshed[0] || row), }; } } await applyTerminal(row, { status: "cancelled", errorMessage: "cancelled by user", resultSummary: null, agentRun: null, }); // Re-read so we return the freshly-flipped row (with followup id). const fresh = await db .select() .from(researchJobs) .where(eq(researchJobs.id, row.id)) .limit(1); return { ok: true, job: toPublicJob(fresh[0] || row) }; } // ---------------------------------------------------------------- follow-up text interface FollowUpContext { toolName: string; label: string | null; taskId: string | null; status: PublicJob["status"]; errorMessage: string | null; resultSummary: Record | null; } function jobLabelHuman(toolName: string, lang: "zh" | "en"): string { if (lang === "zh") { if (toolName === "run_fulltext_upgrade") return "全文证据升级"; if (toolName === "run_validation_workflow") return "验证工作流"; if (toolName === "run_codex_analysis") return "Codex 深度分析"; return toolName; } if (toolName === "run_fulltext_upgrade") return "full-text evidence upgrade"; if (toolName === "run_validation_workflow") return "validation workflow"; if (toolName === "run_codex_analysis") return "Codex deep analysis"; return toolName; } function buildFollowUpText(ctx: FollowUpContext, lang: "zh" | "en"): string { const label = jobLabelHuman(ctx.toolName, lang); const taskHint = ctx.taskId ? ` (task ${ctx.taskId})` : ""; if (ctx.status === "succeeded") { if (lang === "zh") { const next = ctx.toolName === "run_fulltext_upgrade" ? "需要我基于升级后的全文证据重新整理候选靶点排名,或调用 inspect_target 深入查看某个靶点吗?" : ctx.toolName === "run_validation_workflow" ? "需要我汇总验证结论、列出未通过的指标,或为下一步实验给出建议吗?" : "需要我把 Codex 的分析要点总结成简报,或基于结论提出后续验证方向吗?"; return `后台任务「${label}」${taskHint}已完成。${next}`; } const next = ctx.toolName === "run_fulltext_upgrade" ? "Want me to re-rank the candidates using the upgraded full-text evidence, or inspect a specific target in detail?" : ctx.toolName === "run_validation_workflow" ? "Want me to summarise the validation outcome, list the failing checks, or propose follow-up experiments?" : "Want me to distil the Codex findings into a brief, or suggest the next validation step based on the conclusions?"; return `Background job "${label}"${taskHint} completed. ${next}`; } if (ctx.status === "failed") { const reason = (ctx.errorMessage || "").slice(0, 400); if (lang === "zh") { return `后台任务「${label}」${taskHint}执行失败${reason ? `:${reason}` : "。"}是否需要我换一组参数重试,或先排查上一步的输入?`; } return `Background job "${label}"${taskHint} failed${reason ? `: ${reason}` : "."} Want me to retry with adjusted parameters, or first check the upstream input?`; } if (ctx.status === "cancelled") { if (lang === "zh") { return `后台任务「${label}」${taskHint}已取消。需要我重新启动,还是切换到其他分析?`; } return `Background job "${label}"${taskHint} was cancelled. Should I restart it, or pivot to a different analysis?`; } return ""; } // ---------------------------------------------------------------- poller let pollerStarted = false; let pollerHandle: NodeJS.Timeout | null = null; let pollerBusy = false; export function startJobPoller(): void { if (pollerStarted) return; pollerStarted = true; logger.info({ intervalMs: POLL_INTERVAL_MS }, "research job poller started"); pollerHandle = setInterval(() => { if (pollerBusy) return; pollerBusy = true; pollOnce() .catch((err) => logger.error({ err }, "job poller tick failed")) .finally(() => { pollerBusy = false; }); }, POLL_INTERVAL_MS); // Don't keep the process alive purely for the poller. pollerHandle.unref?.(); } export function stopJobPoller(): void { if (pollerHandle) clearInterval(pollerHandle); pollerHandle = null; pollerStarted = false; } async function pollOnce(): Promise { const running = await db .select() .from(researchJobs) .where(inArray(researchJobs.status, ["queued", "running"])) .limit(POLL_BATCH); if (running.length === 0) return; await Promise.all(running.map((row) => pollOne(row).catch((err) => { logger.warn({ err, jobId: row.id }, "poll job failed"); }))); } async function pollOne(row: ResearchJob): Promise { let detail: Record | null = null; try { detail = (await engineGetJob(row.id)) as Record | null; } catch (err) { if (err instanceof ResearchEngineError && err.status === 404) { // Job vanished from the engine (e.g. dev restart). Mark failed so we // stop polling and the user is notified. await applyTerminal(row, { status: "failed", errorMessage: "engine_lost_job", resultSummary: null, agentRun: null, }); return; } // Transient error — leave row as-is, will retry next tick. await db .update(researchJobs) .set({ lastPolledAt: new Date() }) .where(eq(researchJobs.id, row.id)); return; } if (!detail || typeof detail !== "object") { await db .update(researchJobs) .set({ lastPolledAt: new Date() }) .where(eq(researchJobs.id, row.id)); return; } const engineStatus = String(detail["status"] || ""); const mapped = ENGINE_STATUS_MAP[engineStatus] || "running"; const partialAgentRun = extractAgentRun(detail); if (mapped === row.status) { // Status didn't change, but the engine may have flushed a fresh // progressive plan/step snapshot into progress_json. Mirror it into // the local row so SSE subscribers see live progress, not just the // terminal payload. if ( partialAgentRun && !agentRunEqual(row.agentRun, partialAgentRun) ) { const updated = await db .update(researchJobs) .set({ agentRun: partialAgentRun, lastPolledAt: new Date() }) .where(eq(researchJobs.id, row.id)) .returning(); const fresh = updated[0]; if (fresh) { publishJobEvent({ type: "job_status", conversationId: fresh.conversationId, job: toPublicJob(fresh), }); } } else { await db .update(researchJobs) .set({ lastPolledAt: new Date() }) .where(eq(researchJobs.id, row.id)); } return; } if (TERMINAL_STATUSES.has(mapped)) { await applyTerminal(row, { status: mapped, errorMessage: (detail["error_message"] as string | undefined) || (typeof (detail["result_json"] as { error?: string })?.error === "string" ? ((detail["result_json"] as { error?: string }).error as string) : null), resultSummary: summariseEngineResult(detail), agentRun: partialAgentRun, }); } else { // Status moved between non-terminal states (queued -> running). const updated = await db .update(researchJobs) .set({ status: mapped, lastPolledAt: new Date(), ...(partialAgentRun ? { agentRun: partialAgentRun } : {}), }) .where(eq(researchJobs.id, row.id)) .returning(); const fresh = updated[0]; if (fresh) { publishJobEvent({ type: "job_status", conversationId: fresh.conversationId, job: toPublicJob(fresh), }); } } } function agentRunEqual( a: AgentRunSnapshot | null | undefined, b: AgentRunSnapshot | null | undefined, ): boolean { if (a === b) return true; if (!a || !b) return false; // The supervisor stamps a `finished_at: null` and ever-growing // `iterations` / step `updated_at` on every snapshot, so a structural // JSON compare is the cheapest correct equality check here. try { return JSON.stringify(a) === JSON.stringify(b); } catch { return false; } } function extractAgentRun( detail: Record, ): AgentRunSnapshot | null { // Terminal payload: result_json.agent_run is the authoritative final // snapshot once the runner returns. const result = detail["result_json"]; if (result && typeof result === "object") { const ar = (result as Record)["agent_run"]; if (ar && typeof ar === "object") { return ar as AgentRunSnapshot; } } // Pre-terminal: the supervisor mirrors progressive snapshots into // progress_json.agent_run after every begin/finish_step boundary so // pollers can show plan + step state long before the runner finishes. const progress = detail["progress_json"]; if (progress && typeof progress === "object") { const ar = (progress as Record)["agent_run"]; if (ar && typeof ar === "object") { return ar as AgentRunSnapshot; } } return null; } function summariseEngineResult( detail: Record, ): Record | null { const result = detail["result_json"]; if (result && typeof result === "object") { // Drop oversized arrays / nested payloads — keep top-level scalar/short // fields to give the UI something compact to render. const small: Record = {}; for (const [k, v] of Object.entries(result)) { if (v == null) continue; if (typeof v === "string" && v.length > 500) { small[k] = `${v.slice(0, 480)}...`; } else if (typeof v === "object") { const s = JSON.stringify(v); if (s.length < 600) small[k] = v; } else { small[k] = v; } } return small; } return null; } async function applyTerminal( row: ResearchJob, patch: { status: PublicJob["status"]; errorMessage: string | null; resultSummary: Record | null; agentRun: AgentRunSnapshot | null; }, ): Promise { const finishedAt = new Date(); const followupId = newId("msg"); const followupText = buildFollowUpText( { toolName: row.toolName, label: row.label, taskId: row.taskId, status: patch.status, errorMessage: patch.errorMessage, resultSummary: patch.resultSummary, }, (row.language as "zh" | "en") || "zh", ); // Compare-and-set on the non-terminal status so concurrent pollers (or a // dev reload racing the live process) cannot both win and post duplicate // follow-up messages. Only the worker that flips the row from // queued/running to terminal proceeds; everyone else exits silently. // First flip the row to terminal WITHOUT a followup_message_id; we only // persist that pointer once the message insert below succeeds, so a // failed insert can never leave a dangling "view" anchor in the UI. const updated = await db .update(researchJobs) .set({ status: patch.status, errorMessage: patch.errorMessage, resultSummary: patch.resultSummary, agentRun: patch.agentRun, finishedAt, lastPolledAt: finishedAt, }) .where( and( eq(researchJobs.id, row.id), inArray(researchJobs.status, ["queued", "running"]), ), ) .returning(); let fresh = updated[0]; if (!fresh) return; let postedMessageId: string | null = null; if (followupText) { const blocks: ContentBlock[] = [{ type: "text", text: followupText }]; try { await db.insert(messages).values({ id: followupId, conversationId: row.conversationId, role: "assistant", status: "complete", content: blocks, modelId: null, stopReason: "job_followup", }); postedMessageId = followupId; const linked = await db .update(researchJobs) .set({ followupMessageId: followupId }) .where(eq(researchJobs.id, row.id)) .returning(); if (linked[0]) fresh = linked[0]; } catch (err) { logger.warn( { err, jobId: row.id }, "failed to insert job follow-up message", ); } } publishJobEvent({ type: "job_status", conversationId: fresh.conversationId, job: toPublicJob(fresh), }); if (postedMessageId) { publishJobEvent({ type: "job_followup", conversationId: fresh.conversationId, job: toPublicJob(fresh), messageId: postedMessageId, }); } }