File size: 4,122 Bytes
5871090 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | // Singleton wrapper around the persisted PostgresCausalNetwork from
// @workspace/networks. The chat agent's evidence drawer and the graph
// browser's bucket listing route through these helpers instead of
// constructing an InMemoryCausalNetwork from JSONL on every request.
//
// Schema is ensured lazily on the first call; if DATABASE_URL is not
// configured (or the underlying pool throws), the helpers degrade
// silently and return undefined / empty arrays so callers can fall
// back to the legacy literature_v1 JSONL store.
import { pool } from "@workspace/db";
import {
PostgresCausalNetwork,
type CausalEdge,
type LiteratureBucket,
type LocalCausalNet,
type PgQueryable,
} from "@workspace/networks";
import { logger } from "./logger";
let _store: PostgresCausalNetwork | null = null;
let _ready: Promise<void> | null = null;
let _disabled = false;
// pg.Pool satisfies the structural PgQueryable + connect() contract
// PostgresCausalNetwork ducktypes; expose it via that interface so the
// constructor sees a typed value instead of an `any` cast.
const pgQueryable: PgQueryable = pool;
function instance(): PostgresCausalNetwork {
if (!_store) _store = new PostgresCausalNetwork(pgQueryable);
return _store;
}
/** Lazily ensure the persistence schema. Idempotent. Disables the
* helpers permanently on first failure so a missing DB doesn't spam
* the logs on every request. */
export async function ensureCausalStoreReady(): Promise<boolean> {
if (_disabled) return false;
if (!_ready) {
_ready = instance()
.ensureSchema()
.catch((err) => {
_disabled = true;
logger.warn(
{ err: err?.message ?? String(err) },
"PostgresCausalNetwork unavailable; falling back to JSONL store",
);
});
}
await _ready;
return !_disabled;
}
export async function causalGetEdge(
edge_id: string,
): Promise<CausalEdge | undefined> {
if (!(await ensureCausalStoreReady())) return undefined;
try {
return await instance().getEdge(edge_id);
} catch (err) {
logger.warn({ err, edge_id }, "PostgresCausalNetwork.getEdge failed");
return undefined;
}
}
export async function causalEdgesForPaper(
paper_id: string,
): Promise<CausalEdge[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().edgesForPaper(paper_id);
} catch (err) {
logger.warn({ err, paper_id }, "PostgresCausalNetwork.edgesForPaper failed");
return [];
}
}
export async function causalSubgraphLocalNets(
paper_ids: string[],
): Promise<LocalCausalNet[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
const sub = await instance().subgraph(paper_ids);
return sub.listPaperIds().map((pid) => sub.getLocalNet(pid)!).filter(Boolean);
} catch (err) {
logger.warn({ err, paper_ids }, "PostgresCausalNetwork.subgraph failed");
return [];
}
}
export async function causalListBuckets(): Promise<LiteratureBucket[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().listBuckets();
} catch (err) {
logger.warn({ err }, "PostgresCausalNetwork.listBuckets failed");
return [];
}
}
export async function causalEdgesInBucket(
bucket_id: string,
): Promise<CausalEdge[]> {
if (!(await ensureCausalStoreReady())) return [];
try {
return await instance().edgesInBucket(bucket_id);
} catch (err) {
logger.warn({ err, bucket_id }, "PostgresCausalNetwork.edgesInBucket failed");
return [];
}
}
export async function causalGetBucket(
bucket_id: string,
): Promise<LiteratureBucket | undefined> {
if (!(await ensureCausalStoreReady())) return undefined;
try {
return await instance().getBucket(bucket_id);
} catch (err) {
logger.warn({ err, bucket_id }, "PostgresCausalNetwork.getBucket failed");
return undefined;
}
}
/** Direct accessor for callers (e.g. backfill scripts) that need to
* ingest LocalCausalNets. Returns undefined when the store is
* disabled. */
export function getCausalStore(): PostgresCausalNetwork | null {
return _disabled ? null : instance();
}
|