doatlas-2 / artifacts /api-server /src /lib /agent-supervisor.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* agent-supervisor — autonomous agent state for one assistant message.
*
* The chat loop in `runWithTools` is a single-LLM ReAct cycle. To turn
* that into a Plan/Act/Observe/Reflect supervisor without doubling the
* LLM call count, we use an inline-tag protocol: the model is instructed
* to emit `<plan>...</plan>`, `<step_update>...</step_update>` and
* `<reflection>...</reflection>` blocks (each on its own line) inside its
* normal text stream. This module consumes the streamed text, extracts
* those blocks, hides them from the user-visible delta, validates them
* against the actual tool calls performed, and persists everything to
* the `agent_runs` table.
*
* It also owns two per-turn safeguards:
* - circuit breaker: after N transient failures from a single tool, the
* remaining calls to that tool short-circuit with a structured
* `circuit_open` payload that suggests alternate sources;
* - per-turn cache: identical (tool, args) calls reuse the prior
* successful result so retries with reworded queries don't re-hit
* the same upstream.
*/
import { eq } from "drizzle-orm";
import {
db,
agentRuns,
conversations,
type AgentRunRow,
} from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
// ----------------------------------------------------------------- types
export interface AgentPlanStep {
id: string;
goal: string;
candidate_tools?: string[];
success_criteria?: string;
}
export interface AgentPlan {
steps: AgentPlanStep[];
}
export type AgentStepStatus =
| "pending"
| "running"
| "done"
| "failed"
| "skipped";
export interface AgentEvidenceRef {
/** "url" | "pmid" | "doi" | "uniprot" | "ot" | "id" | "numeric" | "result" */
type: string;
ref: string;
tool: string;
}
export interface AgentStepState {
id: string;
goal: string;
status: AgentStepStatus;
candidate_tools?: string[];
success_criteria?: string;
note?: string;
evidence_count: number;
evidence_refs?: AgentEvidenceRef[];
updated_at: string;
}
export interface AgentToolRoute {
iteration: number;
step_id: string | null;
tool_call_id: string;
tool_name: string;
args_summary: string;
outcome: "success" | "error" | "circuit_open" | "cache_hit";
error_code?: string;
retryable?: boolean;
attempts?: number;
duration_ms: number;
at: string;
}
export interface AgentReflection {
step_states?: Array<{
id: string;
status: AgentStepStatus;
evidence_count?: number;
}>;
unresolved?: Array<{ id: string; reason?: string }>;
confidence?: number;
summary?: string;
}
export interface AgentValidator {
passed: boolean;
downgraded: Array<{ id: string; from: AgentStepStatus; reason: string }>;
notes: string[];
}
export interface PublicAgentRun {
id: string;
conversation_id: string;
message_id: string;
status: "running" | "complete" | "error";
plan: AgentPlan | null;
steps: AgentStepState[];
tool_routes: AgentToolRoute[];
reflection: AgentReflection | null;
validator: AgentValidator | null;
iterations: number;
input_tokens: number;
output_tokens: number;
started_at: string;
finished_at: string | null;
/**
* Long-term memory usage for this turn. Only populated on freshly
* generated runs (lazy historical loads return `null`).
*/
memory?: AgentMemoryUsage | null;
}
export interface AgentMemoryUnresolved {
id: string;
goal: string;
note?: string;
source_run_id: string;
recorded_at: string;
}
export interface AgentMemoryRecord {
unresolved_steps: AgentMemoryUnresolved[];
updated_at: string;
}
export interface LearnedMemoryFact {
kind: string;
content: string;
confidence: number;
salience: number;
persisted_id?: string | null;
}
/** A single long-term memory fact injected into this turn's prompt. */
export interface InjectedMemoryFact {
id: string;
kind: string;
content: string;
}
/** Per-turn metadata about long-term memory injection + extraction. */
export interface AgentMemoryUsage {
enabled: boolean;
used_fact_ids: string[];
injected: InjectedMemoryFact[];
injected_tokens: number;
injected_count: number;
learned: LearnedMemoryFact[];
persisted_count: number;
}
// ----------------------------------------------------------------- constants
export const CIRCUIT_BREAKER_THRESHOLD = 2;
const TRANSIENT_ERROR_CODES = new Set([
"rate_limited",
"upstream_unavailable",
"network_error",
"timeout",
]);
// ----------------------------------------------------------------- AgentRunState
type EmitFn = (ev: { event: string; data: Record<string, unknown> }) => void;
export class AgentRunState {
readonly id: string;
readonly conversationId: string;
readonly userId: string;
readonly messageId: string;
readonly startedAt: Date;
plan: AgentPlan | null = null;
steps: Map<string, AgentStepState> = new Map();
toolRoutes: AgentToolRoute[] = [];
reflection: AgentReflection | null = null;
validator: AgentValidator | null = null;
iterations = 0;
/** Auto-extracted memory facts emitted via `<memory_fact>` this turn. */
learnedFacts: LearnedMemoryFact[] = [];
/** Set by messages.ts when memory is enabled — gates auto-extraction. */
memoryAutoExtractEnabled = false;
// per-turn safety nets
private failureCounts: Map<string, number> = new Map();
private openCircuits: Set<string> = new Set();
private resultCache: Map<string, unknown> = new Map();
// streaming-tag parser state
private pendingLine = "";
private inZone: { tag: TagName; buffer: string } | null = null;
// currently active step (set on the most recent step_update->running)
private activeStepId: string | null = null;
private emit: EmitFn;
constructor(args: {
conversationId: string;
userId: string;
messageId: string;
emit: EmitFn;
}) {
this.id = newId("arun");
this.conversationId = args.conversationId;
this.userId = args.userId;
this.messageId = args.messageId;
this.startedAt = new Date();
this.emit = args.emit;
}
// ------------------------------------------------------ tag streaming
/**
* Feed a raw text delta from the LLM stream. Returns the substring that
* is safe to forward to the user (with `<plan>` / `<step_update>` /
* `<reflection>` blocks removed). Side effects: parses any closed tag
* blocks and emits `agent_plan` / `agent_step` / `agent_reflection`
* SSE events.
*
* Tag blocks must be on their own lines, e.g.
*
* <plan>{"steps":[...]}</plan>
*/
ingestTextDelta(delta: string): string {
if (!delta) return "";
const combined = this.pendingLine + delta;
let visible = "";
let cursor = 0;
while (true) {
const nl = combined.indexOf("\n", cursor);
if (nl < 0) {
// partial last line — keep buffered for next ingest
this.pendingLine = combined.slice(cursor);
break;
}
const line = combined.slice(cursor, nl);
cursor = nl + 1;
const out = this.processLine(line, /* isComplete */ true);
if (out !== null) visible += out + "\n";
}
return visible;
}
/** Drain any partial buffered line at the end of an LLM iteration. */
flushPending(): string {
if (!this.pendingLine) return "";
const out = this.processLine(this.pendingLine, /* isComplete */ false);
this.pendingLine = "";
return out ?? "";
}
private processLine(line: string, isComplete: boolean): string | null {
// Already inside a structured zone — keep accumulating until close tag
if (this.inZone) {
const close = `</${this.inZone.tag}>`;
const idx = line.indexOf(close);
if (idx < 0) {
this.inZone.buffer += line + "\n";
return null;
}
this.inZone.buffer += line.slice(0, idx);
const tag = this.inZone.tag;
const json = this.inZone.buffer.trim();
this.inZone = null;
this.handleTagJson(tag, json);
const tail = line.slice(idx + close.length);
// Tail after close tag on the same line — treat as a fresh line so
// recursion handles a same-line opening of another tag.
if (tail) return this.processLine(tail, isComplete);
return null;
}
const trimmed = line.trim();
// Detect a tag opener. Allow opening + closing on the same line.
for (const tag of ALL_TAGS) {
const open = `<${tag}>`;
const close = `</${tag}>`;
if (trimmed.startsWith(open)) {
const restAfterOpen = trimmed.slice(open.length);
const closeIdx = restAfterOpen.indexOf(close);
if (closeIdx >= 0) {
const json = restAfterOpen.slice(0, closeIdx).trim();
this.handleTagJson(tag, json);
return null;
}
// Tag opened but not closed on this line — enter zone
this.inZone = { tag, buffer: restAfterOpen + "\n" };
return null;
}
}
return line;
}
private handleTagJson(tag: TagName, json: string) {
let parsed: unknown;
try {
parsed = JSON.parse(json);
} catch (err) {
logger.debug({ tag, jsonLen: json.length, err }, "agent tag parse failed");
return;
}
if (tag === "plan") {
this.applyPlan(parsed);
} else if (tag === "step_update") {
this.applyStepUpdate(parsed);
} else if (tag === "reflection") {
this.applyReflection(parsed);
} else if (tag === "memory_fact") {
this.applyMemoryFact(parsed);
}
}
private applyMemoryFact(raw: unknown) {
if (!this.memoryAutoExtractEnabled) return;
if (this.learnedFacts.length >= MAX_LEARNED_FACTS_PER_TURN) return;
if (!raw || typeof raw !== "object") return;
const r = raw as Record<string, unknown>;
const content = typeof r.content === "string" ? r.content.trim() : "";
if (!content) return;
const kindRaw = typeof r.kind === "string" ? r.kind : "other";
const kind = VALID_MEMORY_KINDS.has(kindRaw) ? kindRaw : "other";
const confidence =
typeof r.confidence === "number" && Number.isFinite(r.confidence)
? Math.max(0, Math.min(1, r.confidence))
: 0.7;
const salience =
typeof r.salience === "number" && Number.isFinite(r.salience)
? Math.max(0, Math.min(1, r.salience))
: 0.6;
const fact: LearnedMemoryFact = {
kind,
content:
content.length > MAX_FACT_CONTENT_CHARS
? content.slice(0, MAX_FACT_CONTENT_CHARS)
: content,
confidence,
salience,
};
this.learnedFacts.push(fact);
this.emit({
event: "agent_memory_learned",
data: {
agent_run_id: this.id,
message_id: this.messageId,
fact,
},
});
}
private applyPlan(raw: unknown) {
if (!raw || typeof raw !== "object") return;
const stepsRaw = (raw as { steps?: unknown }).steps;
if (!Array.isArray(stepsRaw)) return;
const steps: AgentPlanStep[] = [];
for (const s of stepsRaw) {
if (!s || typeof s !== "object") continue;
const r = s as Record<string, unknown>;
const id = typeof r.id === "string" && r.id ? r.id : `s${steps.length + 1}`;
const goal = typeof r.goal === "string" ? r.goal : "";
if (!goal) continue;
steps.push({
id,
goal,
candidate_tools: Array.isArray(r.candidate_tools)
? (r.candidate_tools.filter((x) => typeof x === "string") as string[])
: undefined,
success_criteria:
typeof r.success_criteria === "string" ? r.success_criteria : undefined,
});
}
if (!steps.length) return;
// Only accept the first plan emitted in a turn — subsequent plans
// would invalidate already-recorded step state.
if (this.plan) return;
this.plan = { steps };
const now = new Date().toISOString();
for (const s of steps) {
this.steps.set(s.id, {
id: s.id,
goal: s.goal,
status: "pending",
candidate_tools: s.candidate_tools,
success_criteria: s.success_criteria,
evidence_count: 0,
updated_at: now,
});
}
this.emit({
event: "agent_plan",
data: {
agent_run_id: this.id,
message_id: this.messageId,
plan: this.plan,
steps: Array.from(this.steps.values()),
},
});
}
private applyStepUpdate(raw: unknown) {
if (!raw || typeof raw !== "object") return;
const r = raw as Record<string, unknown>;
const id = typeof r.id === "string" ? r.id : "";
if (!id) return;
let step = this.steps.get(id);
if (!step) {
// Late step introduced after plan — accept it as a synthetic step
step = {
id,
goal: typeof r.goal === "string" ? r.goal : id,
status: "pending",
evidence_count: 0,
updated_at: new Date().toISOString(),
};
this.steps.set(id, step);
}
const status = typeof r.status === "string" ? (r.status as AgentStepStatus) : null;
if (status && VALID_STATUSES.has(status)) {
step.status = status;
if (status === "running") this.activeStepId = id;
}
if (typeof r.note === "string") step.note = r.note;
step.updated_at = new Date().toISOString();
this.emit({
event: "agent_step",
data: {
agent_run_id: this.id,
message_id: this.messageId,
step: { ...step },
},
});
}
private applyReflection(raw: unknown) {
if (!raw || typeof raw !== "object") return;
const r = raw as Record<string, unknown>;
const reflection: AgentReflection = {};
if (Array.isArray(r.step_states)) {
reflection.step_states = (r.step_states as unknown[])
.map((s) => {
if (!s || typeof s !== "object") return null;
const x = s as Record<string, unknown>;
if (typeof x.id !== "string") return null;
const status = typeof x.status === "string" ? (x.status as AgentStepStatus) : "pending";
if (!VALID_STATUSES.has(status)) return null;
return {
id: x.id,
status,
evidence_count:
typeof x.evidence_count === "number" ? x.evidence_count : undefined,
};
})
.filter((x): x is NonNullable<typeof x> => x !== null);
}
if (Array.isArray(r.unresolved)) {
reflection.unresolved = (r.unresolved as unknown[])
.map((s) => {
if (typeof s === "string") return { id: s };
if (s && typeof s === "object" && typeof (s as Record<string, unknown>).id === "string") {
const x = s as Record<string, unknown>;
return { id: x.id as string, reason: typeof x.reason === "string" ? x.reason : undefined };
}
return null;
})
.filter((x): x is NonNullable<typeof x> => x !== null);
}
if (typeof r.confidence === "number") reflection.confidence = r.confidence;
if (typeof r.summary === "string") reflection.summary = r.summary;
this.reflection = reflection;
// Apply step_states to the live map so the panel reflects the model's
// own self-assessment alongside any in-flight `step_update`s.
if (reflection.step_states) {
for (const s of reflection.step_states) {
const step = this.steps.get(s.id);
if (!step) continue;
step.status = s.status;
if (typeof s.evidence_count === "number") step.evidence_count = s.evidence_count;
step.updated_at = new Date().toISOString();
}
}
this.emit({
event: "agent_reflection",
data: {
agent_run_id: this.id,
message_id: this.messageId,
reflection,
steps: Array.from(this.steps.values()),
},
});
}
// ---------------------------------------------------- tool dispatch
noteIteration() {
this.iterations += 1;
}
/** Cache key for de-duplicating identical tool calls within a turn. */
private cacheKey(toolName: string, args: Record<string, unknown>): string {
let argStr = "";
try {
argStr = JSON.stringify(args);
} catch {
argStr = String(Object.keys(args).sort().join(","));
}
return `${toolName}::${argStr}`;
}
/** Returns a cached successful result if one exists for this call. */
cacheLookup(toolName: string, args: Record<string, unknown>): unknown | undefined {
return this.resultCache.get(this.cacheKey(toolName, args));
}
/**
* Compute the per-source key used by the circuit breaker. We include the
* concrete upstream identifier (research-engine `task_mode`, an explicit
* `source` argument, or the underlying engine endpoint) so a single
* misbehaving source does not trip the breaker for every other variant
* of the same tool. Falls back to the bare tool name when no
* disambiguator is available.
*/
circuitKey(toolName: string, args: Record<string, unknown>): string {
const sub =
typeof args.task_mode === "string" && args.task_mode
? `task_mode:${args.task_mode}`
: typeof args.source === "string" && args.source
? `source:${args.source}`
: typeof args.endpoint === "string" && args.endpoint
? `endpoint:${args.endpoint}`
: typeof args.database === "string" && args.database
? `db:${args.database}`
: null;
return sub ? `${toolName}#${sub}` : toolName;
}
circuitCheck(
toolName: string,
args: Record<string, unknown> = {},
): null | {
error: string;
error_code: "circuit_open";
retryable: false;
source: string;
suggestion: string;
} {
const key = this.circuitKey(toolName, args);
if (!this.openCircuits.has(key)) return null;
return {
error: `Source ${key} disabled for this turn after repeated transient failures.`,
error_code: "circuit_open",
retryable: false,
source: key,
suggestion: alternativeSuggestion(toolName),
};
}
/**
* Record the outcome of a tool call — update circuit-breaker counters,
* cache successful results, and append a tool route entry.
*/
recordToolOutcome(args: {
toolCallId: string;
toolName: string;
args: Record<string, unknown>;
result: unknown;
isError: boolean;
durationMs: number;
fromCache?: boolean;
fromCircuit?: boolean;
}) {
const errCode = errorCodeOf(args.result, args.isError);
const transient = errCode ? TRANSIENT_ERROR_CODES.has(errCode) : false;
const sourceKey = this.circuitKey(args.toolName, args.args);
if (transient) {
const next = (this.failureCounts.get(sourceKey) ?? 0) + 1;
this.failureCounts.set(sourceKey, next);
if (next >= CIRCUIT_BREAKER_THRESHOLD) {
this.openCircuits.add(sourceKey);
}
} else if (!args.isError) {
// Cache successful results for the rest of the turn.
this.resultCache.set(this.cacheKey(args.toolName, args.args), args.result);
// Successful call also bumps evidence count for the active step and
// extracts any source-linked refs (URLs / PMIDs / accessions) so the
// validator can later check that "done" steps actually have evidence.
if (this.activeStepId) {
const step = this.steps.get(this.activeStepId);
if (step) {
step.evidence_count += 1;
const refs = extractEvidenceRefs(args.result, args.toolName);
if (refs.length) {
step.evidence_refs = (step.evidence_refs ?? []).concat(refs).slice(0, 25);
}
step.updated_at = new Date().toISOString();
}
}
}
const route: AgentToolRoute = {
iteration: this.iterations,
step_id: this.activeStepId,
tool_call_id: args.toolCallId,
tool_name: args.toolName,
args_summary: summariseArgs(args.args),
outcome: args.fromCircuit
? "circuit_open"
: args.fromCache
? "cache_hit"
: args.isError
? "error"
: "success",
error_code: errCode ?? undefined,
retryable:
args.isError && args.result && typeof args.result === "object"
? Boolean((args.result as Record<string, unknown>).retryable)
: undefined,
attempts:
args.result && typeof args.result === "object" && typeof (args.result as Record<string, unknown>).attempts === "number"
? ((args.result as Record<string, unknown>).attempts as number)
: undefined,
duration_ms: args.durationMs,
at: new Date().toISOString(),
};
this.toolRoutes.push(route);
this.emit({
event: "agent_tool_route",
data: {
agent_run_id: this.id,
message_id: this.messageId,
route,
},
});
}
// ---------------------------------------------------- validation + persist
/**
* Reconcile model-claimed step states with the actual tool route log
* and the evidence collected from successful tool results. A `done`
* step is downgraded to `failed` when *any* of these machine-checkable
* conditions hold:
*
* - no successful tool route was recorded for the step
* (`reason: "no_evidence"`),
* - no source-linked evidence reference (URL / PMID / DOI / accession
* / structured id) is present on the step
* (`reason: "no_source_link"`),
* - the step's `success_criteria` calls out a numeric quantity but
* none of the captured evidence carries a numeric ref
* (`reason: "missing_numeric_evidence"`),
* - the model's own reflection lists this step under `unresolved`
* while the plan still claims `done`
* (`reason: "reflection_inconsistent"`).
*
* The validator never *upgrades* — it only downgrades and records
* notes. Recovery (one extra tool round) is the caller's job and is
* driven by `needsRecoveryRound()`.
*/
runValidator(): AgentValidator {
const downgraded: AgentValidator["downgraded"] = [];
const notes: string[] = [];
const successByStep = new Map<string, number>();
for (const r of this.toolRoutes) {
if (r.outcome !== "success" && r.outcome !== "cache_hit") continue;
if (!r.step_id) continue;
successByStep.set(r.step_id, (successByStep.get(r.step_id) ?? 0) + 1);
}
const reflectionUnresolved = new Set(
(this.reflection?.unresolved ?? []).map((u) => u.id),
);
for (const step of this.steps.values()) {
if (step.status !== "done") continue;
const reasons: string[] = [];
const success = successByStep.get(step.id) ?? 0;
if (success === 0) reasons.push("no_evidence");
const refs = step.evidence_refs ?? [];
const hasSourceLink = refs.some(
(r) =>
r.type === "url" ||
r.type === "pmid" ||
r.type === "doi" ||
r.type === "uniprot" ||
r.type === "ot" ||
r.type === "id",
);
if (success > 0 && !hasSourceLink) reasons.push("no_source_link");
if (
mentionsNumericClaim(step.success_criteria, step.goal) &&
!refs.some((r) => r.type === "numeric")
) {
reasons.push("missing_numeric_evidence");
}
if (reflectionUnresolved.has(step.id)) {
reasons.push("reflection_inconsistent");
}
if (reasons.length === 0) continue;
const reason = reasons[0]!;
downgraded.push({ id: step.id, from: step.status, reason });
step.status = "failed";
const noteSuffix = `downgraded: ${reasons.join(", ")}`;
step.note = step.note ? `${step.note} | ${noteSuffix}` : noteSuffix;
step.updated_at = new Date().toISOString();
}
if (this.openCircuits.size) {
notes.push(
`Circuit breaker tripped for: ${Array.from(this.openCircuits).join(", ")}`,
);
}
const passed = downgraded.length === 0;
this.validator = { passed, downgraded, notes };
return this.validator;
}
/** True when at least one step needs another tool call to satisfy. */
needsRecoveryRound(): boolean {
if (!this.validator) return false;
return this.validator.downgraded.length > 0;
}
/**
* Emit a consolidated `agent_run_snapshot` SSE event reflecting the
* current plan / step / tool-route state. Called after every iteration
* of the runWithTools loop so an in-chat panel that drops a delta (or
* a client that reconnects mid-run) can rehydrate from a single payload
* instead of having to replay every individual delta.
*/
emitSnapshot(args: {
status: PublicAgentRun["status"];
finishedAt?: Date | null;
inputTokens: number;
outputTokens: number;
memory?: AgentMemoryUsage | null;
}): void {
this.emit({
event: "agent_run_snapshot",
data: {
agent_run: this.toPublic(
args.status,
args.finishedAt ?? null,
{ input: args.inputTokens, output: args.outputTokens },
args.memory ?? null,
),
},
});
}
toPublic(
status: PublicAgentRun["status"],
finishedAt: Date | null,
tokens: { input: number; output: number },
memory: AgentMemoryUsage | null = null,
): PublicAgentRun {
return {
id: this.id,
conversation_id: this.conversationId,
message_id: this.messageId,
status,
plan: this.plan,
steps: Array.from(this.steps.values()),
tool_routes: this.toolRoutes,
reflection: this.reflection,
validator: this.validator,
iterations: this.iterations,
input_tokens: tokens.input,
output_tokens: tokens.output,
started_at: this.startedAt.toISOString(),
finished_at: finishedAt ? finishedAt.toISOString() : null,
memory,
};
}
/**
* Upsert the agent_run row. Called after each iteration so the panel
* survives client refreshes mid-turn, and again at the end with the
* final terminal state.
*/
async persist(args: {
status: PublicAgentRun["status"];
finishedAt: Date | null;
inputTokens: number;
outputTokens: number;
}): Promise<void> {
const payload = {
id: this.id,
conversationId: this.conversationId,
userId: this.userId,
messageId: this.messageId,
status: args.status,
plan: this.plan as unknown,
steps: Array.from(this.steps.values()) as unknown,
toolRoutes: this.toolRoutes as unknown,
reflection: this.reflection as unknown,
validator: this.validator as unknown,
inputTokens: args.inputTokens,
outputTokens: args.outputTokens,
iterations: this.iterations,
startedAt: this.startedAt,
finishedAt: args.finishedAt,
};
try {
await db
.insert(agentRuns)
.values(payload as never)
.onConflictDoUpdate({
target: agentRuns.id,
set: {
status: payload.status,
plan: payload.plan as never,
steps: payload.steps as never,
toolRoutes: payload.toolRoutes as never,
reflection: payload.reflection as never,
validator: payload.validator as never,
inputTokens: payload.inputTokens,
outputTokens: payload.outputTokens,
iterations: payload.iterations,
finishedAt: payload.finishedAt,
},
});
} catch (err) {
logger.warn({ err, agentRunId: this.id }, "agent_run persist failed");
}
}
/**
* Build the working-memory record that would be persisted for the next
* turn. Returns null when there is nothing left unresolved (callers are
* expected to clear stored memory in that case). Pure / DB-free so the
* cross-turn carry-over is testable without spinning up Postgres.
*/
buildPendingWorkingMemory(): AgentMemoryRecord | null {
const unresolved: AgentMemoryUnresolved[] = [];
const reflectionUnresolved = new Set(
(this.reflection?.unresolved ?? []).map((u) => u.id),
);
for (const step of this.steps.values()) {
const isLeftover =
step.status === "failed" ||
step.status === "skipped" ||
reflectionUnresolved.has(step.id);
if (!isLeftover) continue;
unresolved.push({
id: step.id,
goal: step.goal,
note: step.note,
source_run_id: this.id,
recorded_at: new Date().toISOString(),
});
}
if (!unresolved.length) return null;
return {
unresolved_steps: unresolved.slice(0, 10),
updated_at: new Date().toISOString(),
};
}
/**
* Persist unresolved/failed steps into the conversation's working
* memory so the next turn's system prompt can carry them forward.
*/
async writeWorkingMemory(): Promise<void> {
const record = this.buildPendingWorkingMemory();
if (!record) {
// Clear stale memory once all prior items are resolved.
try {
await db
.update(conversations)
.set({ agentMemory: null as unknown as object })
.where(eq(conversations.id, this.conversationId));
} catch (err) {
logger.warn({ err }, "clear agent_memory failed");
}
return;
}
try {
await db
.update(conversations)
.set({ agentMemory: record as unknown as object })
.where(eq(conversations.id, this.conversationId));
} catch (err) {
logger.warn({ err }, "write agent_memory failed");
}
}
}
// ----------------------------------------------------------------- helpers
const VALID_STATUSES: Set<AgentStepStatus> = new Set([
"pending",
"running",
"done",
"failed",
"skipped",
]);
type TagName = "plan" | "step_update" | "reflection" | "memory_fact";
const ALL_TAGS: TagName[] = ["plan", "step_update", "reflection", "memory_fact"];
const VALID_MEMORY_KINDS: ReadonlySet<string> = new Set([
"preference",
"fact",
"interest",
"domain",
"terminology",
"summary",
]);
/** Hard cap on auto-extracted memory facts per turn. */
export const MAX_LEARNED_FACTS_PER_TURN = 5;
const MAX_FACT_CONTENT_CHARS = 500;
function errorCodeOf(result: unknown, isError: boolean): string | null {
if (!isError) return null;
if (!result || typeof result !== "object") return null;
const code = (result as Record<string, unknown>).error_code;
return typeof code === "string" ? code : null;
}
function summariseArgs(args: Record<string, unknown>): string {
try {
const json = JSON.stringify(args);
return json.length > 160 ? json.slice(0, 157) + "..." : json;
} catch {
return Object.keys(args).join(",");
}
}
/**
* Heuristic detector for numeric claims in a step's goal / success
* criteria. Triggers on explicit digits, percent signs, and bilingual
* keywords like "数量 / 个数 / count / number / how many". Used by the
* validator to enforce numeric-claim traceability.
*/
function mentionsNumericClaim(
criteria: string | undefined,
goal: string | undefined,
): boolean {
const text = `${criteria ?? ""}\n${goal ?? ""}`.toLowerCase();
if (!text.trim()) return false;
if (/\d/.test(text)) return true;
if (/%/.test(text)) return true;
return /(count|number|how many|n=|总数|数量|个数|条数|篇数|多少)/i.test(
text,
);
}
const URL_REGEX = /https?:\/\/[^\s"'<>)]+/gi;
const PMID_REGEX = /\b(?:pmid|pubmed[_\s-]*id)[:\s]*([0-9]{4,9})\b/gi;
const DOI_REGEX = /\b10\.\d{4,9}\/[\w./()\-:;]+/gi;
const UNIPROT_REGEX = /\b[OPQ][0-9][A-Z0-9]{3}[0-9]\b|\b[A-NR-Z][0-9](?:[A-Z][A-Z0-9]{2}[0-9]){1,2}\b/g;
const OT_REGEX = /\b(?:ENSG\d{6,}|EFO_\d{4,}|MONDO_\d{4,})\b/g;
/**
* Pull source-linked evidence refs out of a successful tool result. We
* do a depth-limited traversal so a deeply nested object (e.g. a full
* PubMed hit list) still yields useful refs without blowing the stack
* or copying megabytes of payload.
*/
function extractEvidenceRefs(
result: unknown,
toolName: string,
): AgentEvidenceRef[] {
if (!result || (typeof result !== "object" && typeof result !== "string")) {
return [];
}
const refs: AgentEvidenceRef[] = [];
const seen = new Set<string>();
const push = (type: string, ref: string) => {
const key = `${type}:${ref}`;
if (seen.has(key)) return;
seen.add(key);
refs.push({ type, ref, tool: toolName });
};
const visit = (node: unknown, depth: number) => {
if (refs.length >= 25) return;
if (depth > 4 || node == null) return;
if (typeof node === "string") {
let m: RegExpExecArray | null;
URL_REGEX.lastIndex = 0;
while ((m = URL_REGEX.exec(node))) push("url", m[0]);
PMID_REGEX.lastIndex = 0;
while ((m = PMID_REGEX.exec(node))) push("pmid", m[1]!);
DOI_REGEX.lastIndex = 0;
while ((m = DOI_REGEX.exec(node))) push("doi", m[0]);
UNIPROT_REGEX.lastIndex = 0;
while ((m = UNIPROT_REGEX.exec(node))) push("uniprot", m[0]);
OT_REGEX.lastIndex = 0;
while ((m = OT_REGEX.exec(node))) push("ot", m[0]);
return;
}
if (typeof node === "number" && Number.isFinite(node)) {
push("numeric", String(node));
return;
}
if (Array.isArray(node)) {
for (const x of node.slice(0, 20)) visit(x, depth + 1);
return;
}
if (typeof node === "object") {
for (const [k, v] of Object.entries(node as Record<string, unknown>)) {
const lk = k.toLowerCase();
if (typeof v === "string") {
if (lk === "pmid") push("pmid", v);
else if (lk === "doi") push("doi", v);
else if (lk === "url" || lk.endsWith("_url")) push("url", v);
else if (lk === "accession" || lk === "uniprot_id") push("uniprot", v);
else if (lk === "id" || lk.endsWith("_id")) push("id", v);
}
if (typeof v === "number" && Number.isFinite(v)) {
if (lk === "count" || lk === "n" || lk === "total" || lk.endsWith("_count")) {
push("numeric", `${k}=${v}`);
}
}
visit(v, depth + 1);
}
}
};
visit(result, 0);
return refs;
}
function alternativeSuggestion(toolName: string): string {
switch (toolName) {
case "search_pubmed":
return "Try query_opentargets for target↔disease evidence or lookup_uniprot for protein metadata.";
case "lookup_uniprot":
return "Try search_pubmed for the same protein name, or query_opentargets if you need disease links.";
case "query_opentargets":
return "Try search_pubmed with a clinical/mechanistic query, or create_research_task for a structured pipeline.";
default:
return "Switch to an alternative tool or rephrase the query for a different source.";
}
}
/**
* Format the next-turn system-prompt section that carries forward
* unresolved steps. Returns an empty string when there is nothing to
* carry forward, so callers can `if (text) sysParts.push(text)`.
*
* Exported so tests can verify the cross-turn working-memory payload
* actually lands in the prompt without having to spin up the full
* /messages/stream pipeline.
*/
export function formatCarryOverPrompt(
memory: AgentMemoryRecord | null,
): string {
if (!memory || !memory.unresolved_steps.length) return "";
const lines = memory.unresolved_steps
.map((u) => `- ${u.id}: ${u.goal}${u.note ? ` (${u.note})` : ""}`)
.join("\n");
return (
"Carry-over from the previous turn — these subgoals were left " +
"unresolved or failed. If the user has not redirected you, " +
"pick them up; otherwise acknowledge briefly and proceed:\n" +
lines
);
}
// ----------------------------------------------------------------- public lookup
export async function loadAgentRunByMessage(
messageId: string,
userId: string,
): Promise<PublicAgentRun | null> {
const rows = await db
.select()
.from(agentRuns)
.where(eq(agentRuns.messageId, messageId))
.limit(1);
const row = rows[0];
if (!row) return null;
if (row.userId !== userId) return null;
return rowToPublic(row);
}
export function rowToPublic(row: AgentRunRow): PublicAgentRun {
return {
id: row.id,
conversation_id: row.conversationId,
message_id: row.messageId,
status: (row.status as PublicAgentRun["status"]) ?? "complete",
plan: (row.plan as AgentPlan | null) ?? null,
steps: Array.isArray(row.steps) ? (row.steps as AgentStepState[]) : [],
tool_routes: Array.isArray(row.toolRoutes)
? (row.toolRoutes as AgentToolRoute[])
: [],
reflection: (row.reflection as AgentReflection | null) ?? null,
validator: (row.validator as AgentValidator | null) ?? null,
iterations: row.iterations,
input_tokens: row.inputTokens,
output_tokens: row.outputTokens,
started_at: row.startedAt.toISOString(),
finished_at: row.finishedAt ? row.finishedAt.toISOString() : null,
memory: null,
};
}
export async function loadConversationAgentMemory(
conversationId: string,
): Promise<AgentMemoryRecord | null> {
const rows = await db
.select({ agentMemory: conversations.agentMemory })
.from(conversations)
.where(eq(conversations.id, conversationId))
.limit(1);
const m = rows[0]?.agentMemory as AgentMemoryRecord | null | undefined;
if (!m || !Array.isArray(m.unresolved_steps) || !m.unresolved_steps.length) {
return null;
}
return m;
}