| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import { and, desc, eq, inArray, sql } from "drizzle-orm"; |
| import { |
| db, |
| researchJobs, |
| messages, |
| type ContentBlock, |
| type ResearchJob, |
| type InsertResearchJob, |
| type AgentRunSnapshot, |
| } from "@workspace/db"; |
| import { newId } from "./ids"; |
| import { logger } from "./logger"; |
| import { |
| getJob as engineGetJob, |
| cancelJobOnEngine, |
| ResearchEngineError, |
| } from "./research-bridge"; |
| import { publishJobEvent, type PublicJob } from "./job-events"; |
|
|
| |
|
|
| const POLL_INTERVAL_MS = 2_000; |
| const POLL_BATCH = 25; |
| const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled"]); |
|
|
| const ENGINE_STATUS_MAP: Record<string, PublicJob["status"]> = { |
| queued: "queued", |
| running: "running", |
| completed: "succeeded", |
| succeeded: "succeeded", |
| failed: "failed", |
| error: "failed", |
| cancelled: "cancelled", |
| canceled: "cancelled", |
| }; |
|
|
| const TOOL_TO_KIND: Record<string, string> = { |
| run_fulltext_upgrade: "m3_fulltext_upgrade", |
| run_validation_workflow: "m4_validation_workflow", |
| run_codex_analysis: "m7_codex_analysis", |
| }; |
|
|
| export const ASYNC_TOOL_NAMES = new Set(Object.keys(TOOL_TO_KIND)); |
|
|
| |
|
|
| export interface RegisterJobInput { |
| jobId: string; |
| conversationId: string; |
| userId: string; |
| toolName: string; |
| jobKind?: string | null; |
| label?: string | null; |
| taskId?: string | null; |
| requestArgs?: Record<string, unknown> | null; |
| language: "zh" | "en"; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| export async function registerJob( |
| input: RegisterJobInput, |
| ): Promise<ResearchJob | null> { |
| const existing = await db |
| .select() |
| .from(researchJobs) |
| .where(eq(researchJobs.id, input.jobId)) |
| .limit(1); |
| if (existing[0]) { |
| const owned = |
| existing[0].userId === input.userId && |
| existing[0].conversationId === input.conversationId; |
| if (!owned) { |
| logger.error( |
| { |
| jobId: input.jobId, |
| existingUser: existing[0].userId, |
| existingConv: existing[0].conversationId, |
| requestUser: input.userId, |
| requestConv: input.conversationId, |
| }, |
| "registerJob ownership mismatch — refusing to surface foreign job", |
| ); |
| return null; |
| } |
| return existing[0]; |
| } |
|
|
| const row: InsertResearchJob = { |
| id: input.jobId, |
| conversationId: input.conversationId, |
| userId: input.userId, |
| toolName: input.toolName, |
| jobKind: input.jobKind || TOOL_TO_KIND[input.toolName] || input.toolName, |
| label: input.label || null, |
| status: "running", |
| taskId: input.taskId || null, |
| requestArgs: input.requestArgs || null, |
| language: input.language, |
| }; |
| let inserted: ResearchJob | null = null; |
| try { |
| const ret = await db.insert(researchJobs).values(row).returning(); |
| inserted = ret[0] || null; |
| } catch (err) { |
| logger.warn({ err, jobId: input.jobId }, "registerJob insert failed"); |
| return null; |
| } |
| if (inserted) { |
| publishJobEvent({ |
| type: "job_started", |
| conversationId: input.conversationId, |
| job: toPublicJob(inserted), |
| }); |
| } |
| return inserted; |
| } |
|
|
| |
|
|
| export function toPublicJob(row: ResearchJob): PublicJob { |
| return { |
| id: row.id, |
| conversation_id: row.conversationId, |
| tool_name: row.toolName, |
| kind: row.jobKind, |
| label: row.label, |
| status: (row.status as PublicJob["status"]) || "running", |
| task_id: row.taskId, |
| result_summary: row.resultSummary ?? null, |
| agent_run: row.agentRun ?? null, |
| error_message: row.errorMessage, |
| followup_message_id: row.followupMessageId, |
| language: (row.language as "zh" | "en") || "zh", |
| started_at: row.startedAt.toISOString(), |
| finished_at: row.finishedAt ? row.finishedAt.toISOString() : null, |
| }; |
| } |
|
|
| |
|
|
| export interface ListJobsOptions { |
| |
| |
| |
| scopeToTray?: boolean; |
| |
| recentTerminalMinutes?: number; |
| |
| limit?: number; |
| } |
|
|
| export async function listJobsForConversation( |
| conversationId: string, |
| userId: string, |
| opts: ListJobsOptions = {}, |
| ): Promise<ResearchJob[]> { |
| const scopeToTray = opts.scopeToTray !== false; |
| const recentMinutes = Math.max(1, opts.recentTerminalMinutes ?? 10); |
| const limit = Math.max(1, Math.min(200, opts.limit ?? 50)); |
| const base = db |
| .select() |
| .from(researchJobs) |
| .where( |
| and( |
| eq(researchJobs.conversationId, conversationId), |
| eq(researchJobs.userId, userId), |
| scopeToTray |
| ? sql`(${researchJobs.status} IN ('queued','running') |
| OR (${researchJobs.finishedAt} IS NOT NULL |
| AND ${researchJobs.finishedAt} > now() - (${recentMinutes} || ' minutes')::interval))` |
| : sql`true`, |
| ), |
| ) |
| .orderBy(desc(researchJobs.startedAt)) |
| .limit(limit); |
| return base; |
| } |
|
|
| export async function getJobOwned( |
| jobId: string, |
| userId: string, |
| ): Promise<ResearchJob | null> { |
| const rows = await db |
| .select() |
| .from(researchJobs) |
| .where(and(eq(researchJobs.id, jobId), eq(researchJobs.userId, userId))) |
| .limit(1); |
| return rows[0] ?? null; |
| } |
|
|
| |
|
|
| export type CancelOutcome = |
| | { ok: true; job: PublicJob } |
| | { ok: false; reason: "already_terminal"; job: PublicJob } |
| | { ok: false; reason: "engine_error"; message: string }; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| export async function requestCancelJob(row: ResearchJob): Promise<CancelOutcome> { |
| if (TERMINAL_STATUSES.has(row.status)) { |
| return { ok: false, reason: "already_terminal", job: toPublicJob(row) }; |
| } |
| let engineResp: Awaited<ReturnType<typeof cancelJobOnEngine>>; |
| try { |
| engineResp = await cancelJobOnEngine(row.id); |
| } catch (err) { |
| const message = |
| err instanceof Error ? err.message : "engine cancel request failed"; |
| logger.warn({ err, jobId: row.id }, "engine cancel request failed"); |
| return { ok: false, reason: "engine_error", message }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| if (engineResp && engineResp.cancelled === false) { |
| const engineStatus = engineResp.status.toLowerCase(); |
| const mapped = |
| ENGINE_STATUS_MAP[engineStatus] || |
| (TERMINAL_STATUSES.has(engineStatus) |
| ? (engineStatus as PublicJob["status"]) |
| : null); |
| if (mapped && TERMINAL_STATUSES.has(mapped)) { |
| |
| |
| |
| pollOne(row).catch((err) => |
| logger.warn({ err, jobId: row.id }, "race-recovery poll failed"), |
| ); |
| const refreshed = await db |
| .select() |
| .from(researchJobs) |
| .where(eq(researchJobs.id, row.id)) |
| .limit(1); |
| return { |
| ok: false, |
| reason: "already_terminal", |
| job: toPublicJob(refreshed[0] || row), |
| }; |
| } |
| } |
|
|
| await applyTerminal(row, { |
| status: "cancelled", |
| errorMessage: "cancelled by user", |
| resultSummary: null, |
| agentRun: null, |
| }); |
| |
| const fresh = await db |
| .select() |
| .from(researchJobs) |
| .where(eq(researchJobs.id, row.id)) |
| .limit(1); |
| return { ok: true, job: toPublicJob(fresh[0] || row) }; |
| } |
|
|
| |
|
|
| interface FollowUpContext { |
| toolName: string; |
| label: string | null; |
| taskId: string | null; |
| status: PublicJob["status"]; |
| errorMessage: string | null; |
| resultSummary: Record<string, unknown> | null; |
| } |
|
|
| function jobLabelHuman(toolName: string, lang: "zh" | "en"): string { |
| if (lang === "zh") { |
| if (toolName === "run_fulltext_upgrade") return "全文证据升级"; |
| if (toolName === "run_validation_workflow") return "验证工作流"; |
| if (toolName === "run_codex_analysis") return "Codex 深度分析"; |
| return toolName; |
| } |
| if (toolName === "run_fulltext_upgrade") return "full-text evidence upgrade"; |
| if (toolName === "run_validation_workflow") return "validation workflow"; |
| if (toolName === "run_codex_analysis") return "Codex deep analysis"; |
| return toolName; |
| } |
|
|
| function buildFollowUpText(ctx: FollowUpContext, lang: "zh" | "en"): string { |
| const label = jobLabelHuman(ctx.toolName, lang); |
| const taskHint = ctx.taskId ? ` (task ${ctx.taskId})` : ""; |
| if (ctx.status === "succeeded") { |
| if (lang === "zh") { |
| const next = |
| ctx.toolName === "run_fulltext_upgrade" |
| ? "需要我基于升级后的全文证据重新整理候选靶点排名,或调用 inspect_target 深入查看某个靶点吗?" |
| : ctx.toolName === "run_validation_workflow" |
| ? "需要我汇总验证结论、列出未通过的指标,或为下一步实验给出建议吗?" |
| : "需要我把 Codex 的分析要点总结成简报,或基于结论提出后续验证方向吗?"; |
| return `后台任务「${label}」${taskHint}已完成。${next}`; |
| } |
| const next = |
| ctx.toolName === "run_fulltext_upgrade" |
| ? "Want me to re-rank the candidates using the upgraded full-text evidence, or inspect a specific target in detail?" |
| : ctx.toolName === "run_validation_workflow" |
| ? "Want me to summarise the validation outcome, list the failing checks, or propose follow-up experiments?" |
| : "Want me to distil the Codex findings into a brief, or suggest the next validation step based on the conclusions?"; |
| return `Background job "${label}"${taskHint} completed. ${next}`; |
| } |
| if (ctx.status === "failed") { |
| const reason = (ctx.errorMessage || "").slice(0, 400); |
| if (lang === "zh") { |
| return `后台任务「${label}」${taskHint}执行失败${reason ? `:${reason}` : "。"}是否需要我换一组参数重试,或先排查上一步的输入?`; |
| } |
| return `Background job "${label}"${taskHint} failed${reason ? `: ${reason}` : "."} Want me to retry with adjusted parameters, or first check the upstream input?`; |
| } |
| if (ctx.status === "cancelled") { |
| if (lang === "zh") { |
| return `后台任务「${label}」${taskHint}已取消。需要我重新启动,还是切换到其他分析?`; |
| } |
| return `Background job "${label}"${taskHint} was cancelled. Should I restart it, or pivot to a different analysis?`; |
| } |
| return ""; |
| } |
|
|
| |
|
|
| let pollerStarted = false; |
| let pollerHandle: NodeJS.Timeout | null = null; |
| let pollerBusy = false; |
|
|
| export function startJobPoller(): void { |
| if (pollerStarted) return; |
| pollerStarted = true; |
| logger.info({ intervalMs: POLL_INTERVAL_MS }, "research job poller started"); |
| pollerHandle = setInterval(() => { |
| if (pollerBusy) return; |
| pollerBusy = true; |
| pollOnce() |
| .catch((err) => logger.error({ err }, "job poller tick failed")) |
| .finally(() => { |
| pollerBusy = false; |
| }); |
| }, POLL_INTERVAL_MS); |
| |
| pollerHandle.unref?.(); |
| } |
|
|
| export function stopJobPoller(): void { |
| if (pollerHandle) clearInterval(pollerHandle); |
| pollerHandle = null; |
| pollerStarted = false; |
| } |
|
|
| async function pollOnce(): Promise<void> { |
| const running = await db |
| .select() |
| .from(researchJobs) |
| .where(inArray(researchJobs.status, ["queued", "running"])) |
| .limit(POLL_BATCH); |
| if (running.length === 0) return; |
| await Promise.all(running.map((row) => pollOne(row).catch((err) => { |
| logger.warn({ err, jobId: row.id }, "poll job failed"); |
| }))); |
| } |
|
|
| async function pollOne(row: ResearchJob): Promise<void> { |
| let detail: Record<string, unknown> | null = null; |
| try { |
| detail = (await engineGetJob(row.id)) as Record<string, unknown> | null; |
| } catch (err) { |
| if (err instanceof ResearchEngineError && err.status === 404) { |
| |
| |
| await applyTerminal(row, { |
| status: "failed", |
| errorMessage: "engine_lost_job", |
| resultSummary: null, |
| agentRun: null, |
| }); |
| return; |
| } |
| |
| await db |
| .update(researchJobs) |
| .set({ lastPolledAt: new Date() }) |
| .where(eq(researchJobs.id, row.id)); |
| return; |
| } |
| if (!detail || typeof detail !== "object") { |
| await db |
| .update(researchJobs) |
| .set({ lastPolledAt: new Date() }) |
| .where(eq(researchJobs.id, row.id)); |
| return; |
| } |
| const engineStatus = String(detail["status"] || ""); |
| const mapped = ENGINE_STATUS_MAP[engineStatus] || "running"; |
| const partialAgentRun = extractAgentRun(detail); |
| if (mapped === row.status) { |
| |
| |
| |
| |
| if ( |
| partialAgentRun && |
| !agentRunEqual(row.agentRun, partialAgentRun) |
| ) { |
| const updated = await db |
| .update(researchJobs) |
| .set({ agentRun: partialAgentRun, lastPolledAt: new Date() }) |
| .where(eq(researchJobs.id, row.id)) |
| .returning(); |
| const fresh = updated[0]; |
| if (fresh) { |
| publishJobEvent({ |
| type: "job_status", |
| conversationId: fresh.conversationId, |
| job: toPublicJob(fresh), |
| }); |
| } |
| } else { |
| await db |
| .update(researchJobs) |
| .set({ lastPolledAt: new Date() }) |
| .where(eq(researchJobs.id, row.id)); |
| } |
| return; |
| } |
| if (TERMINAL_STATUSES.has(mapped)) { |
| await applyTerminal(row, { |
| status: mapped, |
| errorMessage: |
| (detail["error_message"] as string | undefined) || |
| (typeof (detail["result_json"] as { error?: string })?.error === "string" |
| ? ((detail["result_json"] as { error?: string }).error as string) |
| : null), |
| resultSummary: summariseEngineResult(detail), |
| agentRun: partialAgentRun, |
| }); |
| } else { |
| |
| const updated = await db |
| .update(researchJobs) |
| .set({ |
| status: mapped, |
| lastPolledAt: new Date(), |
| ...(partialAgentRun ? { agentRun: partialAgentRun } : {}), |
| }) |
| .where(eq(researchJobs.id, row.id)) |
| .returning(); |
| const fresh = updated[0]; |
| if (fresh) { |
| publishJobEvent({ |
| type: "job_status", |
| conversationId: fresh.conversationId, |
| job: toPublicJob(fresh), |
| }); |
| } |
| } |
| } |
|
|
| function agentRunEqual( |
| a: AgentRunSnapshot | null | undefined, |
| b: AgentRunSnapshot | null | undefined, |
| ): boolean { |
| if (a === b) return true; |
| if (!a || !b) return false; |
| |
| |
| |
| try { |
| return JSON.stringify(a) === JSON.stringify(b); |
| } catch { |
| return false; |
| } |
| } |
|
|
| function extractAgentRun( |
| detail: Record<string, unknown>, |
| ): AgentRunSnapshot | null { |
| |
| |
| const result = detail["result_json"]; |
| if (result && typeof result === "object") { |
| const ar = (result as Record<string, unknown>)["agent_run"]; |
| if (ar && typeof ar === "object") { |
| return ar as AgentRunSnapshot; |
| } |
| } |
| |
| |
| |
| const progress = detail["progress_json"]; |
| if (progress && typeof progress === "object") { |
| const ar = (progress as Record<string, unknown>)["agent_run"]; |
| if (ar && typeof ar === "object") { |
| return ar as AgentRunSnapshot; |
| } |
| } |
| return null; |
| } |
|
|
| function summariseEngineResult( |
| detail: Record<string, unknown>, |
| ): Record<string, unknown> | null { |
| const result = detail["result_json"]; |
| if (result && typeof result === "object") { |
| |
| |
| const small: Record<string, unknown> = {}; |
| for (const [k, v] of Object.entries(result)) { |
| if (v == null) continue; |
| if (typeof v === "string" && v.length > 500) { |
| small[k] = `${v.slice(0, 480)}...`; |
| } else if (typeof v === "object") { |
| const s = JSON.stringify(v); |
| if (s.length < 600) small[k] = v; |
| } else { |
| small[k] = v; |
| } |
| } |
| return small; |
| } |
| return null; |
| } |
|
|
| async function applyTerminal( |
| row: ResearchJob, |
| patch: { |
| status: PublicJob["status"]; |
| errorMessage: string | null; |
| resultSummary: Record<string, unknown> | null; |
| agentRun: AgentRunSnapshot | null; |
| }, |
| ): Promise<void> { |
| const finishedAt = new Date(); |
| const followupId = newId("msg"); |
| const followupText = buildFollowUpText( |
| { |
| toolName: row.toolName, |
| label: row.label, |
| taskId: row.taskId, |
| status: patch.status, |
| errorMessage: patch.errorMessage, |
| resultSummary: patch.resultSummary, |
| }, |
| (row.language as "zh" | "en") || "zh", |
| ); |
|
|
| |
| |
| |
| |
| |
| |
| |
| const updated = await db |
| .update(researchJobs) |
| .set({ |
| status: patch.status, |
| errorMessage: patch.errorMessage, |
| resultSummary: patch.resultSummary, |
| agentRun: patch.agentRun, |
| finishedAt, |
| lastPolledAt: finishedAt, |
| }) |
| .where( |
| and( |
| eq(researchJobs.id, row.id), |
| inArray(researchJobs.status, ["queued", "running"]), |
| ), |
| ) |
| .returning(); |
| let fresh = updated[0]; |
| if (!fresh) return; |
|
|
| let postedMessageId: string | null = null; |
| if (followupText) { |
| const blocks: ContentBlock[] = [{ type: "text", text: followupText }]; |
| try { |
| await db.insert(messages).values({ |
| id: followupId, |
| conversationId: row.conversationId, |
| role: "assistant", |
| status: "complete", |
| content: blocks, |
| modelId: null, |
| stopReason: "job_followup", |
| }); |
| postedMessageId = followupId; |
| const linked = await db |
| .update(researchJobs) |
| .set({ followupMessageId: followupId }) |
| .where(eq(researchJobs.id, row.id)) |
| .returning(); |
| if (linked[0]) fresh = linked[0]; |
| } catch (err) { |
| logger.warn( |
| { err, jobId: row.id }, |
| "failed to insert job follow-up message", |
| ); |
| } |
| } |
|
|
| publishJobEvent({ |
| type: "job_status", |
| conversationId: fresh.conversationId, |
| job: toPublicJob(fresh), |
| }); |
| if (postedMessageId) { |
| publishJobEvent({ |
| type: "job_followup", |
| conversationId: fresh.conversationId, |
| job: toPublicJob(fresh), |
| messageId: postedMessageId, |
| }); |
| } |
| } |
|
|