File size: 2,755 Bytes
6d1fe92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env node
// Backfill PostgresCausalNetwork from the legacy casual_part snapshots
// shipped under reference/doatlas-handoff/doatlas-backend-py. Run once
// per environment to give the new readers (chat agent evidence drawer,
// graph browser bucket listing) data to serve.
//
// Usage:
//   DATABASE_URL=... node scripts/backfill-causal-store.mjs [--root <path>] [--limit N]
//
// Defaults to scanning the in-repo reference handoff.

import { existsSync, readdirSync, statSync } from "node:fs";
import { join, resolve } from "node:path";
import pg from "pg";
import {
  PostgresCausalNetwork,
  loadLocalCausalNetFromCasualPart,
} from "@workspace/networks";

function parseArgs(argv) {
  const out = { root: null, limit: Infinity };
  for (let i = 0; i < argv.length; i++) {
    const a = argv[i];
    if (a === "--root") out.root = argv[++i];
    else if (a === "--limit") out.limit = Number(argv[++i]) || Infinity;
  }
  return out;
}

async function main() {
  const args = parseArgs(process.argv.slice(2));
  if (!process.env.DATABASE_URL) {
    console.error("DATABASE_URL is required");
    process.exit(1);
  }
  const repoRoot = resolve(import.meta.dirname, "..", "..", "..");
  const root =
    args.root ?? join(repoRoot, "reference", "doatlas-handoff", "doatlas-backend-py");
  const casualPart = join(root, "casual_part");
  if (!existsSync(casualPart)) {
    console.error(`casual_part directory not found at ${casualPart}`);
    process.exit(1);
  }

  const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
  const store = new PostgresCausalNetwork(pool);
  await store.ensureSchema();

  let ingested = 0;
  let failed = 0;
  outer: for (const bucket of readdirSync(casualPart).sort()) {
    const bDir = join(casualPart, bucket);
    if (!statSync(bDir).isDirectory()) continue;
    for (const paper of readdirSync(bDir).sort()) {
      const pDir = join(bDir, paper);
      if (!statSync(pDir).isDirectory()) continue;
      if (!existsSync(join(pDir, "edges.json"))) continue;
      try {
        const net = loadLocalCausalNetFromCasualPart({ root }, bucket, paper);
        await store.ingestLocalNet(net);
        ingested++;
        if (ingested % 25 === 0) {
          console.log(`  ingested ${ingested} papers...`);
        }
        if (ingested >= args.limit) break outer;
      } catch (err) {
        failed++;
        console.warn(
          `  skip ${bucket}/${paper}: ${err?.message ?? String(err)}`,
        );
      }
    }
  }

  const buckets = await store.listBuckets();
  await pool.end();
  console.log(
    `\nBackfill complete: ingested=${ingested}, failed=${failed}, distinct_buckets=${buckets.length}`,
  );
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});