/** * Fast Neo4j Sync - Parallel batch sync with incremental support * * Much faster than sequential sync by: * 1. Using larger batches (500 instead of 100) * 2. Running multiple batches in parallel * 3. Using UNWIND for bulk operations * 4. Supporting incremental sync based on timestamps */ import neo4j, { Driver, Session } from 'neo4j-driver'; import { config } from 'dotenv'; import { resolve } from 'path'; // Load .env.production if it exists, otherwise fall back to .env config({ path: resolve(process.cwd(), '.env.production') }); config({ path: resolve(process.cwd(), '.env') }); interface SyncConfig { localUri: string; localUser: string; localPassword: string; cloudUri: string; cloudUser: string; cloudPassword: string; batchSize: number; parallelBatches: number; incrementalOnly: boolean; lastSyncTime?: Date; } interface SyncStats { nodesCreated: number; nodesUpdated: number; relationshipsCreated: number; relationshipsUpdated: number; duration: number; } class FastNeo4jSync { private localDriver: Driver; private cloudDriver: Driver; private config: SyncConfig; private stats: SyncStats = { nodesCreated: 0, nodesUpdated: 0, relationshipsCreated: 0, relationshipsUpdated: 0, duration: 0 }; constructor(config: SyncConfig) { this.config = config; this.localDriver = neo4j.driver( config.localUri, neo4j.auth.basic(config.localUser, config.localPassword) ); this.cloudDriver = neo4j.driver( config.cloudUri, neo4j.auth.basic(config.cloudUser, config.cloudPassword) ); } async connect(): Promise { console.log('šŸ“” Connecting to databases...'); const localSession = this.localDriver.session(); const cloudSession = this.cloudDriver.session(); try { await localSession.run('RETURN 1'); console.log(' āœ“ Local Neo4j connected'); await cloudSession.run('RETURN 1'); console.log(' āœ“ Cloud AuraDB connected'); } finally { await localSession.close(); await cloudSession.close(); } } async getLastSyncTime(): Promise { const cloudSession = this.cloudDriver.session(); try { const result = await cloudSession.run(` MATCH (s:_SyncMetadata {id: 'last_sync'}) RETURN s.timestamp as timestamp `); if (result.records.length > 0) { const ts = result.records[0].get('timestamp'); return ts ? new Date(ts) : null; } return null; } catch { return null; } finally { await cloudSession.close(); } } async setLastSyncTime(timestamp: Date): Promise { const cloudSession = this.cloudDriver.session(); try { await cloudSession.run(` MERGE (s:_SyncMetadata {id: 'last_sync'}) SET s.timestamp = $timestamp `, { timestamp: timestamp.toISOString() }); } finally { await cloudSession.close(); } } async countNodesToSync(since?: Date): Promise { const localSession = this.localDriver.session(); try { let query: string; let params: Record = {}; if (since) { query = ` MATCH (n) WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt) RETURN count(n) as count `; params = { since: since.toISOString() }; } else { query = 'MATCH (n) RETURN count(n) as count'; } const result = await localSession.run(query, params); return result.records[0].get('count').toNumber(); } finally { await localSession.close(); } } async countRelationshipsToSync(since?: Date): Promise { const localSession = this.localDriver.session(); try { let query: string; let params: Record = {}; if (since) { // For relationships, we sync if either end node was updated query = ` MATCH (a)-[r]->(b) WHERE a.updatedAt > $since OR b.updatedAt > $since OR a.createdAt > $since OR b.createdAt > $since OR NOT exists(a.updatedAt) RETURN count(r) as count `; params = { since: since.toISOString() }; } else { query = 'MATCH ()-[r]->() RETURN count(r) as count'; } const result = await localSession.run(query, params); return result.records[0].get('count').toNumber(); } finally { await localSession.close(); } } async syncNodesInParallel(since?: Date): Promise { const localSession = this.localDriver.session(); const totalNodes = await this.countNodesToSync(since); console.log(`\nšŸ“¦ Syncing ${totalNodes} nodes (batch: ${this.config.batchSize}, parallel: ${this.config.parallelBatches})...`); if (totalNodes === 0) { console.log(' āœ“ No nodes to sync'); return; } let offset = 0; let processed = 0; const startTime = Date.now(); try { while (offset < totalNodes) { // Fetch multiple batches worth of nodes const fetchSize = this.config.batchSize * this.config.parallelBatches; let query: string; let params: Record = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) }; if (since) { query = ` MATCH (n) WHERE n.updatedAt > $since OR n.createdAt > $since OR NOT exists(n.updatedAt) RETURN n, labels(n) as labels, elementId(n) as elementId SKIP $skip LIMIT $limit `; params.since = since.toISOString(); } else { query = ` MATCH (n) RETURN n, labels(n) as labels, elementId(n) as elementId SKIP $skip LIMIT $limit `; } const result = await localSession.run(query, params); const nodes = result.records.map(r => ({ properties: r.get('n').properties, labels: r.get('labels') as string[], elementId: r.get('elementId') as string })); if (nodes.length === 0) break; // Split into batches and process in parallel const batches: typeof nodes[] = []; for (let i = 0; i < nodes.length; i += this.config.batchSize) { batches.push(nodes.slice(i, i + this.config.batchSize)); } // Process batches in parallel await Promise.all(batches.map(batch => this.importNodeBatch(batch))); processed += nodes.length; offset += fetchSize; const elapsed = (Date.now() - startTime) / 1000; const rate = Math.round(processed / elapsed); const eta = Math.round((totalNodes - processed) / rate); console.log(` Progress: ${processed}/${totalNodes} (${rate} nodes/sec, ETA: ${eta}s)`); } this.stats.nodesCreated = processed; console.log(` āœ“ Synced ${processed} nodes in ${((Date.now() - startTime) / 1000).toFixed(1)}s`); } finally { await localSession.close(); } } private async importNodeBatch(nodes: Array<{ properties: Record; labels: string[]; elementId: string }>): Promise { const cloudSession = this.cloudDriver.session(); try { // Group nodes by label combination for efficient UNWIND const nodesByLabels = new Map(); for (const node of nodes) { const labelKey = node.labels.sort().join(':'); if (!nodesByLabels.has(labelKey)) { nodesByLabels.set(labelKey, []); } nodesByLabels.get(labelKey)!.push(node); } // Import each label group for (const [labelKey, labelNodes] of nodesByLabels) { const labels = labelKey || 'Node'; const nodeData = labelNodes.map(n => ({ ...n.properties, _syncId: n.elementId })); // Use MERGE to handle both create and update await cloudSession.run(` UNWIND $nodes as nodeData MERGE (n:${labels} {_syncId: nodeData._syncId}) SET n = nodeData `, { nodes: nodeData }); } } finally { await cloudSession.close(); } } async syncRelationshipsInParallel(since?: Date): Promise { const localSession = this.localDriver.session(); const totalRels = await this.countRelationshipsToSync(since); console.log(`\nšŸ”— Syncing ${totalRels} relationships...`); if (totalRels === 0) { console.log(' āœ“ No relationships to sync'); return; } let offset = 0; let processed = 0; const startTime = Date.now(); try { while (offset < totalRels) { const fetchSize = this.config.batchSize * this.config.parallelBatches; let query: string; let params: Record = { skip: neo4j.int(offset), limit: neo4j.int(fetchSize) }; if (since) { query = ` MATCH (a)-[r]->(b) WHERE a.updatedAt > $since OR b.updatedAt > $since OR a.createdAt > $since OR b.createdAt > $since OR NOT exists(a.updatedAt) RETURN type(r) as type, properties(r) as props, elementId(a) as startId, elementId(b) as endId SKIP $skip LIMIT $limit `; params.since = since.toISOString(); } else { query = ` MATCH (a)-[r]->(b) RETURN type(r) as type, properties(r) as props, elementId(a) as startId, elementId(b) as endId SKIP $skip LIMIT $limit `; } const result = await localSession.run(query, params); const rels = result.records.map(r => ({ type: r.get('type') as string, properties: r.get('props') as Record, startId: r.get('startId') as string, endId: r.get('endId') as string })); if (rels.length === 0) break; // Split into batches and process in parallel const batches: typeof rels[] = []; for (let i = 0; i < rels.length; i += this.config.batchSize) { batches.push(rels.slice(i, i + this.config.batchSize)); } // Process batches sequentially to avoid deadlocks for (const batch of batches) { await this.importRelationshipBatch(batch); } processed += rels.length; offset += fetchSize; const elapsed = (Date.now() - startTime) / 1000; const rate = Math.round(processed / elapsed); const eta = Math.round((totalRels - processed) / rate); console.log(` Progress: ${processed}/${totalRels} (${rate} rels/sec, ETA: ${eta}s)`); } this.stats.relationshipsCreated = processed; console.log(` āœ“ Synced ${processed} relationships in ${((Date.now() - startTime) / 1000).toFixed(1)}s`); } finally { await localSession.close(); } } private async importRelationshipBatch(rels: Array<{ type: string; properties: Record; startId: string; endId: string }>): Promise { const cloudSession = this.cloudDriver.session(); try { // Group by relationship type const relsByType = new Map(); for (const rel of rels) { if (!relsByType.has(rel.type)) { relsByType.set(rel.type, []); } relsByType.get(rel.type)!.push(rel); } // Import each type for (const [relType, typeRels] of relsByType) { const relData = typeRels.map(r => ({ startId: r.startId, endId: r.endId, props: r.properties })); await cloudSession.run(` UNWIND $rels as relData MATCH (a {_syncId: relData.startId}) MATCH (b {_syncId: relData.endId}) MERGE (a)-[r:${relType}]->(b) SET r = relData.props `, { rels: relData }); } } finally { await cloudSession.close(); } } async syncSchema(): Promise { console.log('\nšŸ“‹ Syncing schema...'); const localSession = this.localDriver.session(); const cloudSession = this.cloudDriver.session(); try { // Get constraints from local - build CREATE statements from available fields const constraintsResult = await localSession.run('SHOW CONSTRAINTS'); for (const record of constraintsResult.records) { const name = record.get('name'); const constraintType = record.get('type'); const entityType = record.get('entityType'); const labelsOrTypes = record.get('labelsOrTypes') as string[]; const properties = record.get('properties') as string[]; if (!name || name.startsWith('_')) continue; // Build CREATE CONSTRAINT statement let createStatement = ''; const label = labelsOrTypes?.[0] || 'Node'; const prop = properties?.[0] || 'id'; if (constraintType === 'UNIQUENESS' && entityType === 'NODE') { createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS UNIQUE`; } else if (constraintType === 'NODE_KEY') { createStatement = `CREATE CONSTRAINT ${name} IF NOT EXISTS FOR (n:${label}) REQUIRE n.${prop} IS NODE KEY`; } if (createStatement) { try { await cloudSession.run(createStatement); console.log(` āœ“ Constraint: ${name}`); } catch (e: unknown) { const error = e as Error; if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) { console.log(` ⚠ Constraint ${name}: ${error.message?.slice(0, 50)}`); } } } } // Get indexes - exclude LOOKUP and internal indexes const indexesResult = await localSession.run('SHOW INDEXES'); for (const record of indexesResult.records) { const name = record.get('name'); const indexType = record.get('type'); const entityType = record.get('entityType'); const labelsOrTypes = record.get('labelsOrTypes') as string[]; const properties = record.get('properties') as string[]; if (!name || name.startsWith('_') || indexType === 'LOOKUP') continue; const label = labelsOrTypes?.[0] || 'Node'; const props = properties || []; let createStatement = ''; if (indexType === 'RANGE' && entityType === 'NODE' && props.length > 0) { const propList = props.map(p => `n.${p}`).join(', '); createStatement = `CREATE INDEX ${name} IF NOT EXISTS FOR (n:${label}) ON (${propList})`; } else if (indexType === 'VECTOR' && props.length > 0) { // Skip vector indexes - they need special handling console.log(` ā­ Skipping vector index: ${name}`); continue; } if (createStatement) { try { await cloudSession.run(createStatement); console.log(` āœ“ Index: ${name} (${indexType})`); } catch (e: unknown) { const error = e as Error; if (!error.message?.includes('already exists') && !error.message?.includes('equivalent')) { console.log(` ⚠ Index ${name}: ${error.message?.slice(0, 50)}`); } } } } // Ensure _syncId index exists for fast lookups try { await cloudSession.run('CREATE INDEX sync_id_index IF NOT EXISTS FOR (n:Node) ON (n._syncId)'); } catch { // Ignore } } finally { await localSession.close(); await cloudSession.close(); } } async run(): Promise { const startTime = Date.now(); console.log('═'.repeat(60)); console.log('⚔ Fast Neo4j Sync'); console.log('═'.repeat(60)); console.log(`\nLocal: ${this.config.localUri}`); console.log(`Cloud: ${this.config.cloudUri}`); console.log(`Mode: ${this.config.incrementalOnly ? 'Incremental' : 'Full'}`); console.log(`Batch: ${this.config.batchSize} nodes Ɨ ${this.config.parallelBatches} parallel`); await this.connect(); let since: Date | undefined; if (this.config.incrementalOnly) { const lastSync = await this.getLastSyncTime(); if (lastSync) { since = lastSync; console.log(`\nšŸ“… Last sync: ${lastSync.toISOString()}`); console.log(' Only syncing changes since then...'); } else { console.log('\nāš ļø No previous sync found, doing full sync...'); } } await this.syncSchema(); await this.syncNodesInParallel(since); await this.syncRelationshipsInParallel(since); // Record sync time await this.setLastSyncTime(new Date()); this.stats.duration = Date.now() - startTime; console.log('\n' + '═'.repeat(60)); console.log('āœ… Sync Complete!'); console.log('═'.repeat(60)); console.log(` Nodes: ${this.stats.nodesCreated}`); console.log(` Relationships: ${this.stats.relationshipsCreated}`); console.log(` Duration: ${(this.stats.duration / 1000).toFixed(1)}s`); console.log(` Rate: ${Math.round((this.stats.nodesCreated + this.stats.relationshipsCreated) / (this.stats.duration / 1000))} items/sec`); return this.stats; } async close(): Promise { await this.localDriver.close(); await this.cloudDriver.close(); } } // CLI async function main() { const args = process.argv.slice(2); const incremental = args.includes('--incremental') || args.includes('-i'); const help = args.includes('--help') || args.includes('-h'); if (help) { console.log(` Fast Neo4j Sync - Parallel batch sync with incremental support Usage: npx tsx src/scripts/fast-neo4j-sync.ts [options] Options: --incremental, -i Only sync changes since last sync --help, -h Show this help Environment variables: NEO4J_LOCAL_URI Local Neo4j URI (default: bolt://localhost:7687) NEO4J_LOCAL_USER Local Neo4j user (default: neo4j) NEO4J_LOCAL_PASSWORD Local Neo4j password NEO4J_CLOUD_URI Cloud AuraDB URI NEO4J_CLOUD_USER Cloud user (default: neo4j) NEO4J_CLOUD_PASSWORD Cloud password `); process.exit(0); } // Cloud URI from .env.production (NEO4J_URI contains the cloud AuraDB) const cloudUri = process.env.NEO4J_URI || ''; const cloudPassword = process.env.NEO4J_PASSWORD || ''; const syncConfig: SyncConfig = { localUri: process.env.NEO4J_LOCAL_URI || 'bolt://localhost:7687', localUser: process.env.NEO4J_LOCAL_USER || 'neo4j', localPassword: process.env.NEO4J_LOCAL_PASSWORD || 'password', // Local default cloudUri: cloudUri, cloudUser: process.env.NEO4J_USER || 'neo4j', cloudPassword: cloudPassword, batchSize: 500, // Larger batches parallelBatches: 4, // 4 parallel operations incrementalOnly: incremental }; if (!syncConfig.cloudUri) { console.error('āŒ NEO4J_URI (cloud) is required in .env.production'); process.exit(1); } const sync = new FastNeo4jSync(syncConfig); try { await sync.run(); } finally { await sync.close(); } } main().catch(console.error);