doatlas-2 / artifacts /api-server /src /lib /research-data.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
import { createReadStream, readFileSync, existsSync } from "node:fs";
import { createInterface } from "node:readline";
import { join } from "node:path";
import { logger } from "./logger";
const DATA_ROOT = join(process.cwd(), "data");
const LIT = join(DATA_ROOT, "literature_v1");
const M11 = join(DATA_ROOT, "m11_workbench");
export interface Bucket {
bucket_key: string;
population_bucket: string;
x_norm: string;
y_norm: string;
equation_type: string;
measure_type: string;
measure_scale: string;
time_index: string;
time_horizon: string;
outcome_type: string;
edge_count: number;
paper_count: number;
accepted_edge_count: number;
review_required_edge_count: number;
usable_effect_count: number;
missing_effect_count: number;
blocked_effect_count: number;
has_direction_conflict: boolean;
theta_min: number | null;
theta_max: number | null;
supporting_edge_ids: string[];
supporting_paper_keys: string[];
theta_signs: string[];
// Pooled literature estimate from PostgresCausalNetwork (NIL).
// Optional because the legacy JSONL store doesn't carry it; only
// buckets sourced from `literature_buckets` populate this field.
pooled_estimate?: {
theta_hat: number | null;
ci: [number | null, number | null];
method: "fixed_effect" | "random_effect" | "raw_mean" | "none";
heterogeneity_i2: number | null;
} | null;
}
export interface Edge {
edge_id: string;
paper_key: string;
edge_order: number;
equation_type: string;
paper_title: string;
paper_abstract: string;
first_author: string;
year: number | null;
doi: string | null;
short_title: string;
x_raw: string;
y_raw: string;
x_norm: string;
y_norm: string;
population_bucket: string;
outcome_type: string;
time_index: string;
time_horizon: string;
measure_family: string;
measure_type: string;
measure_scale: string;
intervention_name: string | null;
id_strategy: string;
id_status: string;
assumptions: string[];
theta_hat: number | null;
ci_low: number | null;
ci_high: number | null;
theta_sign: string | null;
p_value: number | null;
n: number | null;
design: string | null;
grade: string | null;
model: string | null;
adjustment_set: string[];
record_status: "accepted" | "review_required" | "excluded";
effect_status: string;
placeholder_flag: boolean;
review_valid: boolean;
review_semantically_valid: boolean;
review_fill_rate: number;
review_issue_count: number;
mapping_statuses: string[];
has_missing_mapping: boolean;
has_tentative_mapping: boolean;
audit_error_count: number;
audit_warning_count: number;
audit_checks: string[];
audit_actions: string[];
pmid: string | null;
pmcid: string | null;
bucket_key: string | null;
}
export interface Paper {
paper_key: string;
first_author: string;
year: number | null;
doi: string | null;
short_title: string;
paper_title: string;
paper_abstract: string;
primary_category: string;
secondary_tags: string[];
classification_confidence: string;
pipeline_status: string;
edge_count_final: number;
accepted_edge_count: number;
review_required_edge_count: number;
excluded_edge_count: number;
usable_effect_edge_count: number;
blocked_effect_edge_count: number;
pmid: string | null;
pmcid: string | null;
}
export interface AuditIssue {
paper_key: string;
edge_id: string;
edge_index: number;
check: string;
severity: "error" | "warning" | "info";
field: string;
action: string | null;
message: string;
value: any;
}
export interface SnapshotEntry {
snapshot_id: string;
snapshot_label: string;
snapshot_db_path: string;
created_at: string;
stats: {
snapshot_id: string;
accepted_edge_count: number;
review_required_edge_count: number;
excluded_edge_count: number;
bucket_count: number;
graph_edge_count: number;
created_at: string;
};
}
export interface TaskRecord {
task_run_id: string;
snapshot_id: string;
task_mode: string;
task_input: Record<string, any>;
task_plan: Record<string, any>;
result_object_type: string;
result_object_id: string;
result: Record<string, any>;
task_file_relative: string;
summary?: any;
}
export interface TestCase {
case_id: string;
label: string;
description: string;
task_mode: string;
query: Record<string, any>;
task_input: Record<string, any>;
tags: string[];
}
export interface WorkbenchData {
generated_at: string;
title: string;
active_snapshot: {
snapshot_id: string;
snapshot_label: string;
created_at: string;
published_at: string;
source: string;
};
snapshots: SnapshotEntry[];
test_cases: TestCase[];
tasks: TaskRecord[];
snapshot_indexes: Record<string, any>;
}
// -------------------------------------------------------------- in-memory store
interface Store {
buckets: Bucket[];
bucketsByKey: Map<string, Bucket>;
edges: Edge[];
edgesById: Map<string, Edge>;
edgesByPaper: Map<string, Edge[]>;
edgesByBucket: Map<string, Edge[]>;
papers: Paper[];
papersByKey: Map<string, Paper>;
audit: AuditIssue[];
auditByEdge: Map<string, AuditIssue[]>;
workbench: WorkbenchData | null;
loaded: boolean;
loadedAt: number;
}
const store: Store = {
buckets: [],
bucketsByKey: new Map(),
edges: [],
edgesById: new Map(),
edgesByPaper: new Map(),
edgesByBucket: new Map(),
papers: [],
papersByKey: new Map(),
audit: [],
auditByEdge: new Map(),
workbench: null,
loaded: false,
loadedAt: 0,
};
function safeJsonArr(s: any): string[] {
if (Array.isArray(s)) return s as string[];
if (typeof s !== "string" || !s) return [];
try {
const v = JSON.parse(s);
return Array.isArray(v) ? v : [];
} catch {
return [];
}
}
async function loadJsonl<T>(path: string, mapRow: (row: any) => T): Promise<T[]> {
if (!existsSync(path)) {
logger.warn({ path }, "research data file not found, skipping");
return [];
}
const out: T[] = [];
const rl = createInterface({ input: createReadStream(path), crlfDelay: Infinity });
for await (const line of rl) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
out.push(mapRow(JSON.parse(trimmed)));
} catch {
// skip malformed
}
}
return out;
}
function buildBucketKeyForEdge(e: Edge): string {
// Best-effort recompute: edges JSONL doesn't carry bucket_key directly,
// but population_bucket + x_norm + y_norm + measure_type + time_horizon
// collide with the bucket fingerprint. We pre-index buckets by these.
return [
e.population_bucket,
e.x_norm,
e.y_norm,
e.equation_type,
e.measure_type,
e.time_horizon,
].join("||");
}
function buildBucketCompositeKey(b: Bucket): string {
return [
b.population_bucket,
b.x_norm,
b.y_norm,
b.equation_type,
b.measure_type,
b.time_horizon,
].join("||");
}
let loadingPromise: Promise<void> | null = null;
export async function ensureLoaded(): Promise<void> {
if (store.loaded) return;
if (loadingPromise) return loadingPromise;
loadingPromise = (async () => {
const t0 = Date.now();
logger.info({ root: DATA_ROOT }, "loading research data");
// ---- buckets
store.buckets = await loadJsonl<Bucket>(join(LIT, "buckets.jsonl"), (r) => ({
bucket_key: r.bucket_key,
population_bucket: r.population_bucket,
x_norm: r.x_norm,
y_norm: r.y_norm,
equation_type: r.equation_type,
measure_type: r.measure_type,
measure_scale: r.measure_scale,
time_index: r.time_index,
time_horizon: r.time_horizon,
outcome_type: r.outcome_type,
edge_count: Number(r.edge_count) || 0,
paper_count: Number(r.paper_count) || 0,
accepted_edge_count: Number(r.accepted_edge_count) || 0,
review_required_edge_count: Number(r.review_required_edge_count) || 0,
usable_effect_count: Number(r.usable_effect_count) || 0,
missing_effect_count: Number(r.missing_effect_count) || 0,
blocked_effect_count: Number(r.blocked_effect_count) || 0,
has_direction_conflict: Boolean(r.has_direction_conflict),
theta_min: r.theta_min,
theta_max: r.theta_max,
supporting_edge_ids: safeJsonArr(r.supporting_edge_ids_json),
supporting_paper_keys: safeJsonArr(r.supporting_paper_keys_json),
theta_signs: safeJsonArr(r.theta_signs_json),
}));
const compositeToBucketKey = new Map<string, string>();
for (const b of store.buckets) {
store.bucketsByKey.set(b.bucket_key, b);
compositeToBucketKey.set(buildBucketCompositeKey(b), b.bucket_key);
}
// ---- edges
store.edges = await loadJsonl<Edge>(join(LIT, "edges.jsonl"), (r) => ({
edge_id: r.edge_id,
paper_key: r.paper_key,
edge_order: Number(r.edge_order) || 0,
equation_type: r.equation_type,
paper_title: r.paper_title,
paper_abstract: r.paper_abstract,
first_author: r.first_author,
year: r.year ?? null,
doi: r.doi ?? null,
short_title: r.short_title,
x_raw: r.x_raw,
y_raw: r.y_raw,
x_norm: r.x_norm,
y_norm: r.y_norm,
population_bucket: r.population_bucket,
outcome_type: r.outcome_type,
time_index: r.time_index,
time_horizon: r.time_horizon,
measure_family: r.measure_family,
measure_type: r.measure_type,
measure_scale: r.measure_scale,
intervention_name: r.intervention_name ?? null,
id_strategy: r.id_strategy,
id_status: r.id_status,
assumptions: safeJsonArr(r.assumptions_json),
theta_hat: r.theta_hat,
ci_low: r.ci_low,
ci_high: r.ci_high,
theta_sign: r.theta_sign,
p_value: r.p_value,
n: r.n,
design: r.design,
grade: r.grade,
model: r.model,
adjustment_set: safeJsonArr(r.adjustment_set_json),
record_status: r.record_status,
effect_status: r.effect_status,
placeholder_flag: Boolean(r.placeholder_flag),
review_valid: Boolean(r.review_valid),
review_semantically_valid: Boolean(r.review_semantically_valid),
review_fill_rate: Number(r.review_fill_rate) || 0,
review_issue_count: Number(r.review_issue_count) || 0,
mapping_statuses: safeJsonArr(r.mapping_statuses_json),
has_missing_mapping: Boolean(r.has_missing_mapping),
has_tentative_mapping: Boolean(r.has_tentative_mapping),
audit_error_count: Number(r.audit_error_count) || 0,
audit_warning_count: Number(r.audit_warning_count) || 0,
audit_checks: safeJsonArr(r.audit_checks_json),
audit_actions: safeJsonArr(r.audit_actions_json),
pmid: r.pmid ?? null,
pmcid: r.pmcid ?? null,
bucket_key: null,
}));
for (const e of store.edges) {
e.bucket_key = compositeToBucketKey.get(buildBucketKeyForEdge(e)) ?? null;
store.edgesById.set(e.edge_id, e);
const arr = store.edgesByPaper.get(e.paper_key) ?? [];
arr.push(e);
store.edgesByPaper.set(e.paper_key, arr);
if (e.bucket_key) {
const ba = store.edgesByBucket.get(e.bucket_key) ?? [];
ba.push(e);
store.edgesByBucket.set(e.bucket_key, ba);
}
}
// ---- papers
store.papers = await loadJsonl<Paper>(join(LIT, "papers.jsonl"), (r) => ({
paper_key: r.paper_key,
first_author: r.first_author,
year: r.year ?? null,
doi: r.doi ?? null,
short_title: r.short_title,
paper_title: r.paper_title,
paper_abstract: r.paper_abstract,
primary_category: r.primary_category,
secondary_tags: safeJsonArr(r.secondary_tags_json),
classification_confidence: r.classification_confidence,
pipeline_status: r.pipeline_status,
edge_count_final: Number(r.edge_count_final) || 0,
accepted_edge_count: Number(r.accepted_edge_count) || 0,
review_required_edge_count: Number(r.review_required_edge_count) || 0,
excluded_edge_count: Number(r.excluded_edge_count) || 0,
usable_effect_edge_count: Number(r.usable_effect_edge_count) || 0,
blocked_effect_edge_count: Number(r.blocked_effect_edge_count) || 0,
pmid: r.pmid ?? null,
pmcid: r.pmcid ?? null,
}));
for (const p of store.papers) store.papersByKey.set(p.paper_key, p);
// ---- audit issues
store.audit = await loadJsonl<AuditIssue>(join(LIT, "audit_issues.jsonl"), (r) => ({
paper_key: r.paper_key,
edge_id: r.edge_id,
edge_index: Number(r.edge_index) || 0,
check: r.check,
severity: (r.severity as AuditIssue["severity"]) || "info",
field: r.field,
action: r.action ?? null,
message: r.message,
value: r.value,
}));
for (const a of store.audit) {
const arr = store.auditByEdge.get(a.edge_id) ?? [];
arr.push(a);
store.auditByEdge.set(a.edge_id, arr);
}
// ---- workbench json
const wbPath = join(M11, "workbench_data.json");
if (existsSync(wbPath)) {
try {
store.workbench = JSON.parse(readFileSync(wbPath, "utf8")) as WorkbenchData;
} catch (err) {
logger.error({ err }, "failed to parse workbench_data.json");
}
}
store.loaded = true;
store.loadedAt = Date.now();
logger.info(
{
ms: Date.now() - t0,
buckets: store.buckets.length,
edges: store.edges.length,
papers: store.papers.length,
audit: store.audit.length,
snapshots: store.workbench?.snapshots.length ?? 0,
tasks: store.workbench?.tasks.length ?? 0,
},
"research data loaded",
);
})();
return loadingPromise;
}
// -------------------------------------------------------------- public getters
export function getStore() {
return store;
}
export function getBucketByKey(k: string): Bucket | undefined {
return store.bucketsByKey.get(k);
}
export function getEdgeById(id: string): Edge | undefined {
const hit = store.edgesById.get(id);
if (hit) return hit;
return getAevEdgeById(id);
}
function normalizeRecordStatus(s: any): string {
if (typeof s !== "string" || !s) return "review_required";
const k = s.toLowerCase();
if (k === "accepted" || k === "review_required" || k === "excluded") return k;
if (k.includes("accept")) return "accepted";
if (k.includes("exclude") || k.includes("reject")) return "excluded";
return "review_required";
}
function getAevEdgeById(id: string): Edge | undefined {
const wb = store.workbench;
if (!wb || !wb.snapshot_indexes) return undefined;
const activeSid = wb.active_snapshot?.snapshot_id;
const sids = activeSid
? [activeSid, ...Object.keys(wb.snapshot_indexes).filter((s) => s !== activeSid)]
: Object.keys(wb.snapshot_indexes);
for (const sid of sids) {
const idx = wb.snapshot_indexes[sid];
const arr = idx?.edges_by_edge_id?.[id];
if (!arr) continue;
const raw = Array.isArray(arr) ? arr[0] : arr;
if (!raw) continue;
return {
...raw,
edge_id: raw.edge_id ?? id,
paper_key: raw.paper_id ?? raw.paper_key ?? "",
edge_order: raw.edge_order ?? 0,
equation_type: raw.equation_type ?? "",
paper_title: raw.paper_title ?? "",
paper_abstract: raw.paper_abstract ?? "",
first_author: raw.first_author ?? "",
year: raw.year ?? null,
doi: raw.doi ?? null,
short_title: raw.short_title ?? raw.paper_title ?? "",
x_raw: raw.x_raw ?? raw.x_norm_text ?? "",
y_raw: raw.y_raw ?? raw.y_norm_text ?? "",
x_norm: raw.x_norm ?? raw.x_norm_text ?? raw.x_raw ?? "",
y_norm: raw.y_norm ?? raw.y_norm_text ?? raw.y_raw ?? "",
population_bucket: raw.population_bucket ?? raw.population_bucket_raw ?? "",
outcome_type: raw.outcome_type ?? raw.outcome_type_raw ?? "",
time_index: raw.time_index ?? raw.time_index_raw ?? "",
time_horizon: raw.time_horizon ?? raw.time_horizon_raw ?? "",
measure_family: raw.measure_family ?? raw.measure_family_raw ?? "",
measure_type: raw.measure_type ?? raw.measure_type_raw ?? "",
measure_scale: raw.measure_scale ?? raw.measure_scale_raw ?? "",
intervention_name: raw.intervention_name ?? null,
id_strategy: raw.id_strategy ?? "",
id_status: raw.id_status ?? "",
assumptions: raw.assumptions ?? [],
theta_hat: raw.theta_hat ?? raw.theta_hat_norm ?? raw.theta_hat_raw ?? null,
ci_low: raw.ci_low ?? raw.ci_low_norm ?? raw.ci_low_raw ?? null,
ci_high: raw.ci_high ?? raw.ci_high_norm ?? raw.ci_high_raw ?? null,
theta_sign: raw.theta_sign ?? raw.effect_direction ?? null,
p_value: raw.p_value ?? raw.p_value_raw ?? null,
n: raw.n ?? raw.n_raw ?? null,
design: raw.design ?? raw.study_design_raw ?? null,
grade: raw.grade ?? null,
pmid: raw.pmid ?? null,
pmcid: raw.pmcid ?? null,
record_status: normalizeRecordStatus(raw.record_status),
} as Edge;
}
return undefined;
}
export function getPaperByKey(k: string): Paper | undefined {
return store.papersByKey.get(k);
}
export function getAuditForEdge(id: string): AuditIssue[] {
return store.auditByEdge.get(id) ?? [];
}
export function getEdgesForPaper(k: string): Edge[] {
return store.edgesByPaper.get(k) ?? [];
}
export function getEdgesForBucket(k: string): Edge[] {
return store.edgesByBucket.get(k) ?? [];
}
export function getWorkbench(): WorkbenchData | null {
return store.workbench;
}
// -------------------------------------------------------------- review decisions
export interface ReviewDecision {
edge_id: string;
decision: "accept" | "reject" | "edit";
note: string | null;
patched_fields: Record<string, any> | null;
user_id: string;
decided_at: string;
}
const reviewDecisions: ReviewDecision[] = [];
const decisionsByEdge = new Map<string, ReviewDecision>();
export function recordDecision(d: ReviewDecision): void {
reviewDecisions.push(d);
decisionsByEdge.set(d.edge_id, d);
logger.info(
{ edge_id: d.edge_id, decision: d.decision, by: d.user_id },
"review decision recorded",
);
}
export function getDecision(edgeId: string): ReviewDecision | undefined {
return decisionsByEdge.get(edgeId);
}
export function getAllDecisions(): ReviewDecision[] {
return reviewDecisions.slice();
}