/** * agent-supervisor — autonomous agent state for one assistant message. * * The chat loop in `runWithTools` is a single-LLM ReAct cycle. To turn * that into a Plan/Act/Observe/Reflect supervisor without doubling the * LLM call count, we use an inline-tag protocol: the model is instructed * to emit `...`, `...` and * `...` blocks (each on its own line) inside its * normal text stream. This module consumes the streamed text, extracts * those blocks, hides them from the user-visible delta, validates them * against the actual tool calls performed, and persists everything to * the `agent_runs` table. * * It also owns two per-turn safeguards: * - circuit breaker: after N transient failures from a single tool, the * remaining calls to that tool short-circuit with a structured * `circuit_open` payload that suggests alternate sources; * - per-turn cache: identical (tool, args) calls reuse the prior * successful result so retries with reworded queries don't re-hit * the same upstream. */ import { eq } from "drizzle-orm"; import { db, agentRuns, conversations, type AgentRunRow, } from "@workspace/db"; import { newId } from "./ids"; import { logger } from "./logger"; // ----------------------------------------------------------------- types export interface AgentPlanStep { id: string; goal: string; candidate_tools?: string[]; success_criteria?: string; } export interface AgentPlan { steps: AgentPlanStep[]; } export type AgentStepStatus = | "pending" | "running" | "done" | "failed" | "skipped"; export interface AgentEvidenceRef { /** "url" | "pmid" | "doi" | "uniprot" | "ot" | "id" | "numeric" | "result" */ type: string; ref: string; tool: string; } export interface AgentStepState { id: string; goal: string; status: AgentStepStatus; candidate_tools?: string[]; success_criteria?: string; note?: string; evidence_count: number; evidence_refs?: AgentEvidenceRef[]; updated_at: string; } export interface AgentToolRoute { iteration: number; step_id: string | null; tool_call_id: string; tool_name: string; args_summary: string; outcome: "success" | "error" | "circuit_open" | "cache_hit"; error_code?: string; retryable?: boolean; attempts?: number; duration_ms: number; at: string; } export interface AgentReflection { step_states?: Array<{ id: string; status: AgentStepStatus; evidence_count?: number; }>; unresolved?: Array<{ id: string; reason?: string }>; confidence?: number; summary?: string; } export interface AgentValidator { passed: boolean; downgraded: Array<{ id: string; from: AgentStepStatus; reason: string }>; notes: string[]; } export interface PublicAgentRun { id: string; conversation_id: string; message_id: string; status: "running" | "complete" | "error"; plan: AgentPlan | null; steps: AgentStepState[]; tool_routes: AgentToolRoute[]; reflection: AgentReflection | null; validator: AgentValidator | null; iterations: number; input_tokens: number; output_tokens: number; started_at: string; finished_at: string | null; /** * Long-term memory usage for this turn. Only populated on freshly * generated runs (lazy historical loads return `null`). */ memory?: AgentMemoryUsage | null; } export interface AgentMemoryUnresolved { id: string; goal: string; note?: string; source_run_id: string; recorded_at: string; } export interface AgentMemoryRecord { unresolved_steps: AgentMemoryUnresolved[]; updated_at: string; } export interface LearnedMemoryFact { kind: string; content: string; confidence: number; salience: number; persisted_id?: string | null; } /** A single long-term memory fact injected into this turn's prompt. */ export interface InjectedMemoryFact { id: string; kind: string; content: string; } /** Per-turn metadata about long-term memory injection + extraction. */ export interface AgentMemoryUsage { enabled: boolean; used_fact_ids: string[]; injected: InjectedMemoryFact[]; injected_tokens: number; injected_count: number; learned: LearnedMemoryFact[]; persisted_count: number; } // ----------------------------------------------------------------- constants export const CIRCUIT_BREAKER_THRESHOLD = 2; const TRANSIENT_ERROR_CODES = new Set([ "rate_limited", "upstream_unavailable", "network_error", "timeout", ]); // ----------------------------------------------------------------- AgentRunState type EmitFn = (ev: { event: string; data: Record }) => void; export class AgentRunState { readonly id: string; readonly conversationId: string; readonly userId: string; readonly messageId: string; readonly startedAt: Date; plan: AgentPlan | null = null; steps: Map = new Map(); toolRoutes: AgentToolRoute[] = []; reflection: AgentReflection | null = null; validator: AgentValidator | null = null; iterations = 0; /** Auto-extracted memory facts emitted via `` this turn. */ learnedFacts: LearnedMemoryFact[] = []; /** Set by messages.ts when memory is enabled — gates auto-extraction. */ memoryAutoExtractEnabled = false; // per-turn safety nets private failureCounts: Map = new Map(); private openCircuits: Set = new Set(); private resultCache: Map = new Map(); // streaming-tag parser state private pendingLine = ""; private inZone: { tag: TagName; buffer: string } | null = null; // currently active step (set on the most recent step_update->running) private activeStepId: string | null = null; private emit: EmitFn; constructor(args: { conversationId: string; userId: string; messageId: string; emit: EmitFn; }) { this.id = newId("arun"); this.conversationId = args.conversationId; this.userId = args.userId; this.messageId = args.messageId; this.startedAt = new Date(); this.emit = args.emit; } // ------------------------------------------------------ tag streaming /** * Feed a raw text delta from the LLM stream. Returns the substring that * is safe to forward to the user (with `` / `` / * `` blocks removed). Side effects: parses any closed tag * blocks and emits `agent_plan` / `agent_step` / `agent_reflection` * SSE events. * * Tag blocks must be on their own lines, e.g. * * {"steps":[...]} */ ingestTextDelta(delta: string): string { if (!delta) return ""; const combined = this.pendingLine + delta; let visible = ""; let cursor = 0; while (true) { const nl = combined.indexOf("\n", cursor); if (nl < 0) { // partial last line — keep buffered for next ingest this.pendingLine = combined.slice(cursor); break; } const line = combined.slice(cursor, nl); cursor = nl + 1; const out = this.processLine(line, /* isComplete */ true); if (out !== null) visible += out + "\n"; } return visible; } /** Drain any partial buffered line at the end of an LLM iteration. */ flushPending(): string { if (!this.pendingLine) return ""; const out = this.processLine(this.pendingLine, /* isComplete */ false); this.pendingLine = ""; return out ?? ""; } private processLine(line: string, isComplete: boolean): string | null { // Already inside a structured zone — keep accumulating until close tag if (this.inZone) { const close = ``; const idx = line.indexOf(close); if (idx < 0) { this.inZone.buffer += line + "\n"; return null; } this.inZone.buffer += line.slice(0, idx); const tag = this.inZone.tag; const json = this.inZone.buffer.trim(); this.inZone = null; this.handleTagJson(tag, json); const tail = line.slice(idx + close.length); // Tail after close tag on the same line — treat as a fresh line so // recursion handles a same-line opening of another tag. if (tail) return this.processLine(tail, isComplete); return null; } const trimmed = line.trim(); // Detect a tag opener. Allow opening + closing on the same line. for (const tag of ALL_TAGS) { const open = `<${tag}>`; const close = ``; if (trimmed.startsWith(open)) { const restAfterOpen = trimmed.slice(open.length); const closeIdx = restAfterOpen.indexOf(close); if (closeIdx >= 0) { const json = restAfterOpen.slice(0, closeIdx).trim(); this.handleTagJson(tag, json); return null; } // Tag opened but not closed on this line — enter zone this.inZone = { tag, buffer: restAfterOpen + "\n" }; return null; } } return line; } private handleTagJson(tag: TagName, json: string) { let parsed: unknown; try { parsed = JSON.parse(json); } catch (err) { logger.debug({ tag, jsonLen: json.length, err }, "agent tag parse failed"); return; } if (tag === "plan") { this.applyPlan(parsed); } else if (tag === "step_update") { this.applyStepUpdate(parsed); } else if (tag === "reflection") { this.applyReflection(parsed); } else if (tag === "memory_fact") { this.applyMemoryFact(parsed); } } private applyMemoryFact(raw: unknown) { if (!this.memoryAutoExtractEnabled) return; if (this.learnedFacts.length >= MAX_LEARNED_FACTS_PER_TURN) return; if (!raw || typeof raw !== "object") return; const r = raw as Record; const content = typeof r.content === "string" ? r.content.trim() : ""; if (!content) return; const kindRaw = typeof r.kind === "string" ? r.kind : "other"; const kind = VALID_MEMORY_KINDS.has(kindRaw) ? kindRaw : "other"; const confidence = typeof r.confidence === "number" && Number.isFinite(r.confidence) ? Math.max(0, Math.min(1, r.confidence)) : 0.7; const salience = typeof r.salience === "number" && Number.isFinite(r.salience) ? Math.max(0, Math.min(1, r.salience)) : 0.6; const fact: LearnedMemoryFact = { kind, content: content.length > MAX_FACT_CONTENT_CHARS ? content.slice(0, MAX_FACT_CONTENT_CHARS) : content, confidence, salience, }; this.learnedFacts.push(fact); this.emit({ event: "agent_memory_learned", data: { agent_run_id: this.id, message_id: this.messageId, fact, }, }); } private applyPlan(raw: unknown) { if (!raw || typeof raw !== "object") return; const stepsRaw = (raw as { steps?: unknown }).steps; if (!Array.isArray(stepsRaw)) return; const steps: AgentPlanStep[] = []; for (const s of stepsRaw) { if (!s || typeof s !== "object") continue; const r = s as Record; const id = typeof r.id === "string" && r.id ? r.id : `s${steps.length + 1}`; const goal = typeof r.goal === "string" ? r.goal : ""; if (!goal) continue; steps.push({ id, goal, candidate_tools: Array.isArray(r.candidate_tools) ? (r.candidate_tools.filter((x) => typeof x === "string") as string[]) : undefined, success_criteria: typeof r.success_criteria === "string" ? r.success_criteria : undefined, }); } if (!steps.length) return; // Only accept the first plan emitted in a turn — subsequent plans // would invalidate already-recorded step state. if (this.plan) return; this.plan = { steps }; const now = new Date().toISOString(); for (const s of steps) { this.steps.set(s.id, { id: s.id, goal: s.goal, status: "pending", candidate_tools: s.candidate_tools, success_criteria: s.success_criteria, evidence_count: 0, updated_at: now, }); } this.emit({ event: "agent_plan", data: { agent_run_id: this.id, message_id: this.messageId, plan: this.plan, steps: Array.from(this.steps.values()), }, }); } private applyStepUpdate(raw: unknown) { if (!raw || typeof raw !== "object") return; const r = raw as Record; const id = typeof r.id === "string" ? r.id : ""; if (!id) return; let step = this.steps.get(id); if (!step) { // Late step introduced after plan — accept it as a synthetic step step = { id, goal: typeof r.goal === "string" ? r.goal : id, status: "pending", evidence_count: 0, updated_at: new Date().toISOString(), }; this.steps.set(id, step); } const status = typeof r.status === "string" ? (r.status as AgentStepStatus) : null; if (status && VALID_STATUSES.has(status)) { step.status = status; if (status === "running") this.activeStepId = id; } if (typeof r.note === "string") step.note = r.note; step.updated_at = new Date().toISOString(); this.emit({ event: "agent_step", data: { agent_run_id: this.id, message_id: this.messageId, step: { ...step }, }, }); } private applyReflection(raw: unknown) { if (!raw || typeof raw !== "object") return; const r = raw as Record; const reflection: AgentReflection = {}; if (Array.isArray(r.step_states)) { reflection.step_states = (r.step_states as unknown[]) .map((s) => { if (!s || typeof s !== "object") return null; const x = s as Record; if (typeof x.id !== "string") return null; const status = typeof x.status === "string" ? (x.status as AgentStepStatus) : "pending"; if (!VALID_STATUSES.has(status)) return null; return { id: x.id, status, evidence_count: typeof x.evidence_count === "number" ? x.evidence_count : undefined, }; }) .filter((x): x is NonNullable => x !== null); } if (Array.isArray(r.unresolved)) { reflection.unresolved = (r.unresolved as unknown[]) .map((s) => { if (typeof s === "string") return { id: s }; if (s && typeof s === "object" && typeof (s as Record).id === "string") { const x = s as Record; return { id: x.id as string, reason: typeof x.reason === "string" ? x.reason : undefined }; } return null; }) .filter((x): x is NonNullable => x !== null); } if (typeof r.confidence === "number") reflection.confidence = r.confidence; if (typeof r.summary === "string") reflection.summary = r.summary; this.reflection = reflection; // Apply step_states to the live map so the panel reflects the model's // own self-assessment alongside any in-flight `step_update`s. if (reflection.step_states) { for (const s of reflection.step_states) { const step = this.steps.get(s.id); if (!step) continue; step.status = s.status; if (typeof s.evidence_count === "number") step.evidence_count = s.evidence_count; step.updated_at = new Date().toISOString(); } } this.emit({ event: "agent_reflection", data: { agent_run_id: this.id, message_id: this.messageId, reflection, steps: Array.from(this.steps.values()), }, }); } // ---------------------------------------------------- tool dispatch noteIteration() { this.iterations += 1; } /** Cache key for de-duplicating identical tool calls within a turn. */ private cacheKey(toolName: string, args: Record): string { let argStr = ""; try { argStr = JSON.stringify(args); } catch { argStr = String(Object.keys(args).sort().join(",")); } return `${toolName}::${argStr}`; } /** Returns a cached successful result if one exists for this call. */ cacheLookup(toolName: string, args: Record): unknown | undefined { return this.resultCache.get(this.cacheKey(toolName, args)); } /** * Compute the per-source key used by the circuit breaker. We include the * concrete upstream identifier (research-engine `task_mode`, an explicit * `source` argument, or the underlying engine endpoint) so a single * misbehaving source does not trip the breaker for every other variant * of the same tool. Falls back to the bare tool name when no * disambiguator is available. */ circuitKey(toolName: string, args: Record): string { const sub = typeof args.task_mode === "string" && args.task_mode ? `task_mode:${args.task_mode}` : typeof args.source === "string" && args.source ? `source:${args.source}` : typeof args.endpoint === "string" && args.endpoint ? `endpoint:${args.endpoint}` : typeof args.database === "string" && args.database ? `db:${args.database}` : null; return sub ? `${toolName}#${sub}` : toolName; } circuitCheck( toolName: string, args: Record = {}, ): null | { error: string; error_code: "circuit_open"; retryable: false; source: string; suggestion: string; } { const key = this.circuitKey(toolName, args); if (!this.openCircuits.has(key)) return null; return { error: `Source ${key} disabled for this turn after repeated transient failures.`, error_code: "circuit_open", retryable: false, source: key, suggestion: alternativeSuggestion(toolName), }; } /** * Record the outcome of a tool call — update circuit-breaker counters, * cache successful results, and append a tool route entry. */ recordToolOutcome(args: { toolCallId: string; toolName: string; args: Record; result: unknown; isError: boolean; durationMs: number; fromCache?: boolean; fromCircuit?: boolean; }) { const errCode = errorCodeOf(args.result, args.isError); const transient = errCode ? TRANSIENT_ERROR_CODES.has(errCode) : false; const sourceKey = this.circuitKey(args.toolName, args.args); if (transient) { const next = (this.failureCounts.get(sourceKey) ?? 0) + 1; this.failureCounts.set(sourceKey, next); if (next >= CIRCUIT_BREAKER_THRESHOLD) { this.openCircuits.add(sourceKey); } } else if (!args.isError) { // Cache successful results for the rest of the turn. this.resultCache.set(this.cacheKey(args.toolName, args.args), args.result); // Successful call also bumps evidence count for the active step and // extracts any source-linked refs (URLs / PMIDs / accessions) so the // validator can later check that "done" steps actually have evidence. if (this.activeStepId) { const step = this.steps.get(this.activeStepId); if (step) { step.evidence_count += 1; const refs = extractEvidenceRefs(args.result, args.toolName); if (refs.length) { step.evidence_refs = (step.evidence_refs ?? []).concat(refs).slice(0, 25); } step.updated_at = new Date().toISOString(); } } } const route: AgentToolRoute = { iteration: this.iterations, step_id: this.activeStepId, tool_call_id: args.toolCallId, tool_name: args.toolName, args_summary: summariseArgs(args.args), outcome: args.fromCircuit ? "circuit_open" : args.fromCache ? "cache_hit" : args.isError ? "error" : "success", error_code: errCode ?? undefined, retryable: args.isError && args.result && typeof args.result === "object" ? Boolean((args.result as Record).retryable) : undefined, attempts: args.result && typeof args.result === "object" && typeof (args.result as Record).attempts === "number" ? ((args.result as Record).attempts as number) : undefined, duration_ms: args.durationMs, at: new Date().toISOString(), }; this.toolRoutes.push(route); this.emit({ event: "agent_tool_route", data: { agent_run_id: this.id, message_id: this.messageId, route, }, }); } // ---------------------------------------------------- validation + persist /** * Reconcile model-claimed step states with the actual tool route log * and the evidence collected from successful tool results. A `done` * step is downgraded to `failed` when *any* of these machine-checkable * conditions hold: * * - no successful tool route was recorded for the step * (`reason: "no_evidence"`), * - no source-linked evidence reference (URL / PMID / DOI / accession * / structured id) is present on the step * (`reason: "no_source_link"`), * - the step's `success_criteria` calls out a numeric quantity but * none of the captured evidence carries a numeric ref * (`reason: "missing_numeric_evidence"`), * - the model's own reflection lists this step under `unresolved` * while the plan still claims `done` * (`reason: "reflection_inconsistent"`). * * The validator never *upgrades* — it only downgrades and records * notes. Recovery (one extra tool round) is the caller's job and is * driven by `needsRecoveryRound()`. */ runValidator(): AgentValidator { const downgraded: AgentValidator["downgraded"] = []; const notes: string[] = []; const successByStep = new Map(); for (const r of this.toolRoutes) { if (r.outcome !== "success" && r.outcome !== "cache_hit") continue; if (!r.step_id) continue; successByStep.set(r.step_id, (successByStep.get(r.step_id) ?? 0) + 1); } const reflectionUnresolved = new Set( (this.reflection?.unresolved ?? []).map((u) => u.id), ); for (const step of this.steps.values()) { if (step.status !== "done") continue; const reasons: string[] = []; const success = successByStep.get(step.id) ?? 0; if (success === 0) reasons.push("no_evidence"); const refs = step.evidence_refs ?? []; const hasSourceLink = refs.some( (r) => r.type === "url" || r.type === "pmid" || r.type === "doi" || r.type === "uniprot" || r.type === "ot" || r.type === "id", ); if (success > 0 && !hasSourceLink) reasons.push("no_source_link"); if ( mentionsNumericClaim(step.success_criteria, step.goal) && !refs.some((r) => r.type === "numeric") ) { reasons.push("missing_numeric_evidence"); } if (reflectionUnresolved.has(step.id)) { reasons.push("reflection_inconsistent"); } if (reasons.length === 0) continue; const reason = reasons[0]!; downgraded.push({ id: step.id, from: step.status, reason }); step.status = "failed"; const noteSuffix = `downgraded: ${reasons.join(", ")}`; step.note = step.note ? `${step.note} | ${noteSuffix}` : noteSuffix; step.updated_at = new Date().toISOString(); } if (this.openCircuits.size) { notes.push( `Circuit breaker tripped for: ${Array.from(this.openCircuits).join(", ")}`, ); } const passed = downgraded.length === 0; this.validator = { passed, downgraded, notes }; return this.validator; } /** True when at least one step needs another tool call to satisfy. */ needsRecoveryRound(): boolean { if (!this.validator) return false; return this.validator.downgraded.length > 0; } /** * Emit a consolidated `agent_run_snapshot` SSE event reflecting the * current plan / step / tool-route state. Called after every iteration * of the runWithTools loop so an in-chat panel that drops a delta (or * a client that reconnects mid-run) can rehydrate from a single payload * instead of having to replay every individual delta. */ emitSnapshot(args: { status: PublicAgentRun["status"]; finishedAt?: Date | null; inputTokens: number; outputTokens: number; memory?: AgentMemoryUsage | null; }): void { this.emit({ event: "agent_run_snapshot", data: { agent_run: this.toPublic( args.status, args.finishedAt ?? null, { input: args.inputTokens, output: args.outputTokens }, args.memory ?? null, ), }, }); } toPublic( status: PublicAgentRun["status"], finishedAt: Date | null, tokens: { input: number; output: number }, memory: AgentMemoryUsage | null = null, ): PublicAgentRun { return { id: this.id, conversation_id: this.conversationId, message_id: this.messageId, status, plan: this.plan, steps: Array.from(this.steps.values()), tool_routes: this.toolRoutes, reflection: this.reflection, validator: this.validator, iterations: this.iterations, input_tokens: tokens.input, output_tokens: tokens.output, started_at: this.startedAt.toISOString(), finished_at: finishedAt ? finishedAt.toISOString() : null, memory, }; } /** * Upsert the agent_run row. Called after each iteration so the panel * survives client refreshes mid-turn, and again at the end with the * final terminal state. */ async persist(args: { status: PublicAgentRun["status"]; finishedAt: Date | null; inputTokens: number; outputTokens: number; }): Promise { const payload = { id: this.id, conversationId: this.conversationId, userId: this.userId, messageId: this.messageId, status: args.status, plan: this.plan as unknown, steps: Array.from(this.steps.values()) as unknown, toolRoutes: this.toolRoutes as unknown, reflection: this.reflection as unknown, validator: this.validator as unknown, inputTokens: args.inputTokens, outputTokens: args.outputTokens, iterations: this.iterations, startedAt: this.startedAt, finishedAt: args.finishedAt, }; try { await db .insert(agentRuns) .values(payload as never) .onConflictDoUpdate({ target: agentRuns.id, set: { status: payload.status, plan: payload.plan as never, steps: payload.steps as never, toolRoutes: payload.toolRoutes as never, reflection: payload.reflection as never, validator: payload.validator as never, inputTokens: payload.inputTokens, outputTokens: payload.outputTokens, iterations: payload.iterations, finishedAt: payload.finishedAt, }, }); } catch (err) { logger.warn({ err, agentRunId: this.id }, "agent_run persist failed"); } } /** * Build the working-memory record that would be persisted for the next * turn. Returns null when there is nothing left unresolved (callers are * expected to clear stored memory in that case). Pure / DB-free so the * cross-turn carry-over is testable without spinning up Postgres. */ buildPendingWorkingMemory(): AgentMemoryRecord | null { const unresolved: AgentMemoryUnresolved[] = []; const reflectionUnresolved = new Set( (this.reflection?.unresolved ?? []).map((u) => u.id), ); for (const step of this.steps.values()) { const isLeftover = step.status === "failed" || step.status === "skipped" || reflectionUnresolved.has(step.id); if (!isLeftover) continue; unresolved.push({ id: step.id, goal: step.goal, note: step.note, source_run_id: this.id, recorded_at: new Date().toISOString(), }); } if (!unresolved.length) return null; return { unresolved_steps: unresolved.slice(0, 10), updated_at: new Date().toISOString(), }; } /** * Persist unresolved/failed steps into the conversation's working * memory so the next turn's system prompt can carry them forward. */ async writeWorkingMemory(): Promise { const record = this.buildPendingWorkingMemory(); if (!record) { // Clear stale memory once all prior items are resolved. try { await db .update(conversations) .set({ agentMemory: null as unknown as object }) .where(eq(conversations.id, this.conversationId)); } catch (err) { logger.warn({ err }, "clear agent_memory failed"); } return; } try { await db .update(conversations) .set({ agentMemory: record as unknown as object }) .where(eq(conversations.id, this.conversationId)); } catch (err) { logger.warn({ err }, "write agent_memory failed"); } } } // ----------------------------------------------------------------- helpers const VALID_STATUSES: Set = new Set([ "pending", "running", "done", "failed", "skipped", ]); type TagName = "plan" | "step_update" | "reflection" | "memory_fact"; const ALL_TAGS: TagName[] = ["plan", "step_update", "reflection", "memory_fact"]; const VALID_MEMORY_KINDS: ReadonlySet = new Set([ "preference", "fact", "interest", "domain", "terminology", "summary", ]); /** Hard cap on auto-extracted memory facts per turn. */ export const MAX_LEARNED_FACTS_PER_TURN = 5; const MAX_FACT_CONTENT_CHARS = 500; function errorCodeOf(result: unknown, isError: boolean): string | null { if (!isError) return null; if (!result || typeof result !== "object") return null; const code = (result as Record).error_code; return typeof code === "string" ? code : null; } function summariseArgs(args: Record): string { try { const json = JSON.stringify(args); return json.length > 160 ? json.slice(0, 157) + "..." : json; } catch { return Object.keys(args).join(","); } } /** * Heuristic detector for numeric claims in a step's goal / success * criteria. Triggers on explicit digits, percent signs, and bilingual * keywords like "数量 / 个数 / count / number / how many". Used by the * validator to enforce numeric-claim traceability. */ function mentionsNumericClaim( criteria: string | undefined, goal: string | undefined, ): boolean { const text = `${criteria ?? ""}\n${goal ?? ""}`.toLowerCase(); if (!text.trim()) return false; if (/\d/.test(text)) return true; if (/%/.test(text)) return true; return /(count|number|how many|n=|总数|数量|个数|条数|篇数|多少)/i.test( text, ); } const URL_REGEX = /https?:\/\/[^\s"'<>)]+/gi; const PMID_REGEX = /\b(?:pmid|pubmed[_\s-]*id)[:\s]*([0-9]{4,9})\b/gi; const DOI_REGEX = /\b10\.\d{4,9}\/[\w./()\-:;]+/gi; const UNIPROT_REGEX = /\b[OPQ][0-9][A-Z0-9]{3}[0-9]\b|\b[A-NR-Z][0-9](?:[A-Z][A-Z0-9]{2}[0-9]){1,2}\b/g; const OT_REGEX = /\b(?:ENSG\d{6,}|EFO_\d{4,}|MONDO_\d{4,})\b/g; /** * Pull source-linked evidence refs out of a successful tool result. We * do a depth-limited traversal so a deeply nested object (e.g. a full * PubMed hit list) still yields useful refs without blowing the stack * or copying megabytes of payload. */ function extractEvidenceRefs( result: unknown, toolName: string, ): AgentEvidenceRef[] { if (!result || (typeof result !== "object" && typeof result !== "string")) { return []; } const refs: AgentEvidenceRef[] = []; const seen = new Set(); const push = (type: string, ref: string) => { const key = `${type}:${ref}`; if (seen.has(key)) return; seen.add(key); refs.push({ type, ref, tool: toolName }); }; const visit = (node: unknown, depth: number) => { if (refs.length >= 25) return; if (depth > 4 || node == null) return; if (typeof node === "string") { let m: RegExpExecArray | null; URL_REGEX.lastIndex = 0; while ((m = URL_REGEX.exec(node))) push("url", m[0]); PMID_REGEX.lastIndex = 0; while ((m = PMID_REGEX.exec(node))) push("pmid", m[1]!); DOI_REGEX.lastIndex = 0; while ((m = DOI_REGEX.exec(node))) push("doi", m[0]); UNIPROT_REGEX.lastIndex = 0; while ((m = UNIPROT_REGEX.exec(node))) push("uniprot", m[0]); OT_REGEX.lastIndex = 0; while ((m = OT_REGEX.exec(node))) push("ot", m[0]); return; } if (typeof node === "number" && Number.isFinite(node)) { push("numeric", String(node)); return; } if (Array.isArray(node)) { for (const x of node.slice(0, 20)) visit(x, depth + 1); return; } if (typeof node === "object") { for (const [k, v] of Object.entries(node as Record)) { const lk = k.toLowerCase(); if (typeof v === "string") { if (lk === "pmid") push("pmid", v); else if (lk === "doi") push("doi", v); else if (lk === "url" || lk.endsWith("_url")) push("url", v); else if (lk === "accession" || lk === "uniprot_id") push("uniprot", v); else if (lk === "id" || lk.endsWith("_id")) push("id", v); } if (typeof v === "number" && Number.isFinite(v)) { if (lk === "count" || lk === "n" || lk === "total" || lk.endsWith("_count")) { push("numeric", `${k}=${v}`); } } visit(v, depth + 1); } } }; visit(result, 0); return refs; } function alternativeSuggestion(toolName: string): string { switch (toolName) { case "search_pubmed": return "Try query_opentargets for target↔disease evidence or lookup_uniprot for protein metadata."; case "lookup_uniprot": return "Try search_pubmed for the same protein name, or query_opentargets if you need disease links."; case "query_opentargets": return "Try search_pubmed with a clinical/mechanistic query, or create_research_task for a structured pipeline."; default: return "Switch to an alternative tool or rephrase the query for a different source."; } } /** * Format the next-turn system-prompt section that carries forward * unresolved steps. Returns an empty string when there is nothing to * carry forward, so callers can `if (text) sysParts.push(text)`. * * Exported so tests can verify the cross-turn working-memory payload * actually lands in the prompt without having to spin up the full * /messages/stream pipeline. */ export function formatCarryOverPrompt( memory: AgentMemoryRecord | null, ): string { if (!memory || !memory.unresolved_steps.length) return ""; const lines = memory.unresolved_steps .map((u) => `- ${u.id}: ${u.goal}${u.note ? ` (${u.note})` : ""}`) .join("\n"); return ( "Carry-over from the previous turn — these subgoals were left " + "unresolved or failed. If the user has not redirected you, " + "pick them up; otherwise acknowledge briefly and proceed:\n" + lines ); } // ----------------------------------------------------------------- public lookup export async function loadAgentRunByMessage( messageId: string, userId: string, ): Promise { const rows = await db .select() .from(agentRuns) .where(eq(agentRuns.messageId, messageId)) .limit(1); const row = rows[0]; if (!row) return null; if (row.userId !== userId) return null; return rowToPublic(row); } export function rowToPublic(row: AgentRunRow): PublicAgentRun { return { id: row.id, conversation_id: row.conversationId, message_id: row.messageId, status: (row.status as PublicAgentRun["status"]) ?? "complete", plan: (row.plan as AgentPlan | null) ?? null, steps: Array.isArray(row.steps) ? (row.steps as AgentStepState[]) : [], tool_routes: Array.isArray(row.toolRoutes) ? (row.toolRoutes as AgentToolRoute[]) : [], reflection: (row.reflection as AgentReflection | null) ?? null, validator: (row.validator as AgentValidator | null) ?? null, iterations: row.iterations, input_tokens: row.inputTokens, output_tokens: row.outputTokens, started_at: row.startedAt.toISOString(), finished_at: row.finishedAt ? row.finishedAt.toISOString() : null, memory: null, }; } export async function loadConversationAgentMemory( conversationId: string, ): Promise { const rows = await db .select({ agentMemory: conversations.agentMemory }) .from(conversations) .where(eq(conversations.id, conversationId)) .limit(1); const m = rows[0]?.agentMemory as AgentMemoryRecord | null | undefined; if (!m || !Array.isArray(m.unresolved_steps) || !m.unresolved_steps.length) { return null; } return m; }