File size: 4,122 Bytes
5871090
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Singleton wrapper around the persisted PostgresCausalNetwork from
// @workspace/networks. The chat agent's evidence drawer and the graph
// browser's bucket listing route through these helpers instead of
// constructing an InMemoryCausalNetwork from JSONL on every request.
//
// Schema is ensured lazily on the first call; if DATABASE_URL is not
// configured (or the underlying pool throws), the helpers degrade
// silently and return undefined / empty arrays so callers can fall
// back to the legacy literature_v1 JSONL store.

import { pool } from "@workspace/db";
import {
  PostgresCausalNetwork,
  type CausalEdge,
  type LiteratureBucket,
  type LocalCausalNet,
  type PgQueryable,
} from "@workspace/networks";
import { logger } from "./logger";

let _store: PostgresCausalNetwork | null = null;
let _ready: Promise<void> | null = null;
let _disabled = false;

// pg.Pool satisfies the structural PgQueryable + connect() contract
// PostgresCausalNetwork ducktypes; expose it via that interface so the
// constructor sees a typed value instead of an `any` cast.
const pgQueryable: PgQueryable = pool;

function instance(): PostgresCausalNetwork {
  if (!_store) _store = new PostgresCausalNetwork(pgQueryable);
  return _store;
}

/** Lazily ensure the persistence schema. Idempotent. Disables the
 *  helpers permanently on first failure so a missing DB doesn't spam
 *  the logs on every request. */
export async function ensureCausalStoreReady(): Promise<boolean> {
  if (_disabled) return false;
  if (!_ready) {
    _ready = instance()
      .ensureSchema()
      .catch((err) => {
        _disabled = true;
        logger.warn(
          { err: err?.message ?? String(err) },
          "PostgresCausalNetwork unavailable; falling back to JSONL store",
        );
      });
  }
  await _ready;
  return !_disabled;
}

export async function causalGetEdge(
  edge_id: string,
): Promise<CausalEdge | undefined> {
  if (!(await ensureCausalStoreReady())) return undefined;
  try {
    return await instance().getEdge(edge_id);
  } catch (err) {
    logger.warn({ err, edge_id }, "PostgresCausalNetwork.getEdge failed");
    return undefined;
  }
}

export async function causalEdgesForPaper(
  paper_id: string,
): Promise<CausalEdge[]> {
  if (!(await ensureCausalStoreReady())) return [];
  try {
    return await instance().edgesForPaper(paper_id);
  } catch (err) {
    logger.warn({ err, paper_id }, "PostgresCausalNetwork.edgesForPaper failed");
    return [];
  }
}

export async function causalSubgraphLocalNets(
  paper_ids: string[],
): Promise<LocalCausalNet[]> {
  if (!(await ensureCausalStoreReady())) return [];
  try {
    const sub = await instance().subgraph(paper_ids);
    return sub.listPaperIds().map((pid) => sub.getLocalNet(pid)!).filter(Boolean);
  } catch (err) {
    logger.warn({ err, paper_ids }, "PostgresCausalNetwork.subgraph failed");
    return [];
  }
}

export async function causalListBuckets(): Promise<LiteratureBucket[]> {
  if (!(await ensureCausalStoreReady())) return [];
  try {
    return await instance().listBuckets();
  } catch (err) {
    logger.warn({ err }, "PostgresCausalNetwork.listBuckets failed");
    return [];
  }
}

export async function causalEdgesInBucket(
  bucket_id: string,
): Promise<CausalEdge[]> {
  if (!(await ensureCausalStoreReady())) return [];
  try {
    return await instance().edgesInBucket(bucket_id);
  } catch (err) {
    logger.warn({ err, bucket_id }, "PostgresCausalNetwork.edgesInBucket failed");
    return [];
  }
}

export async function causalGetBucket(
  bucket_id: string,
): Promise<LiteratureBucket | undefined> {
  if (!(await ensureCausalStoreReady())) return undefined;
  try {
    return await instance().getBucket(bucket_id);
  } catch (err) {
    logger.warn({ err, bucket_id }, "PostgresCausalNetwork.getBucket failed");
    return undefined;
  }
}

/** Direct accessor for callers (e.g. backfill scripts) that need to
 *  ingest LocalCausalNets. Returns undefined when the store is
 *  disabled. */
export function getCausalStore(): PostgresCausalNetwork | null {
  return _disabled ? null : instance();
}