Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* drug.adapter — 真查 PG drug_nodes / target_nodes / outcome_nodes +
* drug_targets_protein / drug_affects_outcome / drug_tested_in_trial。
*
* 支持过滤参数:
* - drug_id 按 drug → 关联 targets / outcomes / trials
* - target_id 按 target → 关联 drugs
* - outcome_id 按 outcome → 关联 drugs
*
* 返回的 KnowledgeItem.payload 带关联强度(confidence)+ source_db。
*/
import { eq, inArray } from "drizzle-orm";
import {
drugNodes,
targetNodes,
outcomeNodes,
drugTargetsProtein,
drugAffectsOutcome,
db,
} from "@workspace/db";
import { enforceBudget } from "./budget-guard.ts";
import { fingerprintParams, recordCall } from "./telemetry.ts";
import type {
KnowledgeAdapter,
KnowledgeItem,
KnowledgeQuery,
KnowledgeResult,
} from "./types.ts";
interface DrugParams {
drug_id?: string;
target_id?: string;
outcome_id?: string;
}
function readParams(p: Record<string, unknown>): DrugParams {
return {
drug_id: typeof p["drug_id"] === "string" ? (p["drug_id"] as string) : undefined,
target_id: typeof p["target_id"] === "string" ? (p["target_id"] as string) : undefined,
outcome_id:
typeof p["outcome_id"] === "string" ? (p["outcome_id"] as string) : undefined,
};
}
export const drugAdapter: KnowledgeAdapter = {
kind: "drug_network",
status: "real",
async query(q: KnowledgeQuery): Promise<KnowledgeResult> {
enforceBudget(q.capabilityId, "drug_network");
const params = readParams(q.params ?? {});
const limit = Math.min(q.limit ?? 50, 500);
const fp = fingerprintParams({ ...params, limit });
const t0 = Date.now();
let error: string | null = null;
let items: KnowledgeItem[] = [];
try {
if (!params.drug_id && !params.target_id && !params.outcome_id) {
// 不允许无任何过滤 — 否则一次拉满整张表,昂贵且无意义。
// 显式 throw 而不是返空,符合反欺骗原则。
throw new Error(
"drug.adapter requires at least one of drug_id / target_id / outcome_id",
);
}
// drug_id → 拉相关 target 和 outcome 联表
if (params.drug_id) {
const targets = await db
.select({
drugId: drugTargetsProtein.drugId,
targetId: drugTargetsProtein.targetId,
sourceDb: drugTargetsProtein.sourceDb,
confidence: drugTargetsProtein.confidence,
target: targetNodes,
})
.from(drugTargetsProtein)
.leftJoin(
targetNodes,
eq(drugTargetsProtein.targetId, targetNodes.targetId),
)
.where(eq(drugTargetsProtein.drugId, params.drug_id))
.limit(limit);
for (const row of targets) {
items.push({
id: `${row.drugId}::${row.targetId}::${row.sourceDb}`,
score: row.confidence,
payload: {
edge_kind: "drug_targets_protein",
drug_id: row.drugId,
target_id: row.targetId,
source_db: row.sourceDb,
confidence: row.confidence,
target: row.target,
},
origin: "pg:drug_targets_protein",
});
}
const outcomes = await db
.select({
drugId: drugAffectsOutcome.drugId,
outcomeId: drugAffectsOutcome.outcomeId,
sourceDb: drugAffectsOutcome.sourceDb,
confidence: drugAffectsOutcome.confidence,
outcome: outcomeNodes,
})
.from(drugAffectsOutcome)
.leftJoin(
outcomeNodes,
eq(drugAffectsOutcome.outcomeId, outcomeNodes.outcomeId),
)
.where(eq(drugAffectsOutcome.drugId, params.drug_id))
.limit(limit);
for (const row of outcomes) {
items.push({
id: `${row.drugId}::${row.outcomeId}::${row.sourceDb}`,
score: row.confidence,
payload: {
edge_kind: "drug_affects_outcome",
drug_id: row.drugId,
outcome_id: row.outcomeId,
source_db: row.sourceDb,
confidence: row.confidence,
outcome: row.outcome,
},
origin: "pg:drug_affects_outcome",
});
}
}
// target_id → 反查 drugs,然后用 drug_ids 2-hop 查 outcomes
if (params.target_id) {
const drugs = await db
.select({
drugId: drugTargetsProtein.drugId,
targetId: drugTargetsProtein.targetId,
sourceDb: drugTargetsProtein.sourceDb,
confidence: drugTargetsProtein.confidence,
drug: drugNodes,
})
.from(drugTargetsProtein)
.leftJoin(drugNodes, eq(drugTargetsProtein.drugId, drugNodes.drugId))
.where(eq(drugTargetsProtein.targetId, params.target_id))
.limit(limit);
for (const row of drugs) {
items.push({
id: `${row.drugId}::${row.targetId}::${row.sourceDb}`,
score: row.confidence,
payload: {
edge_kind: "target_targeted_by_drug",
drug_id: row.drugId,
target_id: row.targetId,
source_db: row.sourceDb,
confidence: row.confidence,
drug: row.drug,
},
origin: "pg:drug_targets_protein",
});
}
// 2-hop:target → drugs → outcomes(临床上"作用于此 target 的药
// 都影响哪些 outcome");使用刚查到的 drug_ids 做 IN 过滤,避免
// 全表扫;调用方可以用 sourceMetadata.tracedDrugIds 复现。
const tracedDrugIds = Array.from(
new Set(drugs.map((d) => d.drugId).filter((x): x is string => !!x)),
);
if (tracedDrugIds.length > 0) {
const outcomes = await db
.select({
drugId: drugAffectsOutcome.drugId,
outcomeId: drugAffectsOutcome.outcomeId,
sourceDb: drugAffectsOutcome.sourceDb,
confidence: drugAffectsOutcome.confidence,
outcome: outcomeNodes,
})
.from(drugAffectsOutcome)
.leftJoin(
outcomeNodes,
eq(drugAffectsOutcome.outcomeId, outcomeNodes.outcomeId),
)
.where(inArray(drugAffectsOutcome.drugId, tracedDrugIds))
.limit(limit);
for (const row of outcomes) {
items.push({
id: `${row.drugId}::${row.outcomeId}::${row.sourceDb}::via_target`,
score: row.confidence,
payload: {
edge_kind: "target_to_outcome_via_drug",
target_id: params.target_id,
drug_id: row.drugId,
outcome_id: row.outcomeId,
source_db: row.sourceDb,
confidence: row.confidence,
outcome: row.outcome,
},
origin: "pg:drug_affects_outcome",
});
}
}
}
// outcome_id → 反查 drugs
if (params.outcome_id) {
const drugs = await db
.select({
drugId: drugAffectsOutcome.drugId,
outcomeId: drugAffectsOutcome.outcomeId,
sourceDb: drugAffectsOutcome.sourceDb,
confidence: drugAffectsOutcome.confidence,
drug: drugNodes,
})
.from(drugAffectsOutcome)
.leftJoin(drugNodes, eq(drugAffectsOutcome.drugId, drugNodes.drugId))
.where(eq(drugAffectsOutcome.outcomeId, params.outcome_id))
.limit(limit);
for (const row of drugs) {
items.push({
id: `${row.drugId}::${row.outcomeId}::${row.sourceDb}`,
score: row.confidence,
payload: {
edge_kind: "outcome_caused_by_drug",
drug_id: row.drugId,
outcome_id: row.outcomeId,
source_db: row.sourceDb,
confidence: row.confidence,
drug: row.drug,
},
origin: "pg:drug_affects_outcome",
});
}
}
} catch (err) {
error = err instanceof Error ? err.message : String(err);
throw err;
} finally {
void recordCall({
adapter: "drug_network",
capabilityId: q.capabilityId,
latencyMs: Date.now() - t0,
hitCount: items.length,
cacheHit: null,
error,
paramsFingerprint: fp,
});
}
return {
kind: "drug_network",
items: items.slice(0, limit),
cursor: null,
sourceMetadata: {
primarySource: "pg:drug_network",
limit,
filters: params,
},
};
},
};