import { createReadStream, readFileSync, existsSync } from "node:fs"; import { createInterface } from "node:readline"; import { join } from "node:path"; import { logger } from "./logger"; const DATA_ROOT = join(process.cwd(), "data"); const LIT = join(DATA_ROOT, "literature_v1"); const M11 = join(DATA_ROOT, "m11_workbench"); export interface Bucket { bucket_key: string; population_bucket: string; x_norm: string; y_norm: string; equation_type: string; measure_type: string; measure_scale: string; time_index: string; time_horizon: string; outcome_type: string; edge_count: number; paper_count: number; accepted_edge_count: number; review_required_edge_count: number; usable_effect_count: number; missing_effect_count: number; blocked_effect_count: number; has_direction_conflict: boolean; theta_min: number | null; theta_max: number | null; supporting_edge_ids: string[]; supporting_paper_keys: string[]; theta_signs: string[]; // Pooled literature estimate from PostgresCausalNetwork (NIL). // Optional because the legacy JSONL store doesn't carry it; only // buckets sourced from `literature_buckets` populate this field. pooled_estimate?: { theta_hat: number | null; ci: [number | null, number | null]; method: "fixed_effect" | "random_effect" | "raw_mean" | "none"; heterogeneity_i2: number | null; } | null; } export interface Edge { edge_id: string; paper_key: string; edge_order: number; equation_type: string; paper_title: string; paper_abstract: string; first_author: string; year: number | null; doi: string | null; short_title: string; x_raw: string; y_raw: string; x_norm: string; y_norm: string; population_bucket: string; outcome_type: string; time_index: string; time_horizon: string; measure_family: string; measure_type: string; measure_scale: string; intervention_name: string | null; id_strategy: string; id_status: string; assumptions: string[]; theta_hat: number | null; ci_low: number | null; ci_high: number | null; theta_sign: string | null; p_value: number | null; n: number | null; design: string | null; grade: string | null; model: string | null; adjustment_set: string[]; record_status: "accepted" | "review_required" | "excluded"; effect_status: string; placeholder_flag: boolean; review_valid: boolean; review_semantically_valid: boolean; review_fill_rate: number; review_issue_count: number; mapping_statuses: string[]; has_missing_mapping: boolean; has_tentative_mapping: boolean; audit_error_count: number; audit_warning_count: number; audit_checks: string[]; audit_actions: string[]; pmid: string | null; pmcid: string | null; bucket_key: string | null; } export interface Paper { paper_key: string; first_author: string; year: number | null; doi: string | null; short_title: string; paper_title: string; paper_abstract: string; primary_category: string; secondary_tags: string[]; classification_confidence: string; pipeline_status: string; edge_count_final: number; accepted_edge_count: number; review_required_edge_count: number; excluded_edge_count: number; usable_effect_edge_count: number; blocked_effect_edge_count: number; pmid: string | null; pmcid: string | null; } export interface AuditIssue { paper_key: string; edge_id: string; edge_index: number; check: string; severity: "error" | "warning" | "info"; field: string; action: string | null; message: string; value: any; } export interface SnapshotEntry { snapshot_id: string; snapshot_label: string; snapshot_db_path: string; created_at: string; stats: { snapshot_id: string; accepted_edge_count: number; review_required_edge_count: number; excluded_edge_count: number; bucket_count: number; graph_edge_count: number; created_at: string; }; } export interface TaskRecord { task_run_id: string; snapshot_id: string; task_mode: string; task_input: Record; task_plan: Record; result_object_type: string; result_object_id: string; result: Record; task_file_relative: string; summary?: any; } export interface TestCase { case_id: string; label: string; description: string; task_mode: string; query: Record; task_input: Record; tags: string[]; } export interface WorkbenchData { generated_at: string; title: string; active_snapshot: { snapshot_id: string; snapshot_label: string; created_at: string; published_at: string; source: string; }; snapshots: SnapshotEntry[]; test_cases: TestCase[]; tasks: TaskRecord[]; snapshot_indexes: Record; } // -------------------------------------------------------------- in-memory store interface Store { buckets: Bucket[]; bucketsByKey: Map; edges: Edge[]; edgesById: Map; edgesByPaper: Map; edgesByBucket: Map; papers: Paper[]; papersByKey: Map; audit: AuditIssue[]; auditByEdge: Map; workbench: WorkbenchData | null; loaded: boolean; loadedAt: number; } const store: Store = { buckets: [], bucketsByKey: new Map(), edges: [], edgesById: new Map(), edgesByPaper: new Map(), edgesByBucket: new Map(), papers: [], papersByKey: new Map(), audit: [], auditByEdge: new Map(), workbench: null, loaded: false, loadedAt: 0, }; function safeJsonArr(s: any): string[] { if (Array.isArray(s)) return s as string[]; if (typeof s !== "string" || !s) return []; try { const v = JSON.parse(s); return Array.isArray(v) ? v : []; } catch { return []; } } async function loadJsonl(path: string, mapRow: (row: any) => T): Promise { if (!existsSync(path)) { logger.warn({ path }, "research data file not found, skipping"); return []; } const out: T[] = []; const rl = createInterface({ input: createReadStream(path), crlfDelay: Infinity }); for await (const line of rl) { const trimmed = line.trim(); if (!trimmed) continue; try { out.push(mapRow(JSON.parse(trimmed))); } catch { // skip malformed } } return out; } function buildBucketKeyForEdge(e: Edge): string { // Best-effort recompute: edges JSONL doesn't carry bucket_key directly, // but population_bucket + x_norm + y_norm + measure_type + time_horizon // collide with the bucket fingerprint. We pre-index buckets by these. return [ e.population_bucket, e.x_norm, e.y_norm, e.equation_type, e.measure_type, e.time_horizon, ].join("||"); } function buildBucketCompositeKey(b: Bucket): string { return [ b.population_bucket, b.x_norm, b.y_norm, b.equation_type, b.measure_type, b.time_horizon, ].join("||"); } let loadingPromise: Promise | null = null; export async function ensureLoaded(): Promise { if (store.loaded) return; if (loadingPromise) return loadingPromise; loadingPromise = (async () => { const t0 = Date.now(); logger.info({ root: DATA_ROOT }, "loading research data"); // ---- buckets store.buckets = await loadJsonl(join(LIT, "buckets.jsonl"), (r) => ({ bucket_key: r.bucket_key, population_bucket: r.population_bucket, x_norm: r.x_norm, y_norm: r.y_norm, equation_type: r.equation_type, measure_type: r.measure_type, measure_scale: r.measure_scale, time_index: r.time_index, time_horizon: r.time_horizon, outcome_type: r.outcome_type, edge_count: Number(r.edge_count) || 0, paper_count: Number(r.paper_count) || 0, accepted_edge_count: Number(r.accepted_edge_count) || 0, review_required_edge_count: Number(r.review_required_edge_count) || 0, usable_effect_count: Number(r.usable_effect_count) || 0, missing_effect_count: Number(r.missing_effect_count) || 0, blocked_effect_count: Number(r.blocked_effect_count) || 0, has_direction_conflict: Boolean(r.has_direction_conflict), theta_min: r.theta_min, theta_max: r.theta_max, supporting_edge_ids: safeJsonArr(r.supporting_edge_ids_json), supporting_paper_keys: safeJsonArr(r.supporting_paper_keys_json), theta_signs: safeJsonArr(r.theta_signs_json), })); const compositeToBucketKey = new Map(); for (const b of store.buckets) { store.bucketsByKey.set(b.bucket_key, b); compositeToBucketKey.set(buildBucketCompositeKey(b), b.bucket_key); } // ---- edges store.edges = await loadJsonl(join(LIT, "edges.jsonl"), (r) => ({ edge_id: r.edge_id, paper_key: r.paper_key, edge_order: Number(r.edge_order) || 0, equation_type: r.equation_type, paper_title: r.paper_title, paper_abstract: r.paper_abstract, first_author: r.first_author, year: r.year ?? null, doi: r.doi ?? null, short_title: r.short_title, x_raw: r.x_raw, y_raw: r.y_raw, x_norm: r.x_norm, y_norm: r.y_norm, population_bucket: r.population_bucket, outcome_type: r.outcome_type, time_index: r.time_index, time_horizon: r.time_horizon, measure_family: r.measure_family, measure_type: r.measure_type, measure_scale: r.measure_scale, intervention_name: r.intervention_name ?? null, id_strategy: r.id_strategy, id_status: r.id_status, assumptions: safeJsonArr(r.assumptions_json), theta_hat: r.theta_hat, ci_low: r.ci_low, ci_high: r.ci_high, theta_sign: r.theta_sign, p_value: r.p_value, n: r.n, design: r.design, grade: r.grade, model: r.model, adjustment_set: safeJsonArr(r.adjustment_set_json), record_status: r.record_status, effect_status: r.effect_status, placeholder_flag: Boolean(r.placeholder_flag), review_valid: Boolean(r.review_valid), review_semantically_valid: Boolean(r.review_semantically_valid), review_fill_rate: Number(r.review_fill_rate) || 0, review_issue_count: Number(r.review_issue_count) || 0, mapping_statuses: safeJsonArr(r.mapping_statuses_json), has_missing_mapping: Boolean(r.has_missing_mapping), has_tentative_mapping: Boolean(r.has_tentative_mapping), audit_error_count: Number(r.audit_error_count) || 0, audit_warning_count: Number(r.audit_warning_count) || 0, audit_checks: safeJsonArr(r.audit_checks_json), audit_actions: safeJsonArr(r.audit_actions_json), pmid: r.pmid ?? null, pmcid: r.pmcid ?? null, bucket_key: null, })); for (const e of store.edges) { e.bucket_key = compositeToBucketKey.get(buildBucketKeyForEdge(e)) ?? null; store.edgesById.set(e.edge_id, e); const arr = store.edgesByPaper.get(e.paper_key) ?? []; arr.push(e); store.edgesByPaper.set(e.paper_key, arr); if (e.bucket_key) { const ba = store.edgesByBucket.get(e.bucket_key) ?? []; ba.push(e); store.edgesByBucket.set(e.bucket_key, ba); } } // ---- papers store.papers = await loadJsonl(join(LIT, "papers.jsonl"), (r) => ({ paper_key: r.paper_key, first_author: r.first_author, year: r.year ?? null, doi: r.doi ?? null, short_title: r.short_title, paper_title: r.paper_title, paper_abstract: r.paper_abstract, primary_category: r.primary_category, secondary_tags: safeJsonArr(r.secondary_tags_json), classification_confidence: r.classification_confidence, pipeline_status: r.pipeline_status, edge_count_final: Number(r.edge_count_final) || 0, accepted_edge_count: Number(r.accepted_edge_count) || 0, review_required_edge_count: Number(r.review_required_edge_count) || 0, excluded_edge_count: Number(r.excluded_edge_count) || 0, usable_effect_edge_count: Number(r.usable_effect_edge_count) || 0, blocked_effect_edge_count: Number(r.blocked_effect_edge_count) || 0, pmid: r.pmid ?? null, pmcid: r.pmcid ?? null, })); for (const p of store.papers) store.papersByKey.set(p.paper_key, p); // ---- audit issues store.audit = await loadJsonl(join(LIT, "audit_issues.jsonl"), (r) => ({ paper_key: r.paper_key, edge_id: r.edge_id, edge_index: Number(r.edge_index) || 0, check: r.check, severity: (r.severity as AuditIssue["severity"]) || "info", field: r.field, action: r.action ?? null, message: r.message, value: r.value, })); for (const a of store.audit) { const arr = store.auditByEdge.get(a.edge_id) ?? []; arr.push(a); store.auditByEdge.set(a.edge_id, arr); } // ---- workbench json const wbPath = join(M11, "workbench_data.json"); if (existsSync(wbPath)) { try { store.workbench = JSON.parse(readFileSync(wbPath, "utf8")) as WorkbenchData; } catch (err) { logger.error({ err }, "failed to parse workbench_data.json"); } } store.loaded = true; store.loadedAt = Date.now(); logger.info( { ms: Date.now() - t0, buckets: store.buckets.length, edges: store.edges.length, papers: store.papers.length, audit: store.audit.length, snapshots: store.workbench?.snapshots.length ?? 0, tasks: store.workbench?.tasks.length ?? 0, }, "research data loaded", ); })(); return loadingPromise; } // -------------------------------------------------------------- public getters export function getStore() { return store; } export function getBucketByKey(k: string): Bucket | undefined { return store.bucketsByKey.get(k); } export function getEdgeById(id: string): Edge | undefined { const hit = store.edgesById.get(id); if (hit) return hit; return getAevEdgeById(id); } function normalizeRecordStatus(s: any): string { if (typeof s !== "string" || !s) return "review_required"; const k = s.toLowerCase(); if (k === "accepted" || k === "review_required" || k === "excluded") return k; if (k.includes("accept")) return "accepted"; if (k.includes("exclude") || k.includes("reject")) return "excluded"; return "review_required"; } function getAevEdgeById(id: string): Edge | undefined { const wb = store.workbench; if (!wb || !wb.snapshot_indexes) return undefined; const activeSid = wb.active_snapshot?.snapshot_id; const sids = activeSid ? [activeSid, ...Object.keys(wb.snapshot_indexes).filter((s) => s !== activeSid)] : Object.keys(wb.snapshot_indexes); for (const sid of sids) { const idx = wb.snapshot_indexes[sid]; const arr = idx?.edges_by_edge_id?.[id]; if (!arr) continue; const raw = Array.isArray(arr) ? arr[0] : arr; if (!raw) continue; return { ...raw, edge_id: raw.edge_id ?? id, paper_key: raw.paper_id ?? raw.paper_key ?? "", edge_order: raw.edge_order ?? 0, equation_type: raw.equation_type ?? "", paper_title: raw.paper_title ?? "", paper_abstract: raw.paper_abstract ?? "", first_author: raw.first_author ?? "", year: raw.year ?? null, doi: raw.doi ?? null, short_title: raw.short_title ?? raw.paper_title ?? "", x_raw: raw.x_raw ?? raw.x_norm_text ?? "", y_raw: raw.y_raw ?? raw.y_norm_text ?? "", x_norm: raw.x_norm ?? raw.x_norm_text ?? raw.x_raw ?? "", y_norm: raw.y_norm ?? raw.y_norm_text ?? raw.y_raw ?? "", population_bucket: raw.population_bucket ?? raw.population_bucket_raw ?? "", outcome_type: raw.outcome_type ?? raw.outcome_type_raw ?? "", time_index: raw.time_index ?? raw.time_index_raw ?? "", time_horizon: raw.time_horizon ?? raw.time_horizon_raw ?? "", measure_family: raw.measure_family ?? raw.measure_family_raw ?? "", measure_type: raw.measure_type ?? raw.measure_type_raw ?? "", measure_scale: raw.measure_scale ?? raw.measure_scale_raw ?? "", intervention_name: raw.intervention_name ?? null, id_strategy: raw.id_strategy ?? "", id_status: raw.id_status ?? "", assumptions: raw.assumptions ?? [], theta_hat: raw.theta_hat ?? raw.theta_hat_norm ?? raw.theta_hat_raw ?? null, ci_low: raw.ci_low ?? raw.ci_low_norm ?? raw.ci_low_raw ?? null, ci_high: raw.ci_high ?? raw.ci_high_norm ?? raw.ci_high_raw ?? null, theta_sign: raw.theta_sign ?? raw.effect_direction ?? null, p_value: raw.p_value ?? raw.p_value_raw ?? null, n: raw.n ?? raw.n_raw ?? null, design: raw.design ?? raw.study_design_raw ?? null, grade: raw.grade ?? null, pmid: raw.pmid ?? null, pmcid: raw.pmcid ?? null, record_status: normalizeRecordStatus(raw.record_status), } as Edge; } return undefined; } export function getPaperByKey(k: string): Paper | undefined { return store.papersByKey.get(k); } export function getAuditForEdge(id: string): AuditIssue[] { return store.auditByEdge.get(id) ?? []; } export function getEdgesForPaper(k: string): Edge[] { return store.edgesByPaper.get(k) ?? []; } export function getEdgesForBucket(k: string): Edge[] { return store.edgesByBucket.get(k) ?? []; } export function getWorkbench(): WorkbenchData | null { return store.workbench; } // -------------------------------------------------------------- review decisions export interface ReviewDecision { edge_id: string; decision: "accept" | "reject" | "edit"; note: string | null; patched_fields: Record | null; user_id: string; decided_at: string; } const reviewDecisions: ReviewDecision[] = []; const decisionsByEdge = new Map(); export function recordDecision(d: ReviewDecision): void { reviewDecisions.push(d); decisionsByEdge.set(d.edge_id, d); logger.info( { edge_id: d.edge_id, decision: d.decision, by: d.user_id }, "review decision recorded", ); } export function getDecision(edgeId: string): ReviewDecision | undefined { return decisionsByEdge.get(edgeId); } export function getAllDecisions(): ReviewDecision[] { return reviewDecisions.slice(); }