doatlas-2 / artifacts /api-server /scripts /backfill-causal-store.mjs
Iostream-Li's picture
Add files using upload-large-folder tool
6d1fe92 verified
#!/usr/bin/env node
// Backfill PostgresCausalNetwork from the legacy casual_part snapshots
// shipped under reference/doatlas-handoff/doatlas-backend-py. Run once
// per environment to give the new readers (chat agent evidence drawer,
// graph browser bucket listing) data to serve.
//
// Usage:
// DATABASE_URL=... node scripts/backfill-causal-store.mjs [--root <path>] [--limit N]
//
// Defaults to scanning the in-repo reference handoff.
import { existsSync, readdirSync, statSync } from "node:fs";
import { join, resolve } from "node:path";
import pg from "pg";
import {
PostgresCausalNetwork,
loadLocalCausalNetFromCasualPart,
} from "@workspace/networks";
function parseArgs(argv) {
const out = { root: null, limit: Infinity };
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === "--root") out.root = argv[++i];
else if (a === "--limit") out.limit = Number(argv[++i]) || Infinity;
}
return out;
}
async function main() {
const args = parseArgs(process.argv.slice(2));
if (!process.env.DATABASE_URL) {
console.error("DATABASE_URL is required");
process.exit(1);
}
const repoRoot = resolve(import.meta.dirname, "..", "..", "..");
const root =
args.root ?? join(repoRoot, "reference", "doatlas-handoff", "doatlas-backend-py");
const casualPart = join(root, "casual_part");
if (!existsSync(casualPart)) {
console.error(`casual_part directory not found at ${casualPart}`);
process.exit(1);
}
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const store = new PostgresCausalNetwork(pool);
await store.ensureSchema();
let ingested = 0;
let failed = 0;
outer: for (const bucket of readdirSync(casualPart).sort()) {
const bDir = join(casualPart, bucket);
if (!statSync(bDir).isDirectory()) continue;
for (const paper of readdirSync(bDir).sort()) {
const pDir = join(bDir, paper);
if (!statSync(pDir).isDirectory()) continue;
if (!existsSync(join(pDir, "edges.json"))) continue;
try {
const net = loadLocalCausalNetFromCasualPart({ root }, bucket, paper);
await store.ingestLocalNet(net);
ingested++;
if (ingested % 25 === 0) {
console.log(` ingested ${ingested} papers...`);
}
if (ingested >= args.limit) break outer;
} catch (err) {
failed++;
console.warn(
` skip ${bucket}/${paper}: ${err?.message ?? String(err)}`,
);
}
}
}
const buckets = await store.listBuckets();
await pool.end();
console.log(
`\nBackfill complete: ingested=${ingested}, failed=${failed}, distinct_buckets=${buckets.length}`,
);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});