doatlas-2 / artifacts /api-server /src /lib /causal-store.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
// Singleton wrapper around the persisted PostgresCausalNetwork from
// @workspace/networks. The chat agent's evidence drawer and the graph
// browser's bucket listing route through these helpers instead of
// constructing an InMemoryCausalNetwork from JSONL on every request.
//
// Schema is ensured lazily on the first call; if DATABASE_URL is not
// configured (or the underlying pool throws), the helpers degrade
// silently and return undefined / empty arrays so callers can fall
// back to the legacy literature_v1 JSONL store.
import { pool } from "@workspace/db";
import {
PostgresCausalNetwork,
type CausalEdge,
type LiteratureBucket,
type LocalCausalNet,
type PgQueryable,
} from "@workspace/networks";
import { logger } from "./logger";
let _store: PostgresCausalNetwork | null = null;
let _ready: Promise<void> | null = null;
let _disabled = false;
// pg.Pool satisfies the structural PgQueryable + connect() contract
// PostgresCausalNetwork ducktypes; expose it via that interface so the
// constructor sees a typed value instead of an `any` cast.
const pgQueryable: PgQueryable = pool;
function instance(): PostgresCausalNetwork {
if (!_store) _store = new PostgresCausalNetwork(pgQueryable);
return _store;
}
/** Lazily ensure the persistence schema. Idempotent. Disables the
* helpers permanently on first failure so a missing DB doesn't spam
* the logs on every request. */
export async function ensureCausalStoreReady(): Promise<boolean> {
if (_disabled) return false;
if (!_ready) {
_ready = instance()
.ensureSchema()
.catch((err) => {
_disabled = true;
logger.warn(
{ err: err?.message ?? String(err) },
"PostgresCausalNetwork unavailable; falling back to JSONL store",
);
});
}
await _ready;
return !_disabled;
}
export async function causalGetEdge(
edge_id: string,
): Promise<CausalEdge | undefined> {
if (!(await ensureCausalStoreReady())) return undefined;
try {
return await instance().getEdge(edge_id);
} catch (err) {
logger.warn({ err, edge_id }, "PostgresCausalNetwork.getEdge failed");
return undefined;
}
}
export async function causalEdgesForPaper(
paper_id: string,
): Promise<CausalEdge[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().edgesForPaper(paper_id);
} catch (err) {
logger.warn({ err, paper_id }, "PostgresCausalNetwork.edgesForPaper failed");
return [];
}
}
export async function causalSubgraphLocalNets(
paper_ids: string[],
): Promise<LocalCausalNet[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
const sub = await instance().subgraph(paper_ids);
return sub.listPaperIds().map((pid) => sub.getLocalNet(pid)!).filter(Boolean);
} catch (err) {
logger.warn({ err, paper_ids }, "PostgresCausalNetwork.subgraph failed");
return [];
}
}
export async function causalListBuckets(): Promise<LiteratureBucket[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().listBuckets();
} catch (err) {
logger.warn({ err }, "PostgresCausalNetwork.listBuckets failed");
return [];
}
}
export async function causalEdgesInBucket(
bucket_id: string,
): Promise<CausalEdge[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().edgesInBucket(bucket_id);
} catch (err) {
logger.warn({ err, bucket_id }, "PostgresCausalNetwork.edgesInBucket failed");
return [];
}
}
export async function causalGetBucket(
bucket_id: string,
): Promise<LiteratureBucket | undefined> {
if (!(await ensureCausalStoreReady())) return undefined;
try {
return await instance().getBucket(bucket_id);
} catch (err) {
logger.warn({ err, bucket_id }, "PostgresCausalNetwork.getBucket failed");
return undefined;
}
}
/** Direct accessor for callers (e.g. backfill scripts) that need to
* ingest LocalCausalNets. Returns undefined when the store is
* disabled. */
export function getCausalStore(): PostgresCausalNetwork | null {
return _disabled ? null : instance();
}