Kraft102's picture
Deploy from GitHub Actions 2025-12-16_03-24-50
33f2414 verified
/**
* Fast Neo4j Sync - Parallel batch sync with incremental support
*
* Much faster than sequential sync by:
* 1. Using larger batches (500 instead of 100)
* 2. Running multiple batches in parallel
* 3. Using UNWIND for bulk operations
* 4. Supporting incremental sync based on timestamps
*/
import neo4j, { Driver, Session } from 'neo4j-driver';
import { config } from 'dotenv';
import { resolve } from 'path';
// Load .env.production if it exists, otherwise fall back to .env
config({ path: resolve(process.cwd(), '.env.production') });
config({ path: resolve(process.cwd(), '.env') });
interface SyncConfig {
localUri: string;
localUser: string;
localPassword: string;
cloudUri: string;
cloudUser: string;
cloudPassword: string;
batchSize: number;
parallelBatches: number;
incrementalOnly: boolean;
lastSyncTime?: Date;
}
interface SyncStats {
nodesCreated: number;
nodesUpdated: number;
relationshipsCreated: number;
relationshipsUpdated: number;
duration: number;
}
class FastNeo4jSync {
private localDriver: Driver;
private cloudDriver: Driver;
private config: SyncConfig;
private stats: SyncStats = {
nodesCreated: 0,
nodesUpdated: 0,
relationshipsCreated: 0,
relationshipsUpdated: 0,
duration: 0
};
constructor(config: SyncConfig) {
this.config = config;
this.localDriver = neo4j.driver(
config.localUri,
neo4j.auth.basic(config.localUser, config.localPassword)
);
this.cloudDriver = neo4j.driver(
config.cloudUri,
neo4j.auth.basic(config.cloudUser, config.cloudPassword)
);
}
async connect(): Promise<void> {
console.log('📡 Connecting to databases...');
const localSession = this.localDriver.session();
const cloudSession = this.cloudDriver.session();
try {
await localSession.run('RETURN 1');
console.log(' ✓ Local Neo4j connected');
await cloudSession.run('RETURN 1');
console.log(' ✓ Cloud AuraDB connected');
} finally {
await localSession.close();
await cloudSession.close();
}
}
async getLastSyncTime(): Promise<Date | null> {
const cloudSession = this.cloudDriver.session();
try {
const result = await cloudSession.run(`
MATCH (s:_SyncMetadata {id: 'last_sync'})
RETURN s.timestamp as timestamp
`);
if (result.records.length > 0) {
const ts = result.records[0].get('timestamp');
return ts ? new Date(ts) : null;
}
return null;
} catch {
return null;
} finally {
await cloudSession.close();
}
}
async setLastSyncTime(timestamp: Date): Promise<void> {
const cloudSession = this.cloudDriver.session();
try {
await cloudSession.run(`
MERGE (s:_SyncMetadata {id: 'last_sync'})
SET s.timestamp = $timestamp
`, { timestamp: timestamp.toISOString() });
} finally {
await cloudSession.close();
}
}
async countNodesToSync(since?: Date): Promise<number> {
const localSession = this.localDriver.session();
try {
let query: string;
let params: Record<string, unknown> = {};
if (since) {
query = `
MATCH (n)
WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt)
RETURN count(n) as count
`;
params = { since: since.toISOString() };
} else {
query = 'MATCH (n) RETURN count(n) as count';
}
const result = await localSession.run(query, params);
return result.records[0].get('count').toNumber();
} finally {
await localSession.close();
}
}
async countRelationshipsToSync(since?: Date): Promise<number> {
const localSession = this.localDriver.session();
try {
let query: string;
let params: Record<string, unknown> = {};
if (since) {
// For relationships, we sync if either end node was updated
query = `
MATCH (a)-[r]->(b)
WHERE a.updatedAt > $since OR b.updatedAt > $since
OR a.createdAt > $since OR b.createdAt > $since
OR NOT exists(a.updatedAt)
RETURN count(r) as count
`;
params = { since: since.toISOString() };
} else {
query = 'MATCH ()-[r]->() RETURN count(r) as count';
}
const result = await localSession.run(query, params);
return result.records[0].get('count').toNumber();
} finally {
await localSession.close();
}
}
async syncNodesInParallel(since?: Date): Promise<void> {
const localSession = this.localDriver.session();
const totalNodes = await this.countNodesToSync(since);
console.log(`\n📦 Syncing ${totalNodes} nodes (batch: ${this.config.batchSize}, parallel: ${this.config.parallelBatches})...`);
if (totalNodes === 0) {
console.log(' ✓ No nodes to sync');
return;
}
let offset = 0;
let processed = 0;
const startTime = Date.now();
try {
while (offset < totalNodes) {
// Fetch multiple batches worth of nodes
const fetchSize = this.config.batchSize * this.config.parallelBatches;
let query: string;
let params: Record<string, unknown> = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) };
if (since) {
query = `
MATCH (n)
WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt)
RETURN n, labels(n) as labels, elementId(n) as elementId
SKIP $skip LIMIT $limit
`;
params.since = since.toISOString();
} else {
query = `
MATCH (n)
RETURN n, labels(n) as labels, elementId(n) as elementId
SKIP $skip LIMIT $limit
`;
}
const result = await localSession.run(query, params);
const nodes = result.records.map(r => ({
properties: r.get('n').properties,
labels: r.get('labels') as string[],
elementId: r.get('elementId') as string
}));
if (nodes.length === 0) break;
// Split into batches and process in parallel
const batches: typeof nodes[] = [];
for (let i = 0; i < nodes.length; i += this.config.batchSize) {
batches.push(nodes.slice(i, i + this.config.batchSize));
}
// Process batches in parallel
await Promise.all(batches.map(batch => this.importNodeBatch(batch)));
processed += nodes.length;
offset += fetchSize;
const elapsed = (Date.now() - startTime) / 1000;
const rate = Math.round(processed / elapsed);
const eta = Math.round((totalNodes - processed) / rate);
console.log(` Progress: ${processed}/${totalNodes} (${rate} nodes/sec, ETA: ${eta}s)`);
}
this.stats.nodesCreated = processed;
console.log(` ✓ Synced ${processed} nodes in ${((Date.now() - startTime) / 1000).toFixed(1)}s`);
} finally {
await localSession.close();
}
}
private async importNodeBatch(nodes: Array<{ properties: Record<string, unknown>; labels: string[]; elementId: string }>): Promise<void> {
const cloudSession = this.cloudDriver.session();
try {
// Group nodes by label combination for efficient UNWIND
const nodesByLabels = new Map<string, typeof nodes>();
for (const node of nodes) {
const labelKey = node.labels.sort().join(':');
if (!nodesByLabels.has(labelKey)) {
nodesByLabels.set(labelKey, []);
}
nodesByLabels.get(labelKey)!.push(node);
}
// Import each label group
for (const [labelKey, labelNodes] of nodesByLabels) {
const labels = labelKey || 'Node';
const nodeData = labelNodes.map(n => ({
...n.properties,
_syncId: n.elementId
}));
// Use MERGE to handle both create and update
await cloudSession.run(`
UNWIND $nodes as nodeData
MERGE (n:${labels} {_syncId: nodeData._syncId})
SET n = nodeData
`, { nodes: nodeData });
}
} finally {
await cloudSession.close();
}
}
async syncRelationshipsInParallel(since?: Date): Promise<void> {
const localSession = this.localDriver.session();
const totalRels = await this.countRelationshipsToSync(since);
console.log(`\n🔗 Syncing ${totalRels} relationships...`);
if (totalRels === 0) {
console.log(' ✓ No relationships to sync');
return;
}
let offset = 0;
let processed = 0;
const startTime = Date.now();
try {
while (offset < totalRels) {
const fetchSize = this.config.batchSize * this.config.parallelBatches;
let query: string;
let params: Record<string, unknown> = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) };
if (since) {
query = `
MATCH (a)-[r]->(b)
WHERE a.updatedAt > $since OR b.updatedAt > $since
OR a.createdAt > $since OR b.createdAt > $since
OR NOT exists(a.updatedAt)
RETURN type(r) as type, properties(r) as props,
elementId(a) as startId, elementId(b) as endId
SKIP $skip LIMIT $limit
`;
params.since = since.toISOString();
} else {
query = `
MATCH (a)-[r]->(b)
RETURN type(r) as type, properties(r) as props,
elementId(a) as startId, elementId(b) as endId
SKIP $skip LIMIT $limit
`;
}
const result = await localSession.run(query, params);
const rels = result.records.map(r => ({
type: r.get('type') as string,
properties: r.get('props') as Record<string, unknown>,
startId: r.get('startId') as string,
endId: r.get('endId') as string
}));
if (rels.length === 0) break;
// Split into batches and process in parallel
const batches: typeof rels[] = [];
for (let i = 0; i < rels.length; i += this.config.batchSize) {
batches.push(rels.slice(i, i + this.config.batchSize));
}
// Process batches sequentially to avoid deadlocks
for (const batch of batches) {
await this.importRelationshipBatch(batch);
}
processed += rels.length;
offset += fetchSize;
const elapsed = (Date.now() - startTime) / 1000;
const rate = Math.round(processed / elapsed);
const eta = Math.round((totalRels - processed) / rate);
console.log(` Progress: ${processed}/${totalRels} (${rate} rels/sec, ETA: ${eta}s)`);
}
this.stats.relationshipsCreated = processed;
console.log(` ✓ Synced ${processed} relationships in ${((Date.now() - startTime) / 1000).toFixed(1)}s`);
} finally {
await localSession.close();
}
}
private async importRelationshipBatch(rels: Array<{ type: string; properties: Record<string, unknown>; startId: string; endId: string }>): Promise<void> {
const cloudSession = this.cloudDriver.session();
try {
// Group by relationship type
const relsByType = new Map<string, typeof rels>();
for (const rel of rels) {
if (!relsByType.has(rel.type)) {
relsByType.set(rel.type, []);
}
relsByType.get(rel.type)!.push(rel);
}
// Import each type
for (const [relType, typeRels] of relsByType) {
const relData = typeRels.map(r => ({
startId: r.startId,
endId: r.endId,
props: r.properties
}));
await cloudSession.run(`
UNWIND $rels as relData
MATCH (a {_syncId: relData.startId})
MATCH (b {_syncId: relData.endId})
MERGE (a)-[r:${relType}]->(b)
SET r = relData.props
`, { rels: relData });
}
} finally {
await cloudSession.close();
}
}
async syncSchema(): Promise<void> {
console.log('\n📋 Syncing schema...');
const localSession = this.localDriver.session();
const cloudSession = this.cloudDriver.session();
try {
// Get constraints from local - build CREATE statements from available fields
const constraintsResult = await localSession.run('SHOW CONSTRAINTS');
for (const record of constraintsResult.records) {
const name = record.get('name');
const constraintType = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes') as string[];
const properties = record.get('properties') as string[];
if (!name || name.startsWith('_')) continue;
// Build CREATE CONSTRAINT statement
let createStatement = '';
const label = labelsOrTypes?.[0] || 'Node';
const prop = properties?.[0] || 'id';
if (constraintType === 'UNIQUENESS' && entityType === 'NODE') {
createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`;
} else if (constraintType === 'NODE_KEY') {
createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS NODE KEY`;
}
if (createStatement) {
try {
await cloudSession.run(createStatement);
console.log(` ✓ Constraint: ${name}`);
} catch (e: unknown) {
const error = e as Error;
if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) {
console.log(` ⚠ Constraint ${name}: ${error.message?.slice(0, 50)}`);
}
}
}
}
// Get indexes - exclude LOOKUP and internal indexes
const indexesResult = await localSession.run('SHOW INDEXES');
for (const record of indexesResult.records) {
const name = record.get('name');
const indexType = record.get('type');
const entityType = record.get('entityType');
const labelsOrTypes = record.get('labelsOrTypes') as string[];
const properties = record.get('properties') as string[];
if (!name || name.startsWith('_') || indexType === 'LOOKUP') continue;
const label = labelsOrTypes?.[0] || 'Node';
const props = properties || [];
let createStatement = '';
if (indexType === 'RANGE' && entityType === 'NODE' && props.length > 0) {
const propList = props.map(p => `n.${p}`).join(', ');
createStatement = `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (${propList})`;
} else if (indexType === 'VECTOR' && props.length > 0) {
// Skip vector indexes - they need special handling
console.log(` ⏭ Skipping vector index: ${name}`);
continue;
}
if (createStatement) {
try {
await cloudSession.run(createStatement);
console.log(` ✓ Index: ${name} (${indexType})`);
} catch (e: unknown) {
const error = e as Error;
if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) {
console.log(` ⚠ Index ${name}: ${error.message?.slice(0, 50)}`);
}
}
}
}
// Ensure _syncId index exists for fast lookups
try {
await cloudSession.run('CREATE INDEX sync_id_index IF NOT EXISTS FOR (n:Node) ON (n._syncId)');
} catch {
// Ignore
}
} finally {
await localSession.close();
await cloudSession.close();
}
}
async run(): Promise<SyncStats> {
const startTime = Date.now();
console.log('═'.repeat(60));
console.log('⚡ Fast Neo4j Sync');
console.log('═'.repeat(60));
console.log(`\nLocal: ${this.config.localUri}`);
console.log(`Cloud: ${this.config.cloudUri}`);
console.log(`Mode: ${this.config.incrementalOnly ? 'Incremental' : 'Full'}`);
console.log(`Batch: ${this.config.batchSize} nodes × ${this.config.parallelBatches} parallel`);
await this.connect();
let since: Date | undefined;
if (this.config.incrementalOnly) {
const lastSync = await this.getLastSyncTime();
if (lastSync) {
since = lastSync;
console.log(`\n📅 Last sync: ${lastSync.toISOString()}`);
console.log(' Only syncing changes since then...');
} else {
console.log('\n⚠️ No previous sync found, doing full sync...');
}
}
await this.syncSchema();
await this.syncNodesInParallel(since);
await this.syncRelationshipsInParallel(since);
// Record sync time
await this.setLastSyncTime(new Date());
this.stats.duration = Date.now() - startTime;
console.log('\n' + '═'.repeat(60));
console.log('✅ Sync Complete!');
console.log('═'.repeat(60));
console.log(` Nodes: ${this.stats.nodesCreated}`);
console.log(` Relationships: ${this.stats.relationshipsCreated}`);
console.log(` Duration: ${(this.stats.duration / 1000).toFixed(1)}s`);
console.log(` Rate: ${Math.round((this.stats.nodesCreated + this.stats.relationshipsCreated) / (this.stats.duration / 1000))} items/sec`);
return this.stats;
}
async close(): Promise<void> {
await this.localDriver.close();
await this.cloudDriver.close();
}
}
// CLI
async function main() {
const args = process.argv.slice(2);
const incremental = args.includes('--incremental') || args.includes('-i');
const help = args.includes('--help') || args.includes('-h');
if (help) {
console.log(`
Fast Neo4j Sync - Parallel batch sync with incremental support
Usage: npx tsx src/scripts/fast-neo4j-sync.ts [options]
Options:
--incremental, -i Only sync changes since last sync
--help, -h Show this help
Environment variables:
NEO4J_LOCAL_URI Local Neo4j URI (default: bolt://localhost:7687)
NEO4J_LOCAL_USER Local Neo4j user (default: neo4j)
NEO4J_LOCAL_PASSWORD Local Neo4j password
NEO4J_CLOUD_URI Cloud AuraDB URI
NEO4J_CLOUD_USER Cloud user (default: neo4j)
NEO4J_CLOUD_PASSWORD Cloud password
`);
process.exit(0);
}
// Cloud URI from .env.production (NEO4J_URI contains the cloud AuraDB)
const cloudUri = process.env.NEO4J_URI || '';
const cloudPassword = process.env.NEO4J_PASSWORD || '';
const syncConfig: SyncConfig = {
localUri: process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687',
localUser: process.env.NEO4J_LOCAL_USER || 'neo4j',
localPassword: process.env.NEO4J_LOCAL_PASSWORD || 'password', // Local default
cloudUri: cloudUri,
cloudUser: process.env.NEO4J_USER || 'neo4j',
cloudPassword: cloudPassword,
batchSize: 500, // Larger batches
parallelBatches: 4, // 4 parallel operations
incrementalOnly: incremental
};
if (!syncConfig.cloudUri) {
console.error('❌ NEO4J_URI (cloud) is required in .env.production');
process.exit(1);
}
const sync = new FastNeo4jSync(syncConfig);
try {
await sync.run();
} finally {
await sync.close();
}
}
main().catch(console.error);